Worker: A3 (ensemble: ontology spine, independent)
Crate: crates/cognitive-shader-driver
Verified against: live source tree at HEAD of default branch (axum 0.8 + tokio 1, pub fn router(driver: ShaderDriver) -> Router in src/serve.rs registering 14 routes under /v1/shader/*).
Why
The cognitive shader driver runs a tight perception-inference cycle: each tick processes inputs, emits scene transitions, and may commit triplets into the NARS-flavoured semantic graph. Today consumers can only observe this cycle by polling discrete /v1/shader/* endpoints — there is no push channel.
The q2 cockpit (the "scene player" surface) needs to subscribe to the cycle stream so it can render scene transitions in real time without polling. Polling a 14-endpoint surface at frame rate is wasteful and racy; missed transitions lose narrative continuity.
Server-Sent Events (SSE) is the right transport here:
- One-way (server → cockpit) matches the data flow.
- Plain HTTP, no protocol upgrade, works through every proxy q2 already runs behind.
- Native
EventSource in browsers and any reasonable HTTP client.
- Trivially co-locates with the existing axum 0.8 router — no new transport stack.
A WebSocket would also work but adds bidirectionality the cockpit does not need, plus a heartbeat / framing layer we'd otherwise re-implement on top of axum's already-built SSE keep-alive.
What
Add a single new route to the existing cognitive-shader-driver axum router:
- Endpoint:
GET /v1/shader/stream
- Content-Type:
text/event-stream
- Event cadence: one SSE event per completed shader cycle.
- Event body (JSON):
cycle — cycle metadata (sequence number, monotonic timestamp, duration, status).
triplets — inferred triplets committed during this cycle (subject / predicate / object / truth-frequency / truth-confidence).
sensorium — sensorium state snapshot at end of cycle (full snapshot for v1; delta encoding deferred — see Out of scope).
dropped — optional non-zero counter when the subscriber lagged behind the broadcast buffer (see backpressure).
- Heartbeat: axum
axum::response::sse::KeepAlive::new().interval(Duration::from_secs(30)) to keep the connection alive through proxies and to let clients detect a dead server within ~30s.
- Backpressure:
tokio::sync::broadcast channel (drop-oldest semantics). Subscribers grab tx.subscribe() outside the ServerState mutex so the lock is not held across the SSE stream lifetime. When a subscriber lags, tokio_stream::wrappers::BroadcastStream yields Err(BroadcastStreamRecvError::Lagged(n)); we surface n as a dropped counter event back to the client (so the cockpit can decide to resync), then continue.
- Auth: explicitly deferred. v1 ships with no auth on this endpoint — assumed external gating (API gateway, mTLS, etc.). Tracked as a separate item.
Implementation sketch (~100 LOC budget)
crates/cognitive-shader-driver/Cargo.toml: add tokio-stream = { version = "0.1", features = ["sync"], optional = true } under the existing serve feature.
ServerState (currently Arc<Mutex<...>>) gains a tx: tokio::sync::broadcast::Sender<ShaderEvent>.
ShaderDriver gains a minimal cycle_completed callback hook (smallest possible callback surface) so the serve layer can tx.send(event) after each cycle. This hook is in-scope for the same ~100 LOC if it does not already exist.
- New
stream_handler(State(state)): Sse<…> builds a BroadcastStream from tx.subscribe(), maps each item to Event::default().json_data(payload), maps Lagged(n) to a dropped event, attaches KeepAlive with a 30s interval.
- One extra line in
router: .route("/v1/shader/stream", get(stream_handler)).
Architecture
shader cycle tick q2 cockpit (browser / TUI)
────────────────── ──────────────────────────
ShaderDriver::tick() EventSource("/v1/shader/stream")
│ ▲
│ cycle_completed(event) │ text/event-stream
▼ │
broadcast::Sender<ShaderEvent> │
│ │
▼ │
broadcast::Receiver ──▶ BroadcastStream ──▶ Sse<…> ┘
│
└─ Lagged(n) ──▶ {"dropped": n} event
- The cycle source emits one
ShaderEvent per completed cycle.
- The broadcast channel fans out to N concurrent SSE subscribers without any one slow client back-pressuring the cycle loop (drop-oldest at the per-subscriber buffer is the right policy for a real-time scene player — newer state always wins over stale state).
- The cockpit consumes the stream via the standard
EventSource API; no custom client SDK needed.
- The endpoint is independent of the contract surface (A1) and the medcare/bridge work (A2/B*): it only depends on what already exists in
cognitive-shader-driver.
Acceptance criteria
Out of scope
- The shader cycle implementation itself — assumed to exist in
ShaderDriver. This issue only adds a fan-out hook + an SSE endpoint.
- Authentication for the stream endpoint — deferred, tracked separately (assume external gating for v1).
- Multi-tenant subscription (per-tenant filtering, per-tenant rate limiting). v1 fans the same stream to every subscriber.
- Scene-graph delta encoding. v1 ships full snapshot per event for
sensorium. Delta / patch encoding is a follow-up once we have profiling data showing the snapshot size matters.
- gRPC / Arrow Flight variant. SSE only for v1.
- q2 cockpit consumer wiring (the
EventSource subscription on the cockpit side) — tracked in q2.
Dependencies
- Independent. Does not block on A1 (contract surface) or A2 (medcare bridge) and is not blocked by them.
- Touches only
crates/cognitive-shader-driver/ — no contract crate changes.
Open questions
- Does
ShaderDriver already expose a cycle_completed hook, or do we need to add the smallest possible callback / channel surface as part of this issue? (In-scope either way for the ~100 LOC budget.)
- Buffer size for
broadcast::channel(N) — propose N = 256 cycles as a starting point; tune once we have a real cycle rate measurement.
- Should the
dropped counter be a top-level field on the next normal event, or its own SSE event: named frame (event: dropped)? Recommend the named frame form so the cockpit can route it independently.
Worker: A3 (ensemble: ontology spine, independent)
Crate:
crates/cognitive-shader-driverVerified against: live source tree at HEAD of default branch (axum 0.8 + tokio 1,
pub fn router(driver: ShaderDriver) -> Routerinsrc/serve.rsregistering 14 routes under/v1/shader/*).Why
The cognitive shader driver runs a tight perception-inference cycle: each tick processes inputs, emits scene transitions, and may commit triplets into the NARS-flavoured semantic graph. Today consumers can only observe this cycle by polling discrete
/v1/shader/*endpoints — there is no push channel.The q2 cockpit (the "scene player" surface) needs to subscribe to the cycle stream so it can render scene transitions in real time without polling. Polling a 14-endpoint surface at frame rate is wasteful and racy; missed transitions lose narrative continuity.
Server-Sent Events (SSE) is the right transport here:
EventSourcein browsers and any reasonable HTTP client.A WebSocket would also work but adds bidirectionality the cockpit does not need, plus a heartbeat / framing layer we'd otherwise re-implement on top of axum's already-built SSE keep-alive.
What
Add a single new route to the existing
cognitive-shader-driveraxum router:GET /v1/shader/streamtext/event-streamcycle— cycle metadata (sequence number, monotonic timestamp, duration, status).triplets— inferred triplets committed during this cycle (subject / predicate / object / truth-frequency / truth-confidence).sensorium— sensorium state snapshot at end of cycle (full snapshot for v1; delta encoding deferred — see Out of scope).dropped— optional non-zero counter when the subscriber lagged behind the broadcast buffer (see backpressure).axum::response::sse::KeepAlive::new().interval(Duration::from_secs(30))to keep the connection alive through proxies and to let clients detect a dead server within ~30s.tokio::sync::broadcastchannel (drop-oldest semantics). Subscribers grabtx.subscribe()outside theServerStatemutex so the lock is not held across the SSE stream lifetime. When a subscriber lags,tokio_stream::wrappers::BroadcastStreamyieldsErr(BroadcastStreamRecvError::Lagged(n)); we surfacenas adroppedcounter event back to the client (so the cockpit can decide to resync), then continue.Implementation sketch (~100 LOC budget)
crates/cognitive-shader-driver/Cargo.toml: addtokio-stream = { version = "0.1", features = ["sync"], optional = true }under the existingservefeature.ServerState(currentlyArc<Mutex<...>>) gains atx: tokio::sync::broadcast::Sender<ShaderEvent>.ShaderDrivergains a minimalcycle_completedcallback hook (smallest possible callback surface) so the serve layer cantx.send(event)after each cycle. This hook is in-scope for the same ~100 LOC if it does not already exist.stream_handler(State(state)): Sse<…>builds aBroadcastStreamfromtx.subscribe(), maps each item toEvent::default().json_data(payload), mapsLagged(n)to adroppedevent, attachesKeepAlivewith a 30s interval.router:.route("/v1/shader/stream", get(stream_handler)).Architecture
ShaderEventper completed cycle.EventSourceAPI; no custom client SDK needed.cognitive-shader-driver.Acceptance criteria
GET /v1/shader/streamregistered incrates/cognitive-shader-driver/src/serve.rs::routerand reachable when the binary is run ascargo run -p cognitive-shader-driver --features serve --bin shader-serve.Content-Type: text/event-stream.cyclemetadata,triplets, andsensoriumfields.reqwest+ manual line-parser, oreventsource-client), drive 10 shader cycles through the driver, assert exactly 10 events received and that theircyclesequence numbers are monotonic.droppedcounter (carrying theLagged(n)value) is surfaced to the client as a distinguishable event, and the stream continues without disconnecting./v1/shader/*routes still pass their existing tests.tokio-streamis added under the existingservefeature gate; default-feature build of the crate is unchanged.Out of scope
ShaderDriver. This issue only adds a fan-out hook + an SSE endpoint.sensorium. Delta / patch encoding is a follow-up once we have profiling data showing the snapshot size matters.EventSourcesubscription on the cockpit side) — tracked in q2.Dependencies
crates/cognitive-shader-driver/— no contract crate changes.Open questions
ShaderDriveralready expose acycle_completedhook, or do we need to add the smallest possible callback / channel surface as part of this issue? (In-scope either way for the ~100 LOC budget.)broadcast::channel(N)— proposeN = 256cycles as a starting point; tune once we have a real cycle rate measurement.droppedcounter be a top-level field on the next normal event, or its own SSEevent:named frame (event: dropped)? Recommend the named frame form so the cockpit can route it independently.