Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ src/
│ ├── datastream/ # Datastream commands (create, list, view, update)
│ ├── ingest-token/ # Ingest token commands (create, list, view, update)
│ ├── metric/ # Metric commands (list, view)
│ ├── opal/ # OPAL commands (check, verbs, functions, validate-ingest)
│ ├── skill/ # AI agent skill commands (list, view)
│ ├── tag-key/ # Tag key commands (list)
│ ├── tag-value/ # Tag value commands (list)
Expand All @@ -57,6 +58,7 @@ src/
│ ├── datastream/ # Datastream queries/mutations
│ ├── ingest-token/ # Ingest token queries/mutations
│ ├── metric/ # Metric queries
│ ├── opal/ # OPAL validation/inspection queries
│ ├── workspace/ # Workspace queries
│ ├── gql-request.ts # GraphQL client/executor
│ └── gql-codegen.config.ts # Codegen configuration
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ To update installed skills after edits in this repo, run `npx skills update`.
| `observe datastream view` | View a datastream by ID |
| `observe datastream update` | Update a datastream |
| `observe datastream-token check-status` | Poll a datastream token until ingest data arrives |
| `observe opal check` | Validate an OPAL pipeline |
| `observe opal verbs` | List all OPAL verbs |
| `observe opal functions` | List all OPAL functions |
| `observe opal validate-ingest` | Validate an OPAL ingest filter expression |
| `observe cli install` | Configure shell integration (PATH, completions) |
| `observe cli uninstall` | Remove shell integration |
| `observe cli upgrade` | Upgrade to the latest version |
Expand Down
2 changes: 2 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { datastreamRoutes } from "./commands/datastream/index.js";
import { helpCommand } from "./commands/help.js";
import { ingestTokenRoutes } from "./commands/ingest-token/index.js";
import { metricRoutes } from "./commands/metric/index.js";
import { opalRoutes } from "./commands/opal/index.js";
import { queryCommand } from "./commands/query.js";
import { skillRoutes } from "./commands/skill/index.js";
import { tagKeyRoutes } from "./commands/tag-key/index.js";
Expand All @@ -36,6 +37,7 @@ export const routes = buildRouteMap({
"data-connection": dataConnectionRoutes,
datastream: datastreamRoutes,
"datastream-token": datastreamTokenRoutes,
opal: opalRoutes,
cli: cliRoutes,
},
defaultCommand: "help",
Expand Down
249 changes: 249 additions & 0 deletions src/commands/opal/check.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
mock,
test,
} from "bun:test";
import { resolve } from "node:path";
import type { LocalContext } from "../../context";
import { createWriter } from "../../lib/writer";

const repoRoot = resolve(import.meta.dir, "../../..");
const gqlModulePath = resolve(repoRoot, "src/gql/opal/check-queries.ts");

const loadConfigFn = mock(() => ({
customerId: "test-customer",
token: "test-token",
domain: "observeinc.com",
}));

const checkQueriesFn = mock((_config: unknown, _variables: unknown) =>
Promise.resolve<unknown>({
parsedPipeline: { errors: [], warnings: [] },
resultSchema: { fieldList: [{ name: "timestamp" }, { name: "log" }] },
}),
);

let check: (typeof import("./check"))["check"];

let previousNoColor: string | undefined;
let previousForceColor: string | undefined;

const deps = {
loadConfig: loadConfigFn,
checkQueries: checkQueriesFn,
} as Parameters<(typeof import("./check"))["check"]>[2];

beforeAll(async () => {
previousNoColor = process.env.NO_COLOR;
previousForceColor = process.env.FORCE_COLOR;
process.env.NO_COLOR = "1";
process.env.FORCE_COLOR = "0";

void mock.module(gqlModulePath, () => ({
checkQueries: checkQueriesFn,
}));

const mod = await import("./check.ts");
check = mod.check;
});

afterAll(() => {
mock.restore();
if (previousNoColor === undefined) {
delete process.env.NO_COLOR;
} else {
process.env.NO_COLOR = previousNoColor;
}
if (previousForceColor === undefined) {
delete process.env.FORCE_COLOR;
} else {
process.env.FORCE_COLOR = previousForceColor;
}
});

function createMockContext() {
const stdout: string[] = [];
const stderr: string[] = [];
let exitCode: number | undefined;

const processMock = {
stdout: {
write: (msg: string) => {
stdout.push(msg);
return true;
},
},
stderr: {
write: (msg: string) => {
stderr.push(msg);
return true;
},
},
exit: (code?: number) => {
exitCode = code ?? 0;
throw new Error("process.exit");
},
};

const context = {
process: processMock,
writer: createWriter({ process: processMock }),
} as unknown as LocalContext;

return { context, stdout, stderr, getExitCode: () => exitCode };
}

describe("opal check", () => {
beforeEach(() => {
loadConfigFn.mockClear();
checkQueriesFn.mockClear();
});

test("prints OK and schema fields on success", async () => {
const { context, stdout } = createMockContext();
await check.call(context, {}, "filter true", deps);

expect(checkQueriesFn).toHaveBeenCalledTimes(1);
const [, variables] = checkQueriesFn.mock.calls[0]!;
expect(variables).toEqual({
queries: {
outputStage: "stage-1",
stages: [{ id: "stage-1", pipeline: "filter true", input: [] }],
},
});
const out = stdout.join("");
expect(out).toContain("OK");
expect(out).toContain("timestamp");
expect(out).toContain("log");
});

test("prints ERROR row:col: text and exits 1 on errors", async () => {
checkQueriesFn.mockImplementationOnce(() =>
Promise.resolve<unknown>({
parsedPipeline: {
errors: [{ col: "1", row: "2", text: "not_a_verb" }],
warnings: [],
},
resultSchema: null,
}),
);

const { context, stdout, getExitCode } = createMockContext();
try {
await check.call(context, {}, "not_a_verb 123", deps);
throw new Error("expected process.exit");
} catch (error) {
expect((error as Error).message).toBe("process.exit");
}
expect(getExitCode()).toBe(1);
const out = stdout.join("");
expect(out).toContain("ERROR 2:1: not_a_verb");
expect(out).not.toContain("OK");
});

test("treats empty-text errors as OK (compilation requires input dataset)", async () => {
checkQueriesFn.mockImplementationOnce(() =>
Promise.resolve<unknown>({
parsedPipeline: {
errors: [{ col: "0", row: "0", text: "" }],
warnings: [],
},
resultSchema: { fieldList: [] },
}),
);

const { context, stdout } = createMockContext();
await check.call(context, {}, "filter true", deps);
expect(stdout.join("")).toContain("OK");
});

test("prints WARN kind row:col then OK on warnings only", async () => {
checkQueriesFn.mockImplementationOnce(() =>
Promise.resolve<unknown>({
parsedPipeline: {
errors: [],
warnings: [{ kind: "deprecation", symbol: { col: "3", row: "4" } }],
},
resultSchema: { fieldList: [] },
}),
);

const { context, stdout } = createMockContext();
await check.call(context, {}, "filter true", deps);
const out = stdout.join("");
expect(out).toContain("WARN deprecation 4:3");
expect(out).toContain("OK");
});

test("reads pipeline from --file", async () => {
const readFile = mock(() => "filter true");
const { context, stdout } = createMockContext();
await check.call(context, { file: "/tmp/p.opal" }, undefined, {
...deps,
readFile,
});

expect(readFile).toHaveBeenCalledWith("/tmp/p.opal");
const [, variables] = checkQueriesFn.mock.calls[0]!;
expect(
(variables as { queries: { stages: { pipeline: string }[] } }).queries
.stages[0]!.pipeline,
).toBe("filter true");
expect(stdout.join("")).toContain("OK");
});

test("exits 1 with usage error when no pipeline and no file", async () => {
const { context, stderr, getExitCode } = createMockContext();
try {
await check.call(context, {}, undefined, deps);
throw new Error("expected process.exit");
} catch (error) {
expect((error as Error).message).toBe("process.exit");
}
expect(getExitCode()).toBe(1);
expect(stderr.join("")).toContain("usage:");
expect(checkQueriesFn).not.toHaveBeenCalled();
});

test("exits 1 when --file cannot be read", async () => {
const readFile = mock(() => {
throw new Error("ENOENT");
});
const { context, stderr, getExitCode } = createMockContext();
try {
await check.call(context, { file: "/nope" }, undefined, {
...deps,
readFile,
});
throw new Error("expected process.exit");
} catch (error) {
expect((error as Error).message).toBe("process.exit");
}
expect(getExitCode()).toBe(1);
expect(stderr.join("")).toContain("could not read file");
expect(checkQueriesFn).not.toHaveBeenCalled();
});

test("exits 1 on API error", async () => {
checkQueriesFn.mockImplementationOnce(() => {
const err = new Error("Server error");
err.name = "GqlApiError";
(err as unknown as { statusCode: number }).statusCode = 500;
throw err;
});

const { context, stderr, getExitCode } = createMockContext();
try {
await check.call(context, {}, "filter true", deps);
throw new Error("expected process.exit");
} catch (error) {
expect((error as Error).message).toBe("process.exit");
}
expect(getExitCode()).toBe(1);
expect(stderr.join("")).toContain("Error");
});
});
Loading