From 31060c86f009977c05070b32b819c09ad0d7ffe1 Mon Sep 17 00:00:00 2001 From: Aaron Brewbaker Date: Wed, 24 Jun 2026 12:00:20 -0400 Subject: [PATCH] feat(dataset): port dry-run and impact from Go CLI Adds `dataset dry-run` and `dataset impact` subcommands to the existing dataset route, ported from the deprecated Go Observe CLI. - dry-run: previews saving a dataset pipeline via `saveDataset` with `dependencyHandling.saveMode = PreflightDatasetAndDependencies` (nothing persisted); reports the dataset, rematerialized datasets, and compile errors, exiting 1 if any error datasets are returned. - impact: renders a table of downstream datasets affected by a save and sends compile/validation errors to stderr. The Go source queried a `saveDatasetDryRun` mutation and `affectedDatasets { dataset { id name } dependencyType }` field that do not exist in the published GraphQL schema; the operations here were rewritten against the real SDL. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 2 +- README.md | 2 + src/commands/dataset/dry-run.test.ts | 214 ++++++++++++++++ src/commands/dataset/dry-run.ts | 87 +++++++ src/commands/dataset/impact.test.ts | 234 ++++++++++++++++++ src/commands/dataset/impact.ts | 145 +++++++++++ src/commands/dataset/index.ts | 10 +- src/commands/dataset/input.test.ts | 64 +++++ src/commands/dataset/input.ts | 100 ++++++++ .../datasets-affected-by-update.graphql | 24 ++ .../dataset/datasets-affected-by-update.ts | 28 +++ src/gql/dataset/save-dataset-dry-run.graphql | 28 +++ src/gql/dataset/save-dataset-dry-run.ts | 29 +++ 13 files changed, 964 insertions(+), 3 deletions(-) create mode 100644 src/commands/dataset/dry-run.test.ts create mode 100644 src/commands/dataset/dry-run.ts create mode 100644 src/commands/dataset/impact.test.ts create mode 100644 src/commands/dataset/impact.ts create mode 100644 src/commands/dataset/input.test.ts create mode 100644 src/commands/dataset/input.ts create mode 100644 src/gql/dataset/datasets-affected-by-update.graphql create mode 100644 src/gql/dataset/datasets-affected-by-update.ts create mode 100644 src/gql/dataset/save-dataset-dry-run.graphql create mode 100644 src/gql/dataset/save-dataset-dry-run.ts diff --git a/AGENTS.md b/AGENTS.md index 0198696..6cbd7de 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -40,7 +40,7 @@ src/ │ │ ├── host/ # Host Explorer (install, view) │ │ ├── kubernetes/ # Kubernetes Explorer (install, view) │ │ └── tracing/ # Trace Explorer (install, view) -│ ├── dataset/ # Dataset commands (list, view) +│ ├── dataset/ # Dataset commands (list, view, dry-run, impact) │ ├── datastream/ # Datastream commands (create, list, view, update) │ ├── ingest-token/ # Ingest token commands (create, list, view, update) │ ├── metric/ # Metric commands (list, view) diff --git a/README.md b/README.md index a7f6baa..fa64e30 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,8 @@ To update installed skills after edits in this repo, run `npx skills update`. | `observe tag-key list` | Search tag keys in the knowledge graph | | `observe dataset list` | List datasets with optional filtering | | `observe dataset view` | View dataset details and schema | +| `observe dataset dry-run` | Dry-run saving a dataset pipeline (nothing persisted) | +| `observe dataset impact` | Show downstream datasets affected by a pipeline save | | `observe metric list` | Search and list metrics | | `observe metric view` | View metric details and dimensions | | `observe query` | Execute OPAL queries on datasets | diff --git a/src/commands/dataset/dry-run.test.ts b/src/commands/dataset/dry-run.test.ts new file mode 100644 index 0000000..4ff8c5c --- /dev/null +++ b/src/commands/dataset/dry-run.test.ts @@ -0,0 +1,214 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const gqlModulePath = resolve( + repoRoot, + "src/gql/dataset/save-dataset-dry-run.ts", +); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const readDatasetInputFn = mock((_file: string) => ({ + workspaceId: "42379913", + dataset: { label: "MyDataset" }, + query: { + outputStage: "main", + stages: [{ id: "main", pipeline: "filter true", input: [] }], + }, +})); + +const saveDatasetDryRunFn = mock((_config: unknown, _variables: unknown) => + Promise.resolve({ + dataset: { id: "99001", name: "MyDataset" }, + dematerializedDatasets: [{ dataset: { id: "88001", name: "DownstreamA" } }], + errorDatasets: [], + }), +); + +let dryRun: (typeof import("./dry-run"))["dryRun"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + readDatasetInput: readDatasetInputFn, + saveDatasetDryRun: saveDatasetDryRunFn, +} as Parameters<(typeof import("./dry-run"))["dryRun"]>[3]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(gqlModulePath, () => ({ + saveDatasetDryRun: saveDatasetDryRunFn, + })); + + const mod = await import("./dry-run.ts"); + dryRun = mod.dryRun; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("dataset dry-run", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + readDatasetInputFn.mockClear(); + saveDatasetDryRunFn.mockClear(); + saveDatasetDryRunFn.mockImplementation(() => + Promise.resolve({ + dataset: { id: "99001", name: "MyDataset" }, + dematerializedDatasets: [ + { dataset: { id: "88001", name: "DownstreamA" } }, + ], + errorDatasets: [], + }), + ); + }); + + test("prints dataset and rematerialization lines on success", async () => { + const { context, stdout, stderr } = createMockContext(); + await dryRun.call(context, {}, "input.json", deps); + + const out = stdout.join(""); + expect(out).toContain("Dataset: MyDataset (99001)"); + expect(out).toContain("Would rematerialize: DownstreamA (88001)"); + expect(stderr.join("")).toBe(""); + }); + + test("passes mapped variables to saveDatasetDryRun", async () => { + const { context } = createMockContext(); + await dryRun.call(context, {}, "input.json", deps); + + const [, variables] = saveDatasetDryRunFn.mock.calls[0]!; + expect(variables).toEqual({ + workspaceId: "42379913", + dataset: { label: "MyDataset" }, + query: { + outputStage: "main", + stages: [{ id: "main", pipeline: "filter true", input: [] }], + }, + }); + }); + + test("does not print rematerialization line when none returned", async () => { + saveDatasetDryRunFn.mockImplementationOnce(() => + Promise.resolve({ + dataset: { id: "99002", name: "CleanDataset" }, + dematerializedDatasets: [], + errorDatasets: [], + }), + ); + const { context, stdout } = createMockContext(); + await dryRun.call(context, {}, "input.json", deps); + + const out = stdout.join(""); + expect(out).toContain("Dataset: CleanDataset (99002)"); + expect(out).not.toContain("Would rematerialize"); + }); + + test("prints error datasets and exits 1 when errors present", async () => { + saveDatasetDryRunFn.mockImplementationOnce(() => + Promise.resolve({ + dataset: null, + dematerializedDatasets: [], + errorDatasets: [ + { + datasetId: "77001", + datasetName: "BadDataset", + text: "syntax error", + }, + { datasetId: "77002", datasetName: "Err2", text: "second error" }, + ], + }), + ); + const { context, stdout, getExitCode } = createMockContext(); + try { + await dryRun.call(context, {}, "input.json", deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + + const out = stdout.join(""); + expect(out).toContain("Error in BadDataset: syntax error"); + expect(out).toContain("Error in Err2: second error"); + expect(getExitCode()).toBe(1); + }); + + test("exits 1 when input cannot be read", async () => { + readDatasetInputFn.mockImplementationOnce(() => { + throw new Error('could not read file "missing.json"'); + }); + const { context, stderr, getExitCode } = createMockContext(); + try { + await dryRun.call(context, {}, "missing.json", deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(stderr.join("")).toContain("could not read file"); + expect(getExitCode()).toBe(1); + }); +}); diff --git a/src/commands/dataset/dry-run.ts b/src/commands/dataset/dry-run.ts new file mode 100644 index 0000000..fb1d534 --- /dev/null +++ b/src/commands/dataset/dry-run.ts @@ -0,0 +1,87 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { saveDatasetDryRun } from "../../gql/dataset/save-dataset-dry-run"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; +import { readDatasetInput } from "./input"; + +export interface DryRunDeps { + loadConfig?: typeof loadConfig; + readDatasetInput?: typeof readDatasetInput; + saveDatasetDryRun?: typeof saveDatasetDryRun; +} + +/** + * Dry-run saving a dataset pipeline. Nothing is persisted. Reports the dataset + * that would be saved, datasets that would be dematerialized/rematerialized, + * and any compile/validation errors. Exits 1 if any error datasets are + * returned. + */ +export async function dryRun( + this: LocalContext, + _flags: Record, + file: string, + deps: DryRunDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + readDatasetInput: readDatasetInputImpl = readDatasetInput, + saveDatasetDryRun: saveDatasetDryRunImpl = saveDatasetDryRun, + } = deps; + const { process, writer } = this; + + try { + const variables = readDatasetInputImpl(file); + const config = loadConfigImpl(); + const result = await saveDatasetDryRunImpl(config, variables); + + if (result?.dataset) { + writer.write(`Dataset: ${result.dataset.name} (${result.dataset.id})`); + } + + for (const demat of result?.dematerializedDatasets ?? []) { + if (demat.dataset) { + writer.write( + `Would rematerialize: ${demat.dataset.name} (${demat.dataset.id})`, + ); + } + } + + let hasErrors = false; + for (const errDs of result?.errorDatasets ?? []) { + writer.write(`Error in ${errDs.datasetName}: ${errDs.text}`); + hasErrors = true; + } + + if (hasErrors) { + process.exit(1); + } + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const dryRunCommand = buildCommand({ + loader: async () => dryRun, + parameters: { + positional: { + kind: "tuple", + parameters: [ + { + brief: "Path to dataset pipeline JSON file", + parse: String, + }, + ], + }, + flags: {}, + }, + docs: { + brief: "Dry-run saving a dataset pipeline (nothing is persisted)", + }, +}); diff --git a/src/commands/dataset/impact.test.ts b/src/commands/dataset/impact.test.ts new file mode 100644 index 0000000..f79fc58 --- /dev/null +++ b/src/commands/dataset/impact.test.ts @@ -0,0 +1,234 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const affectedModulePath = resolve( + repoRoot, + "src/gql/dataset/datasets-affected-by-update.ts", +); +const dryRunModulePath = resolve( + repoRoot, + "src/gql/dataset/save-dataset-dry-run.ts", +); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const readDatasetInputFn = mock((_file: string) => ({ + workspaceId: "42379913", + dataset: { label: "MyDataset" }, + query: { + outputStage: "main", + stages: [{ id: "main", pipeline: "filter true", input: [] }], + }, +})); + +const getAffectedFn = mock((_config: unknown, _variables: unknown) => + Promise.resolve({ + dematerializedDatasets: [ + { dataset: { id: "55001", name: "Alpha" } }, + { dataset: { id: "55002", name: "Beta" } }, + ], + editForwardDematerializedDatasets: [], + }), +); + +const dryRunFn = mock((_config: unknown, _variables: unknown) => + Promise.resolve({ + dataset: { id: "99001", name: "MyDataset" }, + dematerializedDatasets: [], + errorDatasets: [], + }), +); + +let impact: (typeof import("./impact"))["impact"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + readDatasetInput: readDatasetInputFn, + getDatasetsAffectedByUpdate: getAffectedFn, + saveDatasetDryRun: dryRunFn, +} as Parameters<(typeof import("./impact"))["impact"]>[3]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(affectedModulePath, () => ({ + getDatasetsAffectedByUpdate: getAffectedFn, + })); + void mock.module(dryRunModulePath, () => ({ + saveDatasetDryRun: dryRunFn, + })); + + const mod = await import("./impact.ts"); + impact = mod.impact; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("dataset impact", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + readDatasetInputFn.mockClear(); + getAffectedFn.mockClear(); + dryRunFn.mockClear(); + getAffectedFn.mockImplementation(() => + Promise.resolve({ + dematerializedDatasets: [ + { dataset: { id: "55001", name: "Alpha" } }, + { dataset: { id: "55002", name: "Beta" } }, + ], + editForwardDematerializedDatasets: [], + }), + ); + dryRunFn.mockImplementation(() => + Promise.resolve({ + dataset: { id: "99001", name: "MyDataset" }, + dematerializedDatasets: [], + errorDatasets: [], + }), + ); + }); + + test("renders a table of affected datasets", async () => { + const { context, stdout } = createMockContext(); + await impact.call(context, {}, "input.json", deps); + + const out = stdout.join(""); + expect(out).toContain("NAME"); + expect(out).toContain("ID"); + expect(out).toContain("IMPACT"); + expect(out).toContain("Alpha"); + expect(out).toContain("55001"); + expect(out).toContain("Beta"); + expect(out).toContain("dematerialized"); + }); + + test("emits JSON rows with --json", async () => { + const { context, stdout } = createMockContext(); + await impact.call(context, { json: true }, "input.json", deps); + + const rows = JSON.parse(stdout.join("")); + expect(rows).toEqual([ + { name: "Alpha", id: "55001", impact: "dematerialized" }, + { name: "Beta", id: "55002", impact: "dematerialized" }, + ]); + }); + + test("writes error datasets to stderr", async () => { + dryRunFn.mockImplementationOnce(() => + Promise.resolve({ + dataset: null, + dematerializedDatasets: [], + errorDatasets: [ + { + datasetId: "66001", + datasetName: "ErrorDs", + text: "compilation failed", + }, + ], + }), + ); + const { context, stderr } = createMockContext(); + await impact.call(context, {}, "input.json", deps); + + expect(stderr.join("")).toContain("Error in ErrorDs: compilation failed"); + }); + + test("renders header even when no affected datasets", async () => { + getAffectedFn.mockImplementationOnce(() => + Promise.resolve({ + dematerializedDatasets: [], + editForwardDematerializedDatasets: [], + }), + ); + const { context, stdout } = createMockContext(); + await impact.call(context, {}, "input.json", deps); + + const out = stdout.join(""); + expect(out).toContain("NAME"); + expect(out).toContain("ID"); + expect(out).toContain("IMPACT"); + }); + + test("exits 1 on API error", async () => { + getAffectedFn.mockImplementationOnce(() => { + const err = new Error("forbidden"); + err.name = "GqlApiError"; + (err as unknown as { statusCode: number }).statusCode = 200; + throw err; + }); + const { context, stderr, getExitCode } = createMockContext(); + try { + await impact.call(context, {}, "input.json", deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(stderr.join("")).toContain("forbidden"); + expect(getExitCode()).toBe(1); + }); +}); diff --git a/src/commands/dataset/impact.ts b/src/commands/dataset/impact.ts new file mode 100644 index 0000000..6994fc0 --- /dev/null +++ b/src/commands/dataset/impact.ts @@ -0,0 +1,145 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { getDatasetsAffectedByUpdate } from "../../gql/dataset/datasets-affected-by-update"; +import { saveDatasetDryRun } from "../../gql/dataset/save-dataset-dry-run"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; +import { renderAsCSV } from "../../lib/formatters/csv"; +import { createColumnHelper, formatTable } from "../../lib/formatters/table"; +import { muteStatusWriter } from "../../lib/writer"; +import { readDatasetInput } from "./input"; + +type OutputFormat = "json" | "csv"; + +interface ImpactFlags { + format?: OutputFormat; + json?: boolean; +} + +/** A single downstream dataset affected by the proposed update. */ +interface AffectedDataset { + name: string; + id: string; + /** How the dataset is impacted (e.g. dematerialized). */ + impact: string; +} + +export interface ImpactDeps { + loadConfig?: typeof loadConfig; + readDatasetInput?: typeof readDatasetInput; + getDatasetsAffectedByUpdate?: typeof getDatasetsAffectedByUpdate; + saveDatasetDryRun?: typeof saveDatasetDryRun; +} + +const columns = createColumnHelper(); +const tableColumns = [ + columns.accessor((row) => row.name, { header: "NAME", flex: true }), + columns.accessor((row) => row.id, { header: "ID" }), + columns.accessor((row) => row.impact, { header: "IMPACT" }), +]; + +/** + * Report the downstream datasets affected if the given dataset pipeline were + * saved. Affected datasets are rendered as a table (or JSON/CSV); any + * compile/validation errors are written to stderr. Nothing is persisted. + */ +export async function impact( + this: LocalContext, + flags: ImpactFlags, + file: string, + deps: ImpactDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + readDatasetInput: readDatasetInputImpl = readDatasetInput, + getDatasetsAffectedByUpdate: getAffectedImpl = getDatasetsAffectedByUpdate, + saveDatasetDryRun: dryRunImpl = saveDatasetDryRun, + } = deps; + + const format = flags.json ? ("json" as const) : flags.format; + const { process, writer: baseWriter } = this; + const writer = muteStatusWriter(baseWriter, { + muted: format === "json" || format === "csv", + }); + + try { + const variables = readDatasetInputImpl(file); + const config = loadConfigImpl(); + + const affected = await getAffectedImpl(config, variables); + + const rows: AffectedDataset[] = []; + for (const demat of affected?.dematerializedDatasets ?? []) { + if (demat.dataset) { + rows.push({ + name: demat.dataset.name, + id: demat.dataset.id, + impact: "dematerialized", + }); + } + } + for (const demat of affected?.editForwardDematerializedDatasets ?? []) { + if (demat.dataset) { + rows.push({ + name: demat.dataset.name, + id: demat.dataset.id, + impact: "edit-forward-dematerialized", + }); + } + } + + if (format === "json") { + writer.write(JSON.stringify(rows, null, 2)); + } else if (format === "csv") { + writer.write(renderAsCSV(rows)); + } else { + writer.write(formatTable(rows, tableColumns)); + } + + // Surface compile/validation errors on stderr. The affected-datasets query + // does not return errors, so we obtain them from the preflight dry-run. + const dryRunResult = await dryRunImpl(config, variables); + for (const errDs of dryRunResult?.errorDatasets ?? []) { + writer.error(`Error in ${errDs.datasetName}: ${errDs.text}`); + } + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const impactCommand = buildCommand({ + loader: async () => impact, + parameters: { + positional: { + kind: "tuple", + parameters: [ + { + brief: "Path to dataset pipeline JSON file", + parse: String, + }, + ], + }, + flags: { + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv)", + optional: true, + }, + json: { + kind: "boolean", + brief: "Output as JSON (shorthand for --format=json)", + optional: true, + }, + }, + }, + docs: { + brief: "Show downstream datasets affected by saving a dataset pipeline", + }, +}); diff --git a/src/commands/dataset/index.ts b/src/commands/dataset/index.ts index fd6d4e6..59aa75a 100644 --- a/src/commands/dataset/index.ts +++ b/src/commands/dataset/index.ts @@ -1,4 +1,6 @@ import { buildRouteMap } from "@stricli/core"; +import { dryRunCommand } from "./dry-run"; +import { impactCommand } from "./impact"; import { listCommand } from "./list"; import { viewCommand } from "./view"; @@ -6,6 +8,8 @@ export const datasetRoutes = buildRouteMap({ routes: { list: listCommand, view: viewCommand, + "dry-run": dryRunCommand, + impact: impactCommand, }, docs: { brief: "View observe datasets", @@ -13,8 +17,10 @@ export const datasetRoutes = buildRouteMap({ "View and manage datasets in Observe", "", "Commands:", - " list List datasets in Observe", - " view View details of a dataset", + " list List datasets in Observe", + " view View details of a dataset", + " dry-run Dry-run saving a dataset pipeline (nothing is persisted)", + " impact Show downstream datasets affected by saving a dataset pipeline", ].join("\n"), }, }); diff --git a/src/commands/dataset/input.test.ts b/src/commands/dataset/input.test.ts new file mode 100644 index 0000000..ccaabe2 --- /dev/null +++ b/src/commands/dataset/input.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, test } from "bun:test"; +import { readDatasetInput } from "./input"; + +function withFile(contents: string) { + return { readFileSync: () => contents } as Parameters< + typeof readDatasetInput + >[1]; +} + +describe("readDatasetInput", () => { + test("maps name->label and stageID->id, deriving outputStage", () => { + const json = JSON.stringify({ + workspaceId: "42379913", + dataset: { name: "MyDataset" }, + query: { + stageQueries: [ + { stageID: "a", pipeline: "filter true" }, + { stageID: "main", pipeline: "make_col x:1" }, + ], + }, + }); + + const vars = readDatasetInput("input.json", withFile(json)); + + expect(vars).toEqual({ + workspaceId: "42379913", + dataset: { label: "MyDataset" }, + query: { + outputStage: "main", + stages: [ + { id: "a", pipeline: "filter true", input: [] }, + { id: "main", pipeline: "make_col x:1", input: [] }, + ], + }, + }); + }); + + test("throws a read error when the file is missing", () => { + expect(() => + readDatasetInput("missing.json", { + readFileSync: () => { + throw new Error("ENOENT"); + }, + }), + ).toThrow(/could not read file "missing.json"/); + }); + + test("throws a parse error on malformed JSON", () => { + expect(() => readDatasetInput("bad.json", withFile("not json"))).toThrow( + /could not parse JSON/, + ); + }); + + test("throws a validation error when stageQueries is empty", () => { + const json = JSON.stringify({ + workspaceId: "1", + dataset: { name: "X" }, + query: { stageQueries: [] }, + }); + expect(() => readDatasetInput("input.json", withFile(json))).toThrow( + /at least one stage/, + ); + }); +}); diff --git a/src/commands/dataset/input.ts b/src/commands/dataset/input.ts new file mode 100644 index 0000000..c786ef5 --- /dev/null +++ b/src/commands/dataset/input.ts @@ -0,0 +1,100 @@ +import * as fs from "node:fs"; +import { z } from "zod"; +import type { + DatasetInput, + MultiStageQueryInput, +} from "../../gql/generated/graphql"; + +/** + * Shape of the JSON file accepted by `dataset dry-run` and `dataset impact`. + * Mirrors the input contract of the legacy Go CLI: + * + * { "workspaceId": "...", + * "dataset": { "name": "..." }, + * "query": { "stageQueries": [{ "stageID": "...", "pipeline": "..." }] } } + */ +const StageQuerySchema = z.object({ + stageID: z.string().min(1, "query.stageQueries[].stageID is required"), + pipeline: z.string(), +}); + +const DatasetCmdInputSchema = z.object({ + workspaceId: z.string().min(1, "workspaceId is required"), + dataset: z.object({ + name: z.string().min(1, "dataset.name is required"), + }), + query: z.object({ + stageQueries: z + .array(StageQuerySchema) + .min(1, "query.stageQueries must contain at least one stage"), + }), +}); + +export type DatasetCmdInput = z.infer; + +/** GraphQL variables derived from a parsed input file. */ +export interface DatasetCmdVariables { + workspaceId: string; + dataset: DatasetInput; + query: MultiStageQueryInput; +} + +/** + * Read and parse a dataset command input file, then map it onto the GraphQL + * schema types. The input's `dataset.name` becomes `DatasetInput.label`, and + * each `stageQueries[].stageID` becomes a `StageQueryInput.id`. The pipeline's + * `outputStage` defaults to the id of the last stage, matching how the server + * treats the terminal stage as the dataset output. + */ +export function readDatasetInput( + filePath: string, + deps: { readFileSync?: typeof fs.readFileSync } = {}, +): DatasetCmdVariables { + const { readFileSync = fs.readFileSync } = deps; + + let raw: string; + try { + raw = readFileSync(filePath, "utf-8"); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`could not read file "${filePath}": ${message}`, { + cause: error, + }); + } + + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`could not parse JSON from "${filePath}": ${message}`, { + cause: error, + }); + } + + const result = DatasetCmdInputSchema.safeParse(parsed); + if (!result.success) { + throw new Error( + `invalid dataset input: ${result.error.issues + .map((i) => i.message) + .join(", ")}`, + ); + } + const input = result.data; + + const stages = input.query.stageQueries.map((stage) => ({ + id: stage.stageID, + pipeline: stage.pipeline, + input: [], + })); + // The terminal stage is the dataset's output stage. The schema guarantees at + // least one stage, so `lastStage` is always defined here. + const lastStage = stages[stages.length - 1]; + const outputStage = lastStage ? lastStage.id : ""; + + return { + workspaceId: input.workspaceId, + dataset: { label: input.dataset.name }, + query: { outputStage, stages }, + }; +} diff --git a/src/gql/dataset/datasets-affected-by-update.graphql b/src/gql/dataset/datasets-affected-by-update.graphql new file mode 100644 index 0000000..691bb19 --- /dev/null +++ b/src/gql/dataset/datasets-affected-by-update.graphql @@ -0,0 +1,24 @@ +query DatasetsAffectedByUpdate( + $workspaceId: ObjectId + $dataset: DatasetInput! + $query: MultiStageQueryInput +) { + getDatasetsAffectedByDatasetUpdate( + workspaceId: $workspaceId + dataset: $dataset + query: $query + ) { + dematerializedDatasets { + dataset { + id + name + } + } + editForwardDematerializedDatasets { + dataset { + id + name + } + } + } +} diff --git a/src/gql/dataset/datasets-affected-by-update.ts b/src/gql/dataset/datasets-affected-by-update.ts new file mode 100644 index 0000000..2567738 --- /dev/null +++ b/src/gql/dataset/datasets-affected-by-update.ts @@ -0,0 +1,28 @@ +import type { Config } from "../../lib/config"; +import { + DatasetsAffectedByUpdateDocument, + type DatasetsAffectedByUpdateQuery, + type DatasetsAffectedByUpdateQueryVariables, +} from "../generated/graphql"; +import { executeGraphQL } from "../gql-request"; + +/** The DatasetsAffectedByDatasetUpdateResult returned by the impact query. */ +export type DatasetsAffectedResult = + DatasetsAffectedByUpdateQuery["getDatasetsAffectedByDatasetUpdate"]; + +/** + * Compute the downstream datasets that would be affected (dematerialized) if + * the given dataset were saved with the supplied pipeline, without persisting + * anything. + */ +export async function getDatasetsAffectedByUpdate( + config: Config, + variables: DatasetsAffectedByUpdateQueryVariables, +): Promise { + const response = await executeGraphQL( + config, + DatasetsAffectedByUpdateDocument, + variables, + ); + return response.data.getDatasetsAffectedByDatasetUpdate; +} diff --git a/src/gql/dataset/save-dataset-dry-run.graphql b/src/gql/dataset/save-dataset-dry-run.graphql new file mode 100644 index 0000000..bf19af5 --- /dev/null +++ b/src/gql/dataset/save-dataset-dry-run.graphql @@ -0,0 +1,28 @@ +mutation SaveDatasetDryRun( + $workspaceId: ObjectId! + $dataset: DatasetInput! + $query: MultiStageQueryInput! +) { + saveDataset( + workspaceId: $workspaceId + dataset: $dataset + query: $query + dependencyHandling: { saveMode: PreflightDatasetAndDependencies } + ) { + dataset { + id + name + } + dematerializedDatasets { + dataset { + id + name + } + } + errorDatasets { + datasetId + datasetName + text + } + } +} diff --git a/src/gql/dataset/save-dataset-dry-run.ts b/src/gql/dataset/save-dataset-dry-run.ts new file mode 100644 index 0000000..5024174 --- /dev/null +++ b/src/gql/dataset/save-dataset-dry-run.ts @@ -0,0 +1,29 @@ +import type { Config } from "../../lib/config"; +import { + SaveDatasetDryRunDocument, + type SaveDatasetDryRunMutation, + type SaveDatasetDryRunMutationVariables, +} from "../generated/graphql"; +import { executeGraphQL } from "../gql-request"; + +/** The DatasetSaveResult returned from a dry-run saveDataset. */ +export type DryRunSaveResult = SaveDatasetDryRunMutation["saveDataset"]; + +/** + * Dry-run a dataset pipeline save. Uses `saveDataset` with + * `dependencyHandling.saveMode = PreflightDatasetAndDependencies`, which + * computes what the save would do without persisting anything: the dataset + * that would be saved, datasets that would be dematerialized/rematerialized, + * and any compile/validation errors in affected datasets. + */ +export async function saveDatasetDryRun( + config: Config, + variables: SaveDatasetDryRunMutationVariables, +): Promise { + const response = await executeGraphQL( + config, + SaveDatasetDryRunDocument, + variables, + ); + return response.data.saveDataset; +}