-
Notifications
You must be signed in to change notification settings - Fork 0
feat(server): ContexGin HTTP boot context + replay fixes #365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,6 +114,10 @@ export function detectStateMismatch( | |
| const found = registry.findBySessionId(sessionId); | ||
| const state = store.getSessionState(sessionId); | ||
|
|
||
| // CLOSING sessions are in graceful shutdown — registry entry may or may not | ||
| // exist depending on timing, so don't flag mismatches for them. | ||
| if (state === 'CLOSING') return { mismatch: false }; | ||
|
|
||
| const registryHas = !!found; | ||
| const shouldHave = state != null && state !== 'ENDED'; | ||
|
|
||
|
|
@@ -133,7 +137,7 @@ export function detectStateMismatch( | |
|
|
||
| if (found && state) { | ||
| const attached = registry.isAttached(found.clientId); | ||
| const shouldBeAttached = state === 'ACTIVE' || state === 'CLOSING'; | ||
| const shouldBeAttached = state === 'ACTIVE'; | ||
| const shouldBeDetached = state === 'DETACHED' || state === 'SUSPENDED'; | ||
|
|
||
| if (attached && shouldBeDetached) { | ||
|
|
@@ -153,6 +157,47 @@ export function detectStateMismatch( | |
| return { mismatch: false }; | ||
| } | ||
|
|
||
| // ─── Boot context replay helper ───────────────────────────────────────────── | ||
|
|
||
| /** | ||
| * Send cached boot_context to a connection for a given session. | ||
| * Hot path: in-memory ManagedSession cache. | ||
| * Cold path: serialized JSON from EventStore (ended/restarted sessions). | ||
| */ | ||
| function sendBootContext(connectionId: string, sessionId: string, ctx: V2HandlerContext): void { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 missing_tests: The new |
||
| const conn = ctx.connRegistry.get(connectionId); | ||
| if (!conn) return; | ||
|
|
||
| // Hot path: running session with in-memory cache | ||
| const found = ctx.sessionRegistry.findBySessionId(sessionId); | ||
| if (found?.session?.bootContext) { | ||
| conn.transport.send({ | ||
| type: 'boot_context', | ||
| sessionId, | ||
| ...found.session.bootContext, | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| // Cold path: ended/non-running session — read from EventStore | ||
| const meta = ctx.eventStore.getSession(sessionId); | ||
| if (meta?.bootContext) { | ||
| try { | ||
| const parsed = JSON.parse(meta.bootContext); | ||
| conn.transport.send({ | ||
| type: 'boot_context', | ||
| sessionId, | ||
| ...parsed, | ||
| }); | ||
| } catch (err) { | ||
| log.warn('invalid boot_context JSON in EventStore', { | ||
| sessionId, | ||
| error: err instanceof Error ? err.message : String(err), | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ─── Handlers ──────────────────────────────────────────────────────────────── | ||
|
|
||
| export function handleHello( | ||
|
|
@@ -280,13 +325,9 @@ export function handleReconnect( | |
| running, | ||
| }); | ||
|
|
||
| // Re-send cached boot_context so pills reappear after reconnect | ||
| if (found && running && found.session?.bootContext) { | ||
| ctx.connRegistry.get(connectionId)?.transport.send({ | ||
| type: 'boot_context', | ||
| ...found.session.bootContext, | ||
| }); | ||
| } | ||
| // Re-send boot_context so pills reappear after reconnect. | ||
| // Uses shared helper with hot (in-memory) + cold (EventStore) paths. | ||
| sendBootContext(connectionId, entry.sessionId, ctx); | ||
|
|
||
| log.info('reconnect replay', { | ||
| connectionId, | ||
|
|
@@ -395,24 +436,8 @@ export async function handleSwitchSession( | |
| }); | ||
|
|
||
| // Re-send boot_context so pills appear on session switch. | ||
| // Hot path: running session in SessionRegistry (in-memory cache). | ||
| // Cold path: ended session — read serialized JSON from EventStore. | ||
| if (found?.session?.bootContext) { | ||
| ctx.connRegistry.get(connectionId)?.transport.send({ | ||
| type: 'boot_context', | ||
| ...found.session.bootContext, | ||
| }); | ||
| } else if (sessionMeta.bootContext) { | ||
| try { | ||
| const parsed = JSON.parse(sessionMeta.bootContext); | ||
| ctx.connRegistry.get(connectionId)?.transport.send({ | ||
| type: 'boot_context', | ||
| ...parsed, | ||
| }); | ||
| } catch { | ||
| // Invalid JSON — skip | ||
| } | ||
| } | ||
| // Uses shared helper with hot (in-memory) + cold (EventStore) paths. | ||
| sendBootContext(connectionId, msg.sessionId, ctx); | ||
|
|
||
| log.info('switch_session', { connectionId, sessionId: msg.sessionId, running }); | ||
| }, | ||
|
|
@@ -499,6 +524,8 @@ export function handleSendV2( | |
| } | ||
|
|
||
| // State-based routing (Phase 3): durable state is the single source of truth. | ||
| // ACTIVE/DETACHED/SUSPENDED → running path (send to existing query loop) | ||
| // CLOSING/ENDED/null → resume path (zombie cleanup first if needed) | ||
| if ( | ||
| found && | ||
| isActive(found.clientId) && | ||
|
|
@@ -652,6 +679,8 @@ export function handleInterruptV2( | |
| } | ||
|
|
||
| // State-based routing (Phase 3): durable state is the single source of truth. | ||
| // ACTIVE/DETACHED/SUSPENDED → running path (interrupt existing query loop) | ||
| // CLOSING/ENDED/null → resume path (zombie cleanup first if needed) | ||
| if ( | ||
| found && | ||
| isActive(found.clientId) && | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 regressions:
s.bootContext = msg as unknown as Record<string, unknown>stores the full BootContextMessage (which includestype: 'boot_context'). WhensendBootContextin ws-handler-v2.ts replays this via{ type: 'boot_context', sessionId, ...found.session.bootContext }, the spread already containstype: 'boot_context'from the cached message, so the explicittypeis redundant but harmless. However, the cached object does NOT containsessionId, so the replay correctly adds it. This is fine but the doubletypefield is messy — consider strippingtypebefore caching.[fixable]