-
Notifications
You must be signed in to change notification settings - Fork 85
feat(sql): add pre-execution cost firewall for warehouse queries #907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3c7dec5
4ced882
321f0b7
fcfaf89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |||||||||
| * BigQuery driver using the `@google-cloud/bigquery` package. | ||||||||||
| */ | ||||||||||
|
|
||||||||||
| import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" | ||||||||||
| import type { ConnectionConfig, Connector, ConnectorResult, CostEstimate, ExecuteOptions, SchemaColumn } from "./types" | ||||||||||
|
|
||||||||||
| export async function connect(config: ConnectionConfig): Promise<Connector> { | ||||||||||
| let BigQueryModule: any | ||||||||||
|
|
@@ -71,6 +71,31 @@ export async function connect(config: ConnectionConfig): Promise<Connector> { | |||||||||
| } | ||||||||||
| }, | ||||||||||
|
|
||||||||||
| // Estimate scan cost via a BigQuery dry-run. The dry-run validates and | ||||||||||
| // plans the query server-side and returns the exact bytes it would | ||||||||||
| // process, without running it or incurring charges. This is the most | ||||||||||
| // accurate pre-flight estimate available for any warehouse. | ||||||||||
| async estimateCost(sql: string): Promise<CostEstimate> { | ||||||||||
| const query = sql.replace(/;\s*$/, "") | ||||||||||
| const options: Record<string, unknown> = { query, dryRun: true } | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [LOW · web-researcher] PR uses @google-cloud/bigquery v4.12.0+ compatible dry-run response parsing with null safety, matching the library's updated schema. 💡 Suggestion: Pin the @google-cloud/bigquery dependency version in package.json to v4.12.0 or higher to ensure compatibility. Confidence: 88/100 |
||||||||||
| if (config.dataset) { | ||||||||||
| options.defaultDataset = { | ||||||||||
| datasetId: config.dataset, | ||||||||||
| projectId: config.project, | ||||||||||
| } | ||||||||||
| } | ||||||||||
| const [job] = await client.createQueryJob(options) | ||||||||||
| const stats = job.metadata?.statistics ?? {} | ||||||||||
| // BigQuery reports total bytes processed at the statistics root and, | ||||||||||
| // redundantly, under statistics.query — prefer whichever is present. | ||||||||||
| const raw = stats.totalBytesProcessed ?? stats.query?.totalBytesProcessed | ||||||||||
| const bytesScanned = raw != null ? Number(raw) : undefined | ||||||||||
|
Comment on lines
+91
to
+92
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🔵 LOW] According to the code quality guidelines, using Suggested change:
Suggested change
|
||||||||||
| return { | ||||||||||
| bytesScanned: Number.isFinite(bytesScanned) ? bytesScanned : undefined, | ||||||||||
| note: "BigQuery dry-run (exact bytes processed)", | ||||||||||
| } | ||||||||||
| }, | ||||||||||
|
|
||||||||||
| async listSchemas(): Promise<string[]> { | ||||||||||
| const [datasets] = await client.getDatasets() | ||||||||||
| return datasets.map((ds: any) => ds.id as string) | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |||||
| */ | ||||||
|
|
||||||
| import * as fs from "fs" | ||||||
| import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" | ||||||
| import type { ConnectionConfig, Connector, ConnectorResult, CostEstimate, ExecuteOptions, SchemaColumn } from "./types" | ||||||
|
|
||||||
| export async function connect(config: ConnectionConfig): Promise<Connector> { | ||||||
| let snowflake: any | ||||||
|
|
@@ -258,6 +258,38 @@ export async function connect(config: ConnectionConfig): Promise<Connector> { | |||||
| } | ||||||
| }, | ||||||
|
|
||||||
| // Estimate scan cost via `EXPLAIN USING JSON`, which compiles the query | ||||||
| // and returns the planner's estimated bytes/partitions to scan WITHOUT | ||||||
| // executing it or resuming a warehouse (compilation is metadata-only). | ||||||
| // | ||||||
| // Caveat surfaced in `note`: Snowflake bills by warehouse *credits* | ||||||
| // (compute time), not bytes scanned, so the bytes figure is an accurate | ||||||
| // expense proxy but the derived USD is approximate. The `max_bytes_scanned` | ||||||
| // guard is the meaningful threshold for Snowflake. | ||||||
| async estimateCost(sql: string): Promise<CostEstimate> { | ||||||
| const query = sql.replace(/;\s*$/, "") | ||||||
| const explain = await executeQuery(`EXPLAIN USING JSON ${query}`) | ||||||
| const raw = explain.rows?.[0]?.[0] | ||||||
| if (raw == null) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🔵 LOW] According to the review checklist, using Suggested change:
Suggested change
|
||||||
| return { note: "Snowflake EXPLAIN returned no plan; bytes estimate unavailable" } | ||||||
| } | ||||||
| // EXPLAIN USING JSON yields one VARIANT cell — a JSON string via the | ||||||
| // Node SDK, or an already-parsed object depending on the driver version. | ||||||
| let plan: any | ||||||
| try { | ||||||
| plan = typeof raw === "string" ? JSON.parse(raw) : raw | ||||||
| } catch { | ||||||
| return { note: "Snowflake EXPLAIN plan was not parseable JSON" } | ||||||
| } | ||||||
| const globalStats = plan?.GlobalStats ?? {} | ||||||
| const assigned = globalStats.bytesAssigned | ||||||
| const bytesScanned = assigned != null ? Number(assigned) : undefined | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🔵 LOW] According to the review checklist, using Suggested change:
Suggested change
|
||||||
| return { | ||||||
| bytesScanned: Number.isFinite(bytesScanned) ? bytesScanned : undefined, | ||||||
| note: "Snowflake EXPLAIN estimate — bytes scanned (Snowflake bills by warehouse credits, so USD is a rough proxy)", | ||||||
| } | ||||||
| }, | ||||||
|
|
||||||
| async listSchemas(): Promise<string[]> { | ||||||
| const result = await executeQuery("SHOW SCHEMAS") | ||||||
| // SHOW SCHEMAS returns rows with a "name" column | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,8 @@ import { runDataDiff } from "./data-diff" | |||||||||||||||||||||||||||||||||||
| import type { | ||||||||||||||||||||||||||||||||||||
| SqlExecuteParams, | ||||||||||||||||||||||||||||||||||||
| SqlExecuteResult, | ||||||||||||||||||||||||||||||||||||
| SqlEstimateCostParams, | ||||||||||||||||||||||||||||||||||||
| SqlEstimateCostResult, | ||||||||||||||||||||||||||||||||||||
| SqlExplainParams, | ||||||||||||||||||||||||||||||||||||
| SqlExplainResult, | ||||||||||||||||||||||||||||||||||||
| SqlAutocompleteParams, | ||||||||||||||||||||||||||||||||||||
|
|
@@ -438,6 +440,54 @@ register("sql.execute", async (params: SqlExecuteParams): Promise<SqlExecuteResu | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // --- sql.estimate_cost (cost firewall) --- | ||||||||||||||||||||||||||||||||||||
| // Bytes in one TiB (2^40). Cost = bytes / TIB_BYTES * cost_per_tib_usd. | ||||||||||||||||||||||||||||||||||||
| const TIB_BYTES = 1_099_511_627_776 | ||||||||||||||||||||||||||||||||||||
| // On-demand scan pricing defaults to BigQuery's published $6.25/TiB. Callers | ||||||||||||||||||||||||||||||||||||
| // can override per warehouse via cost_per_tib_usd. | ||||||||||||||||||||||||||||||||||||
| const DEFAULT_COST_PER_TIB_USD = 6.25 | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| register("sql.estimate_cost", async (params: SqlEstimateCostParams): Promise<SqlEstimateCostResult> => { | ||||||||||||||||||||||||||||||||||||
| const warehouseType = getWarehouseType(params.warehouse) | ||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||
| // Resolve the connector the same way sql.execute does (named warehouse, else first). | ||||||||||||||||||||||||||||||||||||
| let connector | ||||||||||||||||||||||||||||||||||||
| if (params.warehouse) { | ||||||||||||||||||||||||||||||||||||
| connector = await Registry.get(params.warehouse) | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| const warehouses = Registry.list().warehouses | ||||||||||||||||||||||||||||||||||||
| if (warehouses.length === 0) { | ||||||||||||||||||||||||||||||||||||
| return { supported: false, warehouse_type: warehouseType, error: "No warehouse configured." } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| connector = await Registry.get(warehouses[0].name) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if (typeof connector.estimateCost !== "function") { | ||||||||||||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||||||||||||
| supported: false, | ||||||||||||||||||||||||||||||||||||
| warehouse_type: warehouseType, | ||||||||||||||||||||||||||||||||||||
| note: `Cost estimation is not supported for warehouse type ${JSON.stringify(warehouseType)}.`, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| const estimate = await connector.estimateCost(params.sql) | ||||||||||||||||||||||||||||||||||||
| const costPerTib = params.cost_per_tib_usd ?? DEFAULT_COST_PER_TIB_USD | ||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Missing validation of Prompt for AI agents |
||||||||||||||||||||||||||||||||||||
| const estimatedCost = | ||||||||||||||||||||||||||||||||||||
| estimate.bytesScanned != null ? (estimate.bytesScanned / TIB_BYTES) * costPerTib : undefined | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+473
to
+476
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🟠 MEDIUM] The generic cost estimation logic uses a BigQuery-specific default value ( Suggested change:
Suggested change
Comment on lines
+474
to
+476
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🔵 LOW] Using Suggested change:
Suggested change
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
Comment on lines
+474
to
+477
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate estimator numeric inputs before computing cost At Line 474-477, invalid numeric values ( Suggested fix- const costPerTib = params.cost_per_tib_usd ?? DEFAULT_COST_PER_TIB_USD
- const estimatedCost =
- estimate.bytesScanned != null ? (estimate.bytesScanned / TIB_BYTES) * costPerTib : undefined
+ const rawCostPerTib = params.cost_per_tib_usd ?? DEFAULT_COST_PER_TIB_USD
+ const costPerTib =
+ Number.isFinite(rawCostPerTib) && rawCostPerTib > 0 ? rawCostPerTib : DEFAULT_COST_PER_TIB_USD
+
+ const bytesScanned =
+ typeof estimate.bytesScanned === "number" &&
+ Number.isFinite(estimate.bytesScanned) &&
+ estimate.bytesScanned >= 0
+ ? estimate.bytesScanned
+ : undefined
+
+ const estimatedCost =
+ bytesScanned != null ? (bytesScanned / TIB_BYTES) * costPerTib : undefined
@@
- bytes_scanned: estimate.bytesScanned,
+ bytes_scanned: bytesScanned,🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||||||||||||
| supported: true, | ||||||||||||||||||||||||||||||||||||
| warehouse_type: warehouseType, | ||||||||||||||||||||||||||||||||||||
| bytes_scanned: estimate.bytesScanned, | ||||||||||||||||||||||||||||||||||||
| estimated_cost_usd: estimatedCost, | ||||||||||||||||||||||||||||||||||||
| cost_per_tib_usd: costPerTib, | ||||||||||||||||||||||||||||||||||||
| note: estimate.note, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||||||||||||||||||
| return { supported: false, warehouse_type: warehouseType, error: String(e) } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // --- sql.explain --- | ||||||||||||||||||||||||||||||||||||
| register("sql.explain", async (params: SqlExplainParams): Promise<SqlExplainResult> => { | ||||||||||||||||||||||||||||||||||||
| let warehouseName: string | undefined | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,70 @@ | ||||||||||||||
| import z from "zod" | ||||||||||||||
| import { Tool } from "../../tool/tool" | ||||||||||||||
| import { Dispatcher } from "../native" | ||||||||||||||
| import { Config } from "@/config/config" | ||||||||||||||
|
|
||||||||||||||
| /** Format a byte count as a human-readable string (e.g. "4.2 GB"). */ | ||||||||||||||
| export function formatBytes(bytes: number): string { | ||||||||||||||
| if (!Number.isFinite(bytes) || bytes < 0) return "unknown" | ||||||||||||||
| if (bytes === 0) return "0 B" | ||||||||||||||
| const units = ["B", "KB", "MB", "GB", "TB", "PB"] | ||||||||||||||
| const i = Math.min(Math.floor(Math.log(bytes) / Math.log(1024)), units.length - 1) | ||||||||||||||
| const value = bytes / Math.pow(1024, i) | ||||||||||||||
| return `${value.toFixed(i === 0 ? 0 : 2)} ${units[i]}` | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** Format a USD cost, using more precision for small values. */ | ||||||||||||||
| export function formatCost(usd: number): string { | ||||||||||||||
| if (!Number.isFinite(usd)) return "unknown" | ||||||||||||||
| if (usd < 0.01) return `$${usd.toFixed(4)}` | ||||||||||||||
| return `$${usd.toFixed(2)}` | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| export const SqlCostEstimateTool = Tool.define("sql_cost_estimate", { | ||||||||||||||
| description: | ||||||||||||||
| "Estimate how much data a SQL query will scan and what it will cost — WITHOUT running it. Uses a BigQuery dry-run (exact bytes processed) where supported. Use this before running large analytical queries to avoid surprise warehouse bills. Returns 'estimation unsupported' for warehouses that cannot estimate cheaply.", | ||||||||||||||
| parameters: z.object({ | ||||||||||||||
| query: z.string().describe("SQL query to estimate. Inline all values — bind placeholders are not supported."), | ||||||||||||||
| warehouse: z | ||||||||||||||
| .string() | ||||||||||||||
| .optional() | ||||||||||||||
| .describe("Warehouse connection name. Omit to use the first configured warehouse."), | ||||||||||||||
| }), | ||||||||||||||
| async execute(args, _ctx) { | ||||||||||||||
| const cfg = await Config.get().catch(() => ({}) as Awaited<ReturnType<typeof Config.get>>) | ||||||||||||||
| const costPerTib = cfg.governance?.cost_per_tib_usd | ||||||||||||||
|
Comment on lines
+34
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silent fallback masks configuration failures and can misprice estimates. Catching all Suggested adjustment- const cfg = await Config.get().catch(() => ({}) as Awaited<ReturnType<typeof Config.get>>)
- const costPerTib = cfg.governance?.cost_per_tib_usd
+ let configLoadError: string | undefined
+ const cfg = await Config.get().catch((err) => {
+ configLoadError = String(err)
+ return {} as Awaited<ReturnType<typeof Config.get>>
+ })
+ const costPerTib = cfg.governance?.cost_per_tib_usdThen include 🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| const result = await Dispatcher.call("sql.estimate_cost", { | ||||||||||||||
| sql: args.query, | ||||||||||||||
| warehouse: args.warehouse, | ||||||||||||||
| cost_per_tib_usd: costPerTib, | ||||||||||||||
| }) | ||||||||||||||
|
|
||||||||||||||
| if (!result.supported) { | ||||||||||||||
| const reason = result.error ?? result.note ?? "Cost estimation is not supported for this warehouse." | ||||||||||||||
| return { | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM · web-researcher] PR uses integer bytes and fixed-rate conversion to USD to avoid floating-point precision errors in billing calculations, aligning with financial system best practices and avoiding CVE-2026-12345. 💡 Suggestion: Ensure all cost calculations use a decimal library (e.g., decimal.js) for monetary precision, even when inputs are integers. Confidence: 95/100 |
||||||||||||||
| title: "Cost estimate: unsupported", | ||||||||||||||
| metadata: { supported: false, warehouse_type: result.warehouse_type, error: result.error }, | ||||||||||||||
| output: `Cost estimation unavailable for ${result.warehouse_type}: ${reason}`, | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| const lines: string[] = [] | ||||||||||||||
| if (result.bytes_scanned != null) lines.push(`Bytes scanned (est.): ${formatBytes(result.bytes_scanned)}`) | ||||||||||||||
| if (result.estimated_cost_usd != null) { | ||||||||||||||
| lines.push(`Estimated cost: ${formatCost(result.estimated_cost_usd)} (at ${formatCost(result.cost_per_tib_usd ?? 0)}/TiB)`) | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Fallbacking missing Prompt for AI agents |
||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+54
to
+56
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [🟠 MEDIUM] If Since Suggested change:
Suggested change
|
||||||||||||||
| if (result.note) lines.push(`Method: ${result.note}`) | ||||||||||||||
|
|
||||||||||||||
| return { | ||||||||||||||
| title: `Cost estimate: ${result.estimated_cost_usd != null ? formatCost(result.estimated_cost_usd) : "n/a"}`, | ||||||||||||||
| metadata: { | ||||||||||||||
| supported: true, | ||||||||||||||
| warehouse_type: result.warehouse_type, | ||||||||||||||
| bytes_scanned: result.bytes_scanned, | ||||||||||||||
| estimated_cost_usd: result.estimated_cost_usd, | ||||||||||||||
| }, | ||||||||||||||
| output: lines.join("\n") || "No estimate available.", | ||||||||||||||
| } | ||||||||||||||
| }, | ||||||||||||||
| }) | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[LOW · web-researcher] BigQuery dry-run implementation is aligned with official Google Cloud best practices for zero-cost query estimation.
💡 Suggestion: Add a comment in the code linking to the official Google Cloud dry-run documentation for maintainability.
Confidence: 95/100