Skip to content

Add GET /v1/shader/stream SSE endpoint for cognitive shader scene player #334

@AdaWorldAPI

Description

@AdaWorldAPI

Worker: A3 (ensemble: ontology spine, independent)
Crate: crates/cognitive-shader-driver
Verified against: live source tree at HEAD of default branch (axum 0.8 + tokio 1, pub fn router(driver: ShaderDriver) -> Router in src/serve.rs registering 14 routes under /v1/shader/*).

Why

The cognitive shader driver runs a tight perception-inference cycle: each tick processes inputs, emits scene transitions, and may commit triplets into the NARS-flavoured semantic graph. Today consumers can only observe this cycle by polling discrete /v1/shader/* endpoints — there is no push channel.

The q2 cockpit (the "scene player" surface) needs to subscribe to the cycle stream so it can render scene transitions in real time without polling. Polling a 14-endpoint surface at frame rate is wasteful and racy; missed transitions lose narrative continuity.

Server-Sent Events (SSE) is the right transport here:

  • One-way (server → cockpit) matches the data flow.
  • Plain HTTP, no protocol upgrade, works through every proxy q2 already runs behind.
  • Native EventSource in browsers and any reasonable HTTP client.
  • Trivially co-locates with the existing axum 0.8 router — no new transport stack.

A WebSocket would also work but adds bidirectionality the cockpit does not need, plus a heartbeat / framing layer we'd otherwise re-implement on top of axum's already-built SSE keep-alive.

What

Add a single new route to the existing cognitive-shader-driver axum router:

  • Endpoint: GET /v1/shader/stream
  • Content-Type: text/event-stream
  • Event cadence: one SSE event per completed shader cycle.
  • Event body (JSON):
    • cycle — cycle metadata (sequence number, monotonic timestamp, duration, status).
    • triplets — inferred triplets committed during this cycle (subject / predicate / object / truth-frequency / truth-confidence).
    • sensorium — sensorium state snapshot at end of cycle (full snapshot for v1; delta encoding deferred — see Out of scope).
    • dropped — optional non-zero counter when the subscriber lagged behind the broadcast buffer (see backpressure).
  • Heartbeat: axum axum::response::sse::KeepAlive::new().interval(Duration::from_secs(30)) to keep the connection alive through proxies and to let clients detect a dead server within ~30s.
  • Backpressure: tokio::sync::broadcast channel (drop-oldest semantics). Subscribers grab tx.subscribe() outside the ServerState mutex so the lock is not held across the SSE stream lifetime. When a subscriber lags, tokio_stream::wrappers::BroadcastStream yields Err(BroadcastStreamRecvError::Lagged(n)); we surface n as a dropped counter event back to the client (so the cockpit can decide to resync), then continue.
  • Auth: explicitly deferred. v1 ships with no auth on this endpoint — assumed external gating (API gateway, mTLS, etc.). Tracked as a separate item.

Implementation sketch (~100 LOC budget)

  • crates/cognitive-shader-driver/Cargo.toml: add tokio-stream = { version = "0.1", features = ["sync"], optional = true } under the existing serve feature.
  • ServerState (currently Arc<Mutex<...>>) gains a tx: tokio::sync::broadcast::Sender<ShaderEvent>.
  • ShaderDriver gains a minimal cycle_completed callback hook (smallest possible callback surface) so the serve layer can tx.send(event) after each cycle. This hook is in-scope for the same ~100 LOC if it does not already exist.
  • New stream_handler(State(state)): Sse<…> builds a BroadcastStream from tx.subscribe(), maps each item to Event::default().json_data(payload), maps Lagged(n) to a dropped event, attaches KeepAlive with a 30s interval.
  • One extra line in router: .route("/v1/shader/stream", get(stream_handler)).

Architecture

   shader cycle tick                        q2 cockpit (browser / TUI)
   ──────────────────                       ──────────────────────────
   ShaderDriver::tick()                     EventSource("/v1/shader/stream")
            │                                          ▲
            │ cycle_completed(event)                   │ text/event-stream
            ▼                                          │
   broadcast::Sender<ShaderEvent>                      │
            │                                          │
            ▼                                          │
   broadcast::Receiver  ──▶ BroadcastStream ──▶ Sse<…> ┘
                              │
                              └─ Lagged(n) ──▶ {"dropped": n} event
  • The cycle source emits one ShaderEvent per completed cycle.
  • The broadcast channel fans out to N concurrent SSE subscribers without any one slow client back-pressuring the cycle loop (drop-oldest at the per-subscriber buffer is the right policy for a real-time scene player — newer state always wins over stale state).
  • The cockpit consumes the stream via the standard EventSource API; no custom client SDK needed.
  • The endpoint is independent of the contract surface (A1) and the medcare/bridge work (A2/B*): it only depends on what already exists in cognitive-shader-driver.

Acceptance criteria

  • GET /v1/shader/stream registered in crates/cognitive-shader-driver/src/serve.rs::router and reachable when the binary is run as cargo run -p cognitive-shader-driver --features serve --bin shader-serve.
  • Response carries Content-Type: text/event-stream.
  • Each completed shader cycle emits exactly one SSE event whose JSON body contains cycle metadata, triplets, and sensorium fields.
  • Integration test: connect an SSE client (e.g. reqwest + manual line-parser, or eventsource-client), drive 10 shader cycles through the driver, assert exactly 10 events received and that their cycle sequence numbers are monotonic.
  • Lag handling: when the broadcast buffer overflows for a subscriber, a dropped counter (carrying the Lagged(n) value) is surfaced to the client as a distinguishable event, and the stream continues without disconnecting.
  • Heartbeat verified: with no cycles for >30s, the client observes axum's keep-alive comment frame and the connection stays open.
  • No regressions: the other 14 /v1/shader/* routes still pass their existing tests.
  • tokio-stream is added under the existing serve feature gate; default-feature build of the crate is unchanged.

Out of scope

  • The shader cycle implementation itself — assumed to exist in ShaderDriver. This issue only adds a fan-out hook + an SSE endpoint.
  • Authentication for the stream endpoint — deferred, tracked separately (assume external gating for v1).
  • Multi-tenant subscription (per-tenant filtering, per-tenant rate limiting). v1 fans the same stream to every subscriber.
  • Scene-graph delta encoding. v1 ships full snapshot per event for sensorium. Delta / patch encoding is a follow-up once we have profiling data showing the snapshot size matters.
  • gRPC / Arrow Flight variant. SSE only for v1.
  • q2 cockpit consumer wiring (the EventSource subscription on the cockpit side) — tracked in q2.

Dependencies

  • Independent. Does not block on A1 (contract surface) or A2 (medcare bridge) and is not blocked by them.
  • Touches only crates/cognitive-shader-driver/ — no contract crate changes.

Open questions

  1. Does ShaderDriver already expose a cycle_completed hook, or do we need to add the smallest possible callback / channel surface as part of this issue? (In-scope either way for the ~100 LOC budget.)
  2. Buffer size for broadcast::channel(N) — propose N = 256 cycles as a starting point; tune once we have a real cycle rate measurement.
  3. Should the dropped counter be a top-level field on the next normal event, or its own SSE event: named frame (event: dropped)? Recommend the named frame form so the cockpit can route it independently.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions