From 64ed7c62617dd89b423eaf3e0f992322702482cb Mon Sep 17 00:00:00 2001 From: Alex Ultra Date: Tue, 16 Jun 2026 01:06:01 +0200 Subject: [PATCH 1/2] feat(tg-ctl): harden defer-while-waiting against follow-up questions, wedges, and stale resurface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inbound Telegram text is injected into the agent's tmux pane via send-keys. If a question/permission prompt is OPEN in that pane, injected text lands IN the prompt and is lost/corrupted. The daemon already queued inbound behind an open question and flushed on answer, but the flush had several real hazards: - Follow-up question: the agent commonly re-prompts the instant its question is answered. The old flush drained the whole queue up front and pasted without re-checking, typing queued text straight into the new prompt. driveFlush now re-checks the pane before EVERY paste and re-defers the untouched tail. - Head-of-line block: a transient inject failure (pane lost its agent) re-deferred the whole tail behind a question that may never come, wedging every later message forever. Failures now log + drop + continue. - Out-of-order during a flush: a message arriving in the 800ms settle window or between pastes could jump ahead of older queued items. A flushingPanes flag keeps new inbound deferring (strict FIFO) for the whole flush; the drain loop delivers it. - Stale resurface: a question removed WITHOUT a Telegram answer (timeout / hook socket close / send failure) left its backlog to flush, stale and out of order, on a LATER unrelated question's answer. Those paths now dead-letter the pane's queue (only when the pane is otherwise idle) and tell the user the messages did not land. - Throw safety: injectDeferredOne can no longer reject driveFlush, and the fire-and-forget flushDeferred call is .catch-guarded. The queue model (DeferQueues + driveFlush) is a pure, fully unit-tested module; the daemon owns tmux I/O and the pendingButtons map. Adds unit tests for the queue + flush logic and two daemon-level integration tests (deferred→FIFO flush; question abandoned-without-answer does not wedge or resurface later inbound). Known pre-existing limitation (not introduced here): when a Telegram question expires but the agent's TERMINAL prompt stays open, the daemon loses its only "busy" signal and the next inbound can inject into that open prompt. Solving it needs prompt-state the bridge cannot observe; tracked separately. Co-Authored-By: Claude Opus 4.8 --- AGENTS.md | 5 +- features/tg-ctl/defer.ts | 107 +++++++ tests/ctl-defer-integration.test.ts | 428 ++++++++++++++++++++++++++++ tests/ctl-defer.test.ts | 143 ++++++++++ tg-ctl | 147 ++++++++-- 5 files changed, 806 insertions(+), 24 deletions(-) create mode 100644 features/tg-ctl/defer.ts create mode 100644 tests/ctl-defer-integration.test.ts create mode 100644 tests/ctl-defer.test.ts diff --git a/AGENTS.md b/AGENTS.md index 24bbeb8..fa6e40c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -84,7 +84,10 @@ tests can pass fakes. `agent-match.ts` (phonetic fuzzy window matching + session-grouped selection buttons), `routes.ts` (message_id→pane map for reply recognition + LRU/MRU picker), `hook-normalize.ts` (raw harness hook payload → ButtonRequest), `hook-install.ts` (idempotent q→buttons hook - merge for `tg-ctl install-hooks`), and `voice.ts` (inbound VOICE→text: `voice:` config block + merge for `tg-ctl install-hooks`), `defer.ts` (defer-while-waiting queue model: inbound text is + QUEUED per-pane while that pane has an open question and flushed on answer, so it is never pasted + into the prompt — `driveFlush` re-checks the pane before EACH paste so a follow-up question + re-defers the untouched tail), and `voice.ts` (inbound VOICE→text: `voice:` config block parse/resolve/upsert, ffmpeg + whisper argv builders, transcript cleaning, and the onboarding decision). `voice-probe.ts` is the ONE impure module here — it scans `~/xp` for an existing Whisper install (whisper.cpp binary + ggml model, or a faster-whisper venv) and checks for diff --git a/features/tg-ctl/defer.ts b/features/tg-ctl/defer.ts new file mode 100644 index 0000000..97a1a4d --- /dev/null +++ b/features/tg-ctl/defer.ts @@ -0,0 +1,107 @@ +// Defer-while-waiting queue model for the tg-ctl daemon (spec tg#30). +// +// THE DANGER this exists to prevent: tg-ctl injects inbound Telegram messages +// into the agent's tmux pane via `send-keys`. If a question/permission prompt is +// OPEN in that pane, injected text is typed INTO the prompt — lost, or worse, +// corrupting the user's answer. So while a pane has a pending question the daemon +// QUEUES inbound text here instead of blasting it into the pane, and flushes the +// queue once the question is answered (button reply → flush). +// +// PURE: no I/O. The daemon owns tmux spawns and the pendingButtons map; it feeds +// this model the live pane-busy state and an inject callback. +// +// Past bug (the reason driveFlush re-checks PER ITEM): the agent very often opens +// a SECOND question the instant the first is answered. A flush that drained the +// whole queue up-front and pasted without re-checking would type the queued text +// straight into the new prompt — re-introducing the exact hanging-question bug. +// driveFlush therefore consults isPaneBusy() before EACH paste and re-defers the +// untouched tail the moment a new question appears. + +// Per-pane FIFO queues of already-wrapped inbound texts awaiting their pane's +// open question to be answered. Keyed by tmux pane id ("%N"). +export class DeferQueues { + private readonly byPane = new Map(); + + // Append one wrapped message to the back of a pane's queue. + enqueue(paneId: string, wrappedText: string): void { + const q = this.byPane.get(paneId); + if (q) q.push(wrappedText); + else this.byPane.set(paneId, [wrappedText]); + } + + has(paneId: string): boolean { + return (this.byPane.get(paneId)?.length ?? 0) > 0; + } + + // Read-only view of a pane's queue (tests; never mutate the result). + peek(paneId: string): readonly string[] { + return this.byPane.get(paneId) ?? []; + } + + // Drain a pane's queue for a flush attempt and remove the entry. The caller + // runs driveFlush on the result and re-defers whatever it could not inject. + take(paneId: string): string[] { + const q = this.byPane.get(paneId); + if (!q) return []; + this.byPane.delete(paneId); + return q; + } + + // Put un-flushed items back at the FRONT, ahead of anything that arrived for + // this pane while the flush was in flight — so the user's ordering survives a + // re-defer (the tail of an interrupted flush precedes newer messages). + redefer(paneId: string, items: string[]): void { + if (items.length === 0) return; + const later = this.byPane.get(paneId) ?? []; + this.byPane.set(paneId, [...items, ...later]); + } + + // Drop a pane's whole queue and return what was dropped. Used to dead-letter a + // backlog when its question is removed WITHOUT an answer: leaving it would let + // those messages resurface, stale and out of order, on a LATER unrelated + // question's flush. The caller tells the user (the messages never reached the + // agent) rather than silently losing them. + drop(paneId: string): string[] { + return this.take(paneId); + } +} + +// Drive a queue flush, re-checking the pane between EVERY item via isPaneBusy(). +// The check matters because the daemon awaits the actual tmux paste between +// items, and a freshly-answered question is commonly followed by another one: +// the first injection unblocks the agent, which may immediately re-prompt. The +// moment isPaneBusy() reports a new open question, the loop STOPS and hands the +// untouched tail back to the caller to re-defer — that tail flushes when THAT +// question is answered, so nothing is ever pasted into an open prompt. +// +// A new-question abort is the ONLY reason to re-defer. An inject FAILURE (pane no +// longer hosts an agent, a tmux error) must NOT re-defer the tail: there is no +// pending question to flush it later, so re-deferring would wedge every following +// message in memory forever, marked "queued", while the user thinks it landed +// (review finding — head-of-line blocking + indefinite stuck). On failure we log +// (via the inject callback) and SKIP that one item, continuing with the rest — +// the same forward-progress the pre-defer loop had. +// +// PURE control flow: all I/O (isPaneBusy, inject) is injected, so tests drive it +// with plain callbacks and assert exactly which items landed vs were re-deferred. +export interface FlushOutcome { + injected: string[]; + failed: string[]; // attempted but inject() returned false (logged, dropped) + reDeferred: string[]; // a new question opened — flush these on its answer +} + +export async function driveFlush( + queue: readonly string[], + isPaneBusy: () => boolean, + inject: (text: string) => Promise, +): Promise { + const injected: string[] = []; + const failed: string[] = []; + for (let i = 0; i < queue.length; i++) { + // A new question opened → stop and re-defer everything not yet attempted. + if (isPaneBusy()) return { injected, failed, reDeferred: queue.slice(i) }; + if (await inject(queue[i])) injected.push(queue[i]); + else failed.push(queue[i]); // pane gone / tmux error — drop, don't wedge + } + return { injected, failed, reDeferred: [] }; +} diff --git a/tests/ctl-defer-integration.test.ts b/tests/ctl-defer-integration.test.ts new file mode 100644 index 0000000..6a99635 --- /dev/null +++ b/tests/ctl-defer-integration.test.ts @@ -0,0 +1,428 @@ +import { afterEach, expect, test } from 'bun:test'; +import { closeSync, existsSync, mkdirSync, mkdtempSync, openSync, readFileSync, writeFileSync } from 'fs'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import type { Subprocess } from 'bun'; + +// End-to-end defer-while-waiting (spec tg#30): the real daemon, a fake Telegram +// server, and a fake tmux/ps that reports ONE claude pane (%1) and LOGS every +// injected payload. The danger under test: an inbound message that arrives while +// pane %1 has an OPEN question must NOT be pasted into that prompt — it is +// queued (✍️ reaction) and flushed only once the question is answered. + +const TG_CTL = join(import.meta.dir, '..', 'tg-ctl'); + +const procs: Subprocess[] = []; +const servers: Array<{ stop: (closeActiveConnections?: boolean) => Promise | void }> = []; + +afterEach(async () => { + for (const p of procs.splice(0)) { + if (p.exitCode === null) { + p.kill(9); + await p.exited; + } + } + // await stop: with active connections Bun returns a Promise; not awaiting it + // leaks the listener into the next test (port/listener race). + for (const s of servers.splice(0)) await s.stop(true); +}); + +const PANE_ID = '%1'; +const PANE_PID = 4242; + +// A fake tmux that: answers list-panes with one claude-hosting pane rooted at +// $TG_TEST_CWD; logs every send-keys -l / load-buffer payload to $TG_INJECT_LOG +// (one line per injected text) so the test can assert WHAT landed and WHEN; and +// exits 0 for everything else (Enter, paste-buffer, verify side-effects). +function fakeTmux(cwd: string, injectLog: string): string { + return `#!/bin/sh +sub="$1"; shift +case "$sub" in + list-panes) + # Fields passed as %s args — the pane id is literally '%1', which must NOT + # reach the printf FORMAT string (printf would read it as a directive). + printf '%s\\t%s\\t%s\\t%s\\t%s\\t%s\\n' 'main' '0' '${PANE_ID}' '${PANE_PID}' 'claude' '${cwd}' + ;; + display-message) + printf 'main\\n' + ;; + send-keys) + # Scan for the literal flag: 'send-keys -t %1 -l ' logs . A bare + # 'send-keys -t %1 Enter' (no -l) is the submit keystroke — never logged. + while [ $# -gt 0 ]; do + if [ "$1" = "-l" ]; then printf '%s\\n' "$2" >> '${injectLog}'; break; fi + shift + done + ;; + load-buffer) + # multi-line paste payload arrives on stdin + cat >> '${injectLog}' + printf '\\n' >> '${injectLog}' + ;; +esac +exit 0 +`; +} + +// ps -axo pid=,ppid=,command= → the pane pid runs claude, so findAgentInPane +// resolves %1 to a claude agent. +function fakePs(): string { + return `#!/bin/sh +printf '%s %s %s\\n' '${PANE_PID}' '1' 'claude' +exit 0 +`; +} + +function makeCfgDir(): string { + const cfgDir = mkdtempSync(join(tmpdir(), 'tgctl-defer-')); + const injectLog = join(cfgDir, 'inject.log'); + writeFileSync(join(cfgDir, '.env'), 'TG_BOT_TOKEN=123:abc\nTG_CHAT_ID=1\n'); + writeFileSync(join(cfgDir, 'config.yaml'), 'control:\n enabled: true\n'); + // registration pins the target pane so discovery is deterministic. + writeFileSync( + join(cfgDir, 'tg-ctl.123.registration.json'), + JSON.stringify({ paneId: PANE_ID, cwd: cfgDir }), + ); + mkdirSync(join(cfgDir, 'bin')); + writeFileSync(join(cfgDir, 'bin', 'tmux'), fakeTmux(cfgDir, injectLog), { mode: 0o755 }); + writeFileSync(join(cfgDir, 'bin', 'ps'), fakePs(), { mode: 0o755 }); + return cfgDir; +} + +function injected(cfgDir: string): string[] { + const p = join(cfgDir, 'inject.log'); + if (!existsSync(p)) return []; + return readFileSync(p, 'utf8').split('\n').filter((l) => l.length > 0); +} + +async function startDaemon(cfgDir: string, apiPort: number): Promise { + const logFd = openSync(join(cfgDir, 'daemon.log'), 'a'); + const daemon = Bun.spawn([process.execPath, TG_CTL, 'run'], { + env: { + // ONLY the fake bin dir on PATH for tmux/ps; node/bun resolve via execPath. + PATH: `${join(cfgDir, 'bin')}:/usr/bin:/bin`, + HOME: cfgDir, + TG_CTL_CONFIG_DIR: cfgDir, + TG_API_BASE: `http://127.0.0.1:${apiPort}`, + }, + stdio: ['ignore', logFd, logFd], + }); + closeSync(logFd); + const socket = join(cfgDir, 'tg-ctl.123.sock'); + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && !existsSync(socket)) await Bun.sleep(50); + expect(existsSync(socket)).toBe(true); + return daemon; +} + +function startAsk(cfgDir: string, apiPort: number, request: Record): Subprocess { + const ask = Bun.spawn([process.execPath, TG_CTL, 'ask'], { + env: { + PATH: `${join(cfgDir, 'bin')}:/usr/bin:/bin`, + HOME: cfgDir, + TG_CTL_CONFIG_DIR: cfgDir, + TG_API_BASE: `http://127.0.0.1:${apiPort}`, + // ask reads TMUX_PANE for the request's paneId; pin it to our pane. + TMUX_PANE: PANE_ID, + }, + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + }); + ask.stdin.write(JSON.stringify({ cwd: cfgDir, paneId: PANE_ID, ...request }) + '\n'); + ask.stdin.end(); + return ask; +} + +test('inbound text deferred while a question is open, flushed (and not lost) after the answer', async () => { + const cfgDir = makeCfgDir(); + + // A scripted Telegram server. updateQueue feeds getUpdates one batch at a + // time; the test drives the sequence by pushing updates between assertions. + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + if (url.pathname.endsWith('/answerCallbackQuery') || url.pathname.endsWith('/editMessageText')) { + return Response.json({ ok: true, result: true }); + } + return Response.json({ ok: false, description: `unexpected: ${url.pathname}` }, { status: 404 }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // 1) Open a question on pane %1 (hook → inline buttons). It blocks until the + // callback answers it. + const ask = startAsk(cfgDir, server.port, { + requestId: 'q_block', + agent: 'claude', + kind: 'question', + question: 'Proceed?', + options: [{ label: 'Yes' }, { label: 'No' }], + }); + procs.push(ask); + + // Wait for the button message to be posted. + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + // 2) TWO inbound texts arrive WHILE the question is open → both DEFERRED, in + // order. The second message tests both deferral AND that flush preserves + // FIFO across multiple queued items. + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 10, + message: { + message_id: 11, + from: { id: 1, first_name: 'Alex' }, + chat: { id: 1 }, + date: nowSec, + text: 'first thing', + }, + }, + ]); + // Wait for the first to be deferred (its ✍️ reaction) before sending the + // second, so the daemon enqueues them in a deterministic order. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(injected(cfgDir)).toEqual([]); // nothing pasted into the open prompt + expect(reactions.at(-1)).toMatchObject({ message_id: 11, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + updateQueue.push([ + { + update_id: 11, + message: { + message_id: 12, + from: { id: 1, first_name: 'Alex' }, + chat: { id: 1 }, + date: nowSec, + text: 'second thing', + }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length < 2) await Bun.sleep(50); + } + expect(injected(cfgDir)).toEqual([]); // still nothing pasted + expect(reactions.at(-1)).toMatchObject({ message_id: 12, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + // 3) Answer the question via the inline button → unblocks the agent → flush. + // The callback's source message_id must match the button message the daemon + // posted (captured above), else the daemon rejects a stale tap. + updateQueue.push([ + { + update_id: 12, + callback_query: { + id: 'cb1', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: buttonMessageId, chat: { id: 1 }, date: nowSec }, + data: questionCallbackData, + }, + }, + ]); + + // ask resolves with the hook output once the callback lands. + const askOut = await new Response(ask.stdout).text(); + await ask.exited; + expect(askOut.length).toBeGreaterThan(0); + + // 4) Both deferred messages flush into the pane in FIFO order (flushDeferred + // sleeps 800ms before the first paste). Exactly two, first then second. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && injected(cfgDir).length < 2) await Bun.sleep(100); + } + const landed = injected(cfgDir); + expect(landed).toHaveLength(2); + expect(landed[0]).toContain('first thing'); + expect(landed[1]).toContain('second thing'); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + +test('a question removed WITHOUT an answer (hook socket closes) does not wedge later inbound', async () => { + // Regression for the review finding: extending defer to "queue non-empty" would + // permanently wedge a pane once its question expired without a Telegram answer + // (timeout / socket close / send failure delete the pending button but never + // flush). After the question is gone, a NEW inbound must inject normally, not + // sit deferred forever behind an undrainable backlog. + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + // answerCallbackQuery / editMessageText + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // Open a question; an inbound arrives and is deferred behind it. + const ask = startAsk(cfgDir, server.port, { + requestId: 'q_expire', + agent: 'claude', + kind: 'question', + question: 'Proceed?', + options: [{ label: 'Yes' }], + }); + procs.push(ask); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 20, + message: { message_id: 21, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'deferred one' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(reactions.at(-1)).toMatchObject({ message_id: 21, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + expect(injected(cfgDir)).toEqual([]); + + // The question is removed WITHOUT an answer: kill the ask hook → its socket + // closes → the daemon deletes the pending button AND dead-letters the pane's + // backlog (so it can never resurface, stale + out of order, on a later + // unrelated question's answer). The user is told the queued message did not land. + questionCallbackData = ''; + ask.kill(9); + await ask.exited; + await Bun.sleep(300); // let the socket-close handler run onAbandon + + // A NEW inbound now arrives. With the bug it would be deferred (queue non-empty + // → ✍️, stuck forever). Fixed: no pending question, no flush in flight → it + // injects live and earns a 👀. + updateQueue.push([ + { + update_id: 21, + message: { message_id: 22, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'after expiry' }, + }, + ]); + // Wait for the new message to actually land in the pane (the real proof it was + // NOT wedged). Injection runs verify-pane + paced send-keys, so allow time. + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !injected(cfgDir).some((l) => l.includes('after expiry'))) { + await Bun.sleep(100); + } + } + // The new message landed; the abandoned 'deferred one' was dropped (not wedged). + expect(injected(cfgDir)).toContain('[TG from Alex #22] after expiry'); + expect(injected(cfgDir).some((l) => l.includes('deferred one'))).toBe(false); + // It was DELIVERED (👀), not deferred (✍️). + { + const t0 = Date.now(); + const got22 = (): Record | undefined => + reactions.find((r) => r.message_id === 22); + while (Date.now() - t0 < 4000 && !got22()) await Bun.sleep(50); + expect(got22()).toMatchObject({ message_id: 22, reaction: [{ type: 'emoji', emoji: '👀' }] }); + } + + // The definitive regression (review P1): open a SECOND, unrelated question and + // ANSWER it. The abandoned 'deferred one' from Q1 must NOT resurface in this + // flush — it was dead-lettered, not left in the queue. + const ask2 = startAsk(cfgDir, server.port, { + requestId: 'q_second', + agent: 'claude', + kind: 'question', + question: 'Second?', + options: [{ label: 'Ok' }], + }); + procs.push(ask2); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + updateQueue.push([ + { + update_id: 22, + callback_query: { + id: 'cb2', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: buttonMessageId, chat: { id: 1 }, date: nowSec }, + data: questionCallbackData, + }, + }, + ]); + await new Response(ask2.stdout).text(); + await ask2.exited; + // Give a would-be stale flush ample time to (wrongly) fire, then assert it did not. + await Bun.sleep(1500); + expect(injected(cfgDir).some((l) => l.includes('deferred one'))).toBe(false); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); diff --git a/tests/ctl-defer.test.ts b/tests/ctl-defer.test.ts new file mode 100644 index 0000000..afabeed --- /dev/null +++ b/tests/ctl-defer.test.ts @@ -0,0 +1,143 @@ +// Unit tests for the defer-while-waiting queue model (features/tg-ctl/defer.ts). +// +// The DANGER this guards (docs note tg#30): tg-ctl injects inbound Telegram +// messages into the agent's tmux pane via send-keys. If a question/permission +// prompt is OPEN in that pane, injected text is typed INTO the prompt and +// lost/corrupts it. The daemon therefore DEFERS inbound text while a pane has a +// pending question and flushes the queue once the question is answered. +// +// The subtle failure these tests pin down: the agent frequently asks a SECOND +// question the instant the first is answered. The flush must re-check the +// pane-busy state BEFORE each paste and re-defer the remainder, or it pastes +// straight into the new prompt — re-introducing the exact bug. + +import { describe, expect, test } from 'bun:test'; +import { DeferQueues, driveFlush } from '../features/tg-ctl/defer'; + +describe('DeferQueues', () => { + test('enqueue preserves per-pane FIFO order', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + q.enqueue('%2', 'c'); + expect(q.peek('%1')).toEqual(['a', 'b']); + expect(q.peek('%2')).toEqual(['c']); + }); + + test('has reflects whether a pane has queued text', () => { + const q = new DeferQueues(); + expect(q.has('%1')).toBe(false); + q.enqueue('%1', 'a'); + expect(q.has('%1')).toBe(true); + }); + + test('take drains and removes the pane queue', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + expect(q.take('%1')).toEqual(['a', 'b']); + expect(q.has('%1')).toBe(false); + expect(q.take('%1')).toEqual([]); + }); + + test('redefer puts items back at the FRONT, before anything enqueued meanwhile', () => { + const q = new DeferQueues(); + // While a flush was draining ['a','b'], a new message 'c' arrived for the + // same pane. The un-flushed tail ['b'] must land BEFORE 'c' so the user's + // ordering survives. + q.enqueue('%1', 'c'); + q.redefer('%1', ['b']); + expect(q.peek('%1')).toEqual(['b', 'c']); + }); + + test('redefer of multiple items keeps their relative order', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'd'); + q.redefer('%1', ['b', 'c']); + expect(q.peek('%1')).toEqual(['b', 'c', 'd']); + }); + + test('drop empties the pane queue and returns the dropped items', () => { + const q = new DeferQueues(); + q.enqueue('%1', 'a'); + q.enqueue('%1', 'b'); + expect(q.drop('%1')).toEqual(['a', 'b']); + expect(q.has('%1')).toBe(false); + expect(q.drop('%1')).toEqual([]); // idempotent on an empty pane + }); +}); + +describe('driveFlush', () => { + test('injects everything when the pane never becomes busy again', async () => { + const injected: string[] = []; + const out = await driveFlush(['a', 'b', 'c'], () => false, async (t) => { + injected.push(t); + return true; + }); + expect(injected).toEqual(['a', 'b', 'c']); + expect(out.injected).toEqual(['a', 'b', 'c']); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + }); + + test('stops once a new question opens MID-flush, re-deferring the rest', async () => { + // The pane is free for the first paste; a new question opens before the + // second. 'b' and 'c' must NOT be pasted into that open prompt — they go + // back to the queue. (This is the exact follow-up-question race the fix + // closes: re-checked between awaited pastes, not just once up front.) + const injected: string[] = []; + let questionOpen = false; + const out = await driveFlush(['a', 'b', 'c'], () => questionOpen, async (t) => { + injected.push(t); + questionOpen = true; // the answered agent immediately re-prompts + return true; + }); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.reDeferred).toEqual(['b', 'c']); + expect(out.failed).toEqual([]); + }); + + test('re-defers the WHOLE queue when a question is already open at flush time', async () => { + const injected: string[] = []; + const out = await driveFlush(['a', 'b'], () => true, async (t) => { + injected.push(t); + return true; + }); + expect(injected).toEqual([]); + expect(out.injected).toEqual([]); + expect(out.reDeferred).toEqual(['a', 'b']); + }); + + test('a failed paste is dropped (logged), flush CONTINUES — no head-of-line block', async () => { + // inject() returning false (e.g. the pane lost its agent) must NOT re-defer + // the tail: there may be no future question to flush it, so re-deferring would + // wedge every later message forever. The failed item is dropped; the rest go. + const injected: string[] = []; + const out = await driveFlush(['a', 'b', 'c'], () => false, async (t) => { + if (t === 'b') return false; // transient failure on the middle item + injected.push(t); + return true; + }); + expect(injected).toEqual(['a', 'c']); + expect(out.injected).toEqual(['a', 'c']); + expect(out.failed).toEqual(['b']); + expect(out.reDeferred).toEqual([]); // nothing wedged + }); + + test('a new question takes priority over a pending failure on the next item', async () => { + // busy is checked BEFORE inject, so an open question re-defers the tail even + // if that tail's first inject would have failed. + const out = await driveFlush(['a', 'b'], () => true, async () => false); + expect(out.injected).toEqual([]); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual(['a', 'b']); + }); + + test('empty queue yields an empty outcome', async () => { + const out = await driveFlush([], () => false, async () => true); + expect(out.injected).toEqual([]); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + }); +}); diff --git a/tg-ctl b/tg-ctl index 91dfa62..0c1c5a4 100755 --- a/tg-ctl +++ b/tg-ctl @@ -22,6 +22,7 @@ import { type AgentCandidate, } from "./features/tg-ctl/agent-match" import { parseControlConfig, resolveControlConfig } from "./features/tg-ctl/config" +import { DeferQueues, driveFlush } from "./features/tg-ctl/defer" import { findAgentInPane, parsePaneList, parseProcList, pickTargetPane } from "./features/tg-ctl/discover" import { buildKeyInjectPlan, buildTextInjectPlan, wrapInbound } from "./features/tg-ctl/inject" import { botIdFromToken, ctlPaths, pidStatus, readPidFile } from "./features/tg-ctl/lock" @@ -726,7 +727,12 @@ function hookMatchesRegistration(ctx: Ctx, req: ButtonRequest): boolean { return registrationAllowsHook(readRegistration(ctx.paths), req, resolve) } -function startHookServer(ctx: Ctx, pending: Map, activeKeys: Set): Server | null { +function startHookServer( + ctx: Ctx, + pending: Map, + activeKeys: Set, + onAbandon: (paneId: string) => void, +): Server | null { try { try { unlinkSync(ctx.paths.socket) @@ -745,7 +751,7 @@ function startHookServer(ctx: Ctx, pending: Map, activeKe } const line = raw.slice(0, newline).trim() raw = raw.slice(newline + 1) - void handleHookSocketLine(ctx, pending, activeKeys, socket, line) + void handleHookSocketLine(ctx, pending, activeKeys, onAbandon, socket, line) }) }) server.on("error", (err) => { @@ -769,6 +775,11 @@ async function handleHookSocketLine( ctx: Ctx, pending: Map, activeKeys: Set, + // Called when a forwarded question is removed WITHOUT a Telegram answer + // (timeout, hook socket close, send failure). The daemon drops that pane's + // deferred backlog so it can't resurface, stale + out of order, on a LATER + // unrelated question's answer (review finding). + onAbandon: (paneId: string) => void, socket: Socket, line: string, ): Promise { @@ -813,6 +824,7 @@ async function handleHookSocketLine( if (pending.get(pendingKey) !== entry) return pending.delete(pendingKey) activeKeys.delete(pendingKey) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (entry.messageId !== null) void editMessageText(ctx, entry.messageId, "expired — answer in terminal") else entry.finalText = "expired — answer in terminal" socket.end("null\n") @@ -827,6 +839,7 @@ async function handleHookSocketLine( clearTimeout(entry.timer) pending.delete(pendingKey) activeKeys.delete(pendingKey) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (entry.messageId !== null) void editMessageText(ctx, entry.messageId, "expired — answer in terminal") else entry.finalText = "expired — answer in terminal" }) @@ -838,6 +851,7 @@ async function handleHookSocketLine( pending.delete(pendingKey) activeKeys.delete(pendingKey) clearTimeout(entry.timer) + if (entry.req.paneId) onAbandon(entry.req.paneId) if (!socketClosed) socket.end("null\n") } return @@ -962,32 +976,114 @@ async function runDaemon(ctx: Ctx): Promise { let agentTokenSeq = 0 // Inbound queued behind an agent's outstanding question (defer-while-waiting): // pane → already-wrapped texts, flushed when the question is answered. The - // flag tells the trailing `ack` to mark the source message ⏳ instead of 👀. - const deferredByPane = new Map() + // flag tells the trailing `ack` to mark the source message with DEFER_REACTION + // (✍️ "noted, queued") instead of 👀. + const deferred = new DeferQueues() + // Panes with an in-flight flushDeferred. While a flush drains a pane, its queue + // is momentarily empty (taken out per round), so deferred.has() alone would let + // a live message inject AHEAD of items still being flushed. This flag keeps new + // inbound deferring for the whole flush, preserving strict FIFO; the drain + // loop's next round picks up whatever was enqueued meanwhile. + const flushingPanes = new Set() let lastWasDeferred = false const paneHasPendingQuestion = (paneId: string): boolean => { for (const pb of pendingButtons.values()) if (pb.req.paneId && pb.req.paneId === paneId) return true return false } + // A live inbound message defers behind an OPEN question OR an in-flight flush + // (which sets flushingPanes for its whole duration, incl. the 800ms settle and + // every inter-paste gap). Those two cover every window where injecting now would + // either hit an open prompt or jump ahead of older queued items. + // + // It deliberately does NOT defer merely because the queue is non-empty: a queue + // can outlive its question (the Telegram prompt expires / its hook socket closes + // WITHOUT a terminal answer — the agent is still blocked locally, so we must not + // flush, but no future answer will arrive to drain it either). Treating a + // non-empty queue as "defer" would then wedge EVERY later message to that pane + // forever (review finding). FIFO during a real flush is preserved by + // flushingPanes; a question (live or expired-in-terminal) is preserved by + // paneHasPendingQuestion while the prompt is open. + const shouldDeferInbound = (paneId: string): boolean => + paneHasPendingQuestion(paneId) || flushingPanes.has(paneId) // Enqueue only — the caller sets lastWasDeferred when this came from a MESSAGE // action (which earns a ⏳ ack); a button-callback defer edits its prompt // instead and must NOT leak the flag onto the next message's ack. const enqueueDeferred = (paneId: string, wrappedText: string): void => { - const q = deferredByPane.get(paneId) - if (q) q.push(wrappedText) - else deferredByPane.set(paneId, [wrappedText]) - } - const flushDeferred = async (paneId: string): Promise => { - const queue = deferredByPane.get(paneId) - if (!queue?.length) return - deferredByPane.delete(paneId) - // Let the agent consume the just-delivered answer before the next paste. - await Bun.sleep(800) - for (const text of queue) { + deferred.enqueue(paneId, wrappedText) + } + const injectDeferredOne = async (paneId: string, text: string): Promise => { + // Must NEVER throw: a throw here rejects driveFlush, which (a) loses the + // already-taken batch and (b) becomes an unhandled rejection in the + // fire-and-forget flushDeferred call. executeInjectSteps normally returns + // {ok:false} on tmux errors, but a spawn/exception could still escape — treat + // any throw as a failed paste (drop + log), exactly like {ok:false}. + try { const res = await executeInjectSteps(buildTextInjectPlan(paneId, text)) if (!res.ok) log(`deferred flush into ${paneId} failed: ${res.error}`) + return res.ok + } catch (err) { + log(`deferred flush into ${paneId} threw: ${err}`) + return false + } + } + const flushDeferred = async (paneId: string): Promise => { + if (!deferred.has(paneId)) return + // Re-entrancy guard: two answers for the same pane could both reach here (the + // action loop awaits each, but flushDeferred is fire-and-forget). The flag + // also routes new inbound into the queue for the whole flush (FIFO). + if (flushingPanes.has(paneId)) return + flushingPanes.add(paneId) + try { + // Let the agent consume the just-delivered answer before the first paste. + await Bun.sleep(800) + // Drain in rounds: each round takes the current queue and feeds it to + // driveFlush (which re-checks paneHasPendingQuestion before EVERY paste). + // A round can leave a tail two ways: a NEW question opened (re-defer it — it + // flushes when that question is answered), or fresh messages were enqueued + // DURING the round (drain them next round, while the pane stays free — + // otherwise they'd sit until the next unrelated question). Inject FAILURES do + // NOT re-defer: there may be no future question to flush them, so they are + // logged + dropped rather than wedging the queue forever (review finding). + for (;;) { + const queue = deferred.take(paneId) + if (queue.length === 0) return + const outcome = await driveFlush(queue, () => paneHasPendingQuestion(paneId), (text) => + injectDeferredOne(paneId, text), + ) + if (outcome.reDeferred.length > 0) { + deferred.redefer(paneId, outcome.reDeferred) + log(`deferred flush into ${paneId} paused — ${outcome.reDeferred.length} item(s) re-queued behind a new question`) + return + } + // Stop if a question is now open (its answer re-triggers the flush) or the + // queue is empty; else loop to drain anything enqueued during this round. + if (paneHasPendingQuestion(paneId) || !deferred.has(paneId)) return + } + } finally { + flushingPanes.delete(paneId) } } + // A forwarded question was removed WITHOUT a Telegram answer (timeout / hook + // socket close / send failure). The agent's local prompt is still open, so we + // must NOT flush — but we must also not leave the pane's backlog to resurface, + // stale and out of order, when some LATER unrelated question on this pane is + // answered. Dead-letter it and tell the user (the messages never landed). + // + // The queue is keyed by PANE, not by question, so it may hold items that belong + // to a DIFFERENT outstanding handler on the same pane. Drop ONLY when the pane + // is otherwise idle: skip if a flush is in flight (it owns + will deliver the + // queue) or another question is still pending (its answer will flush). Dropping + // in either case would lose another handler's messages (review finding). + const onQuestionAbandoned = (paneId: string): void => { + if (flushingPanes.has(paneId) || paneHasPendingQuestion(paneId)) return + const dropped = deferred.drop(paneId) + if (dropped.length === 0) return + log(`question abandoned on ${paneId} — dropped ${dropped.length} queued message(s)`) + void sendMessage( + ctx, + `⚠️ the question expired before it was answered — ${dropped.length} queued message(s) were NOT delivered. Resend them.`, + ) + } let hookServer: Server | null = null const cleanExit = (code: number): never => { try { @@ -1004,7 +1100,7 @@ async function runDaemon(ctx: Ctx): Promise { process.on("SIGTERM", () => cleanExit(0)) process.on("SIGINT", () => cleanExit(0)) - hookServer = startHookServer(ctx, pendingButtons, activeButtonKeys) + hookServer = startHookServer(ctx, pendingButtons, activeButtonKeys, onQuestionAbandoned) if (ctx.cfg.transport === "channel") { log("transport 'channel' is reserved for v1.2+ — falling back to tmux") @@ -1045,9 +1141,10 @@ async function runDaemon(ctx: Ctx): Promise { return false } const paneId = found.target.pane.paneId - // Defer a text message behind the pane's outstanding question (⏳); control - // verbs (Escape, no deferText) always go through to interrupt it. - if (deferText !== undefined && paneHasPendingQuestion(paneId)) { + // Defer a text message behind the pane's outstanding question OR an existing + // backlog (⏳); control verbs (Escape, no deferText) always go through to + // interrupt it. + if (deferText !== undefined && shouldDeferInbound(paneId)) { enqueueDeferred(paneId, deferText) lastWasDeferred = true return true @@ -1069,7 +1166,7 @@ async function runDaemon(ctx: Ctx): Promise { msg: string, ): Promise<{ ok: boolean; error?: string; deferred?: boolean }> => { const wrapped = wrapInbound(ctx.cfg.injectWrap, from, msg) - if (paneHasPendingQuestion(paneId)) { + if (shouldDeferInbound(paneId)) { enqueueDeferred(paneId, wrapped) return { ok: true, deferred: true } } @@ -1173,7 +1270,7 @@ async function runDaemon(ctx: Ctx): Promise { // callback edits its own prompt, so it must NOT set lastWasDeferred. let res: { ok: boolean; error?: string; deferred?: boolean } if (pending.prewrapped) { - if (paneHasPendingQuestion(target.paneId)) { + if (shouldDeferInbound(target.paneId)) { enqueueDeferred(target.paneId, pending.message) res = { ok: true, deferred: true } } else { @@ -1201,7 +1298,7 @@ async function runDaemon(ctx: Ctx): Promise { const routes = parseRoutes(readFileOrNull(ctx.paths.routes)) const recognized = recognizeRoute(routes, action.replyToMessageId) const injectVerbatim = async (paneId: string): Promise => { - if (paneHasPendingQuestion(paneId)) { + if (shouldDeferInbound(paneId)) { enqueueDeferred(paneId, action.injectText) lastWasDeferred = true return true @@ -1473,7 +1570,11 @@ async function runDaemon(ctx: Ctx): Promise { void answerCallbackQuery(ctx, action.callbackQueryId, "answered") if (pending.messageId !== null) void editMessageText(ctx, pending.messageId, finalText) // The agent is unblocked → deliver anything queued behind this question. - if (pending.req.paneId) void flushDeferred(pending.req.paneId) + // Fire-and-forget, but guard the promise: an error here must never become + // an unhandled rejection that takes down the daemon. + if (pending.req.paneId) { + flushDeferred(pending.req.paneId).catch((err) => log(`flushDeferred error: ${err}`)) + } return true } case "status": From e47740086c15922c6ebfe730ee7c8986b244a0d4 Mon Sep 17 00:00:00 2001 From: Alex Ultra Date: Wed, 17 Jun 2026 17:16:08 +0200 Subject: [PATCH 2/2] fix(tg-ctl): dead-letter orphaned backlog when a follow-up question is abandoned mid-flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The defer-while-waiting flush had a concurrent-flush correctness bug (codex P2 on PR #35): when an answer-flush is in flight (`flushingPanes` set) and a NEW question opens on the same pane then is ABANDONED (timeout / socket close / send failure, no terminal answer), every abandonment path deletes the pending button BEFORE calling `onAbandon`. So `onQuestionAbandoned` returned early purely on the `flushingPanes` guard and did NOT dead-letter; the in-flight drain loop's next round then saw `paneHasPendingQuestion` false and pasted those orphaned messages into the still-open terminal prompt — the exact "stale resurface" this PR set out to kill, on the concurrent-flush path. Fix — stateful coordination via an `abandonedDuringFlush` epoch flag: - `onQuestionAbandoned`, when a flush owns the pane, records the pane in `abandonedDuringFlush` instead of returning silently. - `driveFlush` takes an `isAbandoned()` predicate (checked before each paste, before the pane-busy check) and returns the untouched tail in a new `abandoned` field — it only LABELS the tail; the caller decides its fate. - The drain loop resolves an abandonment by pane state (codex: "dead-letter once no question is pending"): if a DIFFERENT question is now live, re-defer the residual so its answer flushes it (never drop a live question's messages); only when the pane is idle does it dead-letter and warn the user. The flag is cleared on flush entry/exit so it can never leak into a later legitimate flush. Regression coverage (fails before, passes after): - integration: a follow-up question abandoned mid-flush dead-letters its backlog instead of pasting it into the open prompt; a leaked flag does not wrongly drop a later legitimate flush; an abandonment racing a NEW live question re-defers (does not drop the live question's backlog). - unit: driveFlush abandonment paths (mid-flush stop, abandon-before-busy, failed+abandoned coexistence, empty-queue-with-flag contract). Also hardens the dead-letter notice (`.catch`-guarded send) and adds always-on daemon.log records so an abandonment is traceable even at zero residual. Co-Authored-By: Claude Opus 4.8 --- features/tg-ctl/defer.ts | 41 +++- tests/ctl-defer-integration.test.ts | 361 ++++++++++++++++++++++++++++ tests/ctl-defer.test.ts | 108 +++++++++ tg-ctl | 131 ++++++++-- 4 files changed, 621 insertions(+), 20 deletions(-) diff --git a/features/tg-ctl/defer.ts b/features/tg-ctl/defer.ts index 97a1a4d..3ef6365 100644 --- a/features/tg-ctl/defer.ts +++ b/features/tg-ctl/defer.ts @@ -82,26 +82,59 @@ export class DeferQueues { // (via the inject callback) and SKIP that one item, continuing with the rest — // the same forward-progress the pre-defer loop had. // -// PURE control flow: all I/O (isPaneBusy, inject) is injected, so tests drive it -// with plain callbacks and assert exactly which items landed vs were re-deferred. +// The concurrent-flush hazard `isAbandoned` closes: a follow-up question can open +// on this pane DURING the flush — newer inbound defers behind it (FIFO) — and then +// be abandoned (hook timeout / socket close / send failure) with NO terminal +// answer. The agent is still blocked locally on that prompt, but isPaneBusy() now +// reports idle (the pending button is gone). Without this signal the loop would +// paste that orphaned tail straight into the still-open prompt — the very "stale +// resurface" this module exists to kill, on the concurrent-flush path. The daemon +// sets the flag in onQuestionAbandoned and the loop, seeing it, returns the +// untouched tail as `abandoned` so the caller DEAD-LETTERS it (never pastes, +// never re-defers — there is no future answer to flush it). +// +// PURE control flow: all I/O (isPaneBusy, isAbandoned, inject) is injected, so +// tests drive it with plain callbacks and assert exactly which items landed vs +// were re-deferred vs dead-lettered. +// +// Residual race (same bounded window isPaneBusy already lives with): both guards +// are read BEFORE each inject(). If a question opens or is abandoned DURING the +// awaited paste of the current item, that one item still lands; the guard only +// protects the items AFTER it. The daemon mutates this state from event-loop +// callbacks (button-answer / socket-close), which cannot preempt a synchronous +// guard read, so the window is exactly one in-flight paste — accepted, not closed, +// because no observable bridge signal marks the boundary inside a single paste. export interface FlushOutcome { injected: string[]; failed: string[]; // attempted but inject() returned false (logged, dropped) reDeferred: string[]; // a new question opened — flush these on its answer + abandoned: string[]; // a question was abandoned mid-flush — dead-letter these } export async function driveFlush( queue: readonly string[], isPaneBusy: () => boolean, inject: (text: string) => Promise, + isAbandoned: () => boolean = () => false, ): Promise { const injected: string[] = []; const failed: string[] = []; + // Both guards are RE-READ at the top of every iteration (not cached once): the + // daemon mutates the underlying state from event-loop callbacks between the + // awaited pastes, so a flag that flips mid-flush must be observed on the next + // item. A "cache the flag once" optimization would silently break that. for (let i = 0; i < queue.length; i++) { + // A question was abandoned mid-flush → stop and label the untouched tail + // `abandoned`. Checked before isPaneBusy because the two can race and the + // resolution differs: abandoned messages have no answer of their own coming. + // driveFlush does NOT itself decide dead-letter vs re-defer — it only marks the + // tail; the caller chooses (dead-letter if the pane is idle, re-defer if a + // DIFFERENT question is still live and will flush the queue on its answer). + if (isAbandoned()) return { injected, failed, reDeferred: [], abandoned: queue.slice(i) }; // A new question opened → stop and re-defer everything not yet attempted. - if (isPaneBusy()) return { injected, failed, reDeferred: queue.slice(i) }; + if (isPaneBusy()) return { injected, failed, reDeferred: queue.slice(i), abandoned: [] }; if (await inject(queue[i])) injected.push(queue[i]); else failed.push(queue[i]); // pane gone / tmux error — drop, don't wedge } - return { injected, failed, reDeferred: [] }; + return { injected, failed, reDeferred: [], abandoned: [] }; } diff --git a/tests/ctl-defer-integration.test.ts b/tests/ctl-defer-integration.test.ts index 6a99635..d014bcc 100644 --- a/tests/ctl-defer-integration.test.ts +++ b/tests/ctl-defer-integration.test.ts @@ -277,6 +277,367 @@ test('inbound text deferred while a question is open, flushed (and not lost) aft await daemon.exited; }, 30_000); +test('a follow-up question ABANDONED mid-flush dead-letters its backlog instead of pasting it into the open prompt', async () => { + // Regression for the codex P2 concurrent-flush race. Sequence: + // 1. Q1 is answered → flushDeferred(%1) starts and sleeps 800ms (flushing). + // 2. DURING that settle a follow-up Q2 opens on the SAME pane; an inbound + // message defers behind it (the pane is flushing AND has a pending + // question, so it queues — strict FIFO). + // 3. Q2 is abandoned with NO Telegram answer (its hook socket closes). The + // pending button is deleted BEFORE onAbandon runs, so paneHasPendingQuestion + // is now false for it. + // 4. The flush loop wakes. With the BUG, onQuestionAbandoned returned early + // purely on the flushingPanes guard (no dead-letter), and the loop — seeing + // no pending question — pasted the orphaned backlog into the still-open + // terminal prompt (the agent is still blocked locally on Q2). FIXED: the + // abandonment is recorded, the loop dead-letters the residual queue and + // warns the user; nothing is pasted. + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + const plainMessages: string[] = []; // sendMessage bodies without inline buttons + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + plainMessages.push(String(body.text ?? '')); + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + // answerCallbackQuery / editMessageText + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + // 1) Open Q1 and defer one message behind it so the flush has a backlog to + // drain when Q1 is answered (flushDeferred early-returns on an empty queue). + const ask1 = startAsk(cfgDir, server.port, { + requestId: 'q1', + agent: 'claude', + kind: 'question', + question: 'First?', + options: [{ label: 'Yes' }], + }); + procs.push(ask1); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + const q1Callback = questionCallbackData; + const q1MessageId = buttonMessageId; + + const nowSec = Math.floor(Date.now() / 1000); + updateQueue.push([ + { + update_id: 30, + message: { message_id: 31, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q1 backlog' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && reactions.length === 0) await Bun.sleep(50); + } + expect(reactions.at(-1)).toMatchObject({ message_id: 31, reaction: [{ type: 'emoji', emoji: '✍️' }] }); + expect(injected(cfgDir)).toEqual([]); + + // 2) Answer Q1 → flushDeferred(%1) starts and immediately sleeps 800ms. The + // flush is now IN FLIGHT (flushingPanes has %1) for the whole settle window. + questionCallbackData = ''; + updateQueue.push([ + { + update_id: 31, + callback_query: { + id: 'cbq1', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: q1MessageId, chat: { id: 1 }, date: nowSec }, + data: q1Callback, + }, + }, + ]); + await new Response(ask1.stdout).text(); + await ask1.exited; + + // 3) DURING the 800ms settle, a follow-up Q2 opens on the same pane. + const ask2 = startAsk(cfgDir, server.port, { + requestId: 'q2', + agent: 'claude', + kind: 'question', + question: 'Second?', + options: [{ label: 'Ok' }], + }); + procs.push(ask2); + { + const t0 = Date.now(); + while (Date.now() - t0 < 4000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + + // ...and an inbound arrives behind Q2 (pane is flushing AND has a pending + // question → it defers, strict FIFO). + updateQueue.push([ + { + update_id: 32, + message: { message_id: 33, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q2 orphan' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === 33)) await Bun.sleep(50); + } + expect(reactions.find((r) => r.message_id === 33)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + + // 4) Abandon Q2 with NO answer: kill its hook → socket close → onAbandon. The + // pending button is deleted first, so by the time the flush loop next checks, + // paneHasPendingQuestion(%1) is false. The FIX records the abandonment and the + // loop dead-letters the residual instead of pasting it. + ask2.kill(9); + await ask2.exited; + + // Wait for the flush to actually WAKE from its 800ms settle and reach a verdict, + // racing the two mutually-exclusive outcomes rather than sleeping a fixed time + // (a fixed sleep can pass vacuously if the buggy paste simply hasn't fired yet): + // - BUG → the flush pastes 'q2 orphan' into the still-open prompt (it appears + // in the inject log), and no dead-letter notice is sent. + // - FIX → the flush dead-letters the residual: nothing is pasted and the user + // gets the 'were NOT delivered' notice. + // Poll until whichever lands first; the assertions below then decide pass/fail. + // This fails FAST on the regression instead of hoping a fixed window covered it. + const orphanPasted = (): boolean => injected(cfgDir).some((l) => l.includes('q2 orphan')); + const deadLetterSent = (): boolean => plainMessages.some((m) => m.includes('were NOT delivered')); + { + const t0 = Date.now(); + while (Date.now() - t0 < 10_000 && !orphanPasted() && !deadLetterSent()) await Bun.sleep(50); + } + + // THE REGRESSION: the orphaned 'q2 orphan' must NEVER reach the open prompt. + // (With the bug it injects; fixed, it is dead-lettered.) + expect(orphanPasted()).toBe(false); + // Nothing at all should have been pasted — the pane's prompt is still open. + expect(injected(cfgDir)).toEqual([]); + // The flush woke and dead-lettered: the user was told the messages did not land. + // (This is also the positive proof the flush ran — a no-op flush sends nothing.) + expect(deadLetterSent()).toBe(true); + + // 5) The flag must not LEAK: a later, legitimate question on the same pane must + // still flush normally. If abandonedDuringFlush(%1) were left set, this next + // flush would wrongly dead-letter a valid backlog (the most dangerous failure + // mode of the new flag — silently dropping good messages). Open Q3, defer a + // message behind it, answer it, and assert the message actually lands. + questionCallbackData = ''; + const ask3 = startAsk(cfgDir, server.port, { + requestId: 'q3', + agent: 'claude', + kind: 'question', + question: 'Third?', + options: [{ label: 'Go' }], + }); + procs.push(ask3); + { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + } + const q3Callback = questionCallbackData; + const q3MessageId = buttonMessageId; + updateQueue.push([ + { + update_id: 33, + message: { message_id: 34, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q3 good' }, + }, + ]); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === 34)) await Bun.sleep(50); + } + expect(reactions.find((r) => r.message_id === 34)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + updateQueue.push([ + { + update_id: 34, + callback_query: { + id: 'cbq3', + from: { id: 1, first_name: 'Alex' }, + message: { message_id: q3MessageId, chat: { id: 1 }, date: nowSec }, + data: q3Callback, + }, + }, + ]); + await new Response(ask3.stdout).text(); + await ask3.exited; + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !injected(cfgDir).some((l) => l.includes('q3 good'))) await Bun.sleep(100); + } + // The legitimate Q3 backlog flushed (flag was cleared, not leaked). + expect(injected(cfgDir).some((l) => l.includes('q3 good'))).toBe(true); + // ...and the orphan still never resurfaced in this flush. + expect(injected(cfgDir).some((l) => l.includes('q2 orphan'))).toBe(false); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + +test('an abandonment racing a NEW live question re-defers (does not drop the live question backlog)', async () => { + // Regression for review finding #1: the abandoned-during-flush flag is keyed by + // PANE, but a pane can host a second LIVE question whose backlog must still be + // delivered. Sequence: + // 1. Q1 answered → flushDeferred(%1) starts, sleeps 800ms (flushing). + // 2. During the settle Q2 opens; m2 defers behind it; Q2 is abandoned (flag set). + // 3. ALSO during the settle Q3 opens (a real, still-pending question); m3 defers. + // 4. The flush wakes: the flag is set, but Q3 is pending. It must RE-DEFER the + // residual (Q3's answer will flush it), NOT dead-letter it — dropping here + // would silently lose m3, a legitimately-queued message of a live question. + // 5. Answer Q3 → m3 lands. (m2 rides along; per the per-pane design that is the + // accepted lesser evil vs. dropping the live question's message.) + const cfgDir = makeCfgDir(); + const updateQueue: unknown[][] = []; + const reactions: Array> = []; + const plainMessages: string[] = []; + let buttonMessageId = 0; + let questionCallbackData = ''; + + const server = Bun.serve({ + port: 0, + async fetch(req) { + const url = new URL(req.url); + if (url.pathname.endsWith('/getUpdates')) { + const batch = updateQueue.shift(); + if (batch) return Response.json({ ok: true, result: batch }); + await Bun.sleep(80); + return Response.json({ ok: true, result: [] }); + } + if (url.pathname.endsWith('/sendMessage')) { + const body = (await req.json()) as Record; + const kb = (body.reply_markup as { inline_keyboard?: Array> } | undefined) + ?.inline_keyboard; + if (kb?.length) { + buttonMessageId += 1; + questionCallbackData = kb[0][0].callback_data; + return Response.json({ ok: true, result: { message_id: buttonMessageId } }); + } + plainMessages.push(String(body.text ?? '')); + return Response.json({ ok: true, result: { message_id: 900 } }); + } + if (url.pathname.endsWith('/setMessageReaction')) { + reactions.push((await req.json()) as Record); + return Response.json({ ok: true, result: true }); + } + return Response.json({ ok: true, result: true }); + }, + }); + servers.push(server); + + const daemon = await startDaemon(cfgDir, server.port); + procs.push(daemon); + + const waitForButton = async (): Promise => { + const t0 = Date.now(); + while (Date.now() - t0 < 5000 && questionCallbackData === '') await Bun.sleep(50); + expect(questionCallbackData).not.toBe(''); + }; + const waitForReaction = async (id: number): Promise => { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !reactions.some((r) => r.message_id === id)) await Bun.sleep(50); + expect(reactions.find((r) => r.message_id === id)).toMatchObject({ reaction: [{ type: 'emoji', emoji: '✍️' }] }); + }; + const nowSec = Math.floor(Date.now() / 1000); + + // Q1 + its backlog so the flush has work. + const ask1 = startAsk(cfgDir, server.port, { requestId: 'q1', agent: 'claude', kind: 'question', question: 'First?', options: [{ label: 'Y' }] }); + procs.push(ask1); + await waitForButton(); + const q1Cb = questionCallbackData; + const q1Msg = buttonMessageId; + updateQueue.push([{ update_id: 40, message: { message_id: 41, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q1 backlog' } }]); + await waitForReaction(41); + + // Answer Q1 → flush starts and sleeps 800ms. + questionCallbackData = ''; + updateQueue.push([{ update_id: 41, callback_query: { id: 'c1', from: { id: 1, first_name: 'Alex' }, message: { message_id: q1Msg, chat: { id: 1 }, date: nowSec }, data: q1Cb } }]); + await new Response(ask1.stdout).text(); + await ask1.exited; + + // During the settle: Q2 opens, m2 defers, Q2 abandoned. + const ask2 = startAsk(cfgDir, server.port, { requestId: 'q2', agent: 'claude', kind: 'question', question: 'Second?', options: [{ label: 'Y' }] }); + procs.push(ask2); + await waitForButton(); + questionCallbackData = ''; + updateQueue.push([{ update_id: 42, message: { message_id: 43, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q2 orphan' } }]); + await waitForReaction(43); + + // Q3 opens (a real, still-pending question) and m3 defers behind it — BEFORE we + // abandon Q2, so the flush is guaranteed to see Q3 pending when it wakes. + const ask3 = startAsk(cfgDir, server.port, { requestId: 'q3', agent: 'claude', kind: 'question', question: 'Third?', options: [{ label: 'Go' }] }); + procs.push(ask3); + await waitForButton(); + const q3Cb = questionCallbackData; + const q3Msg = buttonMessageId; + updateQueue.push([{ update_id: 43, message: { message_id: 44, from: { id: 1, first_name: 'Alex' }, chat: { id: 1 }, date: nowSec, text: 'q3 live' } }]); + await waitForReaction(44); + + // Now abandon Q2 (socket close → flag set). The flush, on waking from its 800ms + // settle, sees the flag AND Q3 pending → must re-defer, not dead-letter. + ask2.kill(9); + await ask2.exited; + + // Wait for the daemon to log the re-defer decision BEFORE answering Q3. This is + // the deterministic proof the flush woke while Q3 was still pending and chose to + // re-defer the residual (the finding-#1 behavior). Answering Q3 earlier could + // race the 800ms settle and remove Q3 before the flush evaluates the gate. + const daemonLog = (): string => (existsSync(join(cfgDir, 'daemon.log')) ? readFileSync(join(cfgDir, 'daemon.log'), 'utf8') : ''); + { + const t0 = Date.now(); + while (Date.now() - t0 < 8000 && !daemonLog().includes('abandon raced a new question')) await Bun.sleep(50); + } + // The flush re-deferred (did NOT dead-letter) because a live question was pending. + expect(daemonLog()).toContain('abandon raced a new question'); + // ...and it must NOT have dropped the queue. + expect(daemonLog()).not.toContain('were NOT delivered'); + expect(daemonLog()).not.toMatch(/dropped \d+ queued message/); + + // Answer Q3 → its flush delivers m3 (the live question's message). The fix is + // proven by m3 landing; with the bug (drop-everything) m3 would be lost. + questionCallbackData = ''; + updateQueue.push([{ update_id: 44, callback_query: { id: 'c3', from: { id: 1, first_name: 'Alex' }, message: { message_id: q3Msg, chat: { id: 1 }, date: nowSec }, data: q3Cb } }]); + await new Response(ask3.stdout).text(); + await ask3.exited; + + { + const t0 = Date.now(); + while (Date.now() - t0 < 10_000 && !injected(cfgDir).some((l) => l.includes('q3 live'))) await Bun.sleep(100); + } + // The live question's message was delivered, NOT dropped (review finding #1). + expect(injected(cfgDir).some((l) => l.includes('q3 live'))).toBe(true); + + daemon.kill('SIGTERM'); + await daemon.exited; +}, 30_000); + test('a question removed WITHOUT an answer (hook socket closes) does not wedge later inbound', async () => { // Regression for the review finding: extending defer to "queue non-empty" would // permanently wedge a pane once its question expired without a Telegram answer diff --git a/tests/ctl-defer.test.ts b/tests/ctl-defer.test.ts index afabeed..bcf8a87 100644 --- a/tests/ctl-defer.test.ts +++ b/tests/ctl-defer.test.ts @@ -78,6 +78,7 @@ describe('driveFlush', () => { expect(out.injected).toEqual(['a', 'b', 'c']); expect(out.failed).toEqual([]); expect(out.reDeferred).toEqual([]); + expect(out.abandoned).toEqual([]); }); test('stops once a new question opens MID-flush, re-deferring the rest', async () => { @@ -96,6 +97,7 @@ describe('driveFlush', () => { expect(out.injected).toEqual(['a']); expect(out.reDeferred).toEqual(['b', 'c']); expect(out.failed).toEqual([]); + expect(out.abandoned).toEqual([]); }); test('re-defers the WHOLE queue when a question is already open at flush time', async () => { @@ -107,6 +109,7 @@ describe('driveFlush', () => { expect(injected).toEqual([]); expect(out.injected).toEqual([]); expect(out.reDeferred).toEqual(['a', 'b']); + expect(out.abandoned).toEqual([]); }); test('a failed paste is dropped (logged), flush CONTINUES — no head-of-line block', async () => { @@ -123,6 +126,7 @@ describe('driveFlush', () => { expect(out.injected).toEqual(['a', 'c']); expect(out.failed).toEqual(['b']); expect(out.reDeferred).toEqual([]); // nothing wedged + expect(out.abandoned).toEqual([]); }); test('a new question takes priority over a pending failure on the next item', async () => { @@ -132,6 +136,7 @@ describe('driveFlush', () => { expect(out.injected).toEqual([]); expect(out.failed).toEqual([]); expect(out.reDeferred).toEqual(['a', 'b']); + expect(out.abandoned).toEqual([]); }); test('empty queue yields an empty outcome', async () => { @@ -139,5 +144,108 @@ describe('driveFlush', () => { expect(out.injected).toEqual([]); expect(out.failed).toEqual([]); expect(out.reDeferred).toEqual([]); + expect(out.abandoned).toEqual([]); + }); + + test('a mid-flush abandonment STOPS and hands the tail back to be DEAD-LETTERED', async () => { + // The concurrent-flush hazard (codex P2): a follow-up question opened during + // the flush and was then abandoned with no terminal answer. isPaneBusy() now + // reports idle (its pending button is gone), but the agent is still blocked on + // that prompt — the orphaned tail must be dead-lettered, NOT pasted, and NOT + // re-deferred (no answer is coming to flush it). + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + injected.push(t); + abandoned = true; // a follow-up question opened then was abandoned mid-flush + return true; + }, + () => abandoned, + ); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.abandoned).toEqual(['b', 'c']); // dead-letter, do not paste + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); + }); + + test('abandonment is checked BEFORE pane-busy: a racing question still dead-letters', async () => { + // If an abandonment and a fresh question race, the orphaned messages have no + // answer coming, so they must be dead-lettered rather than re-deferred behind + // the new prompt. The whole queue is abandoned when both are set up front. + const out = await driveFlush(['a', 'b'], () => true, async () => true, () => true); + expect(out.injected).toEqual([]); + expect(out.abandoned).toEqual(['a', 'b']); + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); + }); + + test('a failed paste and a later abandonment coexist in one outcome', async () => { + // 'a' injects, 'b' fails (logged + dropped, NOT re-deferred), then a question + // is abandoned before 'c' → 'c' is dead-lettered. Pins that failed and + // abandoned items are handled by their own disjoint paths and never confused + // (a refactor must not, e.g., re-defer 'b' or dead-letter it). + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + if (t === 'b') return false; // transient paste failure + injected.push(t); + if (t === 'a') abandoned = true; // a follow-up question is abandoned mid-flush + return true; + }, + () => abandoned, + ); + // After 'a' the abandon flag is set, so the loop stops BEFORE attempting 'b': + // 'b' and 'c' are the untouched tail → both dead-lettered, nothing failed. + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.abandoned).toEqual(['b', 'c']); + expect(out.failed).toEqual([]); + expect(out.reDeferred).toEqual([]); + }); + + test('a failure recorded BEFORE an abandonment keeps both in the outcome', async () => { + // Here the abandon flag flips only AFTER 'b' has already failed, so 'b' lands + // in `failed` and the remaining 'c' in `abandoned` — the two coexist. + const injected: string[] = []; + let abandoned = false; + const out = await driveFlush( + ['a', 'b', 'c'], + () => false, + async (t) => { + if (t === 'b') { + abandoned = true; // abandonment observed at the same time the paste fails + return false; + } + injected.push(t); + return true; + }, + () => abandoned, + ); + expect(injected).toEqual(['a']); + expect(out.injected).toEqual(['a']); + expect(out.failed).toEqual(['b']); // failed paste — logged + dropped + expect(out.abandoned).toEqual(['c']); // tail after the abandon — dead-lettered + expect(out.reDeferred).toEqual([]); + }); + + test('an empty queue yields empty `abandoned` even when abandoned is set', async () => { + // driveFlush never invents items: with nothing to drain it returns empty + // `abandoned` regardless of isAbandoned(). This pins the contract the daemon + // relies on — it must also check its own round-boundary flag (an abandonment + // landing AFTER a round drained the queue produces no tail here), so the + // daemon's `outcome.abandoned.length > 0 || flagSet` guard cannot be reduced + // to `outcome.abandoned.length > 0` alone. + const out = await driveFlush([], () => false, async () => true, () => true); + expect(out.injected).toEqual([]); + expect(out.abandoned).toEqual([]); + expect(out.reDeferred).toEqual([]); + expect(out.failed).toEqual([]); }); }); diff --git a/tg-ctl b/tg-ctl index 0c1c5a4..fa31c6d 100755 --- a/tg-ctl +++ b/tg-ctl @@ -985,6 +985,12 @@ async function runDaemon(ctx: Ctx): Promise { // inbound deferring for the whole flush, preserving strict FIFO; the drain // loop's next round picks up whatever was enqueued meanwhile. const flushingPanes = new Set() + // Panes whose forwarded question was abandoned (timeout / socket close / send + // failure, no terminal answer) WHILE a flush was draining that pane. The flush + // loop owns the queue here, so onQuestionAbandoned cannot drop it directly; it + // records the pane and the drain loop dead-letters the residual queue instead of + // pasting it into the still-open prompt (codex P2 — concurrent-flush resurface). + const abandonedDuringFlush = new Set() let lastWasDeferred = false const paneHasPendingQuestion = (paneId: string): boolean => { for (const pb of pendingButtons.values()) if (pb.req.paneId && pb.req.paneId === paneId) return true @@ -1026,6 +1032,25 @@ async function runDaemon(ctx: Ctx): Promise { return false } } + // Tell the user a pane's deferred backlog was dropped without reaching the agent + // (its question was abandoned before an answer). Declared above flushDeferred so + // the latter can reference it (via deadLetterAbandoned) without a TDZ hazard. + // User-facing dead-letter notice (no-op when nothing was queued — no ping if + // there's nothing to resend). Callers own the daemon.log record so the resolution + // is always traced even at count 0; this only handles the Telegram side. + const notifyDeadLettered = (paneId: string, count: number): void => { + if (count === 0) return + // Guard the fire-and-forget send: this is a safety-critical notice (the user's + // only signal that messages were dropped), so a transport failure must at least + // hit the daemon log rather than vanish silently (review finding). NOTE: like + // every void-send in this daemon, an in-flight send can still be cut off by a + // shutdown right after a dead-letter — a pre-existing outbound-durability gap, + // not introduced here. + sendMessage( + ctx, + `⚠️ the question expired before it was answered — ${count} queued message(s) were NOT delivered. Resend them.`, + ).catch((err) => log(`failed to notify dead-letter on ${paneId}: ${err}`)) + } const flushDeferred = async (paneId: string): Promise => { if (!deferred.has(paneId)) return // Re-entrancy guard: two answers for the same pane could both reach here (the @@ -1033,23 +1058,81 @@ async function runDaemon(ctx: Ctx): Promise { // also routes new inbound into the queue for the whole flush (FIFO). if (flushingPanes.has(paneId)) return flushingPanes.add(paneId) + // The flag is cleared in the finally below, and onQuestionAbandoned only sets + // it while flushingPanes holds this pane — which the re-entrancy guard above + // makes mutually exclusive with a second concurrent flush — so it is always + // clean on entry; no defensive clear is needed here. + // + // Dead-letter the orphaned backlog for a pane whose abandoned question has no + // answer coming. `tail` is the slice driveFlush already removed from the live + // queue (via the round's take()); `deferred.drop()` returns whatever else is + // still queued. They are disjoint — take() emptied the pane entry at the start + // of the round, so anything drop() returns was enqueued after it — so the sum + // is the exact count of messages that never reached the agent. + const deadLetterAbandoned = (tail: readonly string[]): void => { + const dropped = deferred.drop(paneId) + const count = tail.length + dropped.length + // Always log the resolution (even a zero-count one) so daemon.log records + // that the abandonment flag was seen and handled — distinguishing "flagged + // and resolved" from "never flagged" (review finding). The user-facing notice + // (notifyDeadLettered) stays count-gated: nothing to resend means no ping. + log(`flush on ${paneId} dead-lettered ${count} orphaned message(s) (abandoned, no answer coming)`) + notifyDeadLettered(paneId, count) + } try { // Let the agent consume the just-delivered answer before the first paste. await Bun.sleep(800) // Drain in rounds: each round takes the current queue and feeds it to // driveFlush (which re-checks paneHasPendingQuestion before EVERY paste). - // A round can leave a tail two ways: a NEW question opened (re-defer it — it - // flushes when that question is answered), or fresh messages were enqueued - // DURING the round (drain them next round, while the pane stays free — - // otherwise they'd sit until the next unrelated question). Inject FAILURES do - // NOT re-defer: there may be no future question to flush them, so they are - // logged + dropped rather than wedging the queue forever (review finding). + // A round can leave a tail three ways: a NEW question opened (re-defer it — + // it flushes when that question is answered); a question was ABANDONED + // mid-flush (dead-letter the residual — no answer is coming, see codex P2); + // or fresh messages were enqueued DURING the round (drain them next round, + // while the pane stays free — otherwise they'd sit until the next unrelated + // question). Inject FAILURES do NOT re-defer: there may be no future question + // to flush them, so they are logged + dropped rather than wedging the queue + // forever (review finding). + // Resolve a mid-flush abandonment. The queue is keyed by PANE, not question, + // so when a question is abandoned we cannot tell its orphaned messages from + // those of ANOTHER question still live on the same pane. Gate strictly on + // pane-idle (codex P2: "dead-letter once no question is pending"): + // - a question is STILL pending → a live prompt owns the queue; re-defer + // the residual so its answer flushes it. Dropping here would lose that + // live question's legitimately-queued messages (review finding #1). + // - no question pending → the messages are truly orphaned (no answer is + // coming); dead-letter them rather than paste into the still-open prompt. + // `tail` is driveFlush's untouched slice (already removed from the queue). The + // pending read and the redefer/drop that follows run in ONE synchronous tick + // (no await between), so no inbound can interleave and be misrouted. + const resolveAbandonment = (tail: readonly string[]): void => { + if (paneHasPendingQuestion(paneId)) { + deferred.redefer(paneId, [...tail]) + log(`deferred flush into ${paneId} paused — ${tail.length} item(s) re-queued (abandon raced a new question)`) + } else { + deadLetterAbandoned(tail) + } + } for (;;) { + // A question abandoned exactly at a round boundary (after driveFlush + // returned, before this take) leaves no tail to hand back, but the queue + // still must not be pasted — resolve it (dead-letter if idle, re-defer if a + // new question is now pending) and stop. + if (abandonedDuringFlush.has(paneId)) { + resolveAbandonment([]) + return + } const queue = deferred.take(paneId) if (queue.length === 0) return - const outcome = await driveFlush(queue, () => paneHasPendingQuestion(paneId), (text) => - injectDeferredOne(paneId, text), + const outcome = await driveFlush( + queue, + () => paneHasPendingQuestion(paneId), + (text) => injectDeferredOne(paneId, text), + () => abandonedDuringFlush.has(paneId), ) + if (outcome.abandoned.length > 0 || abandonedDuringFlush.has(paneId)) { + resolveAbandonment(outcome.abandoned) + return + } if (outcome.reDeferred.length > 0) { deferred.redefer(paneId, outcome.reDeferred) log(`deferred flush into ${paneId} paused — ${outcome.reDeferred.length} item(s) re-queued behind a new question`) @@ -1061,6 +1144,7 @@ async function runDaemon(ctx: Ctx): Promise { } } finally { flushingPanes.delete(paneId) + abandonedDuringFlush.delete(paneId) } } // A forwarded question was removed WITHOUT a Telegram answer (timeout / hook @@ -1075,14 +1159,29 @@ async function runDaemon(ctx: Ctx): Promise { // queue) or another question is still pending (its answer will flush). Dropping // in either case would lose another handler's messages (review finding). const onQuestionAbandoned = (paneId: string): void => { - if (flushingPanes.has(paneId) || paneHasPendingQuestion(paneId)) return - const dropped = deferred.drop(paneId) - if (dropped.length === 0) return - log(`question abandoned on ${paneId} — dropped ${dropped.length} queued message(s)`) - void sendMessage( - ctx, - `⚠️ the question expired before it was answered — ${dropped.length} queued message(s) were NOT delivered. Resend them.`, - ) + // A flush is draining this pane: it owns the queue (taken out per round), so + // dropping here would race the loop and could lose nothing OR the wrong items. + // Record the pane instead — the drain loop checks the flag and dead-letters the + // residual once it sees no pending question, rather than pasting those orphaned + // messages into the still-open terminal prompt (codex P2 concurrent-flush race). + if (flushingPanes.has(paneId)) { + abandonedDuringFlush.add(paneId) + // Timestamp the abandonment itself: the flush resolves it later (re-defer or + // dead-letter), but logging here makes the event visible in daemon.log for + // post-incident review even if the resolution path is quiet (review finding). + log(`question abandoned on ${paneId} mid-flush — flagged for the drain loop`) + return + } + // Another question is still pending on this pane: its answer will flush the + // queue legitimately, so leave it (the queue is keyed by PANE, not question). + if (paneHasPendingQuestion(paneId)) return + // Pane is idle and no flush owns the queue: drop the orphaned backlog now. The + // pending check above and this drop run in the SAME synchronous tick (no await + // between), so no inbound for a freshly-opened question can interleave and be + // dropped by mistake — the daemon's inbound handling is event-loop callbacks. + const droppedCount = deferred.drop(paneId).length + log(`question abandoned on ${paneId} (idle) — dead-lettered ${droppedCount} queued message(s)`) + notifyDeadLettered(paneId, droppedCount) } let hookServer: Server | null = null const cleanExit = (code: number): never => {