Skip to content

Relay PTY Output Streaming #390

@jahala

Description

@jahala

Relay PTY Output Streaming

Problem

When an agent is spawned via relay-pty, the spawner and other observers have no visibility into what the agent is doing. The only signals that escape are explicit relay messages the agent sends (->relay-file:msg). Internal reasoning, tool calls, thinking, and intermediate output are invisible.

This makes it impossible to build real-time status UIs that show what a spawned agent is working on.

What Already Exists

relay-pty already captures all PTY output in its event loop:

relay-pty/src/main.rs (lines 466-748)

PTY read loop:
  1. Reads chunk from PTY          → async_pty.recv()
  2. Writes to stdout               → stdout.write_all(&data)     [line 716]
  3. Writes to log file (optional)  → log_file.write_all(&data)   [line 720]
  4. Parses for relay commands      → parser.process(&data)       [line 727]
  5. Emits parsed commands as JSON  → eprintln!("{}", json)        [line 735]
     ↑ only when --json-output flag is set

The daemon side also has infrastructure:

packages/daemon/src/spawn-manager.ts
  - Already has pty.on('output', listener) wiring     [line ~1390]
  - Already broadcasts via __broadcastLogOutput

packages/daemon/src/server.ts
  - Already has onLogOutput callback                   [line 136]
  - Already handles LOG message type                   [line 1280]
  - Dashboard already consumes this callback

The Gap

Step 5 above only emits parsed relay commands ({"type":"relay_command",...}). The raw output text from step 1 is never emitted as a JSON event. It goes to stdout and the log file, but not back to the daemon.

Proposed Change

1. relay-pty (Rust) — ~20 lines

In relay-pty/src/main.rs, after the stdout write (line 716), add output chunk emission:

// After line 724 (log file write), before parse:
if json_output {
    let text = String::from_utf8_lossy(&data);
    // Skip empty chunks and pure whitespace
    if !text.trim().is_empty() {
        let chunk_event = serde_json::json!({
            "type": "output",
            "ts": std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap_or_default()
                .as_millis() as u64,
            "data": text,
        });
        if let Ok(json) = serde_json::to_string(&chunk_event) {
            eprintln!("{}", json);
        }
    }
}

Rate limiting consideration: Agent output can be high-volume. Two options:

  • Option A (simple): Buffer output for 100ms, emit as single chunk. Reduces event frequency from per-byte to ~10/sec max.
  • Option B (flag): Add --stream-output flag separate from --json-output, so consumers opt in explicitly.

Recommend Option A — buffer with a 100ms flush timer:

// In event loop, alongside existing timers:
let mut output_buffer = String::new();
let mut last_flush = Instant::now();

// In PTY read handler:
output_buffer.push_str(&String::from_utf8_lossy(&data));
if last_flush.elapsed() >= Duration::from_millis(100) || output_buffer.len() > 4096 {
    if json_output && !output_buffer.trim().is_empty() {
        let event = json!({"type":"output","ts":now_ms(),"data":&output_buffer});
        eprintln!("{}", serde_json::to_string(&event).unwrap());
    }
    output_buffer.clear();
    last_flush = Instant::now();
}

2. Daemon Wrapper — ~50 lines

In packages/wrapper/src/relay-pty-orchestrator.ts, parse stderr JSON events:

// In spawn setup, after creating the child process:
// Ensure --json-output is passed in args (may already be the case)

let stderrBuffer = '';
this.process.stderr?.on('data', (chunk: Buffer) => {
  stderrBuffer += chunk.toString();
  const lines = stderrBuffer.split('\n');
  stderrBuffer = lines.pop() || ''; // Keep incomplete line in buffer

  for (const line of lines) {
    if (!line.trim()) continue;
    try {
      const event = JSON.parse(line);
      this.handleEvent(event);
    } catch {
      // Non-JSON stderr line — ignore or log
    }
  }
});

private handleEvent(event: { type: string; [key: string]: unknown }): void {
  switch (event.type) {
    case 'relay_command':
      // Existing handling (if not already here)
      break;
    case 'output':
      this.emit('output', {
        agent: this.config.name,
        data: event.data as string,
        ts: event.ts as number,
      });
      break;
    case 'continuity':
      // Existing handling
      break;
  }
}

3. Daemon Server — ~40 lines (optional but recommended)

Add a subscription mechanism so SDK clients can subscribe to a worker's output:

// New message type (add to protocol/src/types.ts):
// SUBSCRIBE_WORKER_OUTPUT = 'SUBSCRIBE_WORKER_OUTPUT'
// WORKER_OUTPUT = 'WORKER_OUTPUT'

// In server.ts message handler:
case 'SUBSCRIBE_WORKER_OUTPUT': {
  const { agent } = envelope.payload as { agent: string };
  if (!this.outputSubscriptions) this.outputSubscriptions = new Map();

  let subs = this.outputSubscriptions.get(agent);
  if (!subs) {
    subs = new Set();
    this.outputSubscriptions.set(agent, subs);
  }
  subs.add(connection);

  // Clean up on disconnect
  connection.on('close', () => subs?.delete(connection));
  break;
}

// In spawn-manager output handler:
// When pty emits 'output', forward to subscribers:
pty.on('output', ({ agent, data, ts }) => {
  const subs = this.server.outputSubscriptions?.get(agent);
  if (subs) {
    const envelope = createEnvelope('WORKER_OUTPUT', { agent, data, ts });
    for (const conn of subs) {
      conn.send(envelope);
    }
  }
});

4. SDK Client — ~20 lines (optional)

// In packages/sdk/src/client.ts:
subscribeWorkerOutput(
  agentName: string,
  callback: (data: string, ts: number) => void
): () => void {
  this.send(createEnvelope('SUBSCRIBE_WORKER_OUTPUT', { agent: agentName }));

  const handler = (envelope: Envelope) => {
    if (envelope.type === 'WORKER_OUTPUT' && envelope.payload.agent === agentName) {
      callback(envelope.payload.data, envelope.payload.ts);
    }
  };
  this.on('message', handler);
  return () => this.off('message', handler);
}

Summary

Layer File Lines Required?
Rust relay-pty/src/main.rs ~20-30 Yes
TS wrapper packages/wrapper/src/relay-pty-orchestrator.ts ~50 Yes
Protocol packages/protocol/src/types.ts ~10 For subscription
Daemon packages/daemon/src/server.ts + spawn-manager.ts ~40 For subscription
SDK packages/sdk/src/client.ts ~20 For subscription
Total ~100-150

Minimum viable (Rust + wrapper only): ~70-80 lines. The daemon already broadcasts via onLogOutput callback, so the dashboard would get output immediately without the subscription protocol.

Use Case

Once implemented, a planner server can:

// Subscribe to Interviewer output
const unsub = relayClient.subscribeWorkerOutput('Interviewer-abc12345', (data, ts) => {
  // Parse for thinking indicators, tool calls, etc.
  // Broadcast to browser via agent_status_update with thought field
  emitAgentStatusUpdate('ideation-interviewer', 'working', {
    thought: extractThinking(data),
    activity: extractActivity(data),
  });
});

This enables real-time "what is the agent doing?" visibility in any UI, without requiring agents to self-report.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions