From b20e2311ebc68a70e26d447e2ed27b5e751ab4a5 Mon Sep 17 00:00:00 2001 From: Jason McCallister Date: Mon, 27 Apr 2026 22:38:04 -0400 Subject: [PATCH] handle errors better Signed-off-by: Jason McCallister --- CHANGELOG.md | 15 ++++++++++ README.md | 17 +++++++++++ src/drivers/postgres.ts | 63 ++++++++++++++++++++++++++++++++++------- tests/queue_test.ts | 29 +++++++++++++++++++ 4 files changed, 114 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a416d28..cc8ab19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## [Unreleased] + +### Fixed + +- `PostgresDriver.listen()` no longer crashes the host process when the worker + loop hits a connection error. The poll loop catches driver errors, surfaces + them through a new `onError` option (defaulting to `console.error`), and backs + off (capped at 30 seconds) before retrying instead of escaping as an unhandled + rejection. + +### Added + +- `PostgresDriverOptions.onError`. Invoked with any error raised by the polling + loop or the `LISTEN`/`NOTIFY` client. + ## [2.0.0] - 2026-04-27 ### Fixed diff --git a/README.md b/README.md index 076e5cd..06c3b1f 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,23 @@ new PostgresDriver({ }); ``` +### Worker error handling + +The polling loop catches driver errors (connection refused, transient network +failures) and backs off (capped at 30 seconds) instead of letting the rejection +escape and crash the host process. Errors are written to `console.error` by +default. Pass an `onError` callback to route them to your own logger or metrics +pipeline. + +```ts +const driver = new PostgresDriver({ + connectionString: Deno.env.get("DATABASE_URL")!, + onError: (err) => log.warn({ err }, "queue worker error"), +}); +``` + +Call `await queue.close()` to stop the worker cleanly. + ### Local development A `compose.yaml` ships with the project: diff --git a/src/drivers/postgres.ts b/src/drivers/postgres.ts index 551c093..5285c24 100644 --- a/src/drivers/postgres.ts +++ b/src/drivers/postgres.ts @@ -18,6 +18,7 @@ type Client = pg.Client; const DEFAULT_POLL_INTERVAL = 1000; const DEFAULT_RESERVE_TTL = 5 * 60_000; const DEFAULT_TABLE_PREFIX = "snaapi_"; +const MAX_BACKOFF_MS = 30_000; export interface PostgresDriverOptions { /** Postgres connection string (e.g. postgres://user:pass@host:5432/db). */ @@ -42,6 +43,13 @@ export interface PostgresDriverOptions { * the schema yourself or run migrations as a separate deploy step. */ autoMigrate?: boolean; + /** + * Invoked when the worker loop encounters an error (e.g. transient DB + * connection failure). The loop logs and backs off rather than letting + * the rejection escape, so the host process stays up. When unset, errors + * are written to `console.error`. + */ + onError?: (err: unknown) => void; } interface TableNames { @@ -361,9 +369,14 @@ async function insertJob( /** Postgres implementation of `QueueDriver`. */ export class PostgresDriver implements QueueDriver { #pool: Pool; - #opts: Required< - Omit - >; + #opts: + & Required< + Omit< + PostgresDriverOptions, + "connectionString" | "tablePrefix" | "onError" + > + > + & { onError?: (err: unknown) => void }; #connectionString: string; #t: TableNames; #migrationPromise?: Promise; @@ -385,6 +398,7 @@ export class PostgresDriver implements QueueDriver { reserveTtlMs: opts.reserveTtlMs ?? DEFAULT_RESERVE_TTL, poolSize: opts.poolSize ?? 10, autoMigrate: opts.autoMigrate ?? true, + onError: opts.onError, }; this.#t = tableNamesFor(resolvePrefix(opts)); this.#pool = new Pool({ @@ -513,15 +527,31 @@ export class PostgresDriver implements QueueDriver { }; const loop = async () => { - // Make sure tables exist before we try to SELECT from them. + // Errors here are reported by the inner catch on the next iteration. await this.#ensureReady().catch(() => {}); + let backoffMs = this.#opts.pollIntervalMs; while (!this.#stopped) { - await drainOnce(); + let waitMs = this.#opts.pollIntervalMs; + try { + await drainOnce(); + backoffMs = this.#opts.pollIntervalMs; + } catch (err) { + // Without this catch, driver errors (connection refused, network + // blip) escape the fire-and-forget loop as an unhandled rejection + // and crash the host process. + this.#reportError(err); + backoffMs = Math.min( + Math.max(backoffMs * 2, this.#opts.pollIntervalMs), + MAX_BACKOFF_MS, + ); + waitMs = backoffMs; + } + if (this.#stopped) break; const wakeupPromise = new Promise((resolve) => { this.#wakeup = resolve; this.#pollTimer = setTimeout( resolve, - this.#opts.pollIntervalMs, + waitMs, ) as unknown as number; }); await wakeupPromise; @@ -532,10 +562,10 @@ export class PostgresDriver implements QueueDriver { } }; - this.#startNotifyClient().catch(() => { - // Notify failure falls back to polling. - }); - loop(); + // Notify is best-effort. Polling makes progress without it. + this.#startNotifyClient().catch((err) => this.#reportError(err)); + // Final safety net so a stray rejection cannot escape the loop. + loop().catch((err) => this.#reportError(err)); this.#cleanupTimer = setInterval(() => { this.#pool.query( @@ -576,6 +606,19 @@ export class PostgresDriver implements QueueDriver { await this.#pool.end(); } + #reportError(err: unknown): void { + const onError = this.#opts.onError; + if (!onError) { + console.error("[snaapi/queue] worker error:", err); + return; + } + try { + onError(err); + } catch (_handlerErr) { + // A throwing onError must not crash the loop. Drop it. + } + } + async #reserveOne(): Promise<{ id: string; envelope: QueueEnvelope } | null> { const client = await this.#pool.connect(); try { diff --git a/tests/queue_test.ts b/tests/queue_test.ts index ed74dd9..811b6cb 100644 --- a/tests/queue_test.ts +++ b/tests/queue_test.ts @@ -446,3 +446,32 @@ pgTest("withoutOverlapping skips concurrent runs", async () => { assertEquals(completed, 1); }, { concurrency: 2 }); }); + +Deno.test({ + name: "worker survives unreachable DB and surfaces errors via onError", + // The pool keeps reconnect timers alive past the test body. + sanitizeOps: false, + sanitizeResources: false, + async fn() { + const errors: unknown[] = []; + const driver = new PostgresDriver({ + // Port 1 is reserved and refuses connections, so every poll fails. + connectionString: "postgres://nobody:nobody@127.0.0.1:1/none", + pollIntervalMs: 50, + autoMigrate: false, + onError: (err) => errors.push(err), + }); + const queue = new Queue(driver); + queue.register("noop", { handle: () => Promise.resolve() }); + + queue.listen(); + await new Promise((r) => setTimeout(r, 400)); + await queue.close(); + + assertEquals( + errors.length > 0, + true, + `expected at least one onError invocation, got ${errors.length}`, + ); + }, +});