Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## [Unreleased]

### Fixed

- `PostgresDriver.listen()` no longer crashes the host process when the worker
loop hits a connection error. The poll loop catches driver errors, surfaces
them through a new `onError` option (defaulting to `console.error`), and backs
off (capped at 30 seconds) before retrying instead of escaping as an unhandled
rejection.

### Added

- `PostgresDriverOptions.onError`. Invoked with any error raised by the polling
loop or the `LISTEN`/`NOTIFY` client.

## [2.0.0] - 2026-04-27

### Fixed
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ new PostgresDriver({
});
```

### Worker error handling

The polling loop catches driver errors (connection refused, transient network
failures) and backs off (capped at 30 seconds) instead of letting the rejection
escape and crash the host process. Errors are written to `console.error` by
default. Pass an `onError` callback to route them to your own logger or metrics
pipeline.

```ts
const driver = new PostgresDriver({
connectionString: Deno.env.get("DATABASE_URL")!,
onError: (err) => log.warn({ err }, "queue worker error"),
});
```

Call `await queue.close()` to stop the worker cleanly.

### Local development

A `compose.yaml` ships with the project:
Expand Down
63 changes: 53 additions & 10 deletions src/drivers/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Client = pg.Client;
const DEFAULT_POLL_INTERVAL = 1000;
const DEFAULT_RESERVE_TTL = 5 * 60_000;
const DEFAULT_TABLE_PREFIX = "snaapi_";
const MAX_BACKOFF_MS = 30_000;

export interface PostgresDriverOptions {
/** Postgres connection string (e.g. postgres://user:pass@host:5432/db). */
Expand All @@ -42,6 +43,13 @@ export interface PostgresDriverOptions {
* the schema yourself or run migrations as a separate deploy step.
*/
autoMigrate?: boolean;
/**
* Invoked when the worker loop encounters an error (e.g. transient DB
* connection failure). The loop logs and backs off rather than letting
* the rejection escape, so the host process stays up. When unset, errors
* are written to `console.error`.
*/
onError?: (err: unknown) => void;
}

interface TableNames {
Expand Down Expand Up @@ -361,9 +369,14 @@ async function insertJob(
/** Postgres implementation of `QueueDriver`. */
export class PostgresDriver implements QueueDriver {
#pool: Pool;
#opts: Required<
Omit<PostgresDriverOptions, "connectionString" | "tablePrefix">
>;
#opts:
& Required<
Omit<
PostgresDriverOptions,
"connectionString" | "tablePrefix" | "onError"
>
>
& { onError?: (err: unknown) => void };
#connectionString: string;
#t: TableNames;
#migrationPromise?: Promise<void>;
Expand All @@ -385,6 +398,7 @@ export class PostgresDriver implements QueueDriver {
reserveTtlMs: opts.reserveTtlMs ?? DEFAULT_RESERVE_TTL,
poolSize: opts.poolSize ?? 10,
autoMigrate: opts.autoMigrate ?? true,
onError: opts.onError,
};
this.#t = tableNamesFor(resolvePrefix(opts));
this.#pool = new Pool({
Expand Down Expand Up @@ -513,15 +527,31 @@ export class PostgresDriver implements QueueDriver {
};

const loop = async () => {
// Make sure tables exist before we try to SELECT from them.
// Errors here are reported by the inner catch on the next iteration.
await this.#ensureReady().catch(() => {});
let backoffMs = this.#opts.pollIntervalMs;
while (!this.#stopped) {
await drainOnce();
let waitMs = this.#opts.pollIntervalMs;
try {
await drainOnce();
backoffMs = this.#opts.pollIntervalMs;
} catch (err) {
// Without this catch, driver errors (connection refused, network
// blip) escape the fire-and-forget loop as an unhandled rejection
// and crash the host process.
this.#reportError(err);
backoffMs = Math.min(
Math.max(backoffMs * 2, this.#opts.pollIntervalMs),
MAX_BACKOFF_MS,
);
waitMs = backoffMs;
}
if (this.#stopped) break;
const wakeupPromise = new Promise<void>((resolve) => {
this.#wakeup = resolve;
this.#pollTimer = setTimeout(
resolve,
this.#opts.pollIntervalMs,
waitMs,
) as unknown as number;
});
await wakeupPromise;
Expand All @@ -532,10 +562,10 @@ export class PostgresDriver implements QueueDriver {
}
};

this.#startNotifyClient().catch(() => {
// Notify failure falls back to polling.
});
loop();
// Notify is best-effort. Polling makes progress without it.
this.#startNotifyClient().catch((err) => this.#reportError(err));
// Final safety net so a stray rejection cannot escape the loop.
loop().catch((err) => this.#reportError(err));

this.#cleanupTimer = setInterval(() => {
this.#pool.query(
Expand Down Expand Up @@ -576,6 +606,19 @@ export class PostgresDriver implements QueueDriver {
await this.#pool.end();
}

#reportError(err: unknown): void {
const onError = this.#opts.onError;
if (!onError) {
console.error("[snaapi/queue] worker error:", err);
return;
}
try {
onError(err);
} catch (_handlerErr) {
// A throwing onError must not crash the loop. Drop it.
}
}

async #reserveOne(): Promise<{ id: string; envelope: QueueEnvelope } | null> {
const client = await this.#pool.connect();
try {
Expand Down
29 changes: 29 additions & 0 deletions tests/queue_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,32 @@ pgTest("withoutOverlapping skips concurrent runs", async () => {
assertEquals(completed, 1);
}, { concurrency: 2 });
});

Deno.test({
name: "worker survives unreachable DB and surfaces errors via onError",
// The pool keeps reconnect timers alive past the test body.
sanitizeOps: false,
sanitizeResources: false,
async fn() {
const errors: unknown[] = [];
const driver = new PostgresDriver({
// Port 1 is reserved and refuses connections, so every poll fails.
connectionString: "postgres://nobody:nobody@127.0.0.1:1/none",
pollIntervalMs: 50,
autoMigrate: false,
onError: (err) => errors.push(err),
});
const queue = new Queue(driver);
queue.register("noop", { handle: () => Promise.resolve() });

queue.listen();
await new Promise((r) => setTimeout(r, 400));
await queue.close();

assertEquals(
errors.length > 0,
true,
`expected at least one onError invocation, got ${errors.length}`,
);
},
});