Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: queue
POSTGRES_PASSWORD: queue
POSTGRES_DB: queue
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U queue -d queue"
--health-interval 2s
--health-timeout 2s
--health-retries 20
env:
DATABASE_URL: postgres://queue:queue@localhost:5432/queue
steps:
- name: Checkout
uses: actions/checkout@v6
Expand Down
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,43 @@
# Changelog

## [Unreleased]

### Breaking

- Removed the Deno KV driver and the `createQueue()` factory. The queue now runs
exclusively on Postgres. Construct a `PostgresDriver` and pass it to
`new Queue(driver)`. KV Connect on Deno Deploy did not support `enqueue`,
which made the dual driver story misleading on the platform we care about
most.
- `JobContext` no longer exposes `kv`. Use `ctx.locks` and `ctx.counters` from
middleware.
- `FailedJobDriver.store(record)` is now `store(record, envelope)`. Drivers must
persist the original `QueueEnvelope` so retries reconstruct dispatch metadata
(max attempts, backoff schedule, unique key/ttl, chain).

### Added

- `PostgresDriver` with `LISTEN`/`NOTIFY` wakeups, polling fallback, and
`SELECT FOR UPDATE SKIP LOCKED` reservation.
- `reserveTtlMs` option. Reserved jobs are deleted only after the handler
returns, so a worker crash leaves the row reserved until peers can reclaim it.
- `tablePrefix` option (and `QUEUE_TABLE_PREFIX` env var) for sharing a database
with other applications.
- Auto migration on first use, with a standalone `deno task db:migrate` task and
`compose.yaml` for local development.

### Changed

- Unique dispatch is now enforced via a TTL row in the locks table rather than a
row level constraint. The dedupe window now matches the requested `uniqueTtl`
instead of ending when the job is reserved.
- The `jobs` ready index dropped its partial `WHERE reserved_at IS NULL`
predicate so the reservation query (which also reclaims expired reservations)
can use it.
- `PgFailedStore.retry()` rebuilds the envelope from the stored `QueueEnvelope`,
preserving `maxAttempts`, `backoffSchedule`, `uniqueKey`, `uniqueTtl`, and
`chain`.

## [1.0.4] - 2026-04-19

### Other
Expand Down
108 changes: 94 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
# @snaapi/queue

A job queue for Deno built on top of Deno KV. No external dependencies, no
separate queue server. Jobs are dispatched, persisted, retried, and chained
using the KV queue primitives that ship with Deno.
A Postgres backed job queue for Deno. Designed for Deno Deploy and any
environment where you need durable, multi worker job processing. Uses
`LISTEN`/`NOTIFY` for low latency wakeups and `SELECT FOR UPDATE SKIP LOCKED`
for safe concurrent reservation.

## Install

```ts
import { Queue } from "@snaapi/queue";
import { PostgresDriver, Queue } from "@snaapi/queue";
```

## Quick Start

```ts
import { Queue } from "@snaapi/queue";
import { PostgresDriver, Queue } from "@snaapi/queue";
import type { JobHandler } from "@snaapi/queue";

const kv = await Deno.openKv();
const driver = new PostgresDriver({
connectionString: Deno.env.get("DATABASE_URL")!,
});
const queue = new Queue(driver);

const sendEmail: JobHandler<{ to: string; subject: string }> = {
handle(payload, ctx) {
Expand All @@ -26,7 +30,6 @@ const sendEmail: JobHandler<{ to: string; subject: string }> = {
},
};

const queue = new Queue(kv);
queue.register("send-email", sendEmail);
queue.listen();

Expand All @@ -36,10 +39,86 @@ await queue.dispatch("send-email", {
}).send();
```

Note that `queue.listen()` attaches to the `Deno.Kv` handle and must run in the
same long lived process that holds that handle. A script that only dispatches
jobs and then exits will enqueue work, but nothing will consume it until a
process with `listen()` is running against the same KV store.
Note that `queue.listen()` attaches to the backing store and must run in the
same long lived process. A script that only dispatches jobs and then exits will
enqueue work, but nothing will consume it until a process with `listen()` is
running against the same database.

## Driver

```ts
import { PostgresDriver, Queue } from "@snaapi/queue";

const driver = new PostgresDriver({
connectionString: Deno.env.get("DATABASE_URL")!,
pollIntervalMs: 1000, // fallback when LISTEN/NOTIFY misses (default 1000)
concurrency: 1, // in-flight jobs per listener (default 1)
reserveTtlMs: 5 * 60_000, // how long a worker can hold a job before peers can reclaim it
tablePrefix: "snaapi_", // see "Table prefix" below
});
const queue = new Queue(driver);
```

The driver wakes up via `LISTEN`/`NOTIFY` when a new job is enqueued and falls
back to a polling timer for delayed jobs. Multiple worker processes can
`listen()` against the same database. `SELECT FOR UPDATE SKIP LOCKED` ensures
each ready job is reserved by exactly one worker. Reserved jobs are deleted only
after the handler returns, so a worker crash leaves the row reserved until
`reserveTtlMs` elapses, at which point a peer can reclaim it.

### Auto migration

The Postgres driver runs `migrate()` lazily on first use. The schema is
idempotent (`CREATE TABLE IF NOT EXISTS`), so it is safe to leave on across
deploys. Disable it (and run migrations as a separate step) by passing
`autoMigrate: false`:

```ts
const driver = new PostgresDriver({
connectionString: Deno.env.get("DATABASE_URL")!,
autoMigrate: false,
});
await driver.migrate(); // run explicitly if you prefer
```

The standalone task is also available:

```bash
DATABASE_URL=postgres://... deno task db:migrate
```

### Table prefix

By default the driver creates four tables: `snaapi_jobs`, `snaapi_failed_jobs`,
`snaapi_locks`, `snaapi_counters`. The prefix (default `snaapi_`) is
configurable via the `tablePrefix` option or the `QUEUE_TABLE_PREFIX` env var.
The prefix must match `[a-zA-Z_][a-zA-Z0-9_]*`.

```ts
new PostgresDriver({
connectionString: url,
tablePrefix: "myapp_",
// tables become myapp_jobs, myapp_failed_jobs, myapp_locks, myapp_counters
});
```

### Local development

A `compose.yaml` ships with the project:

```bash
docker compose up -d postgres
export DATABASE_URL=postgres://queue:queue@localhost:5444/queue
deno task test
```

### Migrating from 1.x

Versions before 2.0 took a `Deno.Kv` directly and exposed `ctx.kv` to handlers.
The 2.x line drops Deno KV support entirely (KV Connect on Deno Deploy does not
support `enqueue`). Pass a `PostgresDriver` to the `Queue` constructor and use
`ctx.locks` / `ctx.counters` from middleware instead of `ctx.kv`. Pending jobs
do not migrate between stores; drain the KV queue before cutting over.

## Dispatch Options

Expand Down Expand Up @@ -129,8 +208,8 @@ await queue.chain([

## Failed Jobs

Jobs that exhaust all retry attempts are stored in KV. You can inspect, retry,
or remove them.
Jobs that exhaust all retry attempts are persisted to the backing store. You can
inspect, retry, or remove them.

```ts
// List failed jobs
Expand Down Expand Up @@ -163,7 +242,8 @@ interface JobContext {
maxAttempts: number; // configured max attempts
queue: string; // queue name
id: string; // unique dispatch ID
kv: Deno.Kv; // KV store reference
locks: LockDriver; // mutex primitive used by withoutOverlapping
counters: CounterDriver; // counter primitive used by rateLimit
}
```

Expand Down
19 changes: 19 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: queue
POSTGRES_PASSWORD: queue
POSTGRES_DB: queue
ports:
- "5444:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U queue -d queue"]
interval: 2s
timeout: 2s
retries: 20
volumes:
- queue_pg:/var/lib/postgresql/data

volumes:
queue_pg:
8 changes: 6 additions & 2 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
"exports": "./mod.ts",
"tasks": {
"check": "deno check mod.ts",
"test": "deno test --unstable-kv -A",
"test": "deno test -A tests/queue_test.ts",
"db:up": "docker compose up -d postgres",
"db:down": "docker compose down",
"db:migrate": "deno run -A scripts/migrate.ts",
"release": "deno run -A scripts/release.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1"
"@std/assert": "jsr:@std/assert@1",
"pg": "npm:pg@^8.11"
}
}
76 changes: 74 additions & 2 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ export { PendingDispatch } from "./src/dispatcher.ts";
export { FailedJobStore } from "./src/failed.ts";
export { rateLimit, withoutOverlapping } from "./src/middleware.ts";

export { PostgresDriver } from "./src/drivers/postgres.ts";
export type { PostgresDriverOptions } from "./src/drivers/postgres.ts";

export type {
CounterDriver,
EnqueueOptions,
FailedJobDriver,
Listener,
LockDriver,
QueueDriver,
} from "./src/drivers/types.ts";

export type {
DispatchOptions,
FailedJob,
Expand Down
18 changes: 18 additions & 0 deletions scripts/migrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Run the Postgres schema migration for @snaapi/queue.
// Usage: DATABASE_URL=postgres://... deno run -A scripts/migrate.ts

import { PostgresDriver } from "../mod.ts";

const url = Deno.env.get("DATABASE_URL");
if (!url) {
console.error("DATABASE_URL is required");
Deno.exit(1);
}

const driver = new PostgresDriver({ connectionString: url });
try {
await driver.migrate();
console.log("snaapi_queue: schema applied");
} finally {
await driver.close();
}
Loading