diff --git a/packages/opencode/src/altimate/observability/trace-consumer.ts b/packages/opencode/src/altimate/observability/trace-consumer.ts new file mode 100644 index 000000000..cea184393 --- /dev/null +++ b/packages/opencode/src/altimate/observability/trace-consumer.ts @@ -0,0 +1,443 @@ +/** + * Shared event-stream → trace consumer. + * + * Feeds bus events (message.updated, message.part.updated, session.updated) + * into per-session Trace instances so every front-end that observes the event + * stream writes the same trace files to ~/.local/share/altimate-code/traces/. + * + * Extracted from cli/cmd/tui/worker.ts so the headless server + * (`altimate serve`, used by the VS Code "Altimate Code" chat panel) produces + * traces too — previously only the terminal entrypoints (TUI and `run`) + * instantiated a tracer, and chat sessions were never traced. + * + * The per-event logic here is a 1:1 port of the worker's inline handling so + * both front-ends behave identically: + * - traces are NOT finalized on `session.status: idle` — idle fires after + * every turn, not at session end; finalization happens on flush() + * (shutdown) and on MAX_TRACES eviction. Long-lived sessions keep their + * Trace in cache across turns. + * - cache-miss re-creation rehydrates the rich on-disk trace + * (`rehydrateFromFile`) instead of starting fresh and clobbering it. + * - a monotonic stream generation guards getOrCreateTrace against a + * concurrent reset() (the consumer's equivalent of the worker's + * startEventStream cache-clear) tearing the cache out mid-await. + * + * Consumers: + * - cli/cmd/tui/worker.ts — feeds events from its own SDK event loop + * - cli/cmd/serve.ts — uses subscribeTraceConsumer() below + */ +import { createOpencodeClient } from "@opencode-ai/sdk/v2" +import { setTimeout as sleep } from "node:timers/promises" +import { Config } from "@/config/config" +import { Log } from "@/util/log" +import { Server } from "@/server/server" +import { Flag } from "@/flag/flag" +import { Trace, FileExporter, HttpExporter, type TraceExporter } from "./tracing" + +const MAX_TRACES = 100 + +/** Minimal structural view of a bus event — narrowed at each read site. */ +interface BusEventLike { + type?: string + properties?: Record +} + +export class TraceConsumer { + private sessionTraces = new Map() + // Per-session user message IDs (cleaned up on session end) + private sessionUserMsgIds = new Map>() + // Monotonic stream generation. Bumped on every reset() so an in-flight + // getOrCreateTrace() can detect that its owning stream was torn down while + // it was suspended at the rehydrate await. Keyed on a counter rather than + // any object identity so the guard doesn't depend on caller behaviour. + private streamGeneration = 0 + + // Cached tracing config — loaded once at first use + private configLoaded = false + private enabled = true + private exporters: TraceExporter[] | undefined + private maxFiles: number | undefined + + /** + * Optional overrides bypass config loading entirely — used by tests to + * inject a FileExporter pointed at a temp directory. + */ + constructor(overrides?: { exporters?: TraceExporter[]; maxFiles?: number; enabled?: boolean }) { + if (overrides) { + this.configLoaded = true + this.enabled = overrides.enabled ?? true + this.exporters = overrides.exporters + this.maxFiles = overrides.maxFiles + } + } + + /** Load tracing config once. Safe to call repeatedly. */ + async loadConfig() { + if (this.configLoaded) return + this.configLoaded = true + try { + const cfg = await Config.get() + const tc = cfg.tracing + if (tc?.enabled === false) { + this.enabled = false + return + } + const exporters: TraceExporter[] = [new FileExporter(tc?.dir)] + if (tc?.exporters) { + for (const exp of tc.exporters) { + exporters.push(new HttpExporter(exp.name, exp.endpoint, exp.headers)) + } + } + this.exporters = exporters + this.maxFiles = tc?.maxFiles + } catch (error) { + // Config failure must not prevent the host (TUI/serve) from tracing: + // leave `enabled` true and `exporters` undefined so getOrCreateTrace + // falls back to Trace.create()'s default FileExporter. Warn so the + // fallback isn't silent (the original concern was the lack of any signal, + // not the fallback itself). + Log.Default.warn("[tracing] failed to load config, using default tracer", { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async getOrCreateTrace(sessionID: string): Promise { + if (!sessionID || !this.enabled) return null + if (this.sessionTraces.has(sessionID)) return this.sessionTraces.get(sessionID)! + // Capture the stream generation that owns this call so we can detect a + // concurrent reset() that cleared the cache while we were suspended at the + // rehydrate await below. + const generationAtEntry = this.streamGeneration + try { + if (this.sessionTraces.size >= MAX_TRACES) { + const oldest = this.sessionTraces.keys().next().value + if (oldest) { + Log.Default.warn(`[tracing] Evicting trace for session ${oldest} — ${MAX_TRACES} concurrent sessions reached`) + this.sessionTraces + .get(oldest) + ?.endTrace() + .catch(() => {}) + this.sessionTraces.delete(oldest) + this.sessionUserMsgIds.delete(oldest) + } + } + const trace = this.exporters + ? Trace.withExporters([...this.exporters], { maxFiles: this.maxFiles }) + : Trace.create() + // Prefer disk-rehydration on cache miss for an existing session (worker + // restart, MAX_TRACES eviction, post-turn re-creation). startTrace would + // push a fresh root span into empty `this.spans` and the immediate + // snapshot would clobber the rich on-disk file. + if (!(await trace.rehydrateFromFile(sessionID))) { + trace.startTrace(sessionID, {}) + } + // If a reset() replaced our stream while we were awaiting rehydrate, this + // Trace belongs to a stream that's already torn down and its cache + // cleared. Inserting it now would resurrect an orphan writer into the + // freshly-cleared map. Discard it and defer to the live stream. The check + // and the set below run in the same synchronous turn (no await between), + // so the insert can't race a later reset(). + if (this.streamGeneration !== generationAtEntry) { + void trace.endTrace().catch(() => {}) + return this.sessionTraces.get(sessionID) ?? null + } + // Intentionally NOT calling Trace.setActive() here: it sets a single + // process-global active trace, which is meaningless (and a footgun) for a + // multi-session consumer where serve runs many sessions concurrently — + // whichever session's event arrived last would win. Per-session routing + // is via the sessionTraces map; nothing reads Trace.active on this path. + this.sessionTraces.set(sessionID, trace) + return trace + } catch (error) { + Log.Default.debug("[tracing] getOrCreateTrace failed", { + error: error instanceof Error ? error.message : String(error), + }) + return null + } + } + + /** Feed one bus event into the per-session traces. Never throws. */ + async handleEvent(event: unknown): Promise { + try { + const e = event as BusEventLike + if (e.type === "message.updated") { + const info = e.properties?.info as Record | undefined + // Resolve sessionID: use info.sessionID directly, or fall back to + // finding the session via info.parentID (assistant messages may only + // carry the parent message ID, not the session ID). + let resolvedSessionID = info?.sessionID as string | undefined + if (!resolvedSessionID && info?.parentID) { + for (const [sid, msgIds] of this.sessionUserMsgIds) { + if (msgIds.has(info.parentID)) { + resolvedSessionID = sid + break + } + } + } + if (info && resolvedSessionID) { + // Create trace eagerly on user message (arrives before part events) + const trace = + this.sessionTraces.get(resolvedSessionID) ?? + (info.role === "user" ? await this.getOrCreateTrace(resolvedSessionID) : null) + if (info.role === "user") { + if (info.id) { + if (!this.sessionUserMsgIds.has(resolvedSessionID)) + this.sessionUserMsgIds.set(resolvedSessionID, new Set()) + this.sessionUserMsgIds.get(resolvedSessionID)!.add(info.id) + } + if (trace) { + const title = info.summary?.title || info.summary?.body + if (title) trace.setTitle(String(title).slice(0, 80), String(title)) + } + } + if (info.role === "assistant") { + const r = trace ?? (await this.getOrCreateTrace(resolvedSessionID)) + r?.enrichFromAssistant({ + modelID: info.modelID, + providerID: info.providerID, + agent: info.agent, + variant: info.variant, + }) + } + } + } + if (e.type === "message.part.updated") { + const part = e.properties?.part as Record | undefined + if (part) { + // Create trace on first event for this session (lazy creation) + const trace = this.sessionTraces.get(part.sessionID) ?? (await this.getOrCreateTrace(part.sessionID)) + if (trace) { + if (part.type === "step-start") trace.logStepStart(part as Parameters[0]) + if (part.type === "step-finish") trace.logStepFinish(part as Parameters[0]) + // altimate_change start — split the user-vs-assistant text routes. + // User text parts arrive without `time.end` set (it's a meaningful + // concept only for processing-end of assistant chunks), so the old + // `&& part.time?.end` gate dropped the prompt entirely. We trust + // `sessionUserMsgIds.has(messageID)` as the user-text signal and + // call `setPrompt(text)` only — never `setTitle` — to avoid racing + // the auto-generated title from `session.updated` (Path C). + if (part.type === "text") { + // altimate_change start — skip synthetic / ignored text parts. + // `Session.createUserMessage` (prompt.ts) attaches many `synthetic: true` + // text parts to the user message — MCP resource banners, decoded file + // contents, retry/reminder text, plan-mode reminders, agent-handoff + // tags. They all share the user's `messageID` so they would otherwise + // pass the `sessionUserMsgIds` check below and override `metadata.prompt` + // with the LAST synthetic blob (typically file content) and render one + // fake "▶ You" bubble per synthetic part in the chat tab. The synthetic + // and ignored flags exist precisely to mark non-authored content; this + // is exactly the place to consult them. We skip silently rather than + // `continue`-ing the event-loop iteration because the outer loop still + // needs to forward the event downstream via `Rpc.emit`. + const isAuthoredText = !part.synthetic && !part.ignored + // altimate_change end + if (isAuthoredText && part.messageID && this.sessionUserMsgIds.get(part.sessionID)?.has(part.messageID)) { + const text = String(part.text || "") + if (text) { + trace.setPrompt(text) + // altimate_change start — record each user message as a span + // so the chat tab can render multi-turn conversations. + // Without a span, the viewer can only display `metadata.prompt` + // (singular) and every subsequent user message is silently + // dropped from the conversation rendering. + trace.logUserMessage(text) + // altimate_change end + } + } else if (isAuthoredText && part.time?.end) { + // Assistant response text (only counts when processing-end fires) + trace.logText(part as Parameters[0]) + } + } + // altimate_change end + if (part.type === "tool" && (part.state?.status === "completed" || part.state?.status === "error")) { + trace.logToolCall(part as Parameters[0]) + } + } + } + } + // Capture session title from session.updated events + if (e.type === "session.updated") { + const info = e.properties?.info as Record | undefined + if (info?.id && info?.title) { + const trace = this.sessionTraces.get(info.id) + if (trace) trace.setTitle(String(info.title)) + } + } + // Finalize and evict on a real session-end signal. `session.deleted` is a + // true end-of-session event (unlike `idle`, which fires every turn), so + // it is the correct place to flush the trace and release per-session + // state. Without this, in a long-lived `serve` process the trace and its + // sessionUserMsgIds set live until MAX_TRACES eviction (or never, for + // <100 sessions), since reset()/flush() don't fire during normal serve. + if (e.type === "session.deleted") { + const info = e.properties?.info as Record | undefined + if (info?.id) { + const trace = this.sessionTraces.get(info.id) + if (trace) void trace.endTrace().catch(() => {}) + this.sessionTraces.delete(info.id) + this.sessionUserMsgIds.delete(info.id) + } + } + // DO NOT finalize the trace on session.status=idle. `idle` fires after + // every turn (busy → idle), not at session end. Finalizing per-turn would + // treat each turn as session end and the next turn's events would hit a + // cache miss; getOrCreateTrace rehydrates from disk as defense in depth, + // but the correct behaviour is to keep the Trace live across turns and + // finalize only on flush() (shutdown) and MAX_TRACES eviction. + } catch (error) { + // Trace must never interrupt event forwarding — log at debug only. + Log.Default.debug("[tracing] handleEvent error", { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + /** + * Bump the stream generation and end all in-flight traces fire-and-forget, + * then clear state. The worker calls this at the top of every + * startEventStream so stale per-stream state doesn't leak across stream + * instances; the generation bump invalidates any getOrCreateTrace suspended + * at its rehydrate await. + */ + reset() { + this.streamGeneration++ + for (const [, trace] of this.sessionTraces) { + void trace.endTrace().catch(() => {}) + } + this.sessionTraces.clear() + this.sessionUserMsgIds.clear() + } + + /** End all in-flight traces and wait for them. Used on shutdown. */ + async flush() { + for (const [, trace] of this.sessionTraces) { + await trace.endTrace().catch(() => {}) + } + this.sessionTraces.clear() + this.sessionUserMsgIds.clear() + } +} + +/** + * Subscribe to the in-process event stream and feed every event to a + * TraceConsumer. Mirrors the TUI worker's event loop for hosts that don't + * have one of their own — i.e. `altimate serve`, where sessions are driven + * over HTTP (the VS Code chat panel) and no other event consumer exists. + * + * Trace failures must never affect the server, so every step is best-effort. + */ +/** One subscription's event stream. */ +export interface TraceEventSource { + stream: AsyncIterable +} + +export interface TraceSubscribeOptions { + /** Test seam: inject the consumer (e.g. with a temp-dir FileExporter). */ + consumer?: TraceConsumer + /** + * Test seam: provide the event source. Resolves to a stream of bus events, + * or `undefined` to trigger a backoff+retry. Defaults to the in-process SDK + * subscription. Called once per (re)connect with the shutdown signal. + */ + subscribe?: (signal: AbortSignal) => Promise +} + +export function subscribeTraceConsumer( + input: { directory: string }, + options?: TraceSubscribeOptions, +): { stop: () => Promise } { + const consumer = options?.consumer ?? new TraceConsumer() + const abort = new AbortController() + const signal = abort.signal + + // Default event source: the in-process SDK subscription — same pattern as the + // TUI worker. The Bus is process-wide, so events published by sessions served + // by the TCP listener arrive on this subscription too. The SDK client is + // built once; `subscribe` is invoked per (re)connect. + const subscribe = + options?.subscribe ?? + (() => { + const fetchFn = (async (fetchInput: RequestInfo | URL, init?: RequestInit) => { + const request = new Request(fetchInput, init) + const password = Flag.OPENCODE_SERVER_PASSWORD + if (password) { + const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode" + request.headers.set("Authorization", `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`) + } + return Server.Default().fetch(request) + }) as typeof globalThis.fetch + + const sdk = createOpencodeClient({ + baseUrl: "http://altimate-code.internal", + directory: input.directory, + fetch: fetchFn, + signal, + }) + + return (sig: AbortSignal): Promise => + Promise.resolve(sdk.event.subscribe({}, { signal: sig })) + .then((r) => r as unknown as TraceEventSource) + .catch(() => undefined) + })() + + // Abortable sleep so a pending reconnect-backoff doesn't delay shutdown. + const BASE_BACKOFF_MS = 250 + const MAX_BACKOFF_MS = 30_000 + const backoffSleep = (ms: number) => sleep(ms, undefined, { signal }).catch(() => {}) + + const loopPromise = (async () => { + await consumer.loadConfig() + // Exponential backoff on repeated failure so a durably-down stream doesn't + // hot-loop; reset to the base delay after a successful subscription. + let backoff = BASE_BACKOFF_MS + while (!signal.aborted) { + const events = await subscribe(signal).catch(() => undefined) + + if (!events) { + await backoffSleep(backoff) + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS) + continue + } + backoff = BASE_BACKOFF_MS + + // The try/catch MUST sit inside the while loop: a mid-stream throw + // (network disconnect, server hiccup) would otherwise escape to the + // outer .catch and kill the loop permanently for the server's lifetime. + try { + for await (const event of events.stream) { + await consumer.handleEvent(event) + } + } catch (err) { + if (!signal.aborted) { + Log.Default.warn("[tracing] trace event stream disconnected, reconnecting", { + error: err instanceof Error ? err.message : String(err), + }) + } + } + + if (!signal.aborted) { + await backoffSleep(backoff) + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS) + } + } + })() + + loopPromise.catch((error: unknown) => { + Log.Default.error("trace event stream error", { + error: error instanceof Error ? error.message : error, + }) + }) + + return { + stop: async () => { + // Stop accepting new work, then DRAIN the loop before flushing so we + // don't finalize traces while a handleEvent() is still mid-event. + // Bounded by a timeout so an unresponsive stream can't hang shutdown — + // flush() finalizes whatever is in the cache either way. + abort.abort() + await Promise.race([loopPromise.catch(() => {}), sleep(1000)]) + await consumer.flush() + }, + } +} diff --git a/packages/opencode/src/cli/cmd/serve.ts b/packages/opencode/src/cli/cmd/serve.ts index cc7cb3c3c..76396951c 100644 --- a/packages/opencode/src/cli/cmd/serve.ts +++ b/packages/opencode/src/cli/cmd/serve.ts @@ -5,6 +5,9 @@ import { Flag } from "../../flag/flag" import { Workspace } from "../../control-plane/workspace" import { Project } from "../../project/project" import { Installation } from "../../installation" +// altimate_change start — trace: session tracing in headless serve +import { subscribeTraceConsumer } from "../../altimate/observability/trace-consumer" +// altimate_change end export const ServeCommand = cmd({ command: "serve", @@ -20,7 +23,35 @@ export const ServeCommand = cmd({ console.log(`altimate-code server listening on http://${server.hostname}:${server.port}`) // altimate_change end + // altimate_change start — trace: session tracing in headless serve + // Sessions driven over HTTP (e.g. the VS Code chat panel) have no TUI + // worker observing the event stream, so traces were never written in + // serve mode. Subscribe the shared trace consumer to the in-process + // event stream so serve sessions produce the same trace files as the + // terminal entrypoints. + const traceSub = subscribeTraceConsumer({ directory: process.cwd() }) + + // Finalize traces on shutdown. `serve` blocks forever on the promise below + // and otherwise dies abruptly on signal, so without these handlers the + // consumer's stop()/flush()/endTrace() never runs and serve traces are + // left un-finalized (status never "completed", no summary/narrative). + // Mirrors the SIGINT/SIGTERM/beforeExit pattern in cli/cmd/run.ts. + let isShuttingDown = false + const shutdown = async (code: number) => { + if (isShuttingDown) return + isShuttingDown = true + await traceSub.stop() + await server.stop() + process.exit(code) + } + // Exit with signal-conventional codes (128 + signal number) so a + // SIGINT/SIGTERM isn't masked as a successful (0) run. beforeExit is a + // normal drain, so it exits 0. Matches cli/cmd/run.ts. + process.once("SIGINT", () => void shutdown(130)) + process.once("SIGTERM", () => void shutdown(143)) + process.once("beforeExit", () => void shutdown(0)) + // altimate_change end + await new Promise(() => {}) - await server.stop() }, }) diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index 6c2829c8c..be4c12fa7 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -12,7 +12,7 @@ import type { BunWebSocketData } from "hono/bun" import { Flag } from "@/flag/flag" import { setTimeout as sleep } from "node:timers/promises" // altimate_change start — trace: session tracing in TUI -import { Trace, FileExporter, HttpExporter, type TraceExporter } from "@/altimate/observability/tracing" +import { TraceConsumer } from "@/altimate/observability/trace-consumer" // altimate_change end await Log.init({ @@ -47,113 +47,22 @@ const eventStream = { abort: undefined as AbortController | undefined, } -// altimate_change start — trace: monotonic stream generation. Bumped on every -// startEventStream() so an in-flight getOrCreateTrace() can detect that its -// owning stream was torn down while it was suspended at an await. Keyed on a -// counter rather than the AbortController's object identity so the guard does -// not silently depend on startEventStream always allocating a fresh controller. -let streamGeneration = 0 -// altimate_change end - -// altimate_change start — trace: per-session traces -const sessionTraces = new Map() -const sessionUserMsgIds = new Map>() // Per-session user message IDs (cleaned up on session end) -const MAX_TRACES = 100 - -// Cached tracing config — loaded once at first use -let tracingConfigLoaded = false -let tracingEnabled = true -let tracingExporters: TraceExporter[] | undefined -let tracingMaxFiles: number | undefined - -async function loadTracingConfig() { - if (tracingConfigLoaded) return - tracingConfigLoaded = true - try { - const cfg = await Config.get() - const tc = cfg.tracing - if (tc?.enabled === false) { tracingEnabled = false; return } - const exporters: TraceExporter[] = [new FileExporter(tc?.dir)] - if (tc?.exporters) { - for (const exp of tc.exporters) { - exporters.push(new HttpExporter(exp.name, exp.endpoint, exp.headers)) - } - } - tracingExporters = exporters - tracingMaxFiles = tc?.maxFiles - } catch { - // Config failure should not prevent TUI from working - } -} -// altimate_change end - -// altimate_change start — trace: get or create per-session trace -async function getOrCreateTrace(sessionID: string): Promise { - if (!sessionID || !tracingEnabled) return null - if (sessionTraces.has(sessionID)) return sessionTraces.get(sessionID)! - // altimate_change start — capture the stream generation that owns this call so - // we can detect a concurrent startEventStream() (e.g. setWorkspace) that - // aborted us and cleared the cache while we were suspended at the rehydrate - // await below. A counter (not AbortController identity) so we don't depend on - // startEventStream's allocation strategy. - const generationAtEntry = streamGeneration - // altimate_change end - try { - if (sessionTraces.size >= MAX_TRACES) { - const oldest = sessionTraces.keys().next().value - if (oldest) { - Log.Default.warn(`[tracing] Evicting trace for session ${oldest} — ${MAX_TRACES} concurrent sessions reached`) - sessionTraces.get(oldest)?.endTrace().catch(() => {}) - sessionTraces.delete(oldest) - sessionUserMsgIds.delete(oldest) - } - } - const trace = tracingExporters - ? Trace.withExporters([...tracingExporters], { maxFiles: tracingMaxFiles }) - : Trace.create() - // altimate_change start — prefer disk-rehydration on cache miss for an - // existing session (worker restart, MAX_TRACES eviction). startTrace would - // push a fresh root span into empty `this.spans` and the immediate - // snapshot would clobber the rich on-disk file. Defense in depth in - // addition to keeping the cache alive across turns. - // Async to keep the event-stream loop unblocked on large existing traces. - if (!(await trace.rehydrateFromFile(sessionID))) { - trace.startTrace(sessionID, {}) - } - // altimate_change end - // altimate_change start — if a new stream replaced ours while we were - // awaiting rehydrate, this Trace belongs to a stream that's already been - // aborted and its cache cleared. Inserting it now would resurrect an orphan - // writer into the freshly-cleared map. Discard it and defer to whatever the - // live stream has. The check and the set below run in the same synchronous - // turn (no await between them), so the insert can't race a later - // startEventStream — this closes the suspend-at-await hole specifically. - if (streamGeneration !== generationAtEntry) { - void trace.endTrace().catch(() => {}) - return sessionTraces.get(sessionID) ?? null - } - Trace.setActive(trace) - sessionTraces.set(sessionID, trace) - return trace - // altimate_change end - } catch { - return null - } -} +// altimate_change start — trace: per-session traces (shared consumer) +// All per-session trace state + event handling lives in TraceConsumer so the +// headless `serve` entrypoint (VS Code chat panel) gets identical behaviour. +// reset() bumps the consumer's stream generation (the equivalent of the old +// inline cache-clear) to invalidate any in-flight rehydrate. +const traceConsumer = new TraceConsumer() // altimate_change end const startEventStream = (input: { directory: string; workspaceID?: string }) => { if (eventStream.abort) eventStream.abort.abort() - // altimate_change start — new stream generation; invalidates any in-flight - // getOrCreateTrace() suspended at its rehydrate await (see generationAtEntry). - streamGeneration++ + // altimate_change start — trace: clear stale per-stream trace state before + // starting a new stream instance. reset() also bumps the consumer's stream + // generation, invalidating any in-flight getOrCreateTrace() suspended at its + // rehydrate await. + traceConsumer.reset() // altimate_change end - // Clear stale per-stream trace state before starting a new stream instance - for (const [, trace] of sessionTraces) { - void trace.endTrace().catch(() => {}) - } - sessionTraces.clear() - sessionUserMsgIds.clear() const abort = new AbortController() eventStream.abort = abort @@ -175,8 +84,9 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => }) ;(async () => { - // Load tracing config once before processing events - await loadTracingConfig() + // altimate_change start — trace: load tracing config once before processing events + await traceConsumer.loadConfig() + // altimate_change end while (!signal.aborted) { const events = await Promise.resolve( sdk.event.subscribe( @@ -194,135 +104,7 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => for await (const event of events.stream) { // altimate_change start — trace: feed events to per-session trace - try { - if (event.type === "message.updated") { - const info = (event as any).properties?.info - // Resolve sessionID: use info.sessionID directly, or fall back to - // finding the session via info.parentID (assistant messages may only - // carry the parent message ID, not the session ID). - let resolvedSessionID = info?.sessionID as string | undefined - if (!resolvedSessionID && info?.parentID) { - for (const [sid, msgIds] of sessionUserMsgIds) { - if (msgIds.has(info.parentID)) { - resolvedSessionID = sid - break - } - } - } - if (resolvedSessionID) { - // Create trace eagerly on user message (arrives before part events) - const trace = - sessionTraces.get(resolvedSessionID) ?? - (info.role === "user" ? await getOrCreateTrace(resolvedSessionID) : null) - if (info.role === "user") { - if (info.id) { - if (!sessionUserMsgIds.has(resolvedSessionID)) sessionUserMsgIds.set(resolvedSessionID, new Set()) - sessionUserMsgIds.get(resolvedSessionID)!.add(info.id) - } - if (trace) { - const title = (info as any).summary?.title || (info as any).summary?.body - if (title) trace.setTitle(String(title).slice(0, 80), String(title)) - } - } - if (info.role === "assistant") { - const r = trace ?? (await getOrCreateTrace(resolvedSessionID)) - r?.enrichFromAssistant({ - modelID: info.modelID, - providerID: info.providerID, - agent: info.agent, - variant: info.variant, - }) - } - } - } - // altimate_change end - // altimate_change start — trace: part events - if (event.type === "message.part.updated") { - const part = (event as any).properties?.part - if (part) { - // Create trace on first event for this session (lazy creation) - const trace = sessionTraces.get(part.sessionID) ?? (await getOrCreateTrace(part.sessionID)) - if (trace) { - if (part.type === "step-start") trace.logStepStart(part) - if (part.type === "step-finish") trace.logStepFinish(part) - // altimate_change start — split the user-vs-assistant text routes. - // User text parts arrive without `time.end` set (it's a meaningful - // concept only for processing-end of assistant chunks), so the old - // `&& part.time?.end` gate dropped the prompt entirely. We trust - // `sessionUserMsgIds.has(messageID)` as the user-text signal and - // call `setPrompt(text)` only — never `setTitle` — to avoid racing - // the auto-generated title from `session.updated` (Path C). - if (part.type === "text") { - // altimate_change start — skip synthetic / ignored text parts. - // `Session.createUserMessage` (prompt.ts) attaches many `synthetic: true` - // text parts to the user message — MCP resource banners, decoded file - // contents, retry/reminder text, plan-mode reminders, agent-handoff - // tags. They all share the user's `messageID` so they would otherwise - // pass the `sessionUserMsgIds` check below and override `metadata.prompt` - // with the LAST synthetic blob (typically file content) and render one - // fake "▶ You" bubble per synthetic part in the chat tab. The synthetic - // and ignored flags exist precisely to mark non-authored content; this - // is exactly the place to consult them. We skip silently rather than - // `continue`-ing the event-loop iteration because the outer loop still - // needs to forward the event downstream via `Rpc.emit`. - const isAuthoredText = !part.synthetic && !part.ignored - // altimate_change end - if ( - isAuthoredText && - part.messageID && - sessionUserMsgIds.get(part.sessionID)?.has(part.messageID) - ) { - const text = String(part.text || "") - if (text) { - trace.setPrompt(text) - // altimate_change start — record each user message as a span - // so the chat tab can render multi-turn conversations. - // Without a span, the viewer can only display `metadata.prompt` - // (singular) and every subsequent user message is silently - // dropped from the conversation rendering. - trace.logUserMessage(text) - // altimate_change end - } - } else if (isAuthoredText && part.time?.end) { - // Assistant response text (only counts when processing-end fires) - trace.logText(part) - } - } - // altimate_change end - if (part.type === "tool" && (part.state?.status === "completed" || part.state?.status === "error")) { - trace.logToolCall(part) - } - } - } - } - // altimate_change end - // altimate_change start — trace: session title capture and finalization - // Capture session title from session.updated events - if (event.type === "session.updated") { - const info = (event as any).properties?.info - if (info?.id && info?.title) { - const trace = sessionTraces.get(info.id) - if (trace) trace.setTitle(String(info.title)) - } - } - // altimate_change start — DO NOT finalize the trace on session.status=idle. - // `idle` fires after every turn (busy → idle transition), not at session end. - // Calling `endTrace` + `sessionTraces.delete` here treats each turn as the - // end of the session: the next event for the same session in a later turn - // hits a cache miss in getOrCreateTrace, constructs a fresh Trace.create() - // with empty `this.spans`, and the immediate `snapshot()` clobbers the - // rich on-disk `ses_.json` with a single root-span file. Symptoms: - // - waterfall view collapses to the system-prompt span after every turn - // - "What was asked / No prompt recorded" because metadata.prompt was - // captured on the destroyed instance, never on the replacement - // Sessions in altimate-code are long-lived across many turns; the Trace - // should live as long as the worker has the session in cache. Finalization - // happens on `shutdown` (worker.ts:312) and on MAX_TRACES eviction - // (worker.ts:87). No per-turn finalization is correct. - // altimate_change end - } catch { - // Trace must never interrupt event forwarding - } + await traceConsumer.handleEvent(event) // altimate_change end Rpc.emit("event", event as Event) @@ -404,11 +186,7 @@ export const rpc = { Log.Default.info("worker shutting down") if (eventStream.abort) eventStream.abort.abort() // altimate_change start — trace: flush all active traces on shutdown - for (const [sid, trace] of sessionTraces) { - await trace.endTrace().catch(() => {}) - } - sessionTraces.clear() - sessionUserMsgIds.clear() + await traceConsumer.flush() // altimate_change end await Instance.disposeAll() if (server) server.stop(true) diff --git a/packages/opencode/test/altimate/trace-consumer.test.ts b/packages/opencode/test/altimate/trace-consumer.test.ts new file mode 100644 index 000000000..8bbe29b32 --- /dev/null +++ b/packages/opencode/test/altimate/trace-consumer.test.ts @@ -0,0 +1,403 @@ +/** + * Tests for the shared event-stream → trace consumer. + * + * The consumer is the extracted form of the TUI worker's inline tracing + * logic, now also wired into `altimate serve` so headless sessions (e.g. + * the VS Code chat panel) write trace files. These tests feed realistic + * bus-event sequences and assert trace files land on disk. + * + * Behaviour contracts mirrored from the worker: + * - traces are NOT finalized on `session.status: idle` (idle fires every + * turn); incremental snapshots are written as events arrive, and the + * trace is finalized on flush() (shutdown) / reset() / eviction. + * - cache-miss re-creation rehydrates the rich on-disk trace rather than + * clobbering it with a fresh empty one. + */ + +import { describe, expect, test, beforeEach, afterEach } from "bun:test" +import fs from "fs/promises" +import path from "path" +import os from "os" +import { + TraceConsumer, + subscribeTraceConsumer, + type TraceEventSource, +} from "../../src/altimate/observability/trace-consumer" +import { FileExporter, type TraceFile } from "../../src/altimate/observability/tracing" + +let tmpDir: string + +beforeEach(async () => { + tmpDir = path.join(os.tmpdir(), `trace-consumer-${Date.now()}-${Math.random().toString(36).slice(2)}`) + await fs.mkdir(tmpDir, { recursive: true }) +}) + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {}) +}) + +function makeConsumer() { + return new TraceConsumer({ exporters: [new FileExporter(tmpDir)] }) +} + +async function readTraceFile(sessionID: string): Promise { + const raw = await fs.readFile(path.join(tmpDir, `${sessionID}.json`), "utf8") + return JSON.parse(raw) as TraceFile +} + +async function feed(consumer: TraceConsumer, events: unknown[]) { + for (const event of events) { + await consumer.handleEvent(event) + } +} + +/** + * Poll until `predicate` holds, instead of a fixed sleep — snapshot/endTrace + * writes are async + debounced, so a fixed delay is flaky under CI load. + */ +async function waitFor(read: () => Promise, predicate: (v: T) => boolean, timeoutMs = 5000): Promise { + const start = Date.now() + for (;;) { + try { + const v = await read() + if (predicate(v)) return v + } catch { + // not ready yet (e.g. file not written) — keep polling + } + if (Date.now() - start > timeoutMs) throw new Error("waitFor: condition not met within timeout") + await new Promise((r) => setTimeout(r, 10)) + } +} + +/** + * Build a single-use event source for the `subscribeTraceConsumer` test seam. + * Optionally throws mid-stream (to exercise reconnect) or holds the stream open + * until the shutdown signal fires (to exercise stop()/drain). + */ +function eventSource( + events: unknown[], + opts?: { throwAfter?: number; holdUntilAbort?: AbortSignal }, +): TraceEventSource { + async function* gen() { + let i = 0 + for (const e of events) { + yield e + i++ + if (opts?.throwAfter !== undefined && i >= opts.throwAfter) throw new Error("simulated stream disconnect") + } + const sig = opts?.holdUntilAbort + if (sig) { + await new Promise((resolve) => { + if (sig.aborted) return resolve() + sig.addEventListener("abort", () => resolve(), { once: true }) + }) + } + } + return { stream: gen() } +} + +/** Event sequence mirroring what a real session emits over the bus. */ +function sessionEvents(sessionID: string) { + const now = Date.now() + return [ + { + type: "message.updated", + properties: { info: { id: "msg-user-1", sessionID, role: "user", time: { created: now } } }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID, messageID: "msg-user-1", type: "text", text: "list my files", time: { end: now } }, + }, + }, + { + type: "message.updated", + properties: { + info: { + id: "msg-asst-1", + sessionID, + parentID: "msg-user-1", + role: "assistant", + modelID: "gpt-4o", + providerID: "openai", + agent: "general", + time: { created: now }, + }, + }, + }, + { type: "message.part.updated", properties: { part: { sessionID, type: "step-start", id: "step-1" } } }, + { + type: "message.part.updated", + properties: { + part: { + sessionID, + type: "tool", + tool: "bash", + callID: "c1", + state: { + status: "completed", + input: { command: "ls" }, + output: "file1.ts", + time: { start: now - 1000, end: now }, + }, + }, + }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID, messageID: "msg-asst-1", type: "text", text: "Found 1 file.", time: { end: now } }, + }, + }, + { + type: "message.part.updated", + properties: { + part: { + sessionID, + type: "step-finish", + id: "step-1", + reason: "stop", + cost: 0.005, + tokens: { input: 500, output: 100, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + }, + }, + { type: "session.updated", properties: { info: { id: sessionID, title: "List files" } } }, + { type: "session.status", properties: { sessionID, status: { type: "idle" } } }, + ] +} + +describe("TraceConsumer", () => { + test("a full session sequence is captured and finalized on flush", async () => { + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_1")) + + // idle does NOT finalize the trace; flush() (shutdown) does — and writes + // the completed file with full metadata + summary. + await consumer.flush() + const trace = await readTraceFile("ses_consumer_1") + expect(trace.summary.status).toBe("completed") + expect(trace.metadata.model).toBe("openai/gpt-4o") + expect(trace.metadata.agent).toBe("general") + expect(trace.metadata.title).toBe("List files") + expect(trace.metadata.prompt).toBe("list my files") + expect(trace.spans.some((s) => s.kind === "tool")).toBe(true) + expect(trace.summary.totalToolCalls).toBe(1) + expect(trace.summary.totalCost).toBeCloseTo(0.005) + }) + + test("idle does not finalize — a second turn keeps appending to the same trace", async () => { + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_multiturn")) + + // Second turn on the same session AFTER idle. Pre-fix this hit a cache + // miss and clobbered the rich file with an empty one; now the trace is + // still live (no idle finalize) so it just appends. + await feed(consumer, [ + { + type: "message.updated", + properties: { + info: { id: "msg-user-2", sessionID: "ses_consumer_multiturn", role: "user", time: { created: Date.now() } }, + }, + }, + { + type: "message.part.updated", + properties: { + part: { + sessionID: "ses_consumer_multiturn", + type: "tool", + tool: "grep", + callID: "c2", + state: { + status: "completed", + input: { pattern: "x" }, + output: "hit", + time: { start: Date.now() - 10, end: Date.now() }, + }, + }, + }, + }, + { type: "session.status", properties: { sessionID: "ses_consumer_multiturn", status: { type: "idle" } } }, + ]) + await consumer.flush() + + const trace = await readTraceFile("ses_consumer_multiturn") + // Both turns' tool calls survived in one trace. + expect(trace.summary.totalToolCalls).toBe(2) + expect(trace.spans.filter((s) => s.kind === "tool").length).toBe(2) + }) + + test("cache-miss re-creation rehydrates the rich on-disk trace, not an empty one", async () => { + const sessionID = "ses_consumer_rehydrate" + const c1 = makeConsumer() + await feed(c1, sessionEvents(sessionID)) + await c1.flush() + const before = await readTraceFile(sessionID) + expect(before.summary.totalToolCalls).toBe(1) + + // A brand-new consumer (simulating worker restart) gets a late event for + // the same session. It must rehydrate the existing file, not overwrite it. + const c2 = makeConsumer() + await c2.handleEvent({ + type: "message.part.updated", + properties: { + part: { + sessionID, + type: "tool", + tool: "ls", + callID: "c-late", + state: { status: "completed", input: {}, output: "ok", time: { start: Date.now() - 5, end: Date.now() } }, + }, + }, + }) + await c2.flush() + + const after = await readTraceFile(sessionID) + // Original tool call preserved + the late one appended — nothing clobbered. + expect(after.summary.totalToolCalls).toBe(2) + }) + + test("two interleaved sessions write separate trace files", async () => { + const consumer = makeConsumer() + const a = sessionEvents("ses_consumer_a") + const b = sessionEvents("ses_consumer_b") + for (let i = 0; i < Math.max(a.length, b.length); i++) { + if (a[i]) await consumer.handleEvent(a[i]) + if (b[i]) await consumer.handleEvent(b[i]) + } + await consumer.flush() + + const traceA = await readTraceFile("ses_consumer_a") + const traceB = await readTraceFile("ses_consumer_b") + expect(traceA.summary.status).toBe("completed") + expect(traceB.summary.status).toBe("completed") + expect(traceA.summary.totalToolCalls).toBe(1) + expect(traceB.summary.totalToolCalls).toBe(1) + }) + + test("malformed events never throw", async () => { + const consumer = makeConsumer() + const malformed = [ + null, + undefined, + {}, + { type: "message.updated" }, + { type: "message.updated", properties: null }, + { type: "message.updated", properties: { info: null } }, + { type: "message.part.updated", properties: null }, + { type: "message.part.updated", properties: { part: { type: "tool" } } }, + { type: "session.updated", properties: {} }, + { type: "session.status", properties: { status: null } }, + { type: "session.status", properties: { sessionID: "nope", status: { type: "idle" } } }, + "not-an-object", + 42, + ] + for (const event of malformed) { + await expect(consumer.handleEvent(event)).resolves.toBeUndefined() + } + }) + + test("disabled consumer writes nothing", async () => { + const consumer = new TraceConsumer({ exporters: [new FileExporter(tmpDir)], enabled: false }) + await feed(consumer, sessionEvents("ses_consumer_off")) + await consumer.flush() + const files = await fs.readdir(tmpDir) + expect(files.length).toBe(0) + }) + + test("reset finalizes in-flight traces and clears state", async () => { + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_reset")) + consumer.reset() + // reset finalizes fire-and-forget; poll for the write instead of a fixed + // sleep so the test isn't flaky under CI load. + const trace = await waitFor( + () => readTraceFile("ses_consumer_reset"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) + expect(trace.spans.some((s) => s.kind === "tool")).toBe(true) + }) + + test("incremental snapshots land on disk BEFORE any flush (the serve path)", async () => { + // serve never calls flush() during normal operation — it relies entirely + // on the incremental snapshots written as events arrive. This is the path + // every other test skips by flushing first. + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_noflush")) + const trace = await waitFor( + () => readTraceFile("ses_consumer_noflush"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) + expect(trace.summary.totalToolCalls).toBe(1) + expect(trace.metadata.prompt).toBe("list my files") + }) + + test("session.deleted finalizes the trace and releases per-session state", async () => { + const consumer = makeConsumer() + await feed(consumer, sessionEvents("ses_consumer_del")) + await consumer.handleEvent({ type: "session.deleted", properties: { info: { id: "ses_consumer_del" } } }) + // endTrace is fire-and-forget on session.deleted; poll for finalization. + const trace = await waitFor( + () => readTraceFile("ses_consumer_del"), + (t) => t.summary.status === "completed", + ) + expect(trace.summary.status).toBe("completed") + // State is already released, so a later flush must be a harmless no-op. + await consumer.flush() + }) +}) + +describe("subscribeTraceConsumer (serve integration)", () => { + test("stop() drains the loop and flushes — serve traces are finalized", async () => { + const consumer = makeConsumer() + let calls = 0 + const sub = subscribeTraceConsumer( + { directory: tmpDir }, + { + consumer, + subscribe: async (signal) => { + calls++ + // Deliver a full session on the first connect, then hold the stream + // open until shutdown so stop() exercises the drain path. + return eventSource(calls === 1 ? sessionEvents("ses_sub_stop") : [], { holdUntilAbort: signal }) + }, + }, + ) + // Wait until the session's events have been consumed (snapshot on disk). + await waitFor( + () => readTraceFile("ses_sub_stop"), + (t) => t.spans.some((s) => s.kind === "tool"), + ) + // Pre-stop the trace is not yet finalized; stop() must flush → "completed". + await sub.stop() + const trace = await readTraceFile("ses_sub_stop") + expect(trace.summary.status).toBe("completed") + expect(trace.summary.totalToolCalls).toBe(1) + }) + + test("a mid-stream throw does not kill the loop — it reconnects", async () => { + const consumer = makeConsumer() + let calls = 0 + const sub = subscribeTraceConsumer( + { directory: tmpDir }, + { + consumer, + subscribe: async (signal) => { + calls++ + // 1st connect throws mid-stream; 2nd connect delivers a different + // session in full. If the throw killed the loop, session B's file + // would never appear and the wait below would time out. + if (calls === 1) return eventSource(sessionEvents("ses_sub_A"), { throwAfter: 2 }) + return eventSource(sessionEvents("ses_sub_B"), { holdUntilAbort: signal }) + }, + }, + ) + const trace = await waitFor( + () => readTraceFile("ses_sub_B"), + (t) => t.summary.totalToolCalls >= 1, + ) + expect(trace.summary.totalToolCalls).toBe(1) + expect(calls).toBeGreaterThanOrEqual(2) + await sub.stop() + }) +}) diff --git a/packages/opencode/test/altimate/tracing-followups.test.ts b/packages/opencode/test/altimate/tracing-followups.test.ts index a0803cecb..df3035e71 100644 --- a/packages/opencode/test/altimate/tracing-followups.test.ts +++ b/packages/opencode/test/altimate/tracing-followups.test.ts @@ -138,44 +138,47 @@ describe("#903 — capSpansForSerialization bounds long-lived traces", () => { }) describe("#902 — getOrCreateTrace guards against resurrecting a Trace into a cleared cache", () => { - // worker.ts has module-scope side effects (it starts an event stream on - // import), so it can't be unit-tested in-process. Lock the guard's shape with - // a scope-bounded source contract, the same approach as - // worker-trace-clearing.test.ts. + // The getOrCreateTrace guard lives in the shared TraceConsumer now (so + // `altimate serve` traces too); the worker delegates to it. Lock the guard's + // shape with a scope-bounded source contract. The counter is a class member + // (`this.streamGeneration`) bumped in reset() — the consumer's equivalent of + // the worker's old startEventStream cache-clear. test("captures the stream generation before the rehydrate await and re-checks it after", async () => { - const workerSrc = await fs.readFile( - path.join(__dirname, "../../src/cli/cmd/tui/worker.ts"), + const consumerSrc = await fs.readFile( + path.join(__dirname, "../../src/altimate/observability/trace-consumer.ts"), "utf-8", ) // Ownership is keyed on a monotonic counter, not AbortController identity. - expect(workerSrc).toMatch(/let streamGeneration = 0/) + expect(consumerSrc).toMatch(/streamGeneration = 0/) // The owning generation is captured at entry, before any await. - expect(workerSrc).toMatch(/const generationAtEntry = streamGeneration/) - // A new stream bumps the counter, invalidating in-flight calls. - expect(workerSrc).toMatch(/streamGeneration\+\+/) - - // After awaiting rehydrate, if a new stream replaced ours, the freshly built - // Trace is discarded (ended) instead of being inserted into the cleared map. - const guard = workerSrc.match( - /if \(streamGeneration !== generationAtEntry\)[\s\S]*?trace\.endTrace\(\)[\s\S]*?return sessionTraces\.get\(sessionID\) \?\? null/, + expect(consumerSrc).toMatch(/const generationAtEntry = this\.streamGeneration/) + // A new stream (reset) bumps the counter, invalidating in-flight calls. + expect(consumerSrc).toMatch(/this\.streamGeneration\+\+/) + + // After awaiting rehydrate, if a reset() replaced our stream, the freshly + // built Trace is discarded (ended) instead of being inserted into the + // cleared map. + const guard = consumerSrc.match( + /if \(this\.streamGeneration !== generationAtEntry\)[\s\S]*?trace\.endTrace\(\)[\s\S]*?return this\.sessionTraces\.get\(sessionID\) \?\? null/, ) expect(guard).not.toBeNull() // The guard must sit AFTER the rehydrate await and BEFORE the cache insert, // otherwise it can't prevent the orphan write. - const awaitIdx = workerSrc.indexOf("await trace.rehydrateFromFile(sessionID)") - const guardIdx = workerSrc.indexOf("if (streamGeneration !== generationAtEntry)") - const setIdx = workerSrc.indexOf("sessionTraces.set(sessionID, trace)") + const awaitIdx = consumerSrc.indexOf("await trace.rehydrateFromFile(sessionID)") + const guardIdx = consumerSrc.indexOf("if (this.streamGeneration !== generationAtEntry)") + const setIdx = consumerSrc.indexOf("this.sessionTraces.set(sessionID, trace)") expect(awaitIdx).toBeGreaterThan(-1) expect(guardIdx).toBeGreaterThan(awaitIdx) expect(setIdx).toBeGreaterThan(guardIdx) - // The bump happens inside startEventStream so a workspace switch invalidates - // any suspended getOrCreateTrace. - const startIdx = workerSrc.indexOf("const startEventStream =") - const bumpIdx = workerSrc.indexOf("streamGeneration++") - expect(bumpIdx).toBeGreaterThan(startIdx) + // The bump happens inside reset() so a workspace switch (which calls reset) + // invalidates any suspended getOrCreateTrace. + const resetIdx = consumerSrc.indexOf("reset() {") + const bumpIdx = consumerSrc.indexOf("this.streamGeneration++") + expect(resetIdx).toBeGreaterThan(-1) + expect(bumpIdx).toBeGreaterThan(resetIdx) }) }) diff --git a/packages/opencode/test/cli/tui/worker-trace-clearing.test.ts b/packages/opencode/test/cli/tui/worker-trace-clearing.test.ts index 925eb2e4b..b0fbb1fb8 100644 --- a/packages/opencode/test/cli/tui/worker-trace-clearing.test.ts +++ b/packages/opencode/test/cli/tui/worker-trace-clearing.test.ts @@ -52,10 +52,7 @@ describe("trace-clearing-on-workspace-set regression", () => { }) test("session route's workspaceID effect uses `on()` so it only fires when workspaceID actually changes", async () => { - const routeSrc = await fs.readFile( - path.join(ROOT, "src/cli/cmd/tui/routes/session/index.tsx"), - "utf-8", - ) + const routeSrc = await fs.readFile(path.join(ROOT, "src/cli/cmd/tui/routes/session/index.tsx"), "utf-8") // The previous shape (`createEffect(() => { if (session()?.workspaceID) ... })`) // re-runs on every session() signal change. The fixed shape uses `on()` with @@ -85,11 +82,16 @@ describe("trace-clearing-on-workspace-set regression", () => { // replacement. The handler is now a no-op; finalization happens on worker // shutdown and MAX_TRACES eviction only. test("worker does NOT call endTrace+delete on session.status=idle", async () => { + // Idle handling lives in the shared TraceConsumer now; the destructive + // shape must not exist there (nor anywhere the worker delegates). const workerSrc = await fs.readFile(path.join(ROOT, "src/cli/cmd/tui/worker.ts"), "utf-8") + const consumerSrc = await fs.readFile(path.join(ROOT, "src/altimate/observability/trace-consumer.ts"), "utf-8") - // The destructive shape must not exist anywhere in the file. - expect(workerSrc).not.toMatch(/status === "idle"[\s\S]{0,200}sessionTraces\.delete/) - expect(workerSrc).not.toMatch(/status === "idle"[\s\S]{0,200}trace\.endTrace\(\)/) + // The destructive shape must not exist in either file. + for (const src of [workerSrc, consumerSrc]) { + expect(src).not.toMatch(/status === "idle"[\s\S]{0,200}sessionTraces\.delete/) + expect(src).not.toMatch(/status === "idle"[\s\S]{0,200}trace\.endTrace\(\)/) + } }) // Defense-in-depth: `getOrCreateTrace` on cache miss must try to load an @@ -97,7 +99,9 @@ describe("trace-clearing-on-workspace-set regression", () => { // worker restart or MAX_TRACES eviction recreates the trace empty and // the next snapshot clobbers the rich file. test("getOrCreateTrace prefers rehydrateFromFile over startTrace on cache miss", async () => { - const workerSrc = await fs.readFile(path.join(ROOT, "src/cli/cmd/tui/worker.ts"), "utf-8") + // The per-session trace logic was extracted into the shared TraceConsumer + // (so `altimate serve` traces too); the worker delegates to it. + const workerSrc = await fs.readFile(path.join(ROOT, "src/altimate/observability/trace-consumer.ts"), "utf-8") // `rehydrateFromFile` is async (off-loads disk I/O from the event-stream // hot path per cubic P2 review on tracing.ts:515) so the call must be // awaited inside getOrCreateTrace. @@ -112,7 +116,7 @@ describe("trace-clearing-on-workspace-set regression", () => { // title-agent's auto-generated title from `session.updated` and overwrites // it (e.g. "Greeting" → "hi"). See codex review feedback this round. test("user text part is captured via setPrompt, drops time.end precondition, never touches title", async () => { - const workerSrc = await fs.readFile(path.join(ROOT, "src/cli/cmd/tui/worker.ts"), "utf-8") + const workerSrc = await fs.readFile(path.join(ROOT, "src/altimate/observability/trace-consumer.ts"), "utf-8") // The old shape gated on `part.time?.end` for the entire user/assistant // branch — that shape must not be present anymore, because user-input @@ -129,7 +133,9 @@ describe("trace-clearing-on-workspace-set regression", () => { // The user-text branch must call setPrompt (not setTitle) so the auto- // generated session title from Path C isn't overwritten by raw user text. - expect(workerSrc).toMatch(/sessionUserMsgIds\.get\(part\.sessionID\)\?\.has\(part\.messageID\)[\s\S]{0,200}trace\.setPrompt/) + expect(workerSrc).toMatch( + /sessionUserMsgIds\.get\(part\.sessionID\)\?\.has\(part\.messageID\)[\s\S]{0,200}trace\.setPrompt/, + ) // The user-text branch must NOT call setTitle. expect(workerSrc).not.toMatch( /sessionUserMsgIds\.get\(part\.sessionID\)\?\.has\(part\.messageID\)[\s\S]{0,200}trace\.setTitle\(text/, @@ -145,12 +151,10 @@ describe("trace-clearing-on-workspace-set regression", () => { // a file blob) and the chat tab renders one fake "▶ You" bubble per synthetic // span — defeating the two display surfaces this PR fixes. test("user-text branch skips synthetic/ignored parts before calling setPrompt+logUserMessage", async () => { - const workerSrc = await fs.readFile(path.join(ROOT, "src/cli/cmd/tui/worker.ts"), "utf-8") + const workerSrc = await fs.readFile(path.join(ROOT, "src/altimate/observability/trace-consumer.ts"), "utf-8") // The synthetic+ignored gate must build the `isAuthoredText` predicate // from BOTH flags. Stronger than just searching for the literal anywhere. - expect(workerSrc).toMatch( - /const\s+isAuthoredText\s*=\s*!part\.synthetic\s*&&\s*!part\.ignored/, - ) + expect(workerSrc).toMatch(/const\s+isAuthoredText\s*=\s*!part\.synthetic\s*&&\s*!part\.ignored/) // Both write paths must sit inside the user-text branch (gated on the // `sessionUserMsgIds...has(...)` membership check) AND inside an // `if (text)` body whose contents don't cross block boundaries. @@ -165,10 +169,7 @@ describe("trace-clearing-on-workspace-set regression", () => { }) test("Trace.setPrompt exists and only mutates metadata.prompt", async () => { - const tracingSrc = await fs.readFile( - path.join(ROOT, "src/altimate/observability/tracing.ts"), - "utf-8", - ) + const tracingSrc = await fs.readFile(path.join(ROOT, "src/altimate/observability/tracing.ts"), "utf-8") // Method must exist with the documented signature. expect(tracingSrc).toMatch(/setPrompt\(prompt: string\)\s*\{[\s\S]{0,200}this\.metadata\.prompt = prompt/) // Must NOT touch metadata.title. diff --git a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts index 3cd67205c..7e1b7d73d 100644 --- a/packages/opencode/test/upstream/bridge-merge-invariants.test.ts +++ b/packages/opencode/test/upstream/bridge-merge-invariants.test.ts @@ -161,7 +161,12 @@ describe("invariant: event-type literals match between producer and consumers", file: "cli/cmd/tui/context/sync.tsx", types: ["message.updated", "message.removed", "message.part.updated", "message.part.removed"], }, - { file: "cli/cmd/tui/worker.ts", types: ["message.updated"] }, + // worker.ts's inline event handling moved into the shared trace consumer + // (also used by `serve`) — the literals live there now. + { + file: "altimate/observability/trace-consumer.ts", + types: ["message.updated", "message.part.updated", "session.updated", "session.deleted"], + }, { file: "cli/cmd/run.ts", types: ["message.updated", "message.part.updated"] }, { file: "acp/agent.ts", types: ["message.part.updated"] }, ]