diff --git a/AGENTS.md b/AGENTS.md index 0198696..0616c50 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -42,6 +42,7 @@ src/ │ │ └── tracing/ # Trace Explorer (install, view) │ ├── dataset/ # Dataset commands (list, view) │ ├── datastream/ # Datastream commands (create, list, view, update) +│ ├── fleet/ # observe-agent fleet commands (status, host, versions, auth) │ ├── ingest-token/ # Ingest token commands (create, list, view, update) │ ├── metric/ # Metric commands (list, view) │ ├── skill/ # AI agent skill commands (list, view) diff --git a/README.md b/README.md index a7f6baa..c10be83 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,10 @@ To update installed skills after edits in this repo, run `npx skills update`. | `observe datastream view` | View a datastream by ID | | `observe datastream update` | Update a datastream | | `observe datastream-token check-status` | Poll a datastream token until ingest data arrives | +| `observe fleet status` | Current observe-agent inventory (newest first) | +| `observe fleet host` | Event history for one host, incl. agent start time | +| `observe fleet versions` | Agent version distribution across the fleet | +| `observe fleet auth` | Agent auth-check status (failures first) | | `observe cli install` | Configure shell integration (PATH, completions) | | `observe cli uninstall` | Remove shell integration | | `observe cli upgrade` | Upgrade to the latest version | diff --git a/src/app.ts b/src/app.ts index a51b600..e07efe5 100644 --- a/src/app.ts +++ b/src/app.ts @@ -9,6 +9,7 @@ import { cliRoutes } from "./commands/cli/index.js"; import { contentRoutes } from "./commands/content/index.js"; import { datasetRoutes } from "./commands/dataset/index.js"; import { datastreamRoutes } from "./commands/datastream/index.js"; +import { fleetRoutes } from "./commands/fleet/index.js"; import { helpCommand } from "./commands/help.js"; import { ingestTokenRoutes } from "./commands/ingest-token/index.js"; import { metricRoutes } from "./commands/metric/index.js"; @@ -36,6 +37,7 @@ export const routes = buildRouteMap({ "data-connection": dataConnectionRoutes, datastream: datastreamRoutes, "datastream-token": datastreamTokenRoutes, + fleet: fleetRoutes, cli: cliRoutes, }, defaultCommand: "help", diff --git a/src/commands/fleet/auth.ts b/src/commands/fleet/auth.ts new file mode 100644 index 0000000..6635c02 --- /dev/null +++ b/src/commands/fleet/auth.ts @@ -0,0 +1,70 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { loadConfig } from "../../lib/config"; +import { formatFleetError } from "./format-error"; +import { muteStatusWriter } from "../../lib/writer"; +import { runFleetQuery, type RunFleetQueryDeps } from "./run-query"; +import { writeFleetResult, type FleetOutputFormat } from "./format-output"; +import { FLEET_AUTH_PIPELINE } from "./pipelines"; + +const DEFAULT_WINDOW = "20m"; + +interface FleetAuthFlags { + window: string; + format?: Exclude; +} + +export interface FleetAuthDeps extends RunFleetQueryDeps { + loadConfig?: typeof loadConfig; +} + +export async function auth( + this: LocalContext, + flags: FleetAuthFlags, + deps: FleetAuthDeps = {}, +): Promise { + const { loadConfig: loadConfigImpl = loadConfig, datasetQueryOutput } = deps; + const format: FleetOutputFormat = flags.format ?? "table"; + const { process, writer: rawWriter } = this; + const writer = muteStatusWriter(rawWriter, { + muted: format !== "table", + }); + + try { + const config = loadConfigImpl(); + writer.info("Querying fleet auth status..."); + const result = await runFleetQuery( + { config, pipeline: FLEET_AUTH_PIPELINE, window: flags.window }, + { datasetQueryOutput }, + ); + writeFleetResult(writer, result, format); + } catch (error) { + writer.error(`fleet auth failed: ${formatFleetError(error)}`); + process.exit(1); + } +} + +export const authCommand = buildCommand({ + loader: async () => auth, + parameters: { + positional: { kind: "tuple", parameters: [] }, + flags: { + window: { + kind: "parsed", + parse: String, + brief: "Time window for the query (Go duration, e.g. 20m, 24h, 168h)", + default: DEFAULT_WINDOW, + }, + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: table)", + optional: true, + }, + }, + aliases: { w: "window" }, + }, + docs: { + brief: "Auth-check status across the fleet (failures first)", + }, +}); diff --git a/src/commands/fleet/format-error.ts b/src/commands/fleet/format-error.ts new file mode 100644 index 0000000..e064b09 --- /dev/null +++ b/src/commands/fleet/format-error.ts @@ -0,0 +1,18 @@ +import { GqlApiError } from "../../gql/gql-request"; + +/** + * Format an error thrown from the fleet query path into a display string. + * Fleet talks only to the GraphQL query service, so we surface `GqlApiError` + * with its HTTP status and fall back to the message for everything else. + * (Unlike the shared `formatApiError`, this avoids the REST client so the + * GraphQL-only fleet commands don't pull in generated REST runtime types.) + */ +export function formatFleetError(error: unknown): string { + if (error instanceof GqlApiError) { + return `API Error (${error.statusCode}): ${error.message}`; + } + if (error instanceof Error) { + return error.message; + } + return String(error); +} diff --git a/src/commands/fleet/format-output.ts b/src/commands/fleet/format-output.ts new file mode 100644 index 0000000..da862b5 --- /dev/null +++ b/src/commands/fleet/format-output.ts @@ -0,0 +1,91 @@ +import type { Writer } from "../../lib/writer"; +import type { FleetQueryResult, FleetRow } from "./run-query"; +import type { GqlDatasetQueryField } from "../../gql/dataset/dataset-query-output"; +import { DataType } from "../../gql/generated/graphql"; +import { formatTable, type ColumnDef } from "../../lib/formatters/table"; +import { valueToString } from "../../lib/formatters/value"; +import { renderAsCSV } from "../../lib/formatters/csv"; +import { cyan, green, muted, red, yellow } from "../../lib/formatters/colors"; + +export type FleetOutputFormat = "table" | "json" | "csv"; + +/** + * Render a parsed fleet query result in the requested format. Column order is + * preserved from the OPAL `pick_col`/output schema (matching the Go CLI's + * table columns), and row order from the OPAL `sort`. `table` is the default + * human-readable view; `json` and `csv` emit the raw row records. + */ +export function writeFleetResult( + writer: Writer, + result: FleetQueryResult, + format: FleetOutputFormat, +): void { + const { headers, fields, rows } = result; + + if (format === "csv") { + writer.write(renderAsCSV(rows)); + return; + } + + if (format === "json") { + writer.write(JSON.stringify(rows, null, 2)); + return; + } + + if (rows.length === 0) { + writer.info("No results"); + return; + } + + const fieldMap = new Map(); + for (const field of fields) { + if (field.name) fieldMap.set(field.name, field); + } + + const columns: ColumnDef[] = headers.map((h) => ({ + header: h, + accessorFn: (row) => row[h], + format: getFieldFormatter(fieldMap.get(h)), + maxLines: 3, + })); + + writer.write("\n" + formatTable(rows, columns)); + writer.info(`\n${rows.length} row(s)`); +} + +/** + * Get a formatter for a field based on its result type, mirroring the + * coloring used by the `query` command so fleet tables look consistent. + */ +function getFieldFormatter( + field?: GqlDatasetQueryField, +): ((value: unknown) => string) | undefined { + if (!field) return undefined; + + switch (field.type.tag) { + case DataType.Int64: + case DataType.Float64: + return (v) => cyan(valueToString(v)); + case DataType.Bool: + return (v) => (isTruthyBool(v) ? green("true") : red("false")); + case DataType.Timestamp: + return (v) => muted(valueToString(v)); + case DataType.Object: + case DataType.Variant: + case DataType.Array: + return (v) => yellow(valueToString(v)); + default: + return undefined; + } +} + +/** + * The server encodes scalar values as strings in PaginatedResults, so "false" + * arrives as a truthy string. Treat the string "true" (or a real boolean) as + * true; everything else is false. + */ +function isTruthyBool(value: unknown): boolean { + if (typeof value === "boolean") return value; + if (typeof value === "string") return value.toLowerCase() === "true"; + return Boolean(value); +} diff --git a/src/commands/fleet/host.ts b/src/commands/fleet/host.ts new file mode 100644 index 0000000..fe4fbbb --- /dev/null +++ b/src/commands/fleet/host.ts @@ -0,0 +1,79 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { loadConfig } from "../../lib/config"; +import { formatFleetError } from "./format-error"; +import { muteStatusWriter } from "../../lib/writer"; +import { runFleetQuery, type RunFleetQueryDeps } from "./run-query"; +import { writeFleetResult, type FleetOutputFormat } from "./format-output"; +import { fleetHostPipeline } from "./pipelines"; + +const DEFAULT_WINDOW = "20m"; + +interface FleetHostFlags { + window: string; + format?: Exclude; +} + +export interface FleetHostDeps extends RunFleetQueryDeps { + loadConfig?: typeof loadConfig; +} + +export async function host( + this: LocalContext, + flags: FleetHostFlags, + hostname: string, + deps: FleetHostDeps = {}, +): Promise { + const { loadConfig: loadConfigImpl = loadConfig, datasetQueryOutput } = deps; + const format: FleetOutputFormat = flags.format ?? "table"; + const { process, writer: rawWriter } = this; + const writer = muteStatusWriter(rawWriter, { + muted: format !== "table", + }); + + try { + const config = loadConfigImpl(); + writer.info(`Querying fleet history for host ${hostname}...`); + const result = await runFleetQuery( + { config, pipeline: fleetHostPipeline(hostname), window: flags.window }, + { datasetQueryOutput }, + ); + writeFleetResult(writer, result, format); + } catch (error) { + writer.error(`fleet host failed: ${formatFleetError(error)}`); + process.exit(1); + } +} + +export const hostCommand = buildCommand({ + loader: async () => host, + parameters: { + positional: { + kind: "tuple", + parameters: [ + { + brief: "Hostname (identifiers[\"host.name\"]) to show event history for", + parse: String, + }, + ], + }, + flags: { + window: { + kind: "parsed", + parse: String, + brief: "Time window for the query (Go duration, e.g. 20m, 24h, 168h)", + default: DEFAULT_WINDOW, + }, + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: table)", + optional: true, + }, + }, + aliases: { w: "window" }, + }, + docs: { + brief: "Event history for one host, including agent start time", + }, +}); diff --git a/src/commands/fleet/index.ts b/src/commands/fleet/index.ts new file mode 100644 index 0000000..ef4bb16 --- /dev/null +++ b/src/commands/fleet/index.ts @@ -0,0 +1,29 @@ +import { buildRouteMap } from "@stricli/core"; +import { statusCommand } from "./status"; +import { hostCommand } from "./host"; +import { versionsCommand } from "./versions"; +import { authCommand } from "./auth"; + +export const fleetRoutes = buildRouteMap({ + routes: { + status: statusCommand, + host: hostCommand, + versions: versionsCommand, + auth: authCommand, + }, + docs: { + brief: "Query fleet status of observe-agent instances", + fullDescription: [ + "Query the observe-agent fleet from the Default.Observe Agent/Events", + "dataset. Each command runs an OPAL query over the AgentLifecycleEvent", + "stream; use --window to set the lookback (Go duration, e.g. 20m, 24h,", + "168h; default 20m).", + "", + "Commands:", + " status Current agent inventory (newest first)", + " host Event history for one host, incl. agent start time", + " versions Version distribution across the fleet", + " auth Auth-check status (failures first)", + ].join("\n"), + }, +}); diff --git a/src/commands/fleet/pipelines.ts b/src/commands/fleet/pipelines.ts new file mode 100644 index 0000000..6ee9ab7 --- /dev/null +++ b/src/commands/fleet/pipelines.ts @@ -0,0 +1,38 @@ +/** + * OPAL pipelines for the fleet commands, ported verbatim from the Go CLI + * (`cmd_fleet.go`). Each runs against the Observe Agent Events dataset and + * filters to `AgentLifecycleEvent` rows. The `pick_col` order defines the + * output columns and the trailing `sort` defines row order — both are + * preserved by the query path, so output matches the Go CLI. + */ + +/** `fleet status` — current agent inventory, newest first. */ +export const FLEET_STATUS_PIPELINE = + 'filter kind = "AgentLifecycleEvent" | make_col host:string(identifiers["host.name"]), env:string(identifiers["observe.agent.environment"]), version:string(facets["observe.agent.version"]), instance_id:string(identifiers["observe.agent.instance.id"]), data_obj:parse_json(data) | make_col auth_ok:bool(data_obj.authCheck.passed) | pick_col valid_from, host, env, version, auth_ok, instance_id | sort desc(valid_from)'; + +/** `fleet versions` — version distribution across the fleet. */ +export const FLEET_VERSIONS_PIPELINE = + 'filter kind = "AgentLifecycleEvent" | make_col host:string(identifiers["host.name"]), env:string(identifiers["observe.agent.environment"]), version:string(facets["observe.agent.version"]) | pick_col valid_from, host, env, version | sort asc(version), asc(host)'; + +/** `fleet auth` — auth-check status, failures first. */ +export const FLEET_AUTH_PIPELINE = + 'filter kind = "AgentLifecycleEvent" | make_col host:string(identifiers["host.name"]), env:string(identifiers["observe.agent.environment"]), version:string(facets["observe.agent.version"]), data_obj:parse_json(data) | make_col auth_ok:bool(data_obj.authCheck.passed), auth_code:int64(data_obj.authCheck.responseCode), auth_url:string(data_obj.authCheck.url) | pick_col valid_from, host, env, version, auth_ok, auth_code, auth_url | sort asc(auth_ok), desc(valid_from)'; + +/** + * `fleet host ` — event history for one host, newest first, + * including agent start time. The hostname is interpolated into a quoted OPAL + * string literal, matching the Go CLI's `%q` formatting. + */ +export function fleetHostPipeline(hostname: string): string { + return `filter kind = "AgentLifecycleEvent" | filter string(identifiers["host.name"]) = ${quoteOpalString(hostname)} | make_col host:string(identifiers["host.name"]), env:string(identifiers["observe.agent.environment"]), version:string(facets["observe.agent.version"]), data_obj:parse_json(data) | make_col auth_ok:bool(data_obj.authCheck.passed), start_time:from_nanoseconds(int64(data_obj.agentStartTime)*1000000000) | pick_col valid_from, host, env, version, auth_ok, start_time | sort desc(valid_from)`; +} + +/** + * Quote a value as an OPAL double-quoted string literal, escaping characters + * the way Go's `%q` does for the cases that matter here (backslash and + * double-quote). Prevents a hostname from breaking out of the filter. + */ +function quoteOpalString(value: string): string { + const escaped = value.replace(/\\/g, "\\\\").replace(/"/g, '\\"'); + return `"${escaped}"`; +} diff --git a/src/commands/fleet/run-query.test.ts b/src/commands/fleet/run-query.test.ts new file mode 100644 index 0000000..6481a2c --- /dev/null +++ b/src/commands/fleet/run-query.test.ts @@ -0,0 +1,125 @@ +import { describe, expect, test } from "bun:test"; +import { + parseDuration, + getFleetTimeRange, + runFleetQuery, + FLEET_DATASET_PATH, +} from "./run-query"; + +describe("parseDuration", () => { + test("parses single-unit Go durations", () => { + expect(parseDuration("20m")).toBe(20 * 60_000); + expect(parseDuration("24h")).toBe(24 * 3_600_000); + expect(parseDuration("168h")).toBe(168 * 3_600_000); + expect(parseDuration("90s")).toBe(90 * 1_000); + }); + + test("parses compound durations", () => { + expect(parseDuration("1h30m")).toBe(3_600_000 + 30 * 60_000); + }); + + test("parses fractional durations", () => { + expect(parseDuration("1.5h")).toBe(1.5 * 3_600_000); + }); + + test("rejects invalid input", () => { + expect(() => parseDuration("")).toThrow(); + expect(() => parseDuration("abc")).toThrow(); + expect(() => parseDuration("10")).toThrow(); + expect(() => parseDuration("10x")).toThrow(); + }); +}); + +describe("getFleetTimeRange", () => { + test("end is now-15s truncated to the minute; start is end-window", () => { + // 2026-06-24T12:00:40Z -> minus 15s = 12:00:25 -> truncate to 12:00:00 + const now = new Date("2026-06-24T12:00:40.000Z"); + const { startTime, endTime } = getFleetTimeRange("20m", now); + expect(endTime).toBe("2026-06-24T12:00:00.000Z"); + expect(startTime).toBe("2026-06-24T11:40:00.000Z"); + }); + + test("a window under 15s after truncation can land in the prior minute", () => { + // 2026-06-24T12:00:10Z -> minus 15s = 11:59:55 -> truncate to 11:59:00 + const now = new Date("2026-06-24T12:00:10.000Z"); + const { endTime } = getFleetTimeRange("24h", now); + expect(endTime).toBe("2026-06-24T11:59:00.000Z"); + }); +}); + +describe("runFleetQuery", () => { + function makeDataResult() { + return [ + { + queryId: "q1", + stageId: "query", + resultKind: "ResultKindData", + paginatedResults: { + columns: [ + ["2026-06-24T12:00:00Z", "2026-06-24T11:00:00Z"], + ["host-a", "host-b"], + ], + }, + resultSchema: { + fieldList: [ + { name: "valid_from", type: { tag: "Timestamp" } }, + { name: "host", type: { tag: "String" } }, + ], + }, + errors: null, + }, + ]; + } + + test("sends a datasetPath input and parses column-major rows in order", async () => { + let captured: unknown; + const datasetQueryOutput = (args: unknown) => { + captured = args; + return Promise.resolve(makeDataResult() as never); + }; + + const result = await runFleetQuery( + { + config: { customerId: "c", domain: "d", token: "t" }, + pipeline: "filter true", + window: "20m", + }, + { datasetQueryOutput: datasetQueryOutput as never }, + ); + + const vars = (captured as { variables: { query: { inputs: unknown[] }[] } }) + .variables; + expect(vars.query[0]?.inputs[0]).toEqual({ + inputName: "_", + datasetPath: FLEET_DATASET_PATH, + }); + + expect(result.headers).toEqual(["valid_from", "host"]); + expect(result.rows).toEqual([ + { valid_from: "2026-06-24T12:00:00Z", host: "host-a" }, + { valid_from: "2026-06-24T11:00:00Z", host: "host-b" }, + ]); + }); + + test("throws when a task result reports errors", async () => { + const datasetQueryOutput = () => + Promise.resolve([ + { + stageId: "query", + resultKind: "ResultKindData", + errors: [{ message: "boom", text: "boom" }], + }, + ] as never); + + await expect( + runFleetQuery( + { + config: { customerId: "c", domain: "d", token: "t" }, + pipeline: "filter true", + window: "20m", + }, + { datasetQueryOutput: datasetQueryOutput as never }, + ), + ).rejects.toThrow("boom"); + }); +}); diff --git a/src/commands/fleet/run-query.ts b/src/commands/fleet/run-query.ts new file mode 100644 index 0000000..644caa7 --- /dev/null +++ b/src/commands/fleet/run-query.ts @@ -0,0 +1,201 @@ +import type { Config } from "../../lib/config"; +import { + datasetQueryOutput as datasetQueryOutputDefault, + type GqlDatasetQueryField, + type PaginatedResults, +} from "../../gql/dataset/dataset-query-output"; +import { + type StageInput, + ResultKind, + VariantEncodingMode, + RollupMode, +} from "../../gql/generated/graphql"; +import { transposeColumnsToRows } from "../../lib/transpose"; + +/** + * The dataset that the observe-agent fleet writes lifecycle events to. Fleet + * does not use meta CRUD: it runs an OPAL query against this dataset by path + * (format `projectlabel.datasetlabel`), mirroring the Go CLI's + * `Default.Observe Agent/Events` target. + */ +export const FLEET_DATASET_PATH = "Default.Observe Agent/Events"; + +const DEFAULT_LIMIT = 1000; + +/** A single result row keyed by the OPAL output column name. */ +export type FleetRow = Record; + +/** Parsed fleet query result: ordered field metadata plus row records. */ +export interface FleetQueryResult { + fields: GqlDatasetQueryField[]; + headers: string[]; + rows: FleetRow[]; +} + +export interface RunFleetQueryDeps { + datasetQueryOutput?: typeof datasetQueryOutputDefault; +} + +/** + * Execute a fleet OPAL pipeline against the Observe Agent Events dataset and + * return the rows in OPAL output order. Reuses the new CLI's existing OPAL + * execution path (`datasetQueryOutput` → `StageInput`) rather than the Go + * CLI's `/v1/meta/export/query` REST endpoint; column order and sorting are + * carried by the OPAL `pick_col`/`sort` operators in `pipeline`. + */ +export async function runFleetQuery( + { + config, + pipeline, + window, + }: { + config: Config; + pipeline: string; + window: string; + }, + deps: RunFleetQueryDeps = {}, +): Promise { + const { datasetQueryOutput = datasetQueryOutputDefault } = deps; + + const stage: StageInput = { + stageId: "query", + pipeline, + inputs: [ + { + inputName: "_", + datasetPath: FLEET_DATASET_PATH, + }, + ], + pagination: { + initialRows: `${DEFAULT_LIMIT}`, + }, + presentation: { + resultKinds: [ResultKind.ResultKindSchema, ResultKind.ResultKindData], + rollup: {}, + rollupMode: RollupMode.Never, + variantEncodingMode: VariantEncodingMode.String, + }, + bestEffortBinding: true, + }; + + const taskResults = await datasetQueryOutput({ + config, + variables: { + query: [stage], + params: getFleetTimeRange(window), + }, + }); + + const taskResultErrors = taskResults.filter((r) => !!r.errors?.length); + if (taskResultErrors.length > 0) { + const message = taskResultErrors + .map((e) => e.errors?.map((err) => err.message).join(", ")) + .join(", "); + throw new Error(message); + } + + const stageTaskResults = taskResults.filter((r) => r.stageId === stage.stageId); + const stageDataResult = stageTaskResults.find( + (r) => r.resultKind === ResultKind.ResultKindData && r.paginatedResults != null, + ); + const stageSchemaResult = stageDataResult?.resultSchema + ? stageDataResult + : stageTaskResults.find((r) => r.resultKind === ResultKind.ResultKindSchema); + + if (!stageSchemaResult) { + throw new Error("No schema returned"); + } + + const schemaErrors = stageSchemaResult.errors?.map((e) => e.text); + if (schemaErrors && schemaErrors.length > 0) { + throw new Error(schemaErrors.join("; ")); + } + + const fields = stageSchemaResult.resultSchema?.fieldList ?? []; + const headers = fields + .map((f) => f.name) + .filter((n): n is string => typeof n === "string"); + + const paginated = stageDataResult?.paginatedResults as + | PaginatedResults + | undefined; + const rowArrays = transposeColumnsToRows(paginated?.columns); + const rows: FleetRow[] = rowArrays.map((row) => + Object.fromEntries(headers.map((h, i) => [h, row[i]])), + ); + + return { fields, headers, rows }; +} + +const DURATION_UNIT_MS: Record = { + ns: 1e-6, + us: 1e-3, + µs: 1e-3, + ms: 1, + s: 1_000, + m: 60_000, + h: 3_600_000, +}; + +/** + * Parse a Go-style duration string (e.g. `20m`, `24h`, `168h`, `1h30m`, + * `90s`) into milliseconds. Mirrors the subset of `time.ParseDuration` the + * Go fleet command accepts via `--window`. + */ +export function parseDuration(value: string): number { + const trimmed = value.trim(); + if (trimmed === "") { + throw new Error(`Invalid duration: "${value}"`); + } + + let rest = trimmed; + let sign = 1; + if (rest.startsWith("+")) { + rest = rest.slice(1); + } else if (rest.startsWith("-")) { + sign = -1; + rest = rest.slice(1); + } + + const partRe = /(\d+(?:\.\d+)?)(ns|us|µs|ms|s|m|h)/g; + let total = 0; + let consumed = 0; + let match: RegExpExecArray | null; + while ((match = partRe.exec(rest)) !== null) { + const amount = parseFloat(match[1] ?? "0"); + const unit = match[2] ?? ""; + const factor = DURATION_UNIT_MS[unit]; + if (factor === undefined) { + throw new Error(`Invalid duration unit: "${unit}"`); + } + total += amount * factor; + consumed += match[0].length; + } + + if (consumed !== rest.length || consumed === 0) { + throw new Error( + `Invalid duration: "${value}". Expected a Go duration like "20m", "24h", or "168h".`, + ); + } + + return sign * total; +} + +/** + * Compute the fleet query time window, matching the Go CLI: the end time is + * 15 seconds before now, truncated down to the minute; the start time is the + * end time minus the requested window. Returns ISO-8601 (RFC 3339) bounds. + */ +export function getFleetTimeRange( + window: string, + now: Date = new Date(), +): { startTime: string; endTime: string } { + const windowMs = parseDuration(window); + const nowMs = Math.floor(now.getTime() / 1000) * 1000; + const endMs = Math.floor((nowMs - 15_000) / 60_000) * 60_000; + const startMs = endMs - windowMs; + return { + startTime: new Date(startMs).toISOString(), + endTime: new Date(endMs).toISOString(), + }; +} diff --git a/src/commands/fleet/status.test.ts b/src/commands/fleet/status.test.ts new file mode 100644 index 0000000..f7155dd --- /dev/null +++ b/src/commands/fleet/status.test.ts @@ -0,0 +1,174 @@ +import { afterAll, beforeAll, describe, expect, test } from "bun:test"; +import type { LocalContext } from "../../context"; +import { createWriter } from "../../lib/writer"; +import { status } from "./status"; +import { host } from "./host"; + +const config = { customerId: "c", domain: "observeinc.com", token: "t" }; + +const loadConfig = (() => config) as never; + +function makeResult() { + return [ + { + stageId: "query", + resultKind: "ResultKindData", + paginatedResults: { + columns: [ + ["2026-06-24T12:00:00Z"], + ["host-a"], + ["prod"], + ["1.2.3"], + ["true"], + ["inst-1"], + ], + }, + resultSchema: { + fieldList: [ + { name: "valid_from", type: { tag: "Timestamp" } }, + { name: "host", type: { tag: "String" } }, + { name: "env", type: { tag: "String" } }, + { name: "version", type: { tag: "String" } }, + { name: "auth_ok", type: { tag: "Bool" } }, + { name: "instance_id", type: { tag: "String" } }, + ], + }, + errors: null, + }, + ]; +} + +const datasetQueryOutput = (() => Promise.resolve(makeResult() as never)) as never; + +let previousNoColor: string | undefined; +let previousForceColor: string | undefined; + +beforeAll(() => { + previousNoColor = process.env.NO_COLOR; + previousForceColor = process.env.FORCE_COLOR; + process.env.NO_COLOR = "1"; + process.env.FORCE_COLOR = "0"; +}); + +afterAll(() => { + if (previousNoColor === undefined) delete process.env.NO_COLOR; + else process.env.NO_COLOR = previousNoColor; + if (previousForceColor === undefined) delete process.env.FORCE_COLOR; + else process.env.FORCE_COLOR = previousForceColor; +}); + +function createMockContext() { + const stdout: string[] = []; + const stderr: string[] = []; + let exitCode: number | undefined; + + const processMock = { + stdout: { + write: (msg: string) => { + stdout.push(msg); + return true; + }, + }, + stderr: { + write: (msg: string) => { + stderr.push(msg); + return true; + }, + }, + exit: (code?: number) => { + exitCode = code ?? 0; + throw new Error("process.exit"); + }, + }; + + const context = { + process: processMock, + writer: createWriter({ process: processMock }), + } as unknown as LocalContext; + + return { context, stdout, stderr, getExitCode: () => exitCode }; +} + +describe("fleet status", () => { + test("emits JSON rows in OPAL column order with --format json", async () => { + const { context, stdout } = createMockContext(); + await status.call( + context, + { window: "20m", format: "json" }, + { loadConfig, datasetQueryOutput }, + ); + + const output = JSON.parse(stdout.join("")); + expect(output).toEqual([ + { + valid_from: "2026-06-24T12:00:00Z", + host: "host-a", + env: "prod", + version: "1.2.3", + auth_ok: "true", + instance_id: "inst-1", + }, + ]); + }); + + test("emits CSV with header row in column order", async () => { + const { context, stdout } = createMockContext(); + await status.call( + context, + { window: "20m", format: "csv" }, + { loadConfig, datasetQueryOutput }, + ); + + const out = stdout.join(""); + expect(out.split("\n")[0]).toBe( + "valid_from,host,env,version,auth_ok,instance_id", + ); + expect(out).toContain("host-a"); + }); + + test("renders a table by default", async () => { + const { context, stdout } = createMockContext(); + await status.call( + context, + { window: "20m" }, + { loadConfig, datasetQueryOutput }, + ); + + const out = stdout.join(""); + expect(out).toContain("host"); + expect(out).toContain("host-a"); + expect(out).toContain("1 row(s)"); + }); + + test("exits 1 when the query path throws", async () => { + const failing = (() => Promise.reject(new Error("query boom"))) as never; + const { context, stderr, getExitCode } = createMockContext(); + try { + await status.call( + context, + { window: "20m" }, + { loadConfig, datasetQueryOutput: failing }, + ); + throw new Error("expected process.exit"); + } catch (error) { + expect((error as Error).message).toBe("process.exit"); + } + expect(getExitCode()).toBe(1); + expect(stderr.join("")).toContain("fleet status failed"); + }); +}); + +describe("fleet host", () => { + test("passes the hostname through and outputs rows", async () => { + const { context, stdout } = createMockContext(); + await host.call( + context, + { window: "24h", format: "json" }, + "host-a", + { loadConfig, datasetQueryOutput }, + ); + + const output = JSON.parse(stdout.join("")); + expect(output[0].host).toBe("host-a"); + }); +}); diff --git a/src/commands/fleet/status.ts b/src/commands/fleet/status.ts new file mode 100644 index 0000000..d49f379 --- /dev/null +++ b/src/commands/fleet/status.ts @@ -0,0 +1,70 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { loadConfig } from "../../lib/config"; +import { formatFleetError } from "./format-error"; +import { muteStatusWriter } from "../../lib/writer"; +import { runFleetQuery, type RunFleetQueryDeps } from "./run-query"; +import { writeFleetResult, type FleetOutputFormat } from "./format-output"; +import { FLEET_STATUS_PIPELINE } from "./pipelines"; + +const DEFAULT_WINDOW = "20m"; + +interface FleetStatusFlags { + window: string; + format?: Exclude; +} + +export interface FleetStatusDeps extends RunFleetQueryDeps { + loadConfig?: typeof loadConfig; +} + +export async function status( + this: LocalContext, + flags: FleetStatusFlags, + deps: FleetStatusDeps = {}, +): Promise { + const { loadConfig: loadConfigImpl = loadConfig, datasetQueryOutput } = deps; + const format: FleetOutputFormat = flags.format ?? "table"; + const { process, writer: rawWriter } = this; + const writer = muteStatusWriter(rawWriter, { + muted: format !== "table", + }); + + try { + const config = loadConfigImpl(); + writer.info("Querying fleet status..."); + const result = await runFleetQuery( + { config, pipeline: FLEET_STATUS_PIPELINE, window: flags.window }, + { datasetQueryOutput }, + ); + writeFleetResult(writer, result, format); + } catch (error) { + writer.error(`fleet status failed: ${formatFleetError(error)}`); + process.exit(1); + } +} + +export const statusCommand = buildCommand({ + loader: async () => status, + parameters: { + positional: { kind: "tuple", parameters: [] }, + flags: { + window: { + kind: "parsed", + parse: String, + brief: "Time window for the query (Go duration, e.g. 20m, 24h, 168h)", + default: DEFAULT_WINDOW, + }, + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: table)", + optional: true, + }, + }, + aliases: { w: "window" }, + }, + docs: { + brief: "Current agent inventory (host, env, version, auth, instance id)", + }, +}); diff --git a/src/commands/fleet/versions.ts b/src/commands/fleet/versions.ts new file mode 100644 index 0000000..35868f6 --- /dev/null +++ b/src/commands/fleet/versions.ts @@ -0,0 +1,70 @@ +import { buildCommand } from "@stricli/core"; +import type { LocalContext } from "../../context"; +import { loadConfig } from "../../lib/config"; +import { formatFleetError } from "./format-error"; +import { muteStatusWriter } from "../../lib/writer"; +import { runFleetQuery, type RunFleetQueryDeps } from "./run-query"; +import { writeFleetResult, type FleetOutputFormat } from "./format-output"; +import { FLEET_VERSIONS_PIPELINE } from "./pipelines"; + +const DEFAULT_WINDOW = "20m"; + +interface FleetVersionsFlags { + window: string; + format?: Exclude; +} + +export interface FleetVersionsDeps extends RunFleetQueryDeps { + loadConfig?: typeof loadConfig; +} + +export async function versions( + this: LocalContext, + flags: FleetVersionsFlags, + deps: FleetVersionsDeps = {}, +): Promise { + const { loadConfig: loadConfigImpl = loadConfig, datasetQueryOutput } = deps; + const format: FleetOutputFormat = flags.format ?? "table"; + const { process, writer: rawWriter } = this; + const writer = muteStatusWriter(rawWriter, { + muted: format !== "table", + }); + + try { + const config = loadConfigImpl(); + writer.info("Querying fleet versions..."); + const result = await runFleetQuery( + { config, pipeline: FLEET_VERSIONS_PIPELINE, window: flags.window }, + { datasetQueryOutput }, + ); + writeFleetResult(writer, result, format); + } catch (error) { + writer.error(`fleet versions failed: ${formatFleetError(error)}`); + process.exit(1); + } +} + +export const versionsCommand = buildCommand({ + loader: async () => versions, + parameters: { + positional: { kind: "tuple", parameters: [] }, + flags: { + window: { + kind: "parsed", + parse: String, + brief: "Time window for the query (Go duration, e.g. 20m, 24h, 168h)", + default: DEFAULT_WINDOW, + }, + format: { + kind: "enum", + values: ["json", "csv"], + brief: "Output format (json, csv) (default: table)", + optional: true, + }, + }, + aliases: { w: "window" }, + }, + docs: { + brief: "Version distribution across the fleet", + }, +});