diff --git a/server/__tests__/ws-handler-v2.test.ts b/server/__tests__/ws-handler-v2.test.ts index 347eb964..e13abbbe 100644 --- a/server/__tests__/ws-handler-v2.test.ts +++ b/server/__tests__/ws-handler-v2.test.ts @@ -323,6 +323,134 @@ describe('handleReconnect', () => { }); }); +// ─── boot_context replay (sendBootContext helper, tested via handlers) ────── + +describe('boot_context replay', () => { + it('handleReconnect sends boot_context with sessionId from in-memory cache', () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ + clientId: 'c1:sess-1', + session: { bootContext: { source: 'contexgin', tokenCount: 100 } }, + }); + sessionReg.isActive.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ isActive: true }); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleReconnect( + 'c1', + { type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] }, + ctx, + ); + + const bootMsg = transport.sent.find((m) => m.type === 'boot_context'); + expect(bootMsg).toBeDefined(); + expect(bootMsg).toHaveProperty('sessionId', 'sess-1'); + expect(bootMsg).toHaveProperty('source', 'contexgin'); + }); + + it('handleReconnect uses cold-path EventStore when no in-memory cache', () => { + const sessionReg = mockSessionRegistry(); + // No in-memory bootContext — session not in registry + sessionReg.findBySessionId.mockReturnValue(null); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + isActive: false, + bootContext: JSON.stringify({ source: 'contexgin', tokenCount: 50 }), + }); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleReconnect( + 'c1', + { type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] }, + ctx, + ); + + const bootMsg = transport.sent.find((m) => m.type === 'boot_context'); + expect(bootMsg).toBeDefined(); + expect(bootMsg).toHaveProperty('sessionId', 'sess-1'); + expect(bootMsg).toHaveProperty('source', 'contexgin'); + }); + + it('handleSwitchSession sends boot_context with sessionId', async () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ + clientId: 'c1:sess-1', + session: { bootContext: { source: 'local-fallback', tokenCount: 0 } }, + }); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + const bootMsg = transport.sent.find((m) => m.type === 'boot_context'); + expect(bootMsg).toBeDefined(); + expect(bootMsg).toHaveProperty('sessionId', 'sess-1'); + }); + + it('logs warning on invalid JSON in EventStore cold path', () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue(null); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + isActive: false, + bootContext: '{not valid json', + }); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + // Should not throw — logs warning instead + handleReconnect( + 'c1', + { type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] }, + ctx, + ); + + const bootMsg = transport.sent.find((m) => m.type === 'boot_context'); + expect(bootMsg).toBeUndefined(); // No boot_context sent on invalid JSON + }); +}); + // ─── handleWatch / handleUnwatch ───────────────────────────────────────────── describe('handleWatch', () => { @@ -3239,7 +3367,7 @@ describe('detectStateMismatch', () => { expect(result.details).toContain('transport detached but state=ACTIVE'); }); - it('detects detached transport but CLOSING state', () => { + it('ignores CLOSING state (graceful shutdown — registry state is indeterminate)', () => { const reg = mockSessionRegistry(); const store = mockEventStore(); reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); @@ -3251,8 +3379,7 @@ describe('detectStateMismatch', () => { reg as unknown as V2HandlerContext['sessionRegistry'], store as unknown as V2HandlerContext['eventStore'], ); - expect(result.mismatch).toBe(true); - expect(result.details).toContain('transport detached but state=CLOSING'); + expect(result.mismatch).toBe(false); }); it('allows STARTING state in registry regardless of attach state', () => { diff --git a/server/chat.ts b/server/chat.ts index e19a0da3..b4856f57 100644 --- a/server/chat.ts +++ b/server/chat.ts @@ -913,12 +913,23 @@ async function _startChatInner( // Fire-and-forget: fetch boot context from running ContexGin server fetchBootContext(agentName) .then((msg) => { - send(transport, msg); + // Include sessionId for consistency with reconnect/switch replay paths + const sid = options.resume ?? registry.get(clientId)?.sessionId; + send(transport, { ...msg, ...(sid ? { sessionId: sid } : {}) }); // Cache in ManagedSession for replay on reconnect/switch const s = registry.get(clientId); if (s) s.bootContext = msg as unknown as Record; + // Persist to EventStore for cold-path recovery (resumed sessions + // may not run the query loop long enough for it to upsert). + if (sid) { + eventStore.upsertSession({ sessionId: sid, bootContext: JSON.stringify(msg) }); + } }) - .catch(() => {}); + .catch((err: unknown) => { + log.warn('boot context fetch unexpected error', { + error: err instanceof Error ? err.message : String(err), + }); + }); capturePromptComparison(wtId, cwd, systemPromptAppend, repoWorktrees).catch(() => {}); // Resolve SDK session UUID for resume — worktree IDs are not valid SDK session IDs diff --git a/server/ws-handler-v2.ts b/server/ws-handler-v2.ts index 286d95ee..23376b84 100644 --- a/server/ws-handler-v2.ts +++ b/server/ws-handler-v2.ts @@ -114,6 +114,10 @@ export function detectStateMismatch( const found = registry.findBySessionId(sessionId); const state = store.getSessionState(sessionId); + // CLOSING sessions are in graceful shutdown — registry entry may or may not + // exist depending on timing, so don't flag mismatches for them. + if (state === 'CLOSING') return { mismatch: false }; + const registryHas = !!found; const shouldHave = state != null && state !== 'ENDED'; @@ -133,7 +137,7 @@ export function detectStateMismatch( if (found && state) { const attached = registry.isAttached(found.clientId); - const shouldBeAttached = state === 'ACTIVE' || state === 'CLOSING'; + const shouldBeAttached = state === 'ACTIVE'; const shouldBeDetached = state === 'DETACHED' || state === 'SUSPENDED'; if (attached && shouldBeDetached) { @@ -153,6 +157,47 @@ export function detectStateMismatch( return { mismatch: false }; } +// ─── Boot context replay helper ───────────────────────────────────────────── + +/** + * Send cached boot_context to a connection for a given session. + * Hot path: in-memory ManagedSession cache. + * Cold path: serialized JSON from EventStore (ended/restarted sessions). + */ +function sendBootContext(connectionId: string, sessionId: string, ctx: V2HandlerContext): void { + const conn = ctx.connRegistry.get(connectionId); + if (!conn) return; + + // Hot path: running session with in-memory cache + const found = ctx.sessionRegistry.findBySessionId(sessionId); + if (found?.session?.bootContext) { + conn.transport.send({ + type: 'boot_context', + sessionId, + ...found.session.bootContext, + }); + return; + } + + // Cold path: ended/non-running session — read from EventStore + const meta = ctx.eventStore.getSession(sessionId); + if (meta?.bootContext) { + try { + const parsed = JSON.parse(meta.bootContext); + conn.transport.send({ + type: 'boot_context', + sessionId, + ...parsed, + }); + } catch (err) { + log.warn('invalid boot_context JSON in EventStore', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } + } +} + // ─── Handlers ──────────────────────────────────────────────────────────────── export function handleHello( @@ -280,13 +325,9 @@ export function handleReconnect( running, }); - // Re-send cached boot_context so pills reappear after reconnect - if (found && running && found.session?.bootContext) { - ctx.connRegistry.get(connectionId)?.transport.send({ - type: 'boot_context', - ...found.session.bootContext, - }); - } + // Re-send boot_context so pills reappear after reconnect. + // Uses shared helper with hot (in-memory) + cold (EventStore) paths. + sendBootContext(connectionId, entry.sessionId, ctx); log.info('reconnect replay', { connectionId, @@ -395,24 +436,8 @@ export async function handleSwitchSession( }); // Re-send boot_context so pills appear on session switch. - // Hot path: running session in SessionRegistry (in-memory cache). - // Cold path: ended session — read serialized JSON from EventStore. - if (found?.session?.bootContext) { - ctx.connRegistry.get(connectionId)?.transport.send({ - type: 'boot_context', - ...found.session.bootContext, - }); - } else if (sessionMeta.bootContext) { - try { - const parsed = JSON.parse(sessionMeta.bootContext); - ctx.connRegistry.get(connectionId)?.transport.send({ - type: 'boot_context', - ...parsed, - }); - } catch { - // Invalid JSON — skip - } - } + // Uses shared helper with hot (in-memory) + cold (EventStore) paths. + sendBootContext(connectionId, msg.sessionId, ctx); log.info('switch_session', { connectionId, sessionId: msg.sessionId, running }); }, @@ -499,6 +524,8 @@ export function handleSendV2( } // State-based routing (Phase 3): durable state is the single source of truth. + // ACTIVE/DETACHED/SUSPENDED → running path (send to existing query loop) + // CLOSING/ENDED/null → resume path (zombie cleanup first if needed) if ( found && isActive(found.clientId) && @@ -652,6 +679,8 @@ export function handleInterruptV2( } // State-based routing (Phase 3): durable state is the single source of truth. + // ACTIVE/DETACHED/SUSPENDED → running path (interrupt existing query loop) + // CLOSING/ENDED/null → resume path (zombie cleanup first if needed) if ( found && isActive(found.clientId) &&