From a0a1220c0d90b6c995721a869965d67e3d4ac667 Mon Sep 17 00:00:00 2001 From: "Ralph Sto. Domingo" <79431566+ralphstodomingo@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:34:35 +0800 Subject: [PATCH 1/7] fix: write session traces in headless serve mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Session tracing was implemented only at the terminal entrypoints — the TUI worker and `run` construct the tracer and feed it events, but `serve` had no trace wiring at all. Sessions driven over HTTP (e.g. the VS Code chat panel, which spawns `altimate serve` and POSTs to `/session/{id}/prompt_async`) never wrote `ses_*.json` files to `~/.local/share/altimate-code/traces/`: the only hook in the shared session pipeline is `Tracer.active?.logSpan`, and `Tracer.active` was never set in serve mode. - Extract the TUI worker's inline event-stream→trace logic into a shared `TraceConsumer` (`altimate/observability/trace-consumer.ts`): config loading, per-session trace map, MAX_TRACES eviction, event feeding, reset/flush - Rewire `cli/cmd/tui/worker.ts` to use the shared consumer (behavior unchanged) - Add `subscribeTraceConsumer()` — an in-process `/event` subscription mirroring the worker's loop — and start it in `ServeCommand`, so serve sessions produce the same trace files as the terminal - Tests: full event-sequence → completed trace file, interleaved sessions, malformed events, disabled config, flush/reset semantics Co-Authored-By: Claude Opus 4.8 (1M context) --- .../altimate/observability/trace-consumer.ts | 292 ++++++++++++++++++ packages/opencode/src/cli/cmd/serve.ts | 12 + packages/opencode/src/cli/cmd/tui/worker.ts | 167 +--------- .../test/altimate/trace-consumer.test.ts | 211 +++++++++++++ 4 files changed, 522 insertions(+), 160 deletions(-) create mode 100644 packages/opencode/src/altimate/observability/trace-consumer.ts create mode 100644 packages/opencode/test/altimate/trace-consumer.test.ts diff --git a/packages/opencode/src/altimate/observability/trace-consumer.ts b/packages/opencode/src/altimate/observability/trace-consumer.ts new file mode 100644 index 0000000000..82d707d3bb --- /dev/null +++ b/packages/opencode/src/altimate/observability/trace-consumer.ts @@ -0,0 +1,292 @@ +/** + * Shared event-stream → trace consumer. + * + * Feeds bus events (message.updated, message.part.updated, session.updated, + * session.status) into per-session Trace instances so every front-end that + * observes the event stream writes the same trace files to + * ~/.local/share/altimate-code/traces/. + * + * Extracted from cli/cmd/tui/worker.ts so the headless server + * (`altimate serve`, used by the VS Code "Altimate Code" chat panel) produces + * traces too — previously only the terminal entrypoints (TUI and `run`) + * instantiated a tracer, and chat sessions were never traced. + * + * Consumers: + * - cli/cmd/tui/worker.ts — feeds events from its own SDK event loop + * - cli/cmd/serve.ts — uses subscribeTraceConsumer() below + */ +import { createOpencodeClient } from "@opencode-ai/sdk/v2" +import { setTimeout as sleep } from "node:timers/promises" +import { Config } from "@/config/config" +import { Log } from "@/util/log" +import { Server } from "@/server/server" +import { Flag } from "@/flag/flag" +import { Trace, FileExporter, HttpExporter, type TraceExporter } from "./tracing" + +const MAX_TRACES = 100 + +/** Minimal structural view of a bus event — narrowed at each read site. */ +interface BusEventLike { + type?: string + properties?: Record +} + +export class TraceConsumer { + private sessionTraces = new Map() + // Per-session user message IDs (cleaned up on session end) + private sessionUserMsgIds = new Map>() + + // Cached tracing config — loaded once at first use + private configLoaded = false + private enabled = true + private exporters: TraceExporter[] | undefined + private maxFiles: number | undefined + + /** + * Optional overrides bypass config loading entirely — used by tests to + * inject a FileExporter pointed at a temp directory. + */ + constructor(overrides?: { exporters?: TraceExporter[]; maxFiles?: number; enabled?: boolean }) { + if (overrides) { + this.configLoaded = true + this.enabled = overrides.enabled ?? true + this.exporters = overrides.exporters + this.maxFiles = overrides.maxFiles + } + } + + /** Load tracing config once. Safe to call repeatedly. */ + async loadConfig() { + if (this.configLoaded) return + this.configLoaded = true + try { + const cfg = await Config.get() + const tc = cfg.tracing + if (tc?.enabled === false) { + this.enabled = false + return + } + const exporters: TraceExporter[] = [new FileExporter(tc?.dir)] + if (tc?.exporters) { + for (const exp of tc.exporters) { + exporters.push(new HttpExporter(exp.name, exp.endpoint, exp.headers)) + } + } + this.exporters = exporters + this.maxFiles = tc?.maxFiles + } catch { + // Config failure should not prevent the host (TUI/serve) from working + } + } + + private getOrCreateTrace(sessionID: string): Trace | null { + if (!sessionID || !this.enabled) return null + if (this.sessionTraces.has(sessionID)) return this.sessionTraces.get(sessionID)! + try { + if (this.sessionTraces.size >= MAX_TRACES) { + const oldest = this.sessionTraces.keys().next().value + if (oldest) { + Log.Default.warn(`[tracing] Evicting trace for session ${oldest} — ${MAX_TRACES} concurrent sessions reached`) + this.sessionTraces + .get(oldest) + ?.endTrace() + .catch(() => {}) + this.sessionTraces.delete(oldest) + this.sessionUserMsgIds.delete(oldest) + } + } + const trace = this.exporters + ? Trace.withExporters([...this.exporters], { maxFiles: this.maxFiles }) + : Trace.create() + trace.startTrace(sessionID, {}) + Trace.setActive(trace) + this.sessionTraces.set(sessionID, trace) + return trace + } catch { + return null + } + } + + /** Feed one bus event into the per-session traces. Never throws. */ + handleEvent(event: unknown) { + try { + const e = event as BusEventLike + if (e.type === "message.updated") { + const info = e.properties?.info as Record | undefined + // Resolve sessionID: use info.sessionID directly, or fall back to + // finding the session via info.parentID (assistant messages may only + // carry the parent message ID, not the session ID). + let resolvedSessionID = info?.sessionID as string | undefined + if (!resolvedSessionID && info?.parentID) { + for (const [sid, msgIds] of this.sessionUserMsgIds) { + if (msgIds.has(info.parentID)) { + resolvedSessionID = sid + break + } + } + } + if (info && resolvedSessionID) { + // Create trace eagerly on user message (arrives before part events) + const trace = + this.sessionTraces.get(resolvedSessionID) ?? + (info.role === "user" ? this.getOrCreateTrace(resolvedSessionID) : null) + if (info.role === "user") { + if (info.id) { + if (!this.sessionUserMsgIds.has(resolvedSessionID)) + this.sessionUserMsgIds.set(resolvedSessionID, new Set()) + this.sessionUserMsgIds.get(resolvedSessionID)!.add(info.id) + } + if (trace) { + const title = info.summary?.title || info.summary?.body + if (title) trace.setTitle(String(title).slice(0, 80), String(title)) + } + } + if (info.role === "assistant") { + const r = trace ?? this.getOrCreateTrace(resolvedSessionID) + r?.enrichFromAssistant({ + modelID: info.modelID, + providerID: info.providerID, + agent: info.agent, + variant: info.variant, + }) + } + } + } + if (e.type === "message.part.updated") { + const part = e.properties?.part as Record | undefined + if (part) { + // Create trace on first event for this session (lazy creation) + const trace = this.sessionTraces.get(part.sessionID) ?? this.getOrCreateTrace(part.sessionID) + if (trace) { + if (part.type === "step-start") trace.logStepStart(part as Parameters[0]) + if (part.type === "step-finish") trace.logStepFinish(part as Parameters[0]) + if (part.type === "text" && part.time?.end) { + if (part.messageID && this.sessionUserMsgIds.get(part.sessionID)?.has(part.messageID)) { + // This is user prompt text — capture as title/prompt + const text = String(part.text || "") + if (text) trace.setTitle(text.slice(0, 80), text) + } else { + // This is assistant response text + trace.logText(part as Parameters[0]) + } + } + if (part.type === "tool" && (part.state?.status === "completed" || part.state?.status === "error")) { + trace.logToolCall(part as Parameters[0]) + } + } + } + } + // Capture session title from session.updated events + if (e.type === "session.updated") { + const info = e.properties?.info as Record | undefined + if (info?.id && info?.title) { + const trace = this.sessionTraces.get(info.id) + if (trace) trace.setTitle(String(info.title)) + } + } + // Finalize trace when session reaches idle (completed) + if (e.type === "session.status") { + const props = e.properties as Record | undefined + const sid = props?.sessionID + const status = props?.status?.type + if (status === "idle" && sid) { + const trace = this.sessionTraces.get(sid) + if (trace) { + void trace.endTrace().catch(() => {}) + this.sessionTraces.delete(sid) + this.sessionUserMsgIds.delete(sid) + } + } + } + } catch { + // Trace must never interrupt event forwarding + } + } + + /** + * End all in-flight traces fire-and-forget and clear state. + * Used before (re)starting an event stream so stale per-stream state + * doesn't leak across stream instances. + */ + reset() { + for (const [, trace] of this.sessionTraces) { + void trace.endTrace().catch(() => {}) + } + this.sessionTraces.clear() + this.sessionUserMsgIds.clear() + } + + /** End all in-flight traces and wait for them. Used on shutdown. */ + async flush() { + for (const [, trace] of this.sessionTraces) { + await trace.endTrace().catch(() => {}) + } + this.sessionTraces.clear() + this.sessionUserMsgIds.clear() + } +} + +/** + * Subscribe to the in-process event stream and feed every event to a + * TraceConsumer. Mirrors the TUI worker's event loop for hosts that don't + * have one of their own — i.e. `altimate serve`, where sessions are driven + * over HTTP (the VS Code chat panel) and no other event consumer exists. + * + * Trace failures must never affect the server, so every step is best-effort. + */ +export function subscribeTraceConsumer(input: { directory: string }): { stop: () => Promise } { + const consumer = new TraceConsumer() + const abort = new AbortController() + const signal = abort.signal + + // In-process fetch against the default app — same pattern as the TUI + // worker. The Bus is process-wide, so events published by sessions served + // by the TCP listener arrive on this subscription too. + const fetchFn = (async (fetchInput: RequestInfo | URL, init?: RequestInit) => { + const request = new Request(fetchInput, init) + const password = Flag.OPENCODE_SERVER_PASSWORD + if (password) { + const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode" + request.headers.set("Authorization", `Basic ${btoa(`${username}:${password}`)}`) + } + return Server.Default().fetch(request) + }) as typeof globalThis.fetch + + const sdk = createOpencodeClient({ + baseUrl: "http://altimate-code.internal", + directory: input.directory, + fetch: fetchFn, + signal, + }) + + ;(async () => { + await consumer.loadConfig() + while (!signal.aborted) { + const events = await Promise.resolve(sdk.event.subscribe({}, { signal })).catch(() => undefined) + + if (!events) { + await sleep(250) + continue + } + + for await (const event of events.stream) { + consumer.handleEvent(event) + } + + if (!signal.aborted) { + await sleep(250) + } + } + })().catch((error) => { + Log.Default.error("trace event stream error", { + error: error instanceof Error ? error.message : error, + }) + }) + + return { + stop: async () => { + abort.abort() + await consumer.flush() + }, + } +} diff --git a/packages/opencode/src/cli/cmd/serve.ts b/packages/opencode/src/cli/cmd/serve.ts index cc7cb3c3ca..b5a59ab3e5 100644 --- a/packages/opencode/src/cli/cmd/serve.ts +++ b/packages/opencode/src/cli/cmd/serve.ts @@ -5,6 +5,9 @@ import { Flag } from "../../flag/flag" import { Workspace } from "../../control-plane/workspace" import { Project } from "../../project/project" import { Installation } from "../../installation" +// altimate_change start — trace: session tracing in headless serve +import { subscribeTraceConsumer } from "../../altimate/observability/trace-consumer" +// altimate_change end export const ServeCommand = cmd({ command: "serve", @@ -20,6 +23,15 @@ export const ServeCommand = cmd({ console.log(`altimate-code server listening on http://${server.hostname}:${server.port}`) // altimate_change end + // altimate_change start — trace: session tracing in headless serve + // Sessions driven over HTTP (e.g. the VS Code chat panel) have no TUI + // worker observing the event stream, so traces were never written in + // serve mode. Subscribe the shared trace consumer to the in-process + // event stream so serve sessions produce the same trace files as the + // terminal entrypoints. + subscribeTraceConsumer({ directory: process.cwd() }) + // altimate_change end + await new Promise(() => {}) await server.stop() }, diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index 3e2b78de15..c1cac50d3c 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -12,7 +12,7 @@ import type { BunWebSocketData } from "hono/bun" import { Flag } from "@/flag/flag" import { setTimeout as sleep } from "node:timers/promises" // altimate_change start — trace: session tracing in TUI -import { Trace, FileExporter, HttpExporter, type TraceExporter } from "@/altimate/observability/tracing" +import { TraceConsumer } from "@/altimate/observability/trace-consumer" // altimate_change end await Log.init({ @@ -47,73 +47,14 @@ const eventStream = { abort: undefined as AbortController | undefined, } -// altimate_change start — trace: per-session traces -const sessionTraces = new Map() -const sessionUserMsgIds = new Map>() // Per-session user message IDs (cleaned up on session end) -const MAX_TRACES = 100 - -// Cached tracing config — loaded once at first use -let tracingConfigLoaded = false -let tracingEnabled = true -let tracingExporters: TraceExporter[] | undefined -let tracingMaxFiles: number | undefined - -async function loadTracingConfig() { - if (tracingConfigLoaded) return - tracingConfigLoaded = true - try { - const cfg = await Config.get() - const tc = cfg.tracing - if (tc?.enabled === false) { tracingEnabled = false; return } - const exporters: TraceExporter[] = [new FileExporter(tc?.dir)] - if (tc?.exporters) { - for (const exp of tc.exporters) { - exporters.push(new HttpExporter(exp.name, exp.endpoint, exp.headers)) - } - } - tracingExporters = exporters - tracingMaxFiles = tc?.maxFiles - } catch { - // Config failure should not prevent TUI from working - } -} -// altimate_change end - -// altimate_change start — trace: get or create per-session trace -function getOrCreateTrace(sessionID: string): Trace | null { - if (!sessionID || !tracingEnabled) return null - if (sessionTraces.has(sessionID)) return sessionTraces.get(sessionID)! - try { - if (sessionTraces.size >= MAX_TRACES) { - const oldest = sessionTraces.keys().next().value - if (oldest) { - Log.Default.warn(`[tracing] Evicting trace for session ${oldest} — ${MAX_TRACES} concurrent sessions reached`) - sessionTraces.get(oldest)?.endTrace().catch(() => {}) - sessionTraces.delete(oldest) - sessionUserMsgIds.delete(oldest) - } - } - const trace = tracingExporters - ? Trace.withExporters([...tracingExporters], { maxFiles: tracingMaxFiles }) - : Trace.create() - trace.startTrace(sessionID, {}) - Trace.setActive(trace) - sessionTraces.set(sessionID, trace) - return trace - } catch { - return null - } -} +// altimate_change start — trace: per-session traces (shared consumer) +const traceConsumer = new TraceConsumer() // altimate_change end const startEventStream = (input: { directory: string; workspaceID?: string }) => { if (eventStream.abort) eventStream.abort.abort() // Clear stale per-stream trace state before starting a new stream instance - for (const [, trace] of sessionTraces) { - void trace.endTrace().catch(() => {}) - } - sessionTraces.clear() - sessionUserMsgIds.clear() + traceConsumer.reset() const abort = new AbortController() eventStream.abort = abort @@ -136,7 +77,7 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => ;(async () => { // Load tracing config once before processing events - await loadTracingConfig() + await traceConsumer.loadConfig() while (!signal.aborted) { const events = await Promise.resolve( sdk.event.subscribe( @@ -154,97 +95,7 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => for await (const event of events.stream) { // altimate_change start — trace: feed events to per-session trace - try { - if (event.type === "message.updated") { - const info = (event as any).properties?.info - // Resolve sessionID: use info.sessionID directly, or fall back to - // finding the session via info.parentID (assistant messages may only - // carry the parent message ID, not the session ID). - let resolvedSessionID = info?.sessionID as string | undefined - if (!resolvedSessionID && info?.parentID) { - for (const [sid, msgIds] of sessionUserMsgIds) { - if (msgIds.has(info.parentID)) { - resolvedSessionID = sid - break - } - } - } - if (resolvedSessionID) { - // Create trace eagerly on user message (arrives before part events) - const trace = sessionTraces.get(resolvedSessionID) ?? (info.role === "user" ? getOrCreateTrace(resolvedSessionID) : null) - if (info.role === "user") { - if (info.id) { - if (!sessionUserMsgIds.has(resolvedSessionID)) sessionUserMsgIds.set(resolvedSessionID, new Set()) - sessionUserMsgIds.get(resolvedSessionID)!.add(info.id) - } - if (trace) { - const title = (info as any).summary?.title || (info as any).summary?.body - if (title) trace.setTitle(String(title).slice(0, 80), String(title)) - } - } - if (info.role === "assistant") { - const r = trace ?? getOrCreateTrace(resolvedSessionID) - r?.enrichFromAssistant({ - modelID: info.modelID, - providerID: info.providerID, - agent: info.agent, - variant: info.variant, - }) - } - } - } - // altimate_change end - // altimate_change start — trace: part events - if (event.type === "message.part.updated") { - const part = (event as any).properties?.part - if (part) { - // Create trace on first event for this session (lazy creation) - const trace = sessionTraces.get(part.sessionID) ?? getOrCreateTrace(part.sessionID) - if (trace) { - if (part.type === "step-start") trace.logStepStart(part) - if (part.type === "step-finish") trace.logStepFinish(part) - if (part.type === "text" && part.time?.end) { - if (part.messageID && sessionUserMsgIds.get(part.sessionID)?.has(part.messageID)) { - // This is user prompt text — capture as title/prompt - const text = String(part.text || "") - if (text) trace.setTitle(text.slice(0, 80), text) - } else { - // This is assistant response text - trace.logText(part) - } - } - if (part.type === "tool" && (part.state?.status === "completed" || part.state?.status === "error")) { - trace.logToolCall(part) - } - } - } - } - // altimate_change end - // altimate_change start — trace: session title capture and finalization - // Capture session title from session.updated events - if (event.type === "session.updated") { - const info = (event as any).properties?.info - if (info?.id && info?.title) { - const trace = sessionTraces.get(info.id) - if (trace) trace.setTitle(String(info.title)) - } - } - // Finalize trace when session reaches idle (completed) - if (event.type === "session.status") { - const sid = (event as any).properties?.sessionID - const status = (event as any).properties?.status?.type - if (status === "idle" && sid) { - const trace = sessionTraces.get(sid) - if (trace) { - void trace.endTrace().catch(() => {}) - sessionTraces.delete(sid) - sessionUserMsgIds.delete(sid) - } - } - } - } catch { - // Trace must never interrupt event forwarding - } + traceConsumer.handleEvent(event) // altimate_change end Rpc.emit("event", event as Event) @@ -312,11 +163,7 @@ export const rpc = { Log.Default.info("worker shutting down") if (eventStream.abort) eventStream.abort.abort() // altimate_change start — trace: flush all active traces on shutdown - for (const [sid, trace] of sessionTraces) { - await trace.endTrace().catch(() => {}) - } - sessionTraces.clear() - sessionUserMsgIds.clear() + await traceConsumer.flush() // altimate_change end await Instance.disposeAll() if (server) server.stop(true) diff --git a/packages/opencode/test/altimate/trace-consumer.test.ts b/packages/opencode/test/altimate/trace-consumer.test.ts new file mode 100644 index 0000000000..baa0842503 --- /dev/null +++ b/packages/opencode/test/altimate/trace-consumer.test.ts @@ -0,0 +1,211 @@ +/** + * Tests for the shared event-stream → trace consumer. + * + * The consumer is the extracted form of the TUI worker's inline tracing + * logic, now also wired into `altimate serve` so headless sessions (e.g. + * the VS Code chat panel) write trace files. These tests feed realistic + * bus-event sequences and assert trace files land on disk. + */ + +import { describe, expect, test, beforeEach, afterEach } from "bun:test" +import fs from "fs/promises" +import path from "path" +import os from "os" +import { TraceConsumer } from "../../src/altimate/observability/trace-consumer" +import { FileExporter, type TraceFile } from "../../src/altimate/observability/tracing" + +let tmpDir: string + +beforeEach(async () => { + tmpDir = path.join(os.tmpdir(), `trace-consumer-${Date.now()}-${Math.random().toString(36).slice(2)}`) + await fs.mkdir(tmpDir, { recursive: true }) +}) + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {}) +}) + +function makeConsumer() { + return new TraceConsumer({ exporters: [new FileExporter(tmpDir)] }) +} + +async function readTraceFile(sessionID: string): Promise { + const raw = await fs.readFile(path.join(tmpDir, `${sessionID}.json`), "utf8") + return JSON.parse(raw) as TraceFile +} + +/** Event sequence mirroring what a real session emits over the bus. */ +function sessionEvents(sessionID: string) { + const now = Date.now() + return [ + { + type: "message.updated", + properties: { info: { id: "msg-user-1", sessionID, role: "user", time: { created: now } } }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID, messageID: "msg-user-1", type: "text", text: "list my files", time: { end: now } }, + }, + }, + { + type: "message.updated", + properties: { + info: { + id: "msg-asst-1", + sessionID, + parentID: "msg-user-1", + role: "assistant", + modelID: "gpt-4o", + providerID: "openai", + agent: "general", + time: { created: now }, + }, + }, + }, + { type: "message.part.updated", properties: { part: { sessionID, type: "step-start", id: "step-1" } } }, + { + type: "message.part.updated", + properties: { + part: { + sessionID, + type: "tool", + tool: "bash", + callID: "c1", + state: { + status: "completed", + input: { command: "ls" }, + output: "file1.ts", + time: { start: now - 1000, end: now }, + }, + }, + }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID, messageID: "msg-asst-1", type: "text", text: "Found 1 file.", time: { end: now } }, + }, + }, + { + type: "message.part.updated", + properties: { + part: { + sessionID, + type: "step-finish", + id: "step-1", + reason: "stop", + cost: 0.005, + tokens: { input: 500, output: 100, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + }, + }, + { type: "session.updated", properties: { info: { id: sessionID, title: "List files" } } }, + { type: "session.status", properties: { sessionID, status: { type: "idle" } } }, + ] +} + +describe("TraceConsumer", () => { + test("full session event sequence writes a completed trace file", async () => { + const consumer = makeConsumer() + for (const event of sessionEvents("ses_consumer_1")) { + consumer.handleEvent(event) + } + // session.status idle finalizes asynchronously + await new Promise((r) => setTimeout(r, 200)) + + const trace = await readTraceFile("ses_consumer_1") + expect(trace.summary.status).toBe("completed") + expect(trace.metadata.model).toBe("openai/gpt-4o") + expect(trace.metadata.agent).toBe("general") + expect(trace.metadata.title).toBe("List files") + expect(trace.metadata.prompt).toBe("list my files") + expect(trace.spans.some((s) => s.kind === "tool")).toBe(true) + expect(trace.summary.totalToolCalls).toBe(1) + expect(trace.summary.totalCost).toBeCloseTo(0.005) + }) + + test("two interleaved sessions write separate trace files", async () => { + const consumer = makeConsumer() + const a = sessionEvents("ses_consumer_a") + const b = sessionEvents("ses_consumer_b") + // Interleave the two sessions' events + for (let i = 0; i < Math.max(a.length, b.length); i++) { + if (a[i]) consumer.handleEvent(a[i]) + if (b[i]) consumer.handleEvent(b[i]) + } + await new Promise((r) => setTimeout(r, 200)) + + const traceA = await readTraceFile("ses_consumer_a") + const traceB = await readTraceFile("ses_consumer_b") + expect(traceA.summary.status).toBe("completed") + expect(traceB.summary.status).toBe("completed") + expect(traceA.summary.totalToolCalls).toBe(1) + expect(traceB.summary.totalToolCalls).toBe(1) + }) + + test("malformed events never throw", () => { + const consumer = makeConsumer() + const malformed = [ + null, + undefined, + {}, + { type: "message.updated" }, + { type: "message.updated", properties: null }, + { type: "message.updated", properties: { info: null } }, + { type: "message.part.updated", properties: null }, + { type: "message.part.updated", properties: { part: { type: "tool" } } }, + { type: "session.updated", properties: {} }, + { type: "session.status", properties: { status: null } }, + { type: "session.status", properties: { sessionID: "nope", status: { type: "idle" } } }, + "not-an-object", + 42, + ] + for (const event of malformed) { + expect(() => consumer.handleEvent(event)).not.toThrow() + } + }) + + test("disabled consumer writes nothing", async () => { + const consumer = new TraceConsumer({ exporters: [new FileExporter(tmpDir)], enabled: false }) + for (const event of sessionEvents("ses_consumer_off")) { + consumer.handleEvent(event) + } + await new Promise((r) => setTimeout(r, 200)) + const files = await fs.readdir(tmpDir) + expect(files.length).toBe(0) + }) + + test("flush finalizes in-flight traces that never reached idle", async () => { + const consumer = makeConsumer() + // Feed everything except the final session.status idle event + const events = sessionEvents("ses_consumer_flush") + for (const event of events.slice(0, -1)) { + consumer.handleEvent(event) + } + await consumer.flush() + + const trace = await readTraceFile("ses_consumer_flush") + expect(trace.summary.totalToolCalls).toBe(1) + // flush() after the events still finalizes the trace file on disk + expect(trace.spans.length).toBeGreaterThan(0) + }) + + test("reset clears state so a reused sessionID starts a fresh trace", async () => { + const consumer = makeConsumer() + const events = sessionEvents("ses_consumer_reset") + for (const event of events.slice(0, 4)) { + consumer.handleEvent(event) + } + consumer.reset() + await new Promise((r) => setTimeout(r, 200)) + + // After reset, feeding the full sequence again produces a complete trace + for (const event of events) { + consumer.handleEvent(event) + } + await new Promise((r) => setTimeout(r, 200)) + const trace = await readTraceFile("ses_consumer_reset") + expect(trace.summary.status).toBe("completed") + }) +}) From 518033e9539ab7145df6a36a68148bc63d2b9707 Mon Sep 17 00:00:00 2001 From: "Ralph Sto. Domingo" <79431566+ralphstodomingo@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:08:30 +0800 Subject: [PATCH 2/7] fix: don't clobber trace files with post-idle straggler events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real serve sessions emit trailing events after `session.status: idle` — the user message is re-published with its generated `summary` once title generation completes. Finalizing immediately on idle deleted the trace from the map, and the straggler then lazily re-created an EMPTY trace for the same sessionID whose finalization overwrote the rich `.json` file (observed end-to-end in docker code-server: 42KB trace clobbered down to 749 bytes). Fix: schedule finalization 2s after idle instead of finalizing inline; any event for the session during the grace window pushes the deadline back and is absorbed into the live trace. As a bonus the generated session title now lands in trace metadata. flush()/reset()/eviction clear pending timers. Verified: chat session in docker code-server (glob tool call + 2 generations) produces a complete 42KB trace with title, model, tool span, and token totals. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../altimate/observability/trace-consumer.ts | 78 ++++++++++++++++--- .../test/altimate/trace-consumer.test.ts | 41 +++++++++- 2 files changed, 107 insertions(+), 12 deletions(-) diff --git a/packages/opencode/src/altimate/observability/trace-consumer.ts b/packages/opencode/src/altimate/observability/trace-consumer.ts index 82d707d3bb..d794059a87 100644 --- a/packages/opencode/src/altimate/observability/trace-consumer.ts +++ b/packages/opencode/src/altimate/observability/trace-consumer.ts @@ -25,6 +25,19 @@ import { Trace, FileExporter, HttpExporter, type TraceExporter } from "./tracing const MAX_TRACES = 100 +/** + * How long to wait after `session.status: idle` before finalizing a trace. + * + * Sessions emit trailing events after going idle — e.g. the user message is + * re-published with its generated `summary` once title generation completes. + * Finalizing immediately on idle would delete the trace from the map, and + * the straggler would then lazily re-create an EMPTY trace for the same + * sessionID whose finalization overwrites the rich `.json` file. + * Any event arriving during the grace window is absorbed into the live + * trace and pushes the finalization back. + */ +const FINALIZE_GRACE_MS = 2_000 + /** Minimal structural view of a bus event — narrowed at each read site. */ interface BusEventLike { type?: string @@ -35,23 +48,33 @@ export class TraceConsumer { private sessionTraces = new Map() // Per-session user message IDs (cleaned up on session end) private sessionUserMsgIds = new Map>() + // Sessions that went idle and are waiting out the finalize grace window + private pendingEnd = new Map>() // Cached tracing config — loaded once at first use private configLoaded = false private enabled = true private exporters: TraceExporter[] | undefined private maxFiles: number | undefined + private finalizeGraceMs = FINALIZE_GRACE_MS /** * Optional overrides bypass config loading entirely — used by tests to - * inject a FileExporter pointed at a temp directory. + * inject a FileExporter pointed at a temp directory and shrink the + * finalize grace window. */ - constructor(overrides?: { exporters?: TraceExporter[]; maxFiles?: number; enabled?: boolean }) { + constructor(overrides?: { + exporters?: TraceExporter[] + maxFiles?: number + enabled?: boolean + finalizeGraceMs?: number + }) { if (overrides) { this.configLoaded = true this.enabled = overrides.enabled ?? true this.exporters = overrides.exporters this.maxFiles = overrides.maxFiles + if (overrides.finalizeGraceMs !== undefined) this.finalizeGraceMs = overrides.finalizeGraceMs } } @@ -93,6 +116,11 @@ export class TraceConsumer { .catch(() => {}) this.sessionTraces.delete(oldest) this.sessionUserMsgIds.delete(oldest) + const pendingTimer = this.pendingEnd.get(oldest) + if (pendingTimer) { + clearTimeout(pendingTimer) + this.pendingEnd.delete(oldest) + } } } const trace = this.exporters @@ -107,10 +135,40 @@ export class TraceConsumer { } } + /** + * Schedule (or push back) finalization of a session's trace. Fires after + * the grace window unless another event for the session arrives first. + */ + private scheduleEnd(sessionID: string) { + const existing = this.pendingEnd.get(sessionID) + if (existing) clearTimeout(existing) + this.pendingEnd.set( + sessionID, + setTimeout(() => { + this.pendingEnd.delete(sessionID) + const trace = this.sessionTraces.get(sessionID) + if (trace) { + void trace.endTrace().catch(() => {}) + this.sessionTraces.delete(sessionID) + this.sessionUserMsgIds.delete(sessionID) + } + }, this.finalizeGraceMs), + ) + } + /** Feed one bus event into the per-session traces. Never throws. */ handleEvent(event: unknown) { try { const e = event as BusEventLike + // Any activity for a session that is waiting out the finalize grace + // window pushes the finalization back — trailing events (e.g. the + // user message re-published with its generated summary) are absorbed + // into the live trace instead of re-creating an empty one. + { + const props = e.properties as Record | undefined + const sid = props?.sessionID ?? props?.info?.sessionID ?? props?.part?.sessionID ?? props?.info?.id + if (typeof sid === "string" && this.pendingEnd.has(sid)) this.scheduleEnd(sid) + } if (e.type === "message.updated") { const info = e.properties?.info as Record | undefined // Resolve sessionID: use info.sessionID directly, or fall back to @@ -184,18 +242,14 @@ export class TraceConsumer { if (trace) trace.setTitle(String(info.title)) } } - // Finalize trace when session reaches idle (completed) + // Finalize trace when session reaches idle (completed) — after the + // grace window, so post-idle stragglers don't clobber the trace file. if (e.type === "session.status") { const props = e.properties as Record | undefined const sid = props?.sessionID const status = props?.status?.type - if (status === "idle" && sid) { - const trace = this.sessionTraces.get(sid) - if (trace) { - void trace.endTrace().catch(() => {}) - this.sessionTraces.delete(sid) - this.sessionUserMsgIds.delete(sid) - } + if (status === "idle" && sid && this.sessionTraces.has(sid)) { + this.scheduleEnd(sid) } } } catch { @@ -209,6 +263,8 @@ export class TraceConsumer { * doesn't leak across stream instances. */ reset() { + for (const [, timer] of this.pendingEnd) clearTimeout(timer) + this.pendingEnd.clear() for (const [, trace] of this.sessionTraces) { void trace.endTrace().catch(() => {}) } @@ -218,6 +274,8 @@ export class TraceConsumer { /** End all in-flight traces and wait for them. Used on shutdown. */ async flush() { + for (const [, timer] of this.pendingEnd) clearTimeout(timer) + this.pendingEnd.clear() for (const [, trace] of this.sessionTraces) { await trace.endTrace().catch(() => {}) } diff --git a/packages/opencode/test/altimate/trace-consumer.test.ts b/packages/opencode/test/altimate/trace-consumer.test.ts index baa0842503..24cff1ef83 100644 --- a/packages/opencode/test/altimate/trace-consumer.test.ts +++ b/packages/opencode/test/altimate/trace-consumer.test.ts @@ -26,7 +26,7 @@ afterEach(async () => { }) function makeConsumer() { - return new TraceConsumer({ exporters: [new FileExporter(tmpDir)] }) + return new TraceConsumer({ exporters: [new FileExporter(tmpDir)], finalizeGraceMs: 50 }) } async function readTraceFile(sessionID: string): Promise { @@ -166,8 +166,45 @@ describe("TraceConsumer", () => { } }) + test("post-idle straggler events don't clobber the finalized trace", async () => { + const consumer = makeConsumer() + const sessionID = "ses_consumer_straggler" + for (const event of sessionEvents(sessionID)) { + consumer.handleEvent(event) + } + // Straggler: the user message re-published with its generated summary + // arrives AFTER session.status idle (observed in real `serve` sessions — + // title generation completes after the session goes idle). Without the + // finalize grace window this re-created an empty trace whose + // finalization overwrote the rich trace file. + consumer.handleEvent({ + type: "message.updated", + properties: { + info: { + id: "msg-user-1", + sessionID, + role: "user", + summary: { title: "Straggler title" }, + time: { created: Date.now() }, + }, + }, + }) + consumer.handleEvent({ type: "session.status", properties: { sessionID, status: { type: "idle" } } }) + // Wait past the grace window for finalization + await new Promise((r) => setTimeout(r, 300)) + + const trace = await readTraceFile(sessionID) + expect(trace.summary.status).toBe("completed") + // The rich content survived — generation + tool spans intact + expect(trace.spans.some((s) => s.kind === "tool")).toBe(true) + expect(trace.summary.totalToolCalls).toBe(1) + expect(trace.summary.totalCost).toBeCloseTo(0.005) + // And the straggler was absorbed into the live trace + expect(trace.metadata.title).toBe("Straggler title") + }) + test("disabled consumer writes nothing", async () => { - const consumer = new TraceConsumer({ exporters: [new FileExporter(tmpDir)], enabled: false }) + const consumer = new TraceConsumer({ exporters: [new FileExporter(tmpDir)], enabled: false, finalizeGraceMs: 50 }) for (const event of sessionEvents("ses_consumer_off")) { consumer.handleEvent(event) } From 9fb60d9bd96927a7b8868cd52f4a85598a7cc33c Mon Sep 17 00:00:00 2001 From: "Ralph Sto. Domingo" <79431566+ralphstodomingo@users.noreply.github.com> Date: Fri, 5 Jun 2026 12:48:02 +0800 Subject: [PATCH 3/7] test: update event-literal invariant for trace-consumer extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The worker's inline event handling moved into the shared altimate/observability/trace-consumer.ts — point the cross-module event-type literal invariant at the new consumer (and cover the full set it consumes) instead of cli/cmd/tui/worker.ts. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../opencode/test/upstream/bridge-merge-invariants.test.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts index 3cd67205ca..cd498fb402 100644 --- a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts +++ b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts @@ -161,7 +161,12 @@ describe("invariant: event-type literals match between producer and consumers", file: "cli/cmd/tui/context/sync.tsx", types: ["message.updated", "message.removed", "message.part.updated", "message.part.removed"], }, - { file: "cli/cmd/tui/worker.ts", types: ["message.updated"] }, + // worker.ts's inline event handling moved into the shared trace consumer + // (also used by `serve`) — the literals live there now. + { + file: "altimate/observability/trace-consumer.ts", + types: ["message.updated", "message.part.updated", "session.updated", "session.status"], + }, { file: "cli/cmd/run.ts", types: ["message.updated", "message.part.updated"] }, { file: "acp/agent.ts", types: ["message.part.updated"] }, ] From b6c6946ba4293d19fb00c9e635a276d0607a0a67 Mon Sep 17 00:00:00 2001 From: "Ralph Sto. Domingo" <79431566+ralphstodomingo@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:24:10 +0800 Subject: [PATCH 4/7] chore: wrap startEventStream trace reset in altimate_change markers worker.ts is an upstream-shared file; the traceConsumer.reset() call (which replaced main's inline sessionTraces clear) must be wrapped so the upstream merge tooling protects it. Caught by script/upstream/analyze.ts --markers --strict (the Marker Guard CI check). --- packages/opencode/src/cli/cmd/tui/worker.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index 5d1b8b5d48..b412dda6d6 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -57,10 +57,12 @@ const traceConsumer = new TraceConsumer() const startEventStream = (input: { directory: string; workspaceID?: string }) => { if (eventStream.abort) eventStream.abort.abort() - // Clear stale per-stream trace state before starting a new stream instance. - // reset() also bumps the consumer's stream generation, invalidating any - // in-flight getOrCreateTrace() suspended at its rehydrate await. + // altimate_change start — trace: clear stale per-stream trace state before + // starting a new stream instance. reset() also bumps the consumer's stream + // generation, invalidating any in-flight getOrCreateTrace() suspended at its + // rehydrate await. traceConsumer.reset() + // altimate_change end const abort = new AbortController() eventStream.abort = abort From 500fed850c59f6f243f3c56d953c03e0e4397fbb Mon Sep 17 00:00:00 2001 From: "Ralph Sto. Domingo" <79431566+ralphstodomingo@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:25:49 +0800 Subject: [PATCH 5/7] chore: wrap traceConsumer.loadConfig call in altimate_change markers --- packages/opencode/src/cli/cmd/tui/worker.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index b412dda6d6..be4c12fa76 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -84,8 +84,9 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => }) ;(async () => { - // Load tracing config once before processing events + // altimate_change start — trace: load tracing config once before processing events await traceConsumer.loadConfig() + // altimate_change end while (!signal.aborted) { const events = await Promise.resolve( sdk.event.subscribe( From a701fdf97ab1ba5f406e98c3806cb7ed25be78d0 Mon Sep 17 00:00:00 2001 From: anandgupta42 Date: Sun, 7 Jun 2026 22:51:01 -0700 Subject: [PATCH 6/7] fix: finalize serve traces on shutdown + harden trace consumer (#886) Addresses the multi-model consensus review on PR #886. The `TraceConsumer` extraction was sound, but the `serve` integration had lifecycle gaps. `cli/cmd/serve.ts`: - Capture the `subscribeTraceConsumer` handle and wire `SIGINT`/`SIGTERM`/ `beforeExit` to `stop()` so traces are finalized on shutdown instead of left un-`completed`. Mirrors the existing pattern in `cli/cmd/run.ts`. `altimate/observability/trace-consumer.ts`: - `stop()` now drains the event loop (bounded by a 1s timeout) before `flush()`, so finalization can't race an in-flight `handleEvent`. - The `for await` stream loop catches mid-stream errors and reconnects with exponential backoff (250ms -> 30s) instead of dying permanently; backoff sleeps are abortable so shutdown isn't delayed. - Finalize and evict per-session state on `session.deleted` (a real end-of-session signal) so long-lived serve processes don't accumulate `sessionUserMsgIds` / cached traces. - `loadConfig` failure now fails safe (`enabled = false`) instead of silently tracing to the default dir, and logs at debug. - `getOrCreateTrace` and `handleEvent` catch blocks log at debug (still never throw) for headless diagnosability. - Auth header uses `Buffer.from(...).toString("base64")` (UTF-8 safe) instead of `btoa`. - Added a test seam: `subscribeTraceConsumer` accepts an optional injected `consumer` and `subscribe` source (production behaviour unchanged via defaults). Tests: - New `subscribeTraceConsumer` coverage: `stop()` drains + finalizes; a mid-stream throw reconnects rather than killing the loop. - New `TraceConsumer` coverage: incremental snapshots land on disk BEFORE any flush (the real serve path), and `session.deleted` finalizes + releases state. - De-flaked the `reset()` test (poll instead of fixed `setTimeout`). - Registered the `session.deleted` literal in `bridge-merge-invariants`. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../altimate/observability/trace-consumer.ts | 145 ++++++++++++++---- packages/opencode/src/cli/cmd/serve.ts | 20 ++- .../test/altimate/trace-consumer.test.ts | 144 ++++++++++++++++- .../upstream/bridge-merge-invariants.test.ts | 2 +- 4 files changed, 272 insertions(+), 39 deletions(-) diff --git a/packages/opencode/src/altimate/observability/trace-consumer.ts b/packages/opencode/src/altimate/observability/trace-consumer.ts index 77fb483e5d..b054b204cf 100644 --- a/packages/opencode/src/altimate/observability/trace-consumer.ts +++ b/packages/opencode/src/altimate/observability/trace-consumer.ts @@ -90,8 +90,12 @@ export class TraceConsumer { } this.exporters = exporters this.maxFiles = tc?.maxFiles - } catch { + } catch (error) { // Config failure should not prevent the host (TUI/serve) from working + this.enabled = false + Log.Default.debug("[tracing] failed to load config, disabling", { + error: error instanceof Error ? error.message : String(error), + }) } } @@ -138,7 +142,10 @@ export class TraceConsumer { Trace.setActive(trace) this.sessionTraces.set(sessionID, trace) return trace - } catch { + } catch (error) { + Log.Default.debug("[tracing] getOrCreateTrace failed", { + error: error instanceof Error ? error.message : String(error), + }) return null } } @@ -250,14 +257,32 @@ export class TraceConsumer { if (trace) trace.setTitle(String(info.title)) } } + // Finalize and evict on a real session-end signal. `session.deleted` is a + // true end-of-session event (unlike `idle`, which fires every turn), so + // it is the correct place to flush the trace and release per-session + // state. Without this, in a long-lived `serve` process the trace and its + // sessionUserMsgIds set live until MAX_TRACES eviction (or never, for + // <100 sessions), since reset()/flush() don't fire during normal serve. + if (e.type === "session.deleted") { + const info = e.properties?.info as Record | undefined + if (info?.id) { + const trace = this.sessionTraces.get(info.id) + if (trace) void trace.endTrace().catch(() => {}) + this.sessionTraces.delete(info.id) + this.sessionUserMsgIds.delete(info.id) + } + } // DO NOT finalize the trace on session.status=idle. `idle` fires after // every turn (busy → idle), not at session end. Finalizing per-turn would // treat each turn as session end and the next turn's events would hit a // cache miss; getOrCreateTrace rehydrates from disk as defense in depth, // but the correct behaviour is to keep the Trace live across turns and // finalize only on flush() (shutdown) and MAX_TRACES eviction. - } catch { - // Trace must never interrupt event forwarding + } catch (error) { + // Trace must never interrupt event forwarding — log at debug only. + Log.Default.debug("[tracing] handleEvent error", { + error: error instanceof Error ? error.message : String(error), + }) } } @@ -295,50 +320,103 @@ export class TraceConsumer { * * Trace failures must never affect the server, so every step is best-effort. */ -export function subscribeTraceConsumer(input: { directory: string }): { stop: () => Promise } { - const consumer = new TraceConsumer() +/** One subscription's event stream. */ +export interface TraceEventSource { + stream: AsyncIterable +} + +export interface TraceSubscribeOptions { + /** Test seam: inject the consumer (e.g. with a temp-dir FileExporter). */ + consumer?: TraceConsumer + /** + * Test seam: provide the event source. Resolves to a stream of bus events, + * or `undefined` to trigger a backoff+retry. Defaults to the in-process SDK + * subscription. Called once per (re)connect with the shutdown signal. + */ + subscribe?: (signal: AbortSignal) => Promise +} + +export function subscribeTraceConsumer( + input: { directory: string }, + options?: TraceSubscribeOptions, +): { stop: () => Promise } { + const consumer = options?.consumer ?? new TraceConsumer() const abort = new AbortController() const signal = abort.signal - // In-process fetch against the default app — same pattern as the TUI - // worker. The Bus is process-wide, so events published by sessions served - // by the TCP listener arrive on this subscription too. - const fetchFn = (async (fetchInput: RequestInfo | URL, init?: RequestInit) => { - const request = new Request(fetchInput, init) - const password = Flag.OPENCODE_SERVER_PASSWORD - if (password) { - const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode" - request.headers.set("Authorization", `Basic ${btoa(`${username}:${password}`)}`) - } - return Server.Default().fetch(request) - }) as typeof globalThis.fetch + // Default event source: the in-process SDK subscription — same pattern as the + // TUI worker. The Bus is process-wide, so events published by sessions served + // by the TCP listener arrive on this subscription too. The SDK client is + // built once; `subscribe` is invoked per (re)connect. + const subscribe = + options?.subscribe ?? + (() => { + const fetchFn = (async (fetchInput: RequestInfo | URL, init?: RequestInit) => { + const request = new Request(fetchInput, init) + const password = Flag.OPENCODE_SERVER_PASSWORD + if (password) { + const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode" + request.headers.set("Authorization", `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`) + } + return Server.Default().fetch(request) + }) as typeof globalThis.fetch - const sdk = createOpencodeClient({ - baseUrl: "http://altimate-code.internal", - directory: input.directory, - fetch: fetchFn, - signal, - }) + const sdk = createOpencodeClient({ + baseUrl: "http://altimate-code.internal", + directory: input.directory, + fetch: fetchFn, + signal, + }) + + return (sig: AbortSignal): Promise => + Promise.resolve(sdk.event.subscribe({}, { signal: sig })) + .then((r) => r as unknown as TraceEventSource) + .catch(() => undefined) + })() - ;(async () => { + // Abortable sleep so a pending reconnect-backoff doesn't delay shutdown. + const BASE_BACKOFF_MS = 250 + const MAX_BACKOFF_MS = 30_000 + const backoffSleep = (ms: number) => sleep(ms, undefined, { signal }).catch(() => {}) + + const loopPromise = (async () => { await consumer.loadConfig() + // Exponential backoff on repeated failure so a durably-down stream doesn't + // hot-loop; reset to the base delay after a successful subscription. + let backoff = BASE_BACKOFF_MS while (!signal.aborted) { - const events = await Promise.resolve(sdk.event.subscribe({}, { signal })).catch(() => undefined) + const events = await subscribe(signal).catch(() => undefined) if (!events) { - await sleep(250) + await backoffSleep(backoff) + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS) continue } + backoff = BASE_BACKOFF_MS - for await (const event of events.stream) { - await consumer.handleEvent(event) + // The try/catch MUST sit inside the while loop: a mid-stream throw + // (network disconnect, server hiccup) would otherwise escape to the + // outer .catch and kill the loop permanently for the server's lifetime. + try { + for await (const event of events.stream) { + await consumer.handleEvent(event) + } + } catch (err) { + if (!signal.aborted) { + Log.Default.warn("[tracing] trace event stream disconnected, reconnecting", { + error: err instanceof Error ? err.message : String(err), + }) + } } if (!signal.aborted) { - await sleep(250) + await backoffSleep(backoff) + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS) } } - })().catch((error) => { + })() + + loopPromise.catch((error: unknown) => { Log.Default.error("trace event stream error", { error: error instanceof Error ? error.message : error, }) @@ -346,7 +424,12 @@ export function subscribeTraceConsumer(input: { directory: string }): { stop: () return { stop: async () => { + // Stop accepting new work, then DRAIN the loop before flushing so we + // don't finalize traces while a handleEvent() is still mid-event. + // Bounded by a timeout so an unresponsive stream can't hang shutdown — + // flush() finalizes whatever is in the cache either way. abort.abort() + await Promise.race([loopPromise.catch(() => {}), sleep(1000)]) await consumer.flush() }, } diff --git a/packages/opencode/src/cli/cmd/serve.ts b/packages/opencode/src/cli/cmd/serve.ts index b5a59ab3e5..7c1c08b2d0 100644 --- a/packages/opencode/src/cli/cmd/serve.ts +++ b/packages/opencode/src/cli/cmd/serve.ts @@ -29,10 +29,26 @@ export const ServeCommand = cmd({ // serve mode. Subscribe the shared trace consumer to the in-process // event stream so serve sessions produce the same trace files as the // terminal entrypoints. - subscribeTraceConsumer({ directory: process.cwd() }) + const traceSub = subscribeTraceConsumer({ directory: process.cwd() }) + + // Finalize traces on shutdown. `serve` blocks forever on the promise below + // and otherwise dies abruptly on signal, so without these handlers the + // consumer's stop()/flush()/endTrace() never runs and serve traces are + // left un-finalized (status never "completed", no summary/narrative). + // Mirrors the SIGINT/SIGTERM/beforeExit pattern in cli/cmd/run.ts. + let isShuttingDown = false + const shutdown = async () => { + if (isShuttingDown) return + isShuttingDown = true + await traceSub.stop() + await server.stop() + process.exit(0) + } + process.once("SIGINT", () => void shutdown()) + process.once("SIGTERM", () => void shutdown()) + process.once("beforeExit", () => void shutdown()) // altimate_change end await new Promise(() => {}) - await server.stop() }, }) diff --git a/packages/opencode/test/altimate/trace-consumer.test.ts b/packages/opencode/test/altimate/trace-consumer.test.ts index 7b251c8283..8bbe29b321 100644 --- a/packages/opencode/test/altimate/trace-consumer.test.ts +++ b/packages/opencode/test/altimate/trace-consumer.test.ts @@ -18,7 +18,11 @@ import { describe, expect, test, beforeEach, afterEach } from "bun:test" import fs from "fs/promises" import path from "path" import os from "os" -import { TraceConsumer } from "../../src/altimate/observability/trace-consumer" +import { + TraceConsumer, + subscribeTraceConsumer, + type TraceEventSource, +} from "../../src/altimate/observability/trace-consumer" import { FileExporter, type TraceFile } from "../../src/altimate/observability/tracing" let tmpDir: string @@ -47,6 +51,51 @@ async function feed(consumer: TraceConsumer, events: unknown[]) { } } +/** + * Poll until `predicate` holds, instead of a fixed sleep — snapshot/endTrace + * writes are async + debounced, so a fixed delay is flaky under CI load. + */ +async function waitFor(read: () => Promise, predicate: (v: T) => boolean, timeoutMs = 5000): Promise { + const start = Date.now() + for (;;) { + try { + const v = await read() + if (predicate(v)) return v + } catch { + // not ready yet (e.g. file not written) — keep polling + } + if (Date.now() - start > timeoutMs) throw new Error("waitFor: condition not met within timeout") + await new Promise((r) => setTimeout(r, 10)) + } +} + +/** + * Build a single-use event source for the `subscribeTraceConsumer` test seam. + * Optionally throws mid-stream (to exercise reconnect) or holds the stream open + * until the shutdown signal fires (to exercise stop()/drain). + */ +function eventSource( + events: unknown[], + opts?: { throwAfter?: number; holdUntilAbort?: AbortSignal }, +): TraceEventSource { + async function* gen() { + let i = 0 + for (const e of events) { + yield e + i++ + if (opts?.throwAfter !== undefined && i >= opts.throwAfter) throw new Error("simulated stream disconnect") + } + const sig = opts?.holdUntilAbort + if (sig) { + await new Promise((resolve) => { + if (sig.aborted) return resolve() + sig.addEventListener("abort", () => resolve(), { once: true }) + }) + } + } + return { stream: gen() } +} + /** Event sequence mirroring what a real session emits over the bus. */ function sessionEvents(sessionID: string) { const now = Date.now() @@ -260,10 +309,95 @@ describe("TraceConsumer", () => { const consumer = makeConsumer() await feed(consumer, sessionEvents("ses_consumer_reset")) consumer.reset() - // reset finalizes fire-and-forget; give the async endTrace a tick. - await new Promise((r) => setTimeout(r, 100)) - - const trace = await readTraceFile("ses_consumer_reset") + // reset finalizes fire-and-forget; poll for the write instead of a fixed + // sleep so the test isn't flaky under CI load. + const trace = await waitFor( + () => readTraceFile("ses_consumer_reset"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) expect(trace.spans.some((s) => s.kind === "tool")).toBe(true) }) + + test("incremental snapshots land on disk BEFORE any flush (the serve path)", async () => { + // serve never calls flush() during normal operation — it relies entirely + // on the incremental snapshots written as events arrive. This is the path + // every other test skips by flushing first. + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_noflush")) + const trace = await waitFor( + () => readTraceFile("ses_consumer_noflush"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) + expect(trace.summary.totalToolCalls).toBe(1) + expect(trace.metadata.prompt).toBe("list my files") + }) + + test("session.deleted finalizes the trace and releases per-session state", async () => { + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_del")) + await consumer.handleEvent({ type: "session.deleted", properties: { info: { id: "ses_consumer_del" } } }) + // endTrace is fire-and-forget on session.deleted; poll for finalization. + const trace = await waitFor( + () => readTraceFile("ses_consumer_del"), + (t) => t.summary.status === "completed", + ) + expect(trace.summary.status).toBe("completed") + // State is already released, so a later flush must be a harmless no-op. + await consumer.flush() + }) +}) + +describe("subscribeTraceConsumer (serve integration)", () => { + test("stop() drains the loop and flushes — serve traces are finalized", async () => { + const consumer = makeConsumer() + let calls = 0 + const sub = subscribeTraceConsumer( + { directory: tmpDir }, + { + consumer, + subscribe: async (signal) => { + calls++ + // Deliver a full session on the first connect, then hold the stream + // open until shutdown so stop() exercises the drain path. + return eventSource(calls === 1 ? sessionEvents("ses_sub_stop") : [], { holdUntilAbort: signal }) + }, + }, + ) + // Wait until the session's events have been consumed (snapshot on disk). + await waitFor( + () => readTraceFile("ses_sub_stop"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) + // Pre-stop the trace is not yet finalized; stop() must flush → "completed". + await sub.stop() + const trace = await readTraceFile("ses_sub_stop") + expect(trace.summary.status).toBe("completed") + expect(trace.summary.totalToolCalls).toBe(1) + }) + + test("a mid-stream throw does not kill the loop — it reconnects", async () => { + const consumer = makeConsumer() + let calls = 0 + const sub = subscribeTraceConsumer( + { directory: tmpDir }, + { + consumer, + subscribe: async (signal) => { + calls++ + // 1st connect throws mid-stream; 2nd connect delivers a different + // session in full. If the throw killed the loop, session B's file + // would never appear and the wait below would time out. + if (calls === 1) return eventSource(sessionEvents("ses_sub_A"), { throwAfter: 2 }) + return eventSource(sessionEvents("ses_sub_B"), { holdUntilAbort: signal }) + }, + }, + ) + const trace = await waitFor( + () => readTraceFile("ses_sub_B"), + (t) => t.summary.totalToolCalls >= 1, + ) + expect(trace.summary.totalToolCalls).toBe(1) + expect(calls).toBeGreaterThanOrEqual(2) + await sub.stop() + }) }) diff --git a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts index f00a733ab3..7e1b7d73d7 100644 --- a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts +++ b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts @@ -165,7 +165,7 @@ describe("invariant: event-type literals match between producer and consumers", // (also used by `serve`) — the literals live there now. { file: "altimate/observability/trace-consumer.ts", - types: ["message.updated", "message.part.updated", "session.updated"], + types: ["message.updated", "message.part.updated", "session.updated", "session.deleted"], }, { file: "cli/cmd/run.ts", types: ["message.updated", "message.part.updated"] }, { file: "acp/agent.ts", types: ["message.part.updated"] }, From c09a4867d70f20d4d47c2474e6dcfa6e481704a0 Mon Sep 17 00:00:00 2001 From: anandgupta42 Date: Mon, 8 Jun 2026 01:37:29 -0700 Subject: [PATCH 7/7] fix: address cubic review on serve trace lifecycle (#886) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the consensus-review fixes, addressing cubic-dev-ai comments: - serve.ts: SIGINT/SIGTERM shutdown now exits with signal-conventional codes (130 / 143) instead of 0, so a signalled stop isn't masked as a successful run. beforeExit still exits 0. Matches cli/cmd/run.ts. - trace-consumer.ts (loadConfig): a config-load error no longer disables tracing — it keeps `enabled` true and falls back to Trace.create()'s default exporter (the original "must not prevent the host from working" intent), but now logs a warning so the fallback isn't silent. - trace-consumer.ts (getOrCreateTrace): drop the dead `Trace.setActive(trace)` call. It sets a process-global active trace that nothing reads and that is a footgun in multi-session serve (last-event-wins); per-session routing is via the sessionTraces map. Verified: marker guard, typecheck, and trace-consumer / tracing-followups / worker-trace-clearing / bridge-merge-invariants suites all green. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/altimate/observability/trace-consumer.ts | 15 +++++++++++---- packages/opencode/src/cli/cmd/serve.ts | 13 ++++++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/opencode/src/altimate/observability/trace-consumer.ts b/packages/opencode/src/altimate/observability/trace-consumer.ts index b054b204cf..cea184393d 100644 --- a/packages/opencode/src/altimate/observability/trace-consumer.ts +++ b/packages/opencode/src/altimate/observability/trace-consumer.ts @@ -91,9 +91,12 @@ export class TraceConsumer { this.exporters = exporters this.maxFiles = tc?.maxFiles } catch (error) { - // Config failure should not prevent the host (TUI/serve) from working - this.enabled = false - Log.Default.debug("[tracing] failed to load config, disabling", { + // Config failure must not prevent the host (TUI/serve) from tracing: + // leave `enabled` true and `exporters` undefined so getOrCreateTrace + // falls back to Trace.create()'s default FileExporter. Warn so the + // fallback isn't silent (the original concern was the lack of any signal, + // not the fallback itself). + Log.Default.warn("[tracing] failed to load config, using default tracer", { error: error instanceof Error ? error.message : String(error), }) } @@ -139,7 +142,11 @@ export class TraceConsumer { void trace.endTrace().catch(() => {}) return this.sessionTraces.get(sessionID) ?? null } - Trace.setActive(trace) + // Intentionally NOT calling Trace.setActive() here: it sets a single + // process-global active trace, which is meaningless (and a footgun) for a + // multi-session consumer where serve runs many sessions concurrently — + // whichever session's event arrived last would win. Per-session routing + // is via the sessionTraces map; nothing reads Trace.active on this path. this.sessionTraces.set(sessionID, trace) return trace } catch (error) { diff --git a/packages/opencode/src/cli/cmd/serve.ts b/packages/opencode/src/cli/cmd/serve.ts index 7c1c08b2d0..76396951cf 100644 --- a/packages/opencode/src/cli/cmd/serve.ts +++ b/packages/opencode/src/cli/cmd/serve.ts @@ -37,16 +37,19 @@ export const ServeCommand = cmd({ // left un-finalized (status never "completed", no summary/narrative). // Mirrors the SIGINT/SIGTERM/beforeExit pattern in cli/cmd/run.ts. let isShuttingDown = false - const shutdown = async () => { + const shutdown = async (code: number) => { if (isShuttingDown) return isShuttingDown = true await traceSub.stop() await server.stop() - process.exit(0) + process.exit(code) } - process.once("SIGINT", () => void shutdown()) - process.once("SIGTERM", () => void shutdown()) - process.once("beforeExit", () => void shutdown()) + // Exit with signal-conventional codes (128 + signal number) so a + // SIGINT/SIGTERM isn't masked as a successful (0) run. beforeExit is a + // normal drain, so it exits 0. Matches cli/cmd/run.ts. + process.once("SIGINT", () => void shutdown(130)) + process.once("SIGTERM", () => void shutdown(143)) + process.once("beforeExit", () => void shutdown(0)) // altimate_change end await new Promise(() => {})