Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ tests can pass fakes.
`agent-match.ts` (phonetic fuzzy window matching + session-grouped selection buttons),
`routes.ts` (message_id→pane map for reply recognition + LRU/MRU picker), `hook-normalize.ts`
(raw harness hook payload → ButtonRequest), `hook-install.ts` (idempotent q→buttons hook
merge for `tg-ctl install-hooks`), and `voice.ts` (inbound VOICE→text: `voice:` config block
merge for `tg-ctl install-hooks`), `defer.ts` (defer-while-waiting queue model: inbound text is
QUEUED per-pane while that pane has an open question and flushed on answer, so it is never pasted
into the prompt — `driveFlush` re-checks the pane before EACH paste so a follow-up question
re-defers the untouched tail), and `voice.ts` (inbound VOICE→text: `voice:` config block
parse/resolve/upsert, ffmpeg + whisper argv builders, transcript cleaning, and the onboarding
decision). `voice-probe.ts` is the ONE impure module here — it scans `~/xp` for an existing
Whisper install (whisper.cpp binary + ggml model, or a faster-whisper venv) and checks for
Expand Down
107 changes: 107 additions & 0 deletions features/tg-ctl/defer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Defer-while-waiting queue model for the tg-ctl daemon (spec tg#30).
//
// THE DANGER this exists to prevent: tg-ctl injects inbound Telegram messages
// into the agent's tmux pane via `send-keys`. If a question/permission prompt is
// OPEN in that pane, injected text is typed INTO the prompt — lost, or worse,
// corrupting the user's answer. So while a pane has a pending question the daemon
// QUEUES inbound text here instead of blasting it into the pane, and flushes the
// queue once the question is answered (button reply → flush).
//
// PURE: no I/O. The daemon owns tmux spawns and the pendingButtons map; it feeds
// this model the live pane-busy state and an inject callback.
//
// Past bug (the reason driveFlush re-checks PER ITEM): the agent very often opens
// a SECOND question the instant the first is answered. A flush that drained the
// whole queue up-front and pasted without re-checking would type the queued text
// straight into the new prompt — re-introducing the exact hanging-question bug.
// driveFlush therefore consults isPaneBusy() before EACH paste and re-defers the
// untouched tail the moment a new question appears.

// Per-pane FIFO queues of already-wrapped inbound texts awaiting their pane's
// open question to be answered. Keyed by tmux pane id ("%N").
export class DeferQueues {
private readonly byPane = new Map<string, string[]>();

// Append one wrapped message to the back of a pane's queue.
enqueue(paneId: string, wrappedText: string): void {
const q = this.byPane.get(paneId);
if (q) q.push(wrappedText);
else this.byPane.set(paneId, [wrappedText]);
}

has(paneId: string): boolean {
return (this.byPane.get(paneId)?.length ?? 0) > 0;
}

// Read-only view of a pane's queue (tests; never mutate the result).
peek(paneId: string): readonly string[] {
return this.byPane.get(paneId) ?? [];
}

// Drain a pane's queue for a flush attempt and remove the entry. The caller
// runs driveFlush on the result and re-defers whatever it could not inject.
take(paneId: string): string[] {
const q = this.byPane.get(paneId);
if (!q) return [];
this.byPane.delete(paneId);
return q;
}

// Put un-flushed items back at the FRONT, ahead of anything that arrived for
// this pane while the flush was in flight — so the user's ordering survives a
// re-defer (the tail of an interrupted flush precedes newer messages).
redefer(paneId: string, items: string[]): void {
if (items.length === 0) return;
const later = this.byPane.get(paneId) ?? [];
this.byPane.set(paneId, [...items, ...later]);
}

// Drop a pane's whole queue and return what was dropped. Used to dead-letter a
// backlog when its question is removed WITHOUT an answer: leaving it would let
// those messages resurface, stale and out of order, on a LATER unrelated
// question's flush. The caller tells the user (the messages never reached the
// agent) rather than silently losing them.
drop(paneId: string): string[] {
return this.take(paneId);
}
}

// Drive a queue flush, re-checking the pane between EVERY item via isPaneBusy().
// The check matters because the daemon awaits the actual tmux paste between
// items, and a freshly-answered question is commonly followed by another one:
// the first injection unblocks the agent, which may immediately re-prompt. The
// moment isPaneBusy() reports a new open question, the loop STOPS and hands the
// untouched tail back to the caller to re-defer — that tail flushes when THAT
// question is answered, so nothing is ever pasted into an open prompt.
//
// A new-question abort is the ONLY reason to re-defer. An inject FAILURE (pane no
// longer hosts an agent, a tmux error) must NOT re-defer the tail: there is no
// pending question to flush it later, so re-deferring would wedge every following
// message in memory forever, marked "queued", while the user thinks it landed
// (review finding — head-of-line blocking + indefinite stuck). On failure we log
// (via the inject callback) and SKIP that one item, continuing with the rest —
// the same forward-progress the pre-defer loop had.
//
// PURE control flow: all I/O (isPaneBusy, inject) is injected, so tests drive it
// with plain callbacks and assert exactly which items landed vs were re-deferred.
export interface FlushOutcome {
injected: string[];
failed: string[]; // attempted but inject() returned false (logged, dropped)
reDeferred: string[]; // a new question opened — flush these on its answer
}

export async function driveFlush(
queue: readonly string[],
isPaneBusy: () => boolean,
inject: (text: string) => Promise<boolean>,
): Promise<FlushOutcome> {
const injected: string[] = [];
const failed: string[] = [];
for (let i = 0; i < queue.length; i++) {
// A new question opened → stop and re-defer everything not yet attempted.
if (isPaneBusy()) return { injected, failed, reDeferred: queue.slice(i) };
if (await inject(queue[i])) injected.push(queue[i]);
else failed.push(queue[i]); // pane gone / tmux error — drop, don't wedge
}
return { injected, failed, reDeferred: [] };
}
Loading
Loading