From f6e8c1afe360290d61ba2c0a168f2da52f4f88c0 Mon Sep 17 00:00:00 2001 From: Aaron Brewbaker Date: Wed, 24 Jun 2026 09:14:59 -0400 Subject: [PATCH 1/2] feat(opal): port opal validation from Go CLI Port the opal validation/inspection commands from the deprecated Go Observe CLI: check, verbs, functions, and validate-ingest. Output contracts (ERROR row:col, WARN kind, OK + schema fields, exit codes) mirror the Go behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 2 + README.md | 4 + src/app.ts | 2 + src/commands/opal/check.test.ts | 249 ++++++++++++++++++++ src/commands/opal/check.ts | 141 +++++++++++ src/commands/opal/functions.test.ts | 167 +++++++++++++ src/commands/opal/functions.ts | 92 ++++++++ src/commands/opal/index.ts | 26 ++ src/commands/opal/validate-ingest.test.ts | 182 ++++++++++++++ src/commands/opal/validate-ingest.ts | 90 +++++++ src/commands/opal/verbs.test.ts | 163 +++++++++++++ src/commands/opal/verbs.ts | 89 +++++++ src/gql/opal/check-queries.graphql | 23 ++ src/gql/opal/check-queries.ts | 21 ++ src/gql/opal/validate-ingest-filter.graphql | 8 + src/gql/opal/validate-ingest-filter.ts | 27 +++ src/gql/opal/verbs-and-functions.graphql | 15 ++ src/gql/opal/verbs-and-functions.ts | 17 ++ 18 files changed, 1318 insertions(+) create mode 100644 src/commands/opal/check.test.ts create mode 100644 src/commands/opal/check.ts create mode 100644 src/commands/opal/functions.test.ts create mode 100644 src/commands/opal/functions.ts create mode 100644 src/commands/opal/index.ts create mode 100644 src/commands/opal/validate-ingest.test.ts create mode 100644 src/commands/opal/validate-ingest.ts create mode 100644 src/commands/opal/verbs.test.ts create mode 100644 src/commands/opal/verbs.ts create mode 100644 src/gql/opal/check-queries.graphql create mode 100644 src/gql/opal/check-queries.ts create mode 100644 src/gql/opal/validate-ingest-filter.graphql create mode 100644 src/gql/opal/validate-ingest-filter.ts create mode 100644 src/gql/opal/verbs-and-functions.graphql create mode 100644 src/gql/opal/verbs-and-functions.ts diff --git a/AGENTS.md b/AGENTS.md index 0198696..199419d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,6 +44,7 @@ src/ │ ├── datastream/ # Datastream commands (create, list, view, update) │ ├── ingest-token/ # Ingest token commands (create, list, view, update) │ ├── metric/ # Metric commands (list, view) +│ ├── opal/ # OPAL commands (check, verbs, functions, validate-ingest) │ ├── skill/ # AI agent skill commands (list, view) │ ├── tag-key/ # Tag key commands (list) │ ├── tag-value/ # Tag value commands (list) @@ -57,6 +58,7 @@ src/ │ ├── datastream/ # Datastream queries/mutations │ ├── ingest-token/ # Ingest token queries/mutations │ ├── metric/ # Metric queries +│ ├── opal/ # OPAL validation/inspection queries │ ├── workspace/ # Workspace queries │ ├── gql-request.ts # GraphQL client/executor │ └── gql-codegen.config.ts # Codegen configuration diff --git a/README.md b/README.md index a7f6baa..62c84f9 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,10 @@ To update installed skills after edits in this repo, run `npx skills update`. | `observe datastream view` | View a datastream by ID | | `observe datastream update` | Update a datastream | | `observe datastream-token check-status` | Poll a datastream token until ingest data arrives | +| `observe opal check` | Validate an OPAL pipeline | +| `observe opal verbs` | List all OPAL verbs | +| `observe opal functions` | List all OPAL functions | +| `observe opal validate-ingest` | Validate an OPAL ingest filter expression | | `observe cli install` | Configure shell integration (PATH, completions) | | `observe cli uninstall` | Remove shell integration | | `observe cli upgrade` | Upgrade to the latest version | diff --git a/src/app.ts b/src/app.ts index a51b600..582ae07 100644 --- a/src/app.ts +++ b/src/app.ts @@ -12,6 +12,7 @@ import { datastreamRoutes } from "./commands/datastream/index.js"; import { helpCommand } from "./commands/help.js"; import { ingestTokenRoutes } from "./commands/ingest-token/index.js"; import { metricRoutes } from "./commands/metric/index.js"; +import { opalRoutes } from "./commands/opal/index.js"; import { queryCommand } from "./commands/query.js"; import { skillRoutes } from "./commands/skill/index.js"; import { tagKeyRoutes } from "./commands/tag-key/index.js"; @@ -36,6 +37,7 @@ export const routes = buildRouteMap({ "data-connection": dataConnectionRoutes, datastream: datastreamRoutes, "datastream-token": datastreamTokenRoutes, + opal: opalRoutes, cli: cliRoutes, }, defaultCommand: "help", diff --git a/src/commands/opal/check.test.ts b/src/commands/opal/check.test.ts new file mode 100644 index 0000000..573ef1c --- /dev/null +++ b/src/commands/opal/check.test.ts @@ -0,0 +1,249 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const gqlModulePath = resolve(repoRoot, "src/gql/opal/check-queries.ts"); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const checkQueriesFn = mock((_config: unknown, _variables: unknown) => + Promise.resolve({ + parsedPipeline: { errors: [], warnings: [] }, + resultSchema: { fieldList: [{ name: "timestamp" }, { name: "log" }] }, + }), +); + +let check: (typeof import("./check"))["check"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + checkQueries: checkQueriesFn, +} as Parameters<(typeof import("./check"))["check"]>[2]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(gqlModulePath, () => ({ + checkQueries: checkQueriesFn, + })); + + const mod = await import("./check.ts"); + check = mod.check; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("opal check", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + checkQueriesFn.mockClear(); + }); + + test("prints OK and schema fields on success", async () => { + const { context, stdout } = createMockContext(); + await check.call(context, {}, "filter true", deps); + + expect(checkQueriesFn).toHaveBeenCalledTimes(1); + const [, variables] = checkQueriesFn.mock.calls[0]!; + expect(variables).toEqual({ + queries: { + outputStage: "stage-1", + stages: [{ stageID: "stage-1", pipeline: "filter true", input: [] }], + }, + }); + const out = stdout.join(""); + expect(out).toContain("OK"); + expect(out).toContain("timestamp"); + expect(out).toContain("log"); + }); + + test("prints ERROR row:col: text and exits 1 on errors", async () => { + checkQueriesFn.mockImplementationOnce(() => + Promise.resolve({ + parsedPipeline: { + errors: [{ col: "1", row: "2", text: "not_a_verb" }], + warnings: [], + }, + resultSchema: null, + }), + ); + + const { context, stdout, getExitCode } = createMockContext(); + try { + await check.call(context, {}, "not_a_verb 123", deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + const out = stdout.join(""); + expect(out).toContain("ERROR 2:1: not_a_verb"); + expect(out).not.toContain("OK"); + }); + + test("treats empty-text errors as OK (compilation requires input dataset)", async () => { + checkQueriesFn.mockImplementationOnce(() => + Promise.resolve({ + parsedPipeline: { + errors: [{ col: "0", row: "0", text: "" }], + warnings: [], + }, + resultSchema: { fieldList: [] }, + }), + ); + + const { context, stdout } = createMockContext(); + await check.call(context, {}, "filter true", deps); + expect(stdout.join("")).toContain("OK"); + }); + + test("prints WARN kind row:col then OK on warnings only", async () => { + checkQueriesFn.mockImplementationOnce(() => + Promise.resolve({ + parsedPipeline: { + errors: [], + warnings: [{ kind: "deprecation", symbol: { col: "3", row: "4" } }], + }, + resultSchema: { fieldList: [] }, + }), + ); + + const { context, stdout } = createMockContext(); + await check.call(context, {}, "filter true", deps); + const out = stdout.join(""); + expect(out).toContain("WARN deprecation 4:3"); + expect(out).toContain("OK"); + }); + + test("reads pipeline from --file", async () => { + const readFile = mock(() => "filter true"); + const { context, stdout } = createMockContext(); + await check.call(context, { file: "/tmp/p.opal" }, undefined, { + ...deps, + readFile, + }); + + expect(readFile).toHaveBeenCalledWith("/tmp/p.opal"); + const [, variables] = checkQueriesFn.mock.calls[0]!; + expect( + (variables as { queries: { stages: { pipeline: string }[] } }).queries + .stages[0]!.pipeline, + ).toBe("filter true"); + expect(stdout.join("")).toContain("OK"); + }); + + test("exits 1 with usage error when no pipeline and no file", async () => { + const { context, stderr, getExitCode } = createMockContext(); + try { + await check.call(context, {}, undefined, deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("usage:"); + expect(checkQueriesFn).not.toHaveBeenCalled(); + }); + + test("exits 1 when --file cannot be read", async () => { + const readFile = mock(() => { + throw new Error("ENOENT"); + }); + const { context, stderr, getExitCode } = createMockContext(); + try { + await check.call(context, { file: "/nope" }, undefined, { + ...deps, + readFile, + }); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("could not read file"); + expect(checkQueriesFn).not.toHaveBeenCalled(); + }); + + test("exits 1 on API error", async () => { + checkQueriesFn.mockImplementationOnce(() => { + const err = new Error("Server error"); + err.name = "GqlApiError"; + (err as unknown as { statusCode: number }).statusCode = 500; + throw err; + }); + + const { context, stderr, getExitCode } = createMockContext(); + try { + await check.call(context, {}, "filter true", deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("Error"); + }); +}); diff --git a/src/commands/opal/check.ts b/src/commands/opal/check.ts new file mode 100644 index 0000000..6d433f3 --- /dev/null +++ b/src/commands/opal/check.ts @@ -0,0 +1,141 @@ +import { buildCommand } from "@stricli/core"; +import { readFileSync } from "node:fs"; +import type { LocalContext } from "../../context"; +import { checkQueries } from "../../gql/opal/check-queries"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; + +interface CheckFlags { + file?: string; +} + +export interface CheckDeps { + loadConfig?: typeof loadConfig; + checkQueries?: typeof checkQueries; + readFile?: (path: string) => string; +} + +export async function check( + this: LocalContext, + flags: CheckFlags, + pipelineArg: string | undefined, + deps: CheckDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + checkQueries: checkQueriesImpl = checkQueries, + readFile: readFileImpl = (path: string) => readFileSync(path, "utf8"), + } = deps; + const { process, writer } = this; + + try { + let pipeline: string; + if (flags.file) { + try { + pipeline = readFileImpl(flags.file); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + writer.error( + `opal check: could not read file "${flags.file}": ${message}`, + ); + process.exit(1); + return; + } + } else if (pipelineArg !== undefined) { + pipeline = pipelineArg; + } else { + writer.error( + "usage: observe opal check | observe opal check --file ", + ); + process.exit(1); + return; + } + + const config = loadConfigImpl(); + const result = await checkQueriesImpl(config, { + queries: { + outputStage: "stage-1", + stages: [ + { + stageID: "stage-1", + pipeline, + input: [], + }, + ], + }, + }); + + const parsed = result?.parsedPipeline; + + // Errors with empty text mean "compilation requires an input dataset" and + // are not real syntax errors in the pipeline itself — suppress them. + const errors = (parsed?.errors ?? []).filter((e) => (e.text ?? "") !== ""); + const warnings = parsed?.warnings ?? []; + + if (errors.length > 0) { + for (const e of errors) { + writer.write(`ERROR ${e.row}:${e.col}: ${e.text}`); + } + process.exit(1); + return; + } + + for (const w of warnings) { + writer.write(`WARN ${w.kind} ${w.symbol?.row}:${w.symbol?.col}`); + } + + writer.write("OK"); + for (const field of result?.resultSchema?.fieldList ?? []) { + writer.write(` ${field.name}`); + } + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const checkCommand = buildCommand({ + loader: async () => check, + parameters: { + positional: { + kind: "tuple", + parameters: [ + { + brief: "OPAL pipeline to validate (omit when using --file)", + parse: String, + optional: true, + }, + ], + }, + flags: { + file: { + kind: "parsed", + parse: String, + brief: "Read OPAL pipeline from file instead of argument", + optional: true, + }, + }, + aliases: { + f: "file", + }, + }, + docs: { + brief: "Validate an OPAL pipeline", + fullDescription: [ + "Validate an OPAL pipeline via the checkQueries API.", + "", + "On error, prints each as 'ERROR row:col: message' and exits 1.", + "On warnings only, prints 'WARN kind row:col' and exits 0.", + "On success, prints 'OK' followed by the result schema field names.", + "", + "Examples:", + " observe opal check 'filter true'", + " observe opal check --file ./pipeline.opal", + ].join("\n"), + }, +}); diff --git a/src/commands/opal/functions.test.ts b/src/commands/opal/functions.test.ts new file mode 100644 index 0000000..30d39a2 --- /dev/null +++ b/src/commands/opal/functions.test.ts @@ -0,0 +1,167 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const gqlModulePath = resolve(repoRoot, "src/gql/opal/verbs-and-functions.ts"); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const verbsAndFunctionsFn = mock((_config: unknown) => + Promise.resolve({ + verbs: [], + functions: [ + { + name: "sum", + description: "Sum values", + categories: ["Aggregation"], + returnType: "float64", + }, + { + name: "count", + description: "Count rows", + categories: ["Aggregation"], + returnType: "int64", + }, + { + name: "avg", + description: "Average values", + categories: ["Aggregation"], + returnType: "float64", + }, + ], + }), +); + +let functions: (typeof import("./functions"))["functions"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + verbsAndFunctions: verbsAndFunctionsFn, +} as Parameters<(typeof import("./functions"))["functions"]>[1]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(gqlModulePath, () => ({ + verbsAndFunctions: verbsAndFunctionsFn, + })); + + const mod = await import("./functions.ts"); + functions = mod.functions; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("opal functions", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + verbsAndFunctionsFn.mockClear(); + }); + + test("outputs TSV sorted by name including return type", async () => { + const { context, stdout } = createMockContext(); + await functions.call(context, {}, deps); + + const lines = stdout.join("").trimEnd().split("\n"); + expect(lines).toEqual([ + "avg\tAggregation\tfloat64\tAverage values", + "count\tAggregation\tint64\tCount rows", + "sum\tAggregation\tfloat64\tSum values", + ]); + }); + + test("supports --format json", async () => { + const { context, stdout } = createMockContext(); + await functions.call(context, { format: "json" }, deps); + const out = JSON.parse(stdout.join("")); + expect(out[0]).toEqual({ + name: "avg", + categories: "Aggregation", + returnType: "float64", + description: "Average values", + }); + }); + + test("exits 1 on API error", async () => { + verbsAndFunctionsFn.mockImplementationOnce(() => { + const err = new Error("Server error"); + err.name = "GqlApiError"; + (err as unknown as { statusCode: number }).statusCode = 500; + throw err; + }); + + const { context, stderr, getExitCode } = createMockContext(); + try { + await functions.call(context, {}, deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("Error"); + }); +}); diff --git a/src/commands/opal/functions.ts b/src/commands/opal/functions.ts new file mode 100644 index 0000000..9219a48 --- /dev/null +++ b/src/commands/opal/functions.ts @@ -0,0 +1,92 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { verbsAndFunctions } from "../../gql/opal/verbs-and-functions"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; +import { renderAsCSV } from "../../lib/formatters/csv"; + +type OutputFormat = "json" | "csv"; + +interface FunctionsFlags { + format?: OutputFormat; +} + +export interface FunctionsDeps { + loadConfig?: typeof loadConfig; + verbsAndFunctions?: typeof verbsAndFunctions; +} + +export async function functions( + this: LocalContext, + flags: FunctionsFlags, + deps: FunctionsDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + verbsAndFunctions: verbsAndFunctionsImpl = verbsAndFunctions, + } = deps; + const { process, writer } = this; + + try { + const config = loadConfigImpl(); + const result = await verbsAndFunctionsImpl(config); + + const rows = [...result.functions] + .map((f) => ({ + name: f.name ?? "", + categories: (f.categories ?? []).join(","), + returnType: f.returnType ?? "", + description: f.description ?? "", + })) + .sort((a, b) => (a.name < b.name ? -1 : a.name > b.name ? 1 : 0)); + + if (flags.format === "json") { + writer.write(JSON.stringify(rows, null, 2)); + return; + } + if (flags.format === "csv") { + writer.write(renderAsCSV(rows)); + return; + } + + for (const f of rows) { + writer.write( + `${f.name}\t${f.categories}\t${f.returnType}\t${f.description}`, + ); + } + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const functionsCommand = buildCommand({ + loader: async () => functions, + parameters: { + positional: { + kind: "tuple", + parameters: [], + }, + flags: { + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: TSV)", + optional: true, + }, + }, + }, + docs: { + brief: "List all OPAL functions", + fullDescription: [ + "List all OPAL functions sorted by name.", + "", + "Default output is tab-separated: name, categories, return type, description.", + ].join("\n"), + }, +}); diff --git a/src/commands/opal/index.ts b/src/commands/opal/index.ts new file mode 100644 index 0000000..7483887 --- /dev/null +++ b/src/commands/opal/index.ts @@ -0,0 +1,26 @@ +import { buildRouteMap } from "@stricli/core"; +import { checkCommand } from "./check"; +import { verbsCommand } from "./verbs"; +import { functionsCommand } from "./functions"; +import { validateIngestCommand } from "./validate-ingest"; + +export const opalRoutes = buildRouteMap({ + routes: { + check: checkCommand, + verbs: verbsCommand, + functions: functionsCommand, + "validate-ingest": validateIngestCommand, + }, + docs: { + brief: "Validate and inspect OPAL pipelines and functions", + fullDescription: [ + "Validate and inspect OPAL pipelines, verbs, and functions.", + "", + "Commands:", + " check Validate an OPAL pipeline", + " verbs List all OPAL verbs", + " functions List all OPAL functions", + " validate-ingest Validate an OPAL ingest filter expression", + ].join("\n"), + }, +}); diff --git a/src/commands/opal/validate-ingest.test.ts b/src/commands/opal/validate-ingest.test.ts new file mode 100644 index 0000000..8dcad70 --- /dev/null +++ b/src/commands/opal/validate-ingest.test.ts @@ -0,0 +1,182 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const gqlModulePath = resolve( + repoRoot, + "src/gql/opal/validate-ingest-filter.ts", +); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const validateIngestFilterFn = mock( + (_config: unknown, _variables: unknown) => + Promise.resolve(null) as Promise, +); + +let validateIngest: (typeof import("./validate-ingest"))["validateIngest"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + validateIngestFilter: validateIngestFilterFn, +} as Parameters<(typeof import("./validate-ingest"))["validateIngest"]>[2]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(gqlModulePath, () => ({ + validateIngestFilter: validateIngestFilterFn, + })); + + const mod = await import("./validate-ingest.ts"); + validateIngest = mod.validateIngest; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("opal validate-ingest", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + validateIngestFilterFn.mockClear(); + }); + + test("prints OK on null (valid) response", async () => { + const { context, stdout } = createMockContext(); + await validateIngest.call( + context, + { dataset: "42918275" }, + "filter true", + deps, + ); + + const [, variables] = validateIngestFilterFn.mock.calls[0]!; + expect(variables).toEqual({ + pipeline: "filter true", + sourceDatasetID: "42918275", + }); + expect(stdout.join("")).toContain("OK"); + }); + + test("prints OK on empty array response", async () => { + validateIngestFilterFn.mockImplementationOnce(() => Promise.resolve([])); + const { context, stdout } = createMockContext(); + await validateIngest.call( + context, + { dataset: "42918275" }, + "filter true", + deps, + ); + expect(stdout.join("")).toContain("OK"); + }); + + test("prints ERROR: message and exits 1 on diagnostics", async () => { + validateIngestFilterFn.mockImplementationOnce(() => + Promise.resolve([{ message: '1,1: unknown verb "bad_filter"' }]), + ); + + const { context, stdout, getExitCode } = createMockContext(); + try { + await validateIngest.call( + context, + { dataset: "42918275" }, + "bad_filter", + deps, + ); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + const out = stdout.join(""); + expect(out).toContain('ERROR: 1,1: unknown verb "bad_filter"'); + expect(out).not.toContain("OK"); + }); + + test("exits 1 on API error", async () => { + validateIngestFilterFn.mockImplementationOnce(() => { + const err = new Error("Server error"); + err.name = "GqlApiError"; + (err as unknown as { statusCode: number }).statusCode = 500; + throw err; + }); + + const { context, stderr, getExitCode } = createMockContext(); + try { + await validateIngest.call( + context, + { dataset: "42918275" }, + "filter true", + deps, + ); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("Error"); + }); +}); diff --git a/src/commands/opal/validate-ingest.ts b/src/commands/opal/validate-ingest.ts new file mode 100644 index 0000000..d194936 --- /dev/null +++ b/src/commands/opal/validate-ingest.ts @@ -0,0 +1,90 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { validateIngestFilter } from "../../gql/opal/validate-ingest-filter"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; + +interface ValidateIngestFlags { + dataset: string; +} + +export interface ValidateIngestDeps { + loadConfig?: typeof loadConfig; + validateIngestFilter?: typeof validateIngestFilter; +} + +export async function validateIngest( + this: LocalContext, + flags: ValidateIngestFlags, + pipeline: string, + deps: ValidateIngestDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + validateIngestFilter: validateIngestFilterImpl = validateIngestFilter, + } = deps; + const { process, writer } = this; + + try { + const config = loadConfigImpl(); + const result = await validateIngestFilterImpl(config, { + pipeline, + sourceDatasetID: flags.dataset, + }); + + // A null result means valid. A non-empty array means errors. + const diagnostics = result ?? []; + if (diagnostics.length > 0) { + for (const d of diagnostics) { + writer.write(`ERROR: ${d.message}`); + } + process.exit(1); + return; + } + + writer.write("OK"); + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const validateIngestCommand = buildCommand({ + loader: async () => validateIngest, + parameters: { + positional: { + kind: "tuple", + parameters: [ + { + brief: "OPAL ingest filter expression to validate", + parse: String, + }, + ], + }, + flags: { + dataset: { + kind: "parsed", + parse: String, + brief: "Source dataset ID to validate the ingest filter against", + }, + }, + }, + docs: { + brief: "Validate an OPAL ingest filter expression", + fullDescription: [ + "Validate an OPAL ingest filter expression against a source dataset", + "via the validateIngestFilterExpression API.", + "", + "On error, prints each as 'ERROR: message' and exits 1.", + "On success, prints 'OK'.", + "", + "Example:", + " observe opal validate-ingest --dataset 42918275 'filter true'", + ].join("\n"), + }, +}); diff --git a/src/commands/opal/verbs.test.ts b/src/commands/opal/verbs.test.ts new file mode 100644 index 0000000..aab2c70 --- /dev/null +++ b/src/commands/opal/verbs.test.ts @@ -0,0 +1,163 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; +import { resolve } from "node:path"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; + +const repoRoot = resolve(import.meta.dir, "../../.."); +const gqlModulePath = resolve(repoRoot, "src/gql/opal/verbs-and-functions.ts"); + +const loadConfigFn = mock(() => ({ + customerId: "test-customer", + token: "test-token", + domain: "observeinc.com", +})); + +const verbsAndFunctionsFn = mock((_config: unknown) => + Promise.resolve({ + verbs: [ + { name: "filter", description: "Filter rows", categories: ["Filter"] }, + { + name: "aggregate", + description: "Aggregate rows", + categories: ["Metrics", "Aggregate"], + }, + { name: "limit", description: "Limit rows", categories: ["Filter"] }, + ], + functions: [], + }), +); + +let verbs: (typeof import("./verbs"))["verbs"]; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +const deps = { + loadConfig: loadConfigFn, + verbsAndFunctions: verbsAndFunctionsFn, +} as Parameters<(typeof import("./verbs"))["verbs"]>[1]; + +beforeAll(async () => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; + + void mock.module(gqlModulePath, () => ({ + verbsAndFunctions: verbsAndFunctionsFn, + })); + + const mod = await import("./verbs.ts"); + verbs = mod.verbs; +}); + +afterAll(() => { + mock.restore(); + if (previousNoColor === undefined) { + delete process.env.NO_COLOR; + } else { + process.env.NO_COLOR = previousNoColor; + } + if (previousForceColor === undefined) { + delete process.env.FORCE_COLOR; + } else { + process.env.FORCE_COLOR = previousForceColor; + } +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("opal verbs", () => { + beforeEach(() => { + loadConfigFn.mockClear(); + verbsAndFunctionsFn.mockClear(); + }); + + test("outputs TSV sorted by name with comma-joined categories", async () => { + const { context, stdout } = createMockContext(); + await verbs.call(context, {}, deps); + + const lines = stdout.join("").trimEnd().split("\n"); + expect(lines).toEqual([ + "aggregate\tMetrics,Aggregate\tAggregate rows", + "filter\tFilter\tFilter rows", + "limit\tFilter\tLimit rows", + ]); + }); + + test("supports --format json", async () => { + const { context, stdout } = createMockContext(); + await verbs.call(context, { format: "json" }, deps); + const out = JSON.parse(stdout.join("")); + expect(out[0]).toEqual({ + name: "aggregate", + categories: "Metrics,Aggregate", + description: "Aggregate rows", + }); + }); + + test("supports --format csv", async () => { + const { context, stdout } = createMockContext(); + await verbs.call(context, { format: "csv" }, deps); + const out = stdout.join(""); + expect(out).toContain("name,categories,description"); + expect(out).toContain('aggregate,"Metrics,Aggregate",Aggregate rows'); + }); + + test("exits 1 on API error", async () => { + verbsAndFunctionsFn.mockImplementationOnce(() => { + const err = new Error("Server error"); + err.name = "GqlApiError"; + (err as unknown as { statusCode: number }).statusCode = 500; + throw err; + }); + + const { context, stderr, getExitCode } = createMockContext(); + try { + await verbs.call(context, {}, deps); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("Error"); + }); +}); diff --git a/src/commands/opal/verbs.ts b/src/commands/opal/verbs.ts new file mode 100644 index 0000000..24881de --- /dev/null +++ b/src/commands/opal/verbs.ts @@ -0,0 +1,89 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { verbsAndFunctions } from "../../gql/opal/verbs-and-functions"; +import { GqlApiError } from "../../gql/gql-request"; +import { loadConfig } from "../../lib/config"; +import { renderAsCSV } from "../../lib/formatters/csv"; + +type OutputFormat = "json" | "csv"; + +interface VerbsFlags { + format?: OutputFormat; +} + +export interface VerbsDeps { + loadConfig?: typeof loadConfig; + verbsAndFunctions?: typeof verbsAndFunctions; +} + +export async function verbs( + this: LocalContext, + flags: VerbsFlags, + deps: VerbsDeps = {}, +): Promise { + const { + loadConfig: loadConfigImpl = loadConfig, + verbsAndFunctions: verbsAndFunctionsImpl = verbsAndFunctions, + } = deps; + const { process, writer } = this; + + try { + const config = loadConfigImpl(); + const result = await verbsAndFunctionsImpl(config); + + const rows = [...result.verbs] + .map((v) => ({ + name: v.name ?? "", + categories: (v.categories ?? []).join(","), + description: v.description ?? "", + })) + .sort((a, b) => (a.name < b.name ? -1 : a.name > b.name ? 1 : 0)); + + if (flags.format === "json") { + writer.write(JSON.stringify(rows, null, 2)); + return; + } + if (flags.format === "csv") { + writer.write(renderAsCSV(rows)); + return; + } + + for (const v of rows) { + writer.write(`${v.name}\t${v.categories}\t${v.description}`); + } + } catch (error) { + if (error instanceof GqlApiError) { + writer.error(`API Error (${error.statusCode}): ${error.message}`); + } else { + const message = error instanceof Error ? error.message : String(error); + writer.error(`Error: ${message}`); + } + process.exit(1); + } +} + +export const verbsCommand = buildCommand({ + loader: async () => verbs, + parameters: { + positional: { + kind: "tuple", + parameters: [], + }, + flags: { + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: TSV)", + optional: true, + }, + }, + }, + docs: { + brief: "List all OPAL verbs", + fullDescription: [ + "List all OPAL verbs sorted by name.", + "", + "Default output is tab-separated: name, categories, description.", + ].join("\n"), + }, +}); diff --git a/src/gql/opal/check-queries.graphql b/src/gql/opal/check-queries.graphql new file mode 100644 index 0000000..7941f5c --- /dev/null +++ b/src/gql/opal/check-queries.graphql @@ -0,0 +1,23 @@ +query CheckQueries($queries: MultiStageQueryInput!) { + checkQueries(queries: $queries) { + parsedPipeline { + errors { + col + row + text + } + warnings { + kind + symbol { + col + row + } + } + } + resultSchema { + fieldList { + name + } + } + } +} diff --git a/src/gql/opal/check-queries.ts b/src/gql/opal/check-queries.ts new file mode 100644 index 0000000..d1cdc3d --- /dev/null +++ b/src/gql/opal/check-queries.ts @@ -0,0 +1,21 @@ +import type { Config } from "../../lib/config"; +import { + CheckQueriesDocument, + type CheckQueriesQuery, + type CheckQueriesQueryVariables, +} from "../generated/graphql"; +import { executeGraphQL } from "../gql-request"; + +export type CheckQueriesResult = CheckQueriesQuery["checkQueries"][number]; + +export async function checkQueries( + config: Config, + variables: CheckQueriesQueryVariables, +): Promise { + const response = await executeGraphQL( + config, + CheckQueriesDocument, + variables, + ); + return response.data.checkQueries[0]; +} diff --git a/src/gql/opal/validate-ingest-filter.graphql b/src/gql/opal/validate-ingest-filter.graphql new file mode 100644 index 0000000..6531b74 --- /dev/null +++ b/src/gql/opal/validate-ingest-filter.graphql @@ -0,0 +1,8 @@ +query ValidateIngestFilter($pipeline: String!, $sourceDatasetID: ObjectId!) { + validateIngestFilterExpression( + pipeline: $pipeline + sourceDatasetID: $sourceDatasetID + ) { + message + } +} diff --git a/src/gql/opal/validate-ingest-filter.ts b/src/gql/opal/validate-ingest-filter.ts new file mode 100644 index 0000000..d6ba046 --- /dev/null +++ b/src/gql/opal/validate-ingest-filter.ts @@ -0,0 +1,27 @@ +import type { Config } from "../../lib/config"; +import { + ValidateIngestFilterDocument, + type ValidateIngestFilterQuery, + type ValidateIngestFilterQueryVariables, +} from "../generated/graphql"; +import { executeGraphQL } from "../gql-request"; + +export type IngestFilterDiagnostic = + ValidateIngestFilterQuery["validateIngestFilterExpression"] extends + | (infer T)[] + | null + | undefined + ? T + : never; + +export async function validateIngestFilter( + config: Config, + variables: ValidateIngestFilterQueryVariables, +): Promise { + const response = await executeGraphQL( + config, + ValidateIngestFilterDocument, + variables, + ); + return response.data.validateIngestFilterExpression; +} diff --git a/src/gql/opal/verbs-and-functions.graphql b/src/gql/opal/verbs-and-functions.graphql new file mode 100644 index 0000000..3e09b12 --- /dev/null +++ b/src/gql/opal/verbs-and-functions.graphql @@ -0,0 +1,15 @@ +query VerbsAndFunctions { + verbsAndFunctions { + verbs { + name + description + categories + } + functions { + name + description + categories + returnType + } + } +} diff --git a/src/gql/opal/verbs-and-functions.ts b/src/gql/opal/verbs-and-functions.ts new file mode 100644 index 0000000..b7a9f24 --- /dev/null +++ b/src/gql/opal/verbs-and-functions.ts @@ -0,0 +1,17 @@ +import type { Config } from "../../lib/config"; +import { + VerbsAndFunctionsDocument, + type VerbsAndFunctionsQuery, +} from "../generated/graphql"; +import { executeGraphQL } from "../gql-request"; + +export type VerbsAndFunctions = VerbsAndFunctionsQuery["verbsAndFunctions"]; +export type OpalVerb = VerbsAndFunctions["verbs"][number]; +export type OpalFunction = VerbsAndFunctions["functions"][number]; + +export async function verbsAndFunctions( + config: Config, +): Promise { + const response = await executeGraphQL(config, VerbsAndFunctionsDocument); + return response.data.verbsAndFunctions; +} From ef633316cbc1329eb88fe122435fa57114e9838c Mon Sep 17 00:00:00 2001 From: Aaron Brewbaker Date: Wed, 24 Jun 2026 11:45:42 -0400 Subject: [PATCH 2/2] fix(opal): use StageQueryInput.id (not stageID) in check stages Type-verified against the published Observe GraphQL schema (from observeinc/terraform-provider-observe): StageQueryInput's field is `id` (server binds it to StageID); `stageId` is deprecated. The prior `stageID` literal failed type-checking against the real schema. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/commands/opal/check.test.ts | 2 +- src/commands/opal/check.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands/opal/check.test.ts b/src/commands/opal/check.test.ts index 573ef1c..c6ee4b5 100644 --- a/src/commands/opal/check.test.ts +++ b/src/commands/opal/check.test.ts @@ -112,7 +112,7 @@ describe("opal check", () => { expect(variables).toEqual({ queries: { outputStage: "stage-1", - stages: [{ stageID: "stage-1", pipeline: "filter true", input: [] }], + stages: [{ id: "stage-1", pipeline: "filter true", input: [] }], }, }); const out = stdout.join(""); diff --git a/src/commands/opal/check.ts b/src/commands/opal/check.ts index 6d433f3..344e9b4 100644 --- a/src/commands/opal/check.ts +++ b/src/commands/opal/check.ts @@ -57,7 +57,7 @@ export async function check( outputStage: "stage-1", stages: [ { - stageID: "stage-1", + id: "stage-1", pipeline, input: [], },