Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 130 additions & 3 deletions server/__tests__/ws-handler-v2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,134 @@ describe('handleReconnect', () => {
});
});

// ─── boot_context replay (sendBootContext helper, tested via handlers) ──────

describe('boot_context replay', () => {
it('handleReconnect sends boot_context with sessionId from in-memory cache', () => {
const sessionReg = mockSessionRegistry();
sessionReg.findBySessionId.mockReturnValue({
clientId: 'c1:sess-1',
session: { bootContext: { source: 'contexgin', tokenCount: 100 } },
});
sessionReg.isActive.mockReturnValue(true);

const eventStore = mockEventStore();
eventStore.getSession.mockReturnValue({ isActive: true });

const ctx = createContext({
sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'],
eventStore: eventStore as unknown as V2HandlerContext['eventStore'],
});
const transport = mockTransport();
ctx.connRegistry.register('c1', transport);

handleReconnect(
'c1',
{ type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] },
ctx,
);

const bootMsg = transport.sent.find((m) => m.type === 'boot_context');
expect(bootMsg).toBeDefined();
expect(bootMsg).toHaveProperty('sessionId', 'sess-1');
expect(bootMsg).toHaveProperty('source', 'contexgin');
});

it('handleReconnect uses cold-path EventStore when no in-memory cache', () => {
const sessionReg = mockSessionRegistry();
// No in-memory bootContext — session not in registry
sessionReg.findBySessionId.mockReturnValue(null);

const eventStore = mockEventStore();
eventStore.getSession.mockReturnValue({
isActive: false,
bootContext: JSON.stringify({ source: 'contexgin', tokenCount: 50 }),
});

const ctx = createContext({
sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'],
eventStore: eventStore as unknown as V2HandlerContext['eventStore'],
});
const transport = mockTransport();
ctx.connRegistry.register('c1', transport);

handleReconnect(
'c1',
{ type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] },
ctx,
);

const bootMsg = transport.sent.find((m) => m.type === 'boot_context');
expect(bootMsg).toBeDefined();
expect(bootMsg).toHaveProperty('sessionId', 'sess-1');
expect(bootMsg).toHaveProperty('source', 'contexgin');
});

it('handleSwitchSession sends boot_context with sessionId', async () => {
const sessionReg = mockSessionRegistry();
sessionReg.findBySessionId.mockReturnValue({
clientId: 'c1:sess-1',
session: { bootContext: { source: 'local-fallback', tokenCount: 0 } },
});

const eventStore = mockEventStore();
eventStore.getSession.mockReturnValue({
sessionId: 'sess-1',
mode: 'agent',
cwd: '/test',
branch: 'main',
wtId: null,
inputTokens: 0,
outputTokens: 0,
cacheReadTokens: 0,
cacheCreationTokens: 0,
totalCostUsd: 0,
});
eventStore.getSessionState.mockReturnValue('ACTIVE');

const ctx = createContext({
sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'],
eventStore: eventStore as unknown as V2HandlerContext['eventStore'],
});
const transport = mockTransport();
ctx.connRegistry.register('c1', transport);

await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx);

const bootMsg = transport.sent.find((m) => m.type === 'boot_context');
expect(bootMsg).toBeDefined();
expect(bootMsg).toHaveProperty('sessionId', 'sess-1');
});

it('logs warning on invalid JSON in EventStore cold path', () => {
const sessionReg = mockSessionRegistry();
sessionReg.findBySessionId.mockReturnValue(null);

const eventStore = mockEventStore();
eventStore.getSession.mockReturnValue({
isActive: false,
bootContext: '{not valid json',
});

const ctx = createContext({
sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'],
eventStore: eventStore as unknown as V2HandlerContext['eventStore'],
});
const transport = mockTransport();
ctx.connRegistry.register('c1', transport);

// Should not throw — logs warning instead
handleReconnect(
'c1',
{ type: 'reconnect', sessions: [{ sessionId: 'sess-1', lastSeq: 0 }] },
ctx,
);

const bootMsg = transport.sent.find((m) => m.type === 'boot_context');
expect(bootMsg).toBeUndefined(); // No boot_context sent on invalid JSON
});
});

// ─── handleWatch / handleUnwatch ─────────────────────────────────────────────

describe('handleWatch', () => {
Expand Down Expand Up @@ -3239,7 +3367,7 @@ describe('detectStateMismatch', () => {
expect(result.details).toContain('transport detached but state=ACTIVE');
});

it('detects detached transport but CLOSING state', () => {
it('ignores CLOSING state (graceful shutdown — registry state is indeterminate)', () => {
const reg = mockSessionRegistry();
const store = mockEventStore();
reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} });
Expand All @@ -3251,8 +3379,7 @@ describe('detectStateMismatch', () => {
reg as unknown as V2HandlerContext['sessionRegistry'],
store as unknown as V2HandlerContext['eventStore'],
);
expect(result.mismatch).toBe(true);
expect(result.details).toContain('transport detached but state=CLOSING');
expect(result.mismatch).toBe(false);
});

it('allows STARTING state in registry regardless of attach state', () => {
Expand Down
15 changes: 13 additions & 2 deletions server/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -913,12 +913,23 @@ async function _startChatInner(
// Fire-and-forget: fetch boot context from running ContexGin server
fetchBootContext(agentName)
.then((msg) => {
send(transport, msg);
// Include sessionId for consistency with reconnect/switch replay paths
const sid = options.resume ?? registry.get(clientId)?.sessionId;
send(transport, { ...msg, ...(sid ? { sessionId: sid } : {}) });
// Cache in ManagedSession for replay on reconnect/switch
const s = registry.get(clientId);
if (s) s.bootContext = msg as unknown as Record<string, unknown>;
// Persist to EventStore for cold-path recovery (resumed sessions
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 regressions: s.bootContext = msg as unknown as Record<string, unknown> stores the full BootContextMessage (which includes type: 'boot_context'). When sendBootContext in ws-handler-v2.ts replays this via { type: 'boot_context', sessionId, ...found.session.bootContext }, the spread already contains type: 'boot_context' from the cached message, so the explicit type is redundant but harmless. However, the cached object does NOT contain sessionId, so the replay correctly adds it. This is fine but the double type field is messy — consider stripping type before caching. [fixable]

// may not run the query loop long enough for it to upsert).
if (sid) {
eventStore.upsertSession({ sessionId: sid, bootContext: JSON.stringify(msg) });
}
})
.catch(() => {});
.catch((err: unknown) => {
log.warn('boot context fetch unexpected error', {
error: err instanceof Error ? err.message : String(err),
});
});
capturePromptComparison(wtId, cwd, systemPromptAppend, repoWorktrees).catch(() => {});

// Resolve SDK session UUID for resume — worktree IDs are not valid SDK session IDs
Expand Down
81 changes: 55 additions & 26 deletions server/ws-handler-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ export function detectStateMismatch(
const found = registry.findBySessionId(sessionId);
const state = store.getSessionState(sessionId);

// CLOSING sessions are in graceful shutdown — registry entry may or may not
// exist depending on timing, so don't flag mismatches for them.
if (state === 'CLOSING') return { mismatch: false };

const registryHas = !!found;
const shouldHave = state != null && state !== 'ENDED';

Expand All @@ -133,7 +137,7 @@ export function detectStateMismatch(

if (found && state) {
const attached = registry.isAttached(found.clientId);
const shouldBeAttached = state === 'ACTIVE' || state === 'CLOSING';
const shouldBeAttached = state === 'ACTIVE';
const shouldBeDetached = state === 'DETACHED' || state === 'SUSPENDED';

if (attached && shouldBeDetached) {
Expand All @@ -153,6 +157,47 @@ export function detectStateMismatch(
return { mismatch: false };
}

// ─── Boot context replay helper ─────────────────────────────────────────────

/**
* Send cached boot_context to a connection for a given session.
* Hot path: in-memory ManagedSession cache.
* Cold path: serialized JSON from EventStore (ended/restarted sessions).
*/
function sendBootContext(connectionId: string, sessionId: string, ctx: V2HandlerContext): void {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 missing_tests: The new sendBootContext helper function has zero test coverage. No tests in ws-handler-v2.test.ts exercise boot_context replay (hot path from SessionRegistry or cold path from EventStore). The cold path includes JSON.parse which could fail — only the function's own catch handles it, but correctness of the hot/cold path selection and sessionId injection are untested. [fixable]

const conn = ctx.connRegistry.get(connectionId);
if (!conn) return;

// Hot path: running session with in-memory cache
const found = ctx.sessionRegistry.findBySessionId(sessionId);
if (found?.session?.bootContext) {
conn.transport.send({
type: 'boot_context',
sessionId,
...found.session.bootContext,
});
return;
}

// Cold path: ended/non-running session — read from EventStore
const meta = ctx.eventStore.getSession(sessionId);
if (meta?.bootContext) {
try {
const parsed = JSON.parse(meta.bootContext);
conn.transport.send({
type: 'boot_context',
sessionId,
...parsed,
});
} catch (err) {
log.warn('invalid boot_context JSON in EventStore', {
sessionId,
error: err instanceof Error ? err.message : String(err),
});
}
}
}

// ─── Handlers ────────────────────────────────────────────────────────────────

export function handleHello(
Expand Down Expand Up @@ -280,13 +325,9 @@ export function handleReconnect(
running,
});

// Re-send cached boot_context so pills reappear after reconnect
if (found && running && found.session?.bootContext) {
ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...found.session.bootContext,
});
}
// Re-send boot_context so pills reappear after reconnect.
// Uses shared helper with hot (in-memory) + cold (EventStore) paths.
sendBootContext(connectionId, entry.sessionId, ctx);

log.info('reconnect replay', {
connectionId,
Expand Down Expand Up @@ -395,24 +436,8 @@ export async function handleSwitchSession(
});

// Re-send boot_context so pills appear on session switch.
// Hot path: running session in SessionRegistry (in-memory cache).
// Cold path: ended session — read serialized JSON from EventStore.
if (found?.session?.bootContext) {
ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...found.session.bootContext,
});
} else if (sessionMeta.bootContext) {
try {
const parsed = JSON.parse(sessionMeta.bootContext);
ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...parsed,
});
} catch {
// Invalid JSON — skip
}
}
// Uses shared helper with hot (in-memory) + cold (EventStore) paths.
sendBootContext(connectionId, msg.sessionId, ctx);

log.info('switch_session', { connectionId, sessionId: msg.sessionId, running });
},
Expand Down Expand Up @@ -499,6 +524,8 @@ export function handleSendV2(
}

// State-based routing (Phase 3): durable state is the single source of truth.
// ACTIVE/DETACHED/SUSPENDED → running path (send to existing query loop)
// CLOSING/ENDED/null → resume path (zombie cleanup first if needed)
if (
found &&
isActive(found.clientId) &&
Expand Down Expand Up @@ -652,6 +679,8 @@ export function handleInterruptV2(
}

// State-based routing (Phase 3): durable state is the single source of truth.
// ACTIVE/DETACHED/SUSPENDED → running path (interrupt existing query loop)
// CLOSING/ENDED/null → resume path (zombie cleanup first if needed)
if (
found &&
isActive(found.clientId) &&
Expand Down
Loading