diff --git a/AGENTS.md b/AGENTS.md index 5fd889f..b633703 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -90,7 +90,10 @@ tests can pass fakes. `agent-match.ts` (phonetic fuzzy window matching + session-grouped selection buttons), `routes.ts` (message_id→pane map for reply recognition + LRU/MRU picker), `hook-normalize.ts` (raw harness hook payload → ButtonRequest), `hook-install.ts` (idempotent q→buttons hook - merge for `tg-ctl install-hooks`), and `voice.ts` (inbound VOICE→text: `voice:` config block + merge for `tg-ctl install-hooks`), `defer.ts` (defer-while-waiting queue model: inbound text is + QUEUED per-pane while that pane has an open question and flushed on answer, so it is never pasted + into the prompt — `driveFlush` re-checks the pane before EACH paste so a follow-up question + re-defers the untouched tail), and `voice.ts` (inbound VOICE→text: `voice:` config block parse/resolve/upsert, ffmpeg + whisper argv builders, transcript cleaning, and the onboarding decision). `voice-probe.ts` is the ONE impure module here — it scans `~/xp` for an existing Whisper install (whisper.cpp binary + ggml model, or a faster-whisper venv) and checks for diff --git a/features/tg-ctl/defer.ts b/features/tg-ctl/defer.ts new file mode 100644 index 0000000..3ef6365 --- /dev/null +++ b/features/tg-ctl/defer.ts @@ -0,0 +1,140 @@ +// Defer-while-waiting queue model for the tg-ctl daemon (spec tg#30). +// +// THE DANGER this exists to prevent: tg-ctl injects inbound Telegram messages +// into the agent's tmux pane via `send-keys`. If a question/permission prompt is +// OPEN in that pane, injected text is typed INTO the prompt — lost, or worse, +// corrupting the user's answer. So while a pane has a pending question the daemon +// QUEUES inbound text here instead of blasting it into the pane, and flushes the +// queue once the question is answered (button reply → flush). +// +// PURE: no I/O. The daemon owns tmux spawns and the pendingButtons map; it feeds +// this model the live pane-busy state and an inject callback. +// +// Past bug (the reason driveFlush re-checks PER ITEM): the agent very often opens +// a SECOND question the instant the first is answered. A flush that drained the +// whole queue up-front and pasted without re-checking would type the queued text +// straight into the new prompt — re-introducing the exact hanging-question bug. +// driveFlush therefore consults isPaneBusy() before EACH paste and re-defers the +// untouched tail the moment a new question appears. + +// Per-pane FIFO queues of already-wrapped inbound texts awaiting their pane's +// open question to be answered. Keyed by tmux pane id ("%N"). +export class DeferQueues { + private readonly byPane = new Map(); + + // Append one wrapped message to the back of a pane's queue. + enqueue(paneId: string, wrappedText: string): void { + const q = this.byPane.get(paneId); + if (q) q.push(wrappedText); + else this.byPane.set(paneId, [wrappedText]); + } + + has(paneId: string): boolean { + return (this.byPane.get(paneId)?.length ?? 0) > 0; + } + + // Read-only view of a pane's queue (tests; never mutate the result). + peek(paneId: string): readonly string[] { + return this.byPane.get(paneId) ?? []; + } + + // Drain a pane's queue for a flush attempt and remove the entry. The caller + // runs driveFlush on the result and re-defers whatever it could not inject. + take(paneId: string): string[] { + const q = this.byPane.get(paneId); + if (!q) return []; + this.byPane.delete(paneId); + return q; + } + + // Put un-flushed items back at the FRONT, ahead of anything that arrived for + // this pane while the flush was in flight — so the user's ordering survives a + // re-defer (the tail of an interrupted flush precedes newer messages). + redefer(paneId: string, items: string[]): void { + if (items.length === 0) return; + const later = this.byPane.get(paneId) ?? []; + this.byPane.set(paneId, [...items, ...later]); + } + + // Drop a pane's whole queue and return what was dropped. Used to dead-letter a + // backlog when its question is removed WITHOUT an answer: leaving it would let + // those messages resurface, stale and out of order, on a LATER unrelated + // question's flush. The caller tells the user (the messages never reached the + // agent) rather than silently losing them. + drop(paneId: string): string[] { + return this.take(paneId); + } +} + +// Drive a queue flush, re-checking the pane between EVERY item via isPaneBusy(). +// The check matters because the daemon awaits the actual tmux paste between +// items, and a freshly-answered question is commonly followed by another one: +// the first injection unblocks the agent, which may immediately re-prompt. The +// moment isPaneBusy() reports a new open question, the loop STOPS and hands the +// untouched tail back to the caller to re-defer — that tail flushes when THAT +// question is answered, so nothing is ever pasted into an open prompt. +// +// A new-question abort is the ONLY reason to re-defer. An inject FAILURE (pane no +// longer hosts an agent, a tmux error) must NOT re-defer the tail: there is no +// pending question to flush it later, so re-deferring would wedge every following +// message in memory forever, marked "queued", while the user thinks it landed +// (review finding — head-of-line blocking + indefinite stuck). On failure we log +// (via the inject callback) and SKIP that one item, continuing with the rest — +// the same forward-progress the pre-defer loop had. +// +// The concurrent-flush hazard `isAbandoned` closes: a follow-up question can open +// on this pane DURING the flush — newer inbound defers behind it (FIFO) — and then +// be abandoned (hook timeout / socket close / send failure) with NO terminal +// answer. The agent is still blocked locally on that prompt, but isPaneBusy() now +// reports idle (the pending button is gone). Without this signal the loop would +// paste that orphaned tail straight into the still-open prompt — the very "stale +// resurface" this module exists to kill, on the concurrent-flush path. The daemon +// sets the flag in onQuestionAbandoned and the loop, seeing it, returns the +// untouched tail as `abandoned` so the caller DEAD-LETTERS it (never pastes, +// never re-defers — there is no future answer to flush it). +// +// PURE control flow: all I/O (isPaneBusy, isAbandoned, inject) is injected, so +// tests drive it with plain callbacks and assert exactly which items landed vs +// were re-deferred vs dead-lettered. +// +// Residual race (same bounded window isPaneBusy already lives with): both guards +// are read BEFORE each inject(). If a question opens or is abandoned DURING the +// awaited paste of the current item, that one item still lands; the guard only +// protects the items AFTER it. The daemon mutates this state from event-loop +// callbacks (button-answer / socket-close), which cannot preempt a synchronous +// guard read, so the window is exactly one in-flight paste — accepted, not closed, +// because no observable bridge signal marks the boundary inside a single paste. +export interface FlushOutcome { + injected: string[]; + failed: string[]; // attempted but inject() returned false (logged, dropped) + reDeferred: string[]; // a new question opened — flush these on its answer + abandoned: string[]; // a question was abandoned mid-flush — dead-letter these +} + +export async function driveFlush( + queue: readonly string[], + isPaneBusy: () => boolean, + inject: (text: string) => Promise, + isAbandoned: () => boolean = () => false, +): Promise { + const injected: string[] = []; + const failed: string[] = []; + // Both guards are RE-READ at the top of every iteration (not cached once): the + // daemon mutates the underlying state from event-loop callbacks between the + // awaited pastes, so a flag that flips mid-flush must be observed on the next + // item. A "cache the flag once" optimization would silently break that. + for (let i = 0; i < queue.length; i++) { + // A question was abandoned mid-flush → stop and label the untouched tail + // `abandoned`. Checked before isPaneBusy because the two can race and the + // resolution differs: abandoned messages have no answer of their own coming. + // driveFlush does NOT itself decide dead-letter vs re-defer — it only marks the + // tail; the caller chooses (dead-letter if the pane is idle, re-defer if a + // DIFFERENT question is still live and will flush the queue on its answer). + if (isAbandoned()) return { injected, failed, reDeferred: [], abandoned: queue.slice(i) }; + // A new question opened → stop and re-defer everything not yet attempted. + if (isPaneBusy()) return { injected, failed, reDeferred: queue.slice(i), abandoned: [] }; + if (await inject(queue[i])) injected.push(queue[i]); + else failed.push(queue[i]); // pane gone / tmux error — drop, don't wedge + } + return { injected, failed, reDeferred: [], abandoned: [] }; +} diff --git a/tests/ctl-defer-integration.test.ts b/tests/ctl-defer-integration.test.ts new file mode 100644 index 0000000..9c24ea4 --- /dev/null +++ b/tests/ctl-defer-integration.test.ts @@ -0,0 +1,806 @@ +import { afterEach, expect, test } from 'bun:test'; +import { closeSync, existsSync, mkdirSync, mkdtempSync, openSync, readFileSync, writeFileSync } from 'fs'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import type { Subprocess } from 'bun'; + +// End-to-end defer-while-waiting (spec tg#30): the real daemon, a fake Telegram +// server, and a fake tmux/ps that reports ONE claude pane (%1) and LOGS every +// injected payload. The danger under test: an inbound message that arrives while +// pane %1 has an OPEN question must NOT be pasted into that prompt — it is +// queued (✍️ reaction) and flushed only once the question is answered. + +const TG_CTL = join(import.meta.dir, '..', 'tg-ctl'); + +const procs: Subprocess[] = []; +const servers: Array<{ stop: (closeActiveConnections?: boolean) => Promise | void }> = []; + +afterEach(async () => { + for (const p of procs.splice(0)) { + if (p.exitCode === null) { + p.kill(9); + await p.exited; + } + } + // await stop: with active connections Bun returns a Promise; not awaiting it + // leaks the listener into the next test (port/listener race). + for (const s of servers.splice(0)) await s.stop(true); +}); + +const PANE_ID = '%1'; +const PANE_PID = 4242; + +// A fake tmux that: answers list-panes with one claude-hosting pane rooted at +// $TG_TEST_CWD; logs every send-keys -l / load-buffer payload to $TG_INJECT_LOG +// (one line per injected text) so the test can assert WHAT landed and WHEN; and +// exits 0 for everything else (Enter, paste-buffer, verify side-effects). +function fakeTmux(cwd: string, injectLog: string): string { + return `#!/bin/sh +sub="$1"; shift +case "$sub" in + list-panes) + # Fields passed as %s args — the pane id is literally '%1', which must NOT + # reach the printf FORMAT string (printf would read it as a directive). + printf '%s\\t%s\\t%s\\t%s\\t%s\\t%s\\n' 'main' '0' '${PANE_ID}' '${PANE_PID}' 'claude' '${cwd}' + ;; + display-message) + printf 'main\\n' + ;; + send-keys) + # Scan for the literal flag: 'send-keys -t %1 -l ' logs . A bare + # 'send-keys -t %1 Enter' (no -l) is the submit keystroke — never logged. + while [ $# -gt 0 ]; do + if [ "$1" = "-l" ]; then printf '%s\\n' "$2" >> '${injectLog}'; break; fi + shift + done + ;; + load-buffer) + # multi-line paste payload arrives on stdin + cat >> '${injectLog}' + printf '\\n' >> '${injectLog}' + ;; +esac +exit 0 +`; +} + +// ps -axo pid=,ppid=,command= → the pane pid runs claude, so findAgentInPane +// resolves %1 to a claude agent. +function fakePs(): string { + return `#!/bin/sh +printf '%s %s %s\\n' '${PANE_PID}' '1' 'claude' +exit 0 +`; +} + +function makeCfgDir(): string { + const cfgDir = mkdtempSync(join(tmpdir(), 'tgctl-defer-')); + const injectLog = join(cfgDir, 'inject.log'); + writeFileSync(join(cfgDir, '.env'), 'TG_BOT_TOKEN=123:abc\nTG_CHAT_ID=1\n'); + writeFileSync(join(cfgDir, 'config.yaml'), 'control:\n enabled: true\n'); + // registration pins the target pane so discovery is deterministic. + writeFileSync( + join(cfgDir, 'tg-ctl.123.registration.json'), + JSON.stringify({ paneId: PANE_ID, cwd: cfgDir }), + ); + mkdirSync(join(cfgDir, 'bin')); + writeFileSync(join(cfgDir, 'bin', 'tmux'), fakeTmux(cfgDir, injectLog), { mode: 0o755 }); + writeFileSync(join(cfgDir, 'bin', 'ps'), fakePs(), { mode: 0o755 }); + return cfgDir; +} + +function injected(cfgDir: string): string[] { + const p = join(cfgDir, 'inject.log'); + if (!existsSync(p)) return []; + return readFileSync(p, 'utf8').split('\n').filter((l) => l.length > 0); +} + +// The daemon's stdout/stderr log (markers like the dead-letter / re-defer lines) +// — polled by tests to gate on an async handler having run, rather than sleeping +// a fixed time that a loaded CI runner can outrun. +function daemonLogText(cfgDir: string): string { + const p = join(cfgDir, 'daemon.log'); + return existsSync(p) ? readFileSync(p, 'utf8') : ''; +} + +async function startDaemon(cfgDir: string, apiPort: number): Promise { + const logFd = openSync(join(cfgDir, 'daemon.log'), 'a'); + const daemon = Bun.spawn([process.execPath, TG_CTL, 'run'], { + env: { + // ONLY the fake bin dir on PATH for tmux/ps; node/bun resolve via execPath. + PATH: `${join(cfgDir, 'bin')}:/usr/bin:/bin`, + HOME: cfgDir, + TG_CTL_CONFIG_DIR: cfgDir, + TG_API_BASE: `http://127.0.0.1:${apiPort}`, + }, + stdio: ['ignore', logFd, logFd], + }); + closeSync(logFd); + const socket = join(cfgDir, 'tg-ctl.123.sock'); + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && !existsSync(socket)) await Bun.sleep(50); + expect(existsSync(socket)).toBe(true); + return daemon; +} + +function startAsk(cfgDir: string, apiPort: number, request: Record): Subprocess { + const ask = Bun.spawn([process.execPath, TG_CTL, 'ask'], { + env: { + PATH: `${join(cfgDir, 'bin')}:/usr/bin:/bin`, + HOME: cfgDir, + TG_CTL_CONFIG_DIR: cfgDir, + TG_API_BASE: `http://127.0.0.1:${apiPort}`, + // ask reads TMUX_PANE for the request's paneId; pin it to our pane. + TMUX_PANE: PANE_ID, + }, + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + }); + ask.stdin.write(JSON.stringify({ cwd: cfgDir, paneId: PANE_ID, ...request }) + '\n'); + ask.stdin.end(); + return ask; +} + +test('inbound text deferred while a question is open, flushed (and not lost) after the answer', async () => { + const cfgDir = makeCfgDir(); + + // A scripted Telegram server. updateQueue feeds getUpdates one batch at a + // time; the test drives the sequence by pushing updates between assertions. + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + if (url.pathname.endsWith('/answerCallbackQuery') || url.pathname.endsWith('/editMessageText')) { + return Response.json({ ok: true, result: true }); + } + return Response.json({ ok: false, description: `unexpected: ${url.pathname}` }, { status: 404 }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // 1) Open a question on pane %1 (hook → inline buttons). It blocks until the + // callback answers it. + const ask = startAsk(cfgDir, server.port, { + requestId: 'q_block', + agent: 'claude', + kind: 'question', + question: 'Proceed?', + options: [{ label: 'Yes' }, { label: 'No' }], + }); + procs.push(ask); + + // Wait for the button message to be posted. + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + // 2) TWO inbound texts arrive WHILE the question is open → both DEFERRED, in + // order. The second message tests both deferral AND that flush preserves + // FIFO across multiple queued items. + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 10, + message: { + message_id: 11, + from: { id: 1, first_name: 'Alex' }, + chat: { id: 1 }, + date: nowSec, + text: 'first thing', + }, + }, + ]); + // Wait for the first to be deferred (its ✍️ reaction) before sending the + // second, so the daemon enqueues them in a deterministic order. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(injected(cfgDir)).toEqual([]); // nothing pasted into the open prompt + expect(reactions.at(-1)).toMatchObject({ message_id: 11, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + updateQueue.push([ + { + update_id: 11, + message: { + message_id: 12, + from: { id: 1, first_name: 'Alex' }, + chat: { id: 1 }, + date: nowSec, + text: 'second thing', + }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length < 2) await Bun.sleep(50); + } + expect(injected(cfgDir)).toEqual([]); // still nothing pasted + expect(reactions.at(-1)).toMatchObject({ message_id: 12, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + // 3) Answer the question via the inline button → unblocks the agent → flush. + // The callback's source message_id must match the button message the daemon + // posted (captured above), else the daemon rejects a stale tap. + updateQueue.push([ + { + update_id: 12, + callback_query: { + id: 'cb1', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: buttonMessageId, chat: { id: 1 }, date: nowSec }, + data: questionCallbackData, + }, + }, + ]); + + // ask resolves with the hook output once the callback lands. + const askOut = await new Response(ask.stdout).text(); + await ask.exited; + expect(askOut.length).toBeGreaterThan(0); + + // 4) Both deferred messages flush into the pane in FIFO order (flushDeferred + // sleeps 800ms before the first paste). Exactly two, first then second. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && injected(cfgDir).length < 2) await Bun.sleep(100); + } + const landed = injected(cfgDir); + expect(landed).toHaveLength(2); + expect(landed[0]).toContain('first thing'); + expect(landed[1]).toContain('second thing'); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + +test('a follow-up question ABANDONED mid-flush dead-letters its backlog instead of pasting it into the open prompt', async () => { + // Regression for the codex P2 concurrent-flush race. Sequence: + // 1. Q1 is answered → flushDeferred(%1) starts and sleeps 800ms (flushing). + // 2. DURING that settle a follow-up Q2 opens on the SAME pane; an inbound + // message defers behind it (the pane is flushing AND has a pending + // question, so it queues — strict FIFO). + // 3. Q2 is abandoned with NO Telegram answer (its hook socket closes). The + // pending button is deleted BEFORE onAbandon runs, so paneHasPendingQuestion + // is now false for it. + // 4. The flush loop wakes. With the BUG, onQuestionAbandoned returned early + // purely on the flushingPanes guard (no dead-letter), and the loop — seeing + // no pending question — pasted the orphaned backlog into the still-open + // terminal prompt (the agent is still blocked locally on Q2). FIXED: the + // abandonment is recorded, the loop dead-letters the residual queue and + // warns the user; nothing is pasted. + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + const plainMessages: string[] = []; // sendMessage bodies without inline buttons + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + plainMessages.push(String(body.text ?? '')); + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + // answerCallbackQuery / editMessageText + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // 1) Open Q1 and defer one message behind it so the flush has a backlog to + // drain when Q1 is answered (flushDeferred early-returns on an empty queue). + const ask1 = startAsk(cfgDir, server.port, { + requestId: 'q1', + agent: 'claude', + kind: 'question', + question: 'First?', + options: [{ label: 'Yes' }], + }); + procs.push(ask1); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + const q1Callback = questionCallbackData; + const q1MessageId = buttonMessageId; + + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 30, + message: { message_id: 31, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q1 backlog' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(reactions.at(-1)).toMatchObject({ message_id: 31, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + expect(injected(cfgDir)).toEqual([]); + + // 2) Answer Q1 → flushDeferred(%1) starts and immediately sleeps 800ms. The + // flush is now IN FLIGHT (flushingPanes has %1) for the whole settle window. + questionCallbackData = ''; + updateQueue.push([ + { + update_id: 31, + callback_query: { + id: 'cbq1', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: q1MessageId, chat: { id: 1 }, date: nowSec }, + data: q1Callback, + }, + }, + ]); + await new Response(ask1.stdout).text(); + await ask1.exited; + + // 3) DURING the 800ms settle, a follow-up Q2 opens on the same pane. + const ask2 = startAsk(cfgDir, server.port, { + requestId: 'q2', + agent: 'claude', + kind: 'question', + question: 'Second?', + options: [{ label: 'Ok' }], + }); + procs.push(ask2); + { + const t0 = Date.now(); + while (Date.now() - t0 < 4000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + // ...and an inbound arrives behind Q2 (pane is flushing AND has a pending + // question → it defers, strict FIFO). + updateQueue.push([ + { + update_id: 32, + message: { message_id: 33, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q2 orphan' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === 33)) await Bun.sleep(50); + } + expect(reactions.find((r) => r.message_id === 33)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + // 4) Abandon Q2 with NO answer: kill its hook → socket close → onAbandon. The + // pending button is deleted first, so by the time the flush loop next checks, + // paneHasPendingQuestion(%1) is false. The FIX records the abandonment and the + // loop dead-letters the residual instead of pasting it. + ask2.kill(9); + await ask2.exited; + + // Wait for the flush to actually WAKE from its 800ms settle and reach a verdict, + // racing the two mutually-exclusive outcomes rather than sleeping a fixed time + // (a fixed sleep can pass vacuously if the buggy paste simply hasn't fired yet): + // - BUG → the flush pastes 'q2 orphan' into the still-open prompt (it appears + // in the inject log), and no dead-letter notice is sent. + // - FIX → the flush dead-letters the residual: nothing is pasted and the user + // gets the 'were NOT delivered' notice. + // Poll until whichever lands first; the assertions below then decide pass/fail. + // This fails FAST on the regression instead of hoping a fixed window covered it. + const orphanPasted = (): boolean => injected(cfgDir).some((l) => l.includes('q2 orphan')); + const deadLetterSent = (): boolean => plainMessages.some((m) => m.includes('were NOT delivered')); + { + const t0 = Date.now(); + while (Date.now() - t0 < 10_000 && !orphanPasted() && !deadLetterSent()) await Bun.sleep(50); + } + + // THE REGRESSION: the orphaned 'q2 orphan' must NEVER reach the open prompt. + // (With the bug it injects; fixed, it is dead-lettered.) + expect(orphanPasted()).toBe(false); + // Nothing at all should have been pasted — the pane's prompt is still open. + expect(injected(cfgDir)).toEqual([]); + // The flush woke and dead-lettered: the user was told the messages did not land. + // (This is also the positive proof the flush ran — a no-op flush sends nothing.) + expect(deadLetterSent()).toBe(true); + + // 5) The flag must not LEAK: a later, legitimate question on the same pane must + // still flush normally. If abandonedDuringFlush(%1) were left set, this next + // flush would wrongly dead-letter a valid backlog (the most dangerous failure + // mode of the new flag — silently dropping good messages). Open Q3, defer a + // message behind it, answer it, and assert the message actually lands. + questionCallbackData = ''; + const ask3 = startAsk(cfgDir, server.port, { + requestId: 'q3', + agent: 'claude', + kind: 'question', + question: 'Third?', + options: [{ label: 'Go' }], + }); + procs.push(ask3); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + const q3Callback = questionCallbackData; + const q3MessageId = buttonMessageId; + updateQueue.push([ + { + update_id: 33, + message: { message_id: 34, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q3 good' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === 34)) await Bun.sleep(50); + } + expect(reactions.find((r) => r.message_id === 34)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + updateQueue.push([ + { + update_id: 34, + callback_query: { + id: 'cbq3', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: q3MessageId, chat: { id: 1 }, date: nowSec }, + data: q3Callback, + }, + }, + ]); + await new Response(ask3.stdout).text(); + await ask3.exited; + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !injected(cfgDir).some((l) => l.includes('q3 good'))) await Bun.sleep(100); + } + // The legitimate Q3 backlog flushed (flag was cleared, not leaked). + expect(injected(cfgDir).some((l) => l.includes('q3 good'))).toBe(true); + // ...and the orphan still never resurfaced in this flush. + expect(injected(cfgDir).some((l) => l.includes('q2 orphan'))).toBe(false); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + +test('an abandonment racing a NEW live question re-defers (does not drop the live question backlog)', async () => { + // Regression for review finding #1: the abandoned-during-flush flag is keyed by + // PANE, but a pane can host a second LIVE question whose backlog must still be + // delivered. Sequence: + // 1. Q1 answered → flushDeferred(%1) starts, sleeps 800ms (flushing). + // 2. During the settle Q2 opens; m2 defers behind it; Q2 is abandoned (flag set). + // 3. ALSO during the settle Q3 opens (a real, still-pending question); m3 defers. + // 4. The flush wakes: the flag is set, but Q3 is pending. It must RE-DEFER the + // residual (Q3's answer will flush it), NOT dead-letter it — dropping here + // would silently lose m3, a legitimately-queued message of a live question. + // 5. Answer Q3 → m3 lands. (m2 rides along; per the per-pane design that is the + // accepted lesser evil vs. dropping the live question's message.) + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + const plainMessages: string[] = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + plainMessages.push(String(body.text ?? '')); + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + const waitForButton = async (): Promise => { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + }; + const waitForReaction = async (id: number): Promise => { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === id)) await Bun.sleep(50); + expect(reactions.find((r) => r.message_id === id)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + }; + const nowSec = Math.floor(Date.now() / 1000); + + // Q1 + its backlog so the flush has work. + const ask1 = startAsk(cfgDir, server.port, { requestId: 'q1', agent: 'claude', kind: 'question', question: 'First?', options: [{ label: 'Y' }] }); + procs.push(ask1); + await waitForButton(); + const q1Cb = questionCallbackData; + const q1Msg = buttonMessageId; + updateQueue.push([{ update_id: 40, message: { message_id: 41, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q1 backlog' } }]); + await waitForReaction(41); + + // Answer Q1 → flush starts and sleeps 800ms. + questionCallbackData = ''; + updateQueue.push([{ update_id: 41, callback_query: { id: 'c1', from: { id: 1, first_name: 'Alex' }, message: { message_id: q1Msg, chat: { id: 1 }, date: nowSec }, data: q1Cb } }]); + await new Response(ask1.stdout).text(); + await ask1.exited; + + // During the settle: Q2 opens, m2 defers, Q2 abandoned. + const ask2 = startAsk(cfgDir, server.port, { requestId: 'q2', agent: 'claude', kind: 'question', question: 'Second?', options: [{ label: 'Y' }] }); + procs.push(ask2); + await waitForButton(); + questionCallbackData = ''; + updateQueue.push([{ update_id: 42, message: { message_id: 43, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q2 orphan' } }]); + await waitForReaction(43); + + // Q3 opens (a real, still-pending question) and m3 defers behind it — BEFORE we + // abandon Q2, so the flush is guaranteed to see Q3 pending when it wakes. + const ask3 = startAsk(cfgDir, server.port, { requestId: 'q3', agent: 'claude', kind: 'question', question: 'Third?', options: [{ label: 'Go' }] }); + procs.push(ask3); + await waitForButton(); + const q3Cb = questionCallbackData; + const q3Msg = buttonMessageId; + updateQueue.push([{ update_id: 43, message: { message_id: 44, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q3 live' } }]); + await waitForReaction(44); + + // Now abandon Q2 (socket close → flag set). The flush, on waking from its 800ms + // settle, sees the flag AND Q3 pending → must re-defer, not dead-letter. + ask2.kill(9); + await ask2.exited; + + // Wait for the daemon to log the re-defer decision BEFORE answering Q3. This is + // the deterministic proof the flush woke while Q3 was still pending and chose to + // re-defer the residual (the finding-#1 behavior). Answering Q3 earlier could + // race the 800ms settle and remove Q3 before the flush evaluates the gate. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !daemonLogText(cfgDir).includes('abandon raced a new question')) await Bun.sleep(50); + } + // The flush re-deferred (did NOT dead-letter) because a live question was pending. + expect(daemonLogText(cfgDir)).toContain('abandon raced a new question'); + // ...and it must NOT have dropped the queue. + expect(daemonLogText(cfgDir)).not.toContain('were NOT delivered'); + expect(daemonLogText(cfgDir)).not.toMatch(/dead-lettered \d+ queued message/); + + // Answer Q3 → its flush delivers m3 (the live question's message). The fix is + // proven by m3 landing; with the bug (drop-everything) m3 would be lost. + questionCallbackData = ''; + updateQueue.push([{ update_id: 44, callback_query: { id: 'c3', from: { id: 1, first_name: 'Alex' }, message: { message_id: q3Msg, chat: { id: 1 }, date: nowSec }, data: q3Cb } }]); + await new Response(ask3.stdout).text(); + await ask3.exited; + + { + const t0 = Date.now(); + while (Date.now() - t0 < 10_000 && !injected(cfgDir).some((l) => l.includes('q3 live'))) await Bun.sleep(100); + } + // The live question's message was delivered, NOT dropped (review finding #1). + expect(injected(cfgDir).some((l) => l.includes('q3 live'))).toBe(true); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + +test('a question removed WITHOUT an answer (hook socket closes) does not wedge later inbound', async () => { + // Regression for the review finding: extending defer to "queue non-empty" would + // permanently wedge a pane once its question expired without a Telegram answer + // (timeout / socket close / send failure delete the pending button but never + // flush). After the question is gone, a NEW inbound must inject normally, not + // sit deferred forever behind an undrainable backlog. + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + // answerCallbackQuery / editMessageText + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // Open a question; an inbound arrives and is deferred behind it. + const ask = startAsk(cfgDir, server.port, { + requestId: 'q_expire', + agent: 'claude', + kind: 'question', + question: 'Proceed?', + options: [{ label: 'Yes' }], + }); + procs.push(ask); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 20, + message: { message_id: 21, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'deferred one' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(reactions.at(-1)).toMatchObject({ message_id: 21, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + expect(injected(cfgDir)).toEqual([]); + + // The question is removed WITHOUT an answer: kill the ask hook → its socket + // closes → the daemon deletes the pending button AND dead-letters the pane's + // backlog (so it can never resurface, stale + out of order, on a later + // unrelated question's answer). The user is told the queued message did not land. + questionCallbackData = ''; + ask.kill(9); + await ask.exited; + // Wait for the socket-close handler to actually run onAbandon (dead-letter the + // idle pane) before sending the next inbound. Poll the daemon.log marker instead + // of a fixed sleep: under a loaded CI runner a blind 300ms can fire the next + // message before the abandon is processed, racing pending-question state. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !daemonLogText(cfgDir).includes('question abandoned on %1 (idle)')) { + await Bun.sleep(50); + } + expect(daemonLogText(cfgDir)).toContain('question abandoned on %1 (idle)'); + } + + // A NEW inbound now arrives. With the bug it would be deferred (queue non-empty + // → ✍️, stuck forever). Fixed: no pending question, no flush in flight → it + // injects live and earns a 👀. + updateQueue.push([ + { + update_id: 21, + message: { message_id: 22, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'after expiry' }, + }, + ]); + // Wait for the new message to actually land in the pane (the real proof it was + // NOT wedged). Injection runs verify-pane + paced send-keys, so allow time. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !injected(cfgDir).some((l) => l.includes('after expiry'))) { + await Bun.sleep(100); + } + } + // The new message landed; the abandoned 'deferred one' was dropped (not wedged). + expect(injected(cfgDir)).toContain('[TG from Alex tg#22] after expiry'); + expect(injected(cfgDir).some((l) => l.includes('deferred one'))).toBe(false); + // It was DELIVERED (👀), not deferred (✍️). + { + const t0 = Date.now(); + const got22 = (): Record | undefined => + reactions.find((r) => r.message_id === 22); + while (Date.now() - t0 < 4000 && !got22()) await Bun.sleep(50); + expect(got22()).toMatchObject({ message_id: 22, reaction: [{ type: 'emoji', emoji: '👀' }] }); + } + + // The definitive regression (review P1): open a SECOND, unrelated question and + // ANSWER it. The abandoned 'deferred one' from Q1 must NOT resurface in this + // flush — it was dead-lettered, not left in the queue. + const ask2 = startAsk(cfgDir, server.port, { + requestId: 'q_second', + agent: 'claude', + kind: 'question', + question: 'Second?', + options: [{ label: 'Ok' }], + }); + procs.push(ask2); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + updateQueue.push([ + { + update_id: 22, + callback_query: { + id: 'cb2', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: buttonMessageId, chat: { id: 1 }, date: nowSec }, + data: questionCallbackData, + }, + }, + ]); + await new Response(ask2.stdout).text(); + await ask2.exited; + // Give a would-be stale flush ample time to (wrongly) fire, then assert it did not. + await Bun.sleep(1500); + expect(injected(cfgDir).some((l) => l.includes('deferred one'))).toBe(false); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); diff --git a/tests/ctl-defer.test.ts b/tests/ctl-defer.test.ts new file mode 100644 index 0000000..bcf8a87 --- /dev/null +++ b/tests/ctl-defer.test.ts @@ -0,0 +1,251 @@ +// Unit tests for the defer-while-waiting queue model (features/tg-ctl/defer.ts). +// +// The DANGER this guards (docs note tg#30): tg-ctl injects inbound Telegram +// messages into the agent's tmux pane via send-keys. If a question/permission +// prompt is OPEN in that pane, injected text is typed INTO the prompt and +// lost/corrupts it. The daemon therefore DEFERS inbound text while a pane has a +// pending question and flushes the queue once the question is answered. +// +// The subtle failure these tests pin down: the agent frequently asks a SECOND +// question the instant the first is answered. The flush must re-check the +// pane-busy state BEFORE each paste and re-defer the remainder, or it pastes +// straight into the new prompt — re-introducing the exact bug. + +import { describe, expect, test } from 'bun:test'; +import { DeferQueues, driveFlush } from '../features/tg-ctl/defer'; + +describe('DeferQueues', () => { + test('enqueue preserves per-pane FIFO order', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + q.enqueue('%2', 'c'); + expect(q.peek('%1')).toEqual(['a', 'b']); + expect(q.peek('%2')).toEqual(['c']); + }); + + test('has reflects whether a pane has queued text', () => { + const q = new DeferQueues(); + expect(q.has('%1')).toBe(false); + q.enqueue('%1', 'a'); + expect(q.has('%1')).toBe(true); + }); + + test('take drains and removes the pane queue', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + expect(q.take('%1')).toEqual(['a', 'b']); + expect(q.has('%1')).toBe(false); + expect(q.take('%1')).toEqual([]); + }); + + test('redefer puts items back at the FRONT, before anything enqueued meanwhile', () => { + const q = new DeferQueues(); + // While a flush was draining ['a','b'], a new message 'c' arrived for the + // same pane. The un-flushed tail ['b'] must land BEFORE 'c' so the user's + // ordering survives. + q.enqueue('%1', 'c'); + q.redefer('%1', ['b']); + expect(q.peek('%1')).toEqual(['b', 'c']); + }); + + test('redefer of multiple items keeps their relative order', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'd'); + q.redefer('%1', ['b', 'c']); + expect(q.peek('%1')).toEqual(['b', 'c', 'd']); + }); + + test('drop empties the pane queue and returns the dropped items', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + expect(q.drop('%1')).toEqual(['a', 'b']); + expect(q.has('%1')).toBe(false); + expect(q.drop('%1')).toEqual([]); // idempotent on an empty pane + }); +}); + +describe('driveFlush', () => { + test('injects everything when the pane never becomes busy again', async () => { + const injected: string[] = []; + const out = await driveFlush(['a', 'b', 'c'], () => false, async (t) => { + injected.push(t); + return true; + }); + expect(injected).toEqual(['a', 'b', 'c']); + expect(out.injected).toEqual(['a', 'b', 'c']); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + expect(out.abandoned).toEqual([]); + }); + + test('stops once a new question opens MID-flush, re-deferring the rest', async () => { + // The pane is free for the first paste; a new question opens before the + // second. 'b' and 'c' must NOT be pasted into that open prompt — they go + // back to the queue. (This is the exact follow-up-question race the fix + // closes: re-checked between awaited pastes, not just once up front.) + const injected: string[] = []; + let questionOpen = false; + const out = await driveFlush(['a', 'b', 'c'], () => questionOpen, async (t) => { + injected.push(t); + questionOpen = true; // the answered agent immediately re-prompts + return true; + }); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.reDeferred).toEqual(['b', 'c']); + expect(out.failed).toEqual([]); + expect(out.abandoned).toEqual([]); + }); + + test('re-defers the WHOLE queue when a question is already open at flush time', async () => { + const injected: string[] = []; + const out = await driveFlush(['a', 'b'], () => true, async (t) => { + injected.push(t); + return true; + }); + expect(injected).toEqual([]); + expect(out.injected).toEqual([]); + expect(out.reDeferred).toEqual(['a', 'b']); + expect(out.abandoned).toEqual([]); + }); + + test('a failed paste is dropped (logged), flush CONTINUES — no head-of-line block', async () => { + // inject() returning false (e.g. the pane lost its agent) must NOT re-defer + // the tail: there may be no future question to flush it, so re-deferring would + // wedge every later message forever. The failed item is dropped; the rest go. + const injected: string[] = []; + const out = await driveFlush(['a', 'b', 'c'], () => false, async (t) => { + if (t === 'b') return false; // transient failure on the middle item + injected.push(t); + return true; + }); + expect(injected).toEqual(['a', 'c']); + expect(out.injected).toEqual(['a', 'c']); + expect(out.failed).toEqual(['b']); + expect(out.reDeferred).toEqual([]); // nothing wedged + expect(out.abandoned).toEqual([]); + }); + + test('a new question takes priority over a pending failure on the next item', async () => { + // busy is checked BEFORE inject, so an open question re-defers the tail even + // if that tail's first inject would have failed. + const out = await driveFlush(['a', 'b'], () => true, async () => false); + expect(out.injected).toEqual([]); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual(['a', 'b']); + expect(out.abandoned).toEqual([]); + }); + + test('empty queue yields an empty outcome', async () => { + const out = await driveFlush([], () => false, async () => true); + expect(out.injected).toEqual([]); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + expect(out.abandoned).toEqual([]); + }); + + test('a mid-flush abandonment STOPS and hands the tail back to be DEAD-LETTERED', async () => { + // The concurrent-flush hazard (codex P2): a follow-up question opened during + // the flush and was then abandoned with no terminal answer. isPaneBusy() now + // reports idle (its pending button is gone), but the agent is still blocked on + // that prompt — the orphaned tail must be dead-lettered, NOT pasted, and NOT + // re-deferred (no answer is coming to flush it). + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + injected.push(t); + abandoned = true; // a follow-up question opened then was abandoned mid-flush + return true; + }, + () => abandoned, + ); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.abandoned).toEqual(['b', 'c']); // dead-letter, do not paste + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); + }); + + test('abandonment is checked BEFORE pane-busy: a racing question still dead-letters', async () => { + // If an abandonment and a fresh question race, the orphaned messages have no + // answer coming, so they must be dead-lettered rather than re-deferred behind + // the new prompt. The whole queue is abandoned when both are set up front. + const out = await driveFlush(['a', 'b'], () => true, async () => true, () => true); + expect(out.injected).toEqual([]); + expect(out.abandoned).toEqual(['a', 'b']); + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); + }); + + test('a failed paste and a later abandonment coexist in one outcome', async () => { + // 'a' injects, 'b' fails (logged + dropped, NOT re-deferred), then a question + // is abandoned before 'c' → 'c' is dead-lettered. Pins that failed and + // abandoned items are handled by their own disjoint paths and never confused + // (a refactor must not, e.g., re-defer 'b' or dead-letter it). + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + if (t === 'b') return false; // transient paste failure + injected.push(t); + if (t === 'a') abandoned = true; // a follow-up question is abandoned mid-flush + return true; + }, + () => abandoned, + ); + // After 'a' the abandon flag is set, so the loop stops BEFORE attempting 'b': + // 'b' and 'c' are the untouched tail → both dead-lettered, nothing failed. + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.abandoned).toEqual(['b', 'c']); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + }); + + test('a failure recorded BEFORE an abandonment keeps both in the outcome', async () => { + // Here the abandon flag flips only AFTER 'b' has already failed, so 'b' lands + // in `failed` and the remaining 'c' in `abandoned` — the two coexist. + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + if (t === 'b') { + abandoned = true; // abandonment observed at the same time the paste fails + return false; + } + injected.push(t); + return true; + }, + () => abandoned, + ); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.failed).toEqual(['b']); // failed paste — logged + dropped + expect(out.abandoned).toEqual(['c']); // tail after the abandon — dead-lettered + expect(out.reDeferred).toEqual([]); + }); + + test('an empty queue yields empty `abandoned` even when abandoned is set', async () => { + // driveFlush never invents items: with nothing to drain it returns empty + // `abandoned` regardless of isAbandoned(). This pins the contract the daemon + // relies on — it must also check its own round-boundary flag (an abandonment + // landing AFTER a round drained the queue produces no tail here), so the + // daemon's `outcome.abandoned.length > 0 || flagSet` guard cannot be reduced + // to `outcome.abandoned.length > 0` alone. + const out = await driveFlush([], () => false, async () => true, () => true); + expect(out.injected).toEqual([]); + expect(out.abandoned).toEqual([]); + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); + }); +}); diff --git a/tg-ctl b/tg-ctl index 91dfa62..fa31c6d 100755 --- a/tg-ctl +++ b/tg-ctl @@ -22,6 +22,7 @@ import { type AgentCandidate, } from "./features/tg-ctl/agent-match" import { parseControlConfig, resolveControlConfig } from "./features/tg-ctl/config" +import { DeferQueues, driveFlush } from "./features/tg-ctl/defer" import { findAgentInPane, parsePaneList, parseProcList, pickTargetPane } from "./features/tg-ctl/discover" import { buildKeyInjectPlan, buildTextInjectPlan, wrapInbound } from "./features/tg-ctl/inject" import { botIdFromToken, ctlPaths, pidStatus, readPidFile } from "./features/tg-ctl/lock" @@ -726,7 +727,12 @@ function hookMatchesRegistration(ctx: Ctx, req: ButtonRequest): boolean { return registrationAllowsHook(readRegistration(ctx.paths), req, resolve) } -function startHookServer(ctx: Ctx, pending: Map, activeKeys: Set): Server | null { +function startHookServer( + ctx: Ctx, + pending: Map, + activeKeys: Set, + onAbandon: (paneId: string) => void, +): Server | null { try { try { unlinkSync(ctx.paths.socket) @@ -745,7 +751,7 @@ function startHookServer(ctx: Ctx, pending: Map, activeKe } const line = raw.slice(0, newline).trim() raw = raw.slice(newline + 1) - void handleHookSocketLine(ctx, pending, activeKeys, socket, line) + void handleHookSocketLine(ctx, pending, activeKeys, onAbandon, socket, line) }) }) server.on("error", (err) => { @@ -769,6 +775,11 @@ async function handleHookSocketLine( ctx: Ctx, pending: Map, activeKeys: Set, + // Called when a forwarded question is removed WITHOUT a Telegram answer + // (timeout, hook socket close, send failure). The daemon drops that pane's + // deferred backlog so it can't resurface, stale + out of order, on a LATER + // unrelated question's answer (review finding). + onAbandon: (paneId: string) => void, socket: Socket, line: string, ): Promise { @@ -813,6 +824,7 @@ async function handleHookSocketLine( if (pending.get(pendingKey) !== entry) return pending.delete(pendingKey) activeKeys.delete(pendingKey) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (entry.messageId !== null) void editMessageText(ctx, entry.messageId, "expired — answer in terminal") else entry.finalText = "expired — answer in terminal" socket.end("null\n") @@ -827,6 +839,7 @@ async function handleHookSocketLine( clearTimeout(entry.timer) pending.delete(pendingKey) activeKeys.delete(pendingKey) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (entry.messageId !== null) void editMessageText(ctx, entry.messageId, "expired — answer in terminal") else entry.finalText = "expired — answer in terminal" }) @@ -838,6 +851,7 @@ async function handleHookSocketLine( pending.delete(pendingKey) activeKeys.delete(pendingKey) clearTimeout(entry.timer) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (!socketClosed) socket.end("null\n") } return @@ -962,32 +976,213 @@ async function runDaemon(ctx: Ctx): Promise { let agentTokenSeq = 0 // Inbound queued behind an agent's outstanding question (defer-while-waiting): // pane → already-wrapped texts, flushed when the question is answered. The - // flag tells the trailing `ack` to mark the source message ⏳ instead of 👀. - const deferredByPane = new Map() + // flag tells the trailing `ack` to mark the source message with DEFER_REACTION + // (✍️ "noted, queued") instead of 👀. + const deferred = new DeferQueues() + // Panes with an in-flight flushDeferred. While a flush drains a pane, its queue + // is momentarily empty (taken out per round), so deferred.has() alone would let + // a live message inject AHEAD of items still being flushed. This flag keeps new + // inbound deferring for the whole flush, preserving strict FIFO; the drain + // loop's next round picks up whatever was enqueued meanwhile. + const flushingPanes = new Set() + // Panes whose forwarded question was abandoned (timeout / socket close / send + // failure, no terminal answer) WHILE a flush was draining that pane. The flush + // loop owns the queue here, so onQuestionAbandoned cannot drop it directly; it + // records the pane and the drain loop dead-letters the residual queue instead of + // pasting it into the still-open prompt (codex P2 — concurrent-flush resurface). + const abandonedDuringFlush = new Set() let lastWasDeferred = false const paneHasPendingQuestion = (paneId: string): boolean => { for (const pb of pendingButtons.values()) if (pb.req.paneId && pb.req.paneId === paneId) return true return false } + // A live inbound message defers behind an OPEN question OR an in-flight flush + // (which sets flushingPanes for its whole duration, incl. the 800ms settle and + // every inter-paste gap). Those two cover every window where injecting now would + // either hit an open prompt or jump ahead of older queued items. + // + // It deliberately does NOT defer merely because the queue is non-empty: a queue + // can outlive its question (the Telegram prompt expires / its hook socket closes + // WITHOUT a terminal answer — the agent is still blocked locally, so we must not + // flush, but no future answer will arrive to drain it either). Treating a + // non-empty queue as "defer" would then wedge EVERY later message to that pane + // forever (review finding). FIFO during a real flush is preserved by + // flushingPanes; a question (live or expired-in-terminal) is preserved by + // paneHasPendingQuestion while the prompt is open. + const shouldDeferInbound = (paneId: string): boolean => + paneHasPendingQuestion(paneId) || flushingPanes.has(paneId) // Enqueue only — the caller sets lastWasDeferred when this came from a MESSAGE // action (which earns a ⏳ ack); a button-callback defer edits its prompt // instead and must NOT leak the flag onto the next message's ack. const enqueueDeferred = (paneId: string, wrappedText: string): void => { - const q = deferredByPane.get(paneId) - if (q) q.push(wrappedText) - else deferredByPane.set(paneId, [wrappedText]) - } - const flushDeferred = async (paneId: string): Promise => { - const queue = deferredByPane.get(paneId) - if (!queue?.length) return - deferredByPane.delete(paneId) - // Let the agent consume the just-delivered answer before the next paste. - await Bun.sleep(800) - for (const text of queue) { + deferred.enqueue(paneId, wrappedText) + } + const injectDeferredOne = async (paneId: string, text: string): Promise => { + // Must NEVER throw: a throw here rejects driveFlush, which (a) loses the + // already-taken batch and (b) becomes an unhandled rejection in the + // fire-and-forget flushDeferred call. executeInjectSteps normally returns + // {ok:false} on tmux errors, but a spawn/exception could still escape — treat + // any throw as a failed paste (drop + log), exactly like {ok:false}. + try { const res = await executeInjectSteps(buildTextInjectPlan(paneId, text)) if (!res.ok) log(`deferred flush into ${paneId} failed: ${res.error}`) + return res.ok + } catch (err) { + log(`deferred flush into ${paneId} threw: ${err}`) + return false + } + } + // Tell the user a pane's deferred backlog was dropped without reaching the agent + // (its question was abandoned before an answer). Declared above flushDeferred so + // the latter can reference it (via deadLetterAbandoned) without a TDZ hazard. + // User-facing dead-letter notice (no-op when nothing was queued — no ping if + // there's nothing to resend). Callers own the daemon.log record so the resolution + // is always traced even at count 0; this only handles the Telegram side. + const notifyDeadLettered = (paneId: string, count: number): void => { + if (count === 0) return + // Guard the fire-and-forget send: this is a safety-critical notice (the user's + // only signal that messages were dropped), so a transport failure must at least + // hit the daemon log rather than vanish silently (review finding). NOTE: like + // every void-send in this daemon, an in-flight send can still be cut off by a + // shutdown right after a dead-letter — a pre-existing outbound-durability gap, + // not introduced here. + sendMessage( + ctx, + `⚠️ the question expired before it was answered — ${count} queued message(s) were NOT delivered. Resend them.`, + ).catch((err) => log(`failed to notify dead-letter on ${paneId}: ${err}`)) + } + const flushDeferred = async (paneId: string): Promise => { + if (!deferred.has(paneId)) return + // Re-entrancy guard: two answers for the same pane could both reach here (the + // action loop awaits each, but flushDeferred is fire-and-forget). The flag + // also routes new inbound into the queue for the whole flush (FIFO). + if (flushingPanes.has(paneId)) return + flushingPanes.add(paneId) + // The flag is cleared in the finally below, and onQuestionAbandoned only sets + // it while flushingPanes holds this pane — which the re-entrancy guard above + // makes mutually exclusive with a second concurrent flush — so it is always + // clean on entry; no defensive clear is needed here. + // + // Dead-letter the orphaned backlog for a pane whose abandoned question has no + // answer coming. `tail` is the slice driveFlush already removed from the live + // queue (via the round's take()); `deferred.drop()` returns whatever else is + // still queued. They are disjoint — take() emptied the pane entry at the start + // of the round, so anything drop() returns was enqueued after it — so the sum + // is the exact count of messages that never reached the agent. + const deadLetterAbandoned = (tail: readonly string[]): void => { + const dropped = deferred.drop(paneId) + const count = tail.length + dropped.length + // Always log the resolution (even a zero-count one) so daemon.log records + // that the abandonment flag was seen and handled — distinguishing "flagged + // and resolved" from "never flagged" (review finding). The user-facing notice + // (notifyDeadLettered) stays count-gated: nothing to resend means no ping. + log(`flush on ${paneId} dead-lettered ${count} orphaned message(s) (abandoned, no answer coming)`) + notifyDeadLettered(paneId, count) + } + try { + // Let the agent consume the just-delivered answer before the first paste. + await Bun.sleep(800) + // Drain in rounds: each round takes the current queue and feeds it to + // driveFlush (which re-checks paneHasPendingQuestion before EVERY paste). + // A round can leave a tail three ways: a NEW question opened (re-defer it — + // it flushes when that question is answered); a question was ABANDONED + // mid-flush (dead-letter the residual — no answer is coming, see codex P2); + // or fresh messages were enqueued DURING the round (drain them next round, + // while the pane stays free — otherwise they'd sit until the next unrelated + // question). Inject FAILURES do NOT re-defer: there may be no future question + // to flush them, so they are logged + dropped rather than wedging the queue + // forever (review finding). + // Resolve a mid-flush abandonment. The queue is keyed by PANE, not question, + // so when a question is abandoned we cannot tell its orphaned messages from + // those of ANOTHER question still live on the same pane. Gate strictly on + // pane-idle (codex P2: "dead-letter once no question is pending"): + // - a question is STILL pending → a live prompt owns the queue; re-defer + // the residual so its answer flushes it. Dropping here would lose that + // live question's legitimately-queued messages (review finding #1). + // - no question pending → the messages are truly orphaned (no answer is + // coming); dead-letter them rather than paste into the still-open prompt. + // `tail` is driveFlush's untouched slice (already removed from the queue). The + // pending read and the redefer/drop that follows run in ONE synchronous tick + // (no await between), so no inbound can interleave and be misrouted. + const resolveAbandonment = (tail: readonly string[]): void => { + if (paneHasPendingQuestion(paneId)) { + deferred.redefer(paneId, [...tail]) + log(`deferred flush into ${paneId} paused — ${tail.length} item(s) re-queued (abandon raced a new question)`) + } else { + deadLetterAbandoned(tail) + } + } + for (;;) { + // A question abandoned exactly at a round boundary (after driveFlush + // returned, before this take) leaves no tail to hand back, but the queue + // still must not be pasted — resolve it (dead-letter if idle, re-defer if a + // new question is now pending) and stop. + if (abandonedDuringFlush.has(paneId)) { + resolveAbandonment([]) + return + } + const queue = deferred.take(paneId) + if (queue.length === 0) return + const outcome = await driveFlush( + queue, + () => paneHasPendingQuestion(paneId), + (text) => injectDeferredOne(paneId, text), + () => abandonedDuringFlush.has(paneId), + ) + if (outcome.abandoned.length > 0 || abandonedDuringFlush.has(paneId)) { + resolveAbandonment(outcome.abandoned) + return + } + if (outcome.reDeferred.length > 0) { + deferred.redefer(paneId, outcome.reDeferred) + log(`deferred flush into ${paneId} paused — ${outcome.reDeferred.length} item(s) re-queued behind a new question`) + return + } + // Stop if a question is now open (its answer re-triggers the flush) or the + // queue is empty; else loop to drain anything enqueued during this round. + if (paneHasPendingQuestion(paneId) || !deferred.has(paneId)) return + } + } finally { + flushingPanes.delete(paneId) + abandonedDuringFlush.delete(paneId) } } + // A forwarded question was removed WITHOUT a Telegram answer (timeout / hook + // socket close / send failure). The agent's local prompt is still open, so we + // must NOT flush — but we must also not leave the pane's backlog to resurface, + // stale and out of order, when some LATER unrelated question on this pane is + // answered. Dead-letter it and tell the user (the messages never landed). + // + // The queue is keyed by PANE, not by question, so it may hold items that belong + // to a DIFFERENT outstanding handler on the same pane. Drop ONLY when the pane + // is otherwise idle: skip if a flush is in flight (it owns + will deliver the + // queue) or another question is still pending (its answer will flush). Dropping + // in either case would lose another handler's messages (review finding). + const onQuestionAbandoned = (paneId: string): void => { + // A flush is draining this pane: it owns the queue (taken out per round), so + // dropping here would race the loop and could lose nothing OR the wrong items. + // Record the pane instead — the drain loop checks the flag and dead-letters the + // residual once it sees no pending question, rather than pasting those orphaned + // messages into the still-open terminal prompt (codex P2 concurrent-flush race). + if (flushingPanes.has(paneId)) { + abandonedDuringFlush.add(paneId) + // Timestamp the abandonment itself: the flush resolves it later (re-defer or + // dead-letter), but logging here makes the event visible in daemon.log for + // post-incident review even if the resolution path is quiet (review finding). + log(`question abandoned on ${paneId} mid-flush — flagged for the drain loop`) + return + } + // Another question is still pending on this pane: its answer will flush the + // queue legitimately, so leave it (the queue is keyed by PANE, not question). + if (paneHasPendingQuestion(paneId)) return + // Pane is idle and no flush owns the queue: drop the orphaned backlog now. The + // pending check above and this drop run in the SAME synchronous tick (no await + // between), so no inbound for a freshly-opened question can interleave and be + // dropped by mistake — the daemon's inbound handling is event-loop callbacks. + const droppedCount = deferred.drop(paneId).length + log(`question abandoned on ${paneId} (idle) — dead-lettered ${droppedCount} queued message(s)`) + notifyDeadLettered(paneId, droppedCount) + } let hookServer: Server | null = null const cleanExit = (code: number): never => { try { @@ -1004,7 +1199,7 @@ async function runDaemon(ctx: Ctx): Promise { process.on("SIGTERM", () => cleanExit(0)) process.on("SIGINT", () => cleanExit(0)) - hookServer = startHookServer(ctx, pendingButtons, activeButtonKeys) + hookServer = startHookServer(ctx, pendingButtons, activeButtonKeys, onQuestionAbandoned) if (ctx.cfg.transport === "channel") { log("transport 'channel' is reserved for v1.2+ — falling back to tmux") @@ -1045,9 +1240,10 @@ async function runDaemon(ctx: Ctx): Promise { return false } const paneId = found.target.pane.paneId - // Defer a text message behind the pane's outstanding question (⏳); control - // verbs (Escape, no deferText) always go through to interrupt it. - if (deferText !== undefined && paneHasPendingQuestion(paneId)) { + // Defer a text message behind the pane's outstanding question OR an existing + // backlog (⏳); control verbs (Escape, no deferText) always go through to + // interrupt it. + if (deferText !== undefined && shouldDeferInbound(paneId)) { enqueueDeferred(paneId, deferText) lastWasDeferred = true return true @@ -1069,7 +1265,7 @@ async function runDaemon(ctx: Ctx): Promise { msg: string, ): Promise<{ ok: boolean; error?: string; deferred?: boolean }> => { const wrapped = wrapInbound(ctx.cfg.injectWrap, from, msg) - if (paneHasPendingQuestion(paneId)) { + if (shouldDeferInbound(paneId)) { enqueueDeferred(paneId, wrapped) return { ok: true, deferred: true } } @@ -1173,7 +1369,7 @@ async function runDaemon(ctx: Ctx): Promise { // callback edits its own prompt, so it must NOT set lastWasDeferred. let res: { ok: boolean; error?: string; deferred?: boolean } if (pending.prewrapped) { - if (paneHasPendingQuestion(target.paneId)) { + if (shouldDeferInbound(target.paneId)) { enqueueDeferred(target.paneId, pending.message) res = { ok: true, deferred: true } } else { @@ -1201,7 +1397,7 @@ async function runDaemon(ctx: Ctx): Promise { const routes = parseRoutes(readFileOrNull(ctx.paths.routes)) const recognized = recognizeRoute(routes, action.replyToMessageId) const injectVerbatim = async (paneId: string): Promise => { - if (paneHasPendingQuestion(paneId)) { + if (shouldDeferInbound(paneId)) { enqueueDeferred(paneId, action.injectText) lastWasDeferred = true return true @@ -1473,7 +1669,11 @@ async function runDaemon(ctx: Ctx): Promise { void answerCallbackQuery(ctx, action.callbackQueryId, "answered") if (pending.messageId !== null) void editMessageText(ctx, pending.messageId, finalText) // The agent is unblocked → deliver anything queued behind this question. - if (pending.req.paneId) void flushDeferred(pending.req.paneId) + // Fire-and-forget, but guard the promise: an error here must never become + // an unhandled rejection that takes down the daemon. + if (pending.req.paneId) { + flushDeferred(pending.req.paneId).catch((err) => log(`flushDeferred error: ${err}`)) + } return true } case "status":