diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f70ed67..5499bf2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,22 @@ on: jobs: test: runs-on: ubuntu-latest + services: + postgres: + image: postgres:16-alpine + env: + POSTGRES_USER: queue + POSTGRES_PASSWORD: queue + POSTGRES_DB: queue + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U queue -d queue" + --health-interval 2s + --health-timeout 2s + --health-retries 20 + env: + DATABASE_URL: postgres://queue:queue@localhost:5432/queue steps: - name: Checkout uses: actions/checkout@v6 diff --git a/CHANGELOG.md b/CHANGELOG.md index df590ed..542d196 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,43 @@ # Changelog +## [Unreleased] + +### Breaking + +- Removed the Deno KV driver and the `createQueue()` factory. The queue now runs + exclusively on Postgres. Construct a `PostgresDriver` and pass it to + `new Queue(driver)`. KV Connect on Deno Deploy did not support `enqueue`, + which made the dual driver story misleading on the platform we care about + most. +- `JobContext` no longer exposes `kv`. Use `ctx.locks` and `ctx.counters` from + middleware. +- `FailedJobDriver.store(record)` is now `store(record, envelope)`. Drivers must + persist the original `QueueEnvelope` so retries reconstruct dispatch metadata + (max attempts, backoff schedule, unique key/ttl, chain). + +### Added + +- `PostgresDriver` with `LISTEN`/`NOTIFY` wakeups, polling fallback, and + `SELECT FOR UPDATE SKIP LOCKED` reservation. +- `reserveTtlMs` option. Reserved jobs are deleted only after the handler + returns, so a worker crash leaves the row reserved until peers can reclaim it. +- `tablePrefix` option (and `QUEUE_TABLE_PREFIX` env var) for sharing a database + with other applications. +- Auto migration on first use, with a standalone `deno task db:migrate` task and + `compose.yaml` for local development. + +### Changed + +- Unique dispatch is now enforced via a TTL row in the locks table rather than a + row level constraint. The dedupe window now matches the requested `uniqueTtl` + instead of ending when the job is reserved. +- The `jobs` ready index dropped its partial `WHERE reserved_at IS NULL` + predicate so the reservation query (which also reclaims expired reservations) + can use it. +- `PgFailedStore.retry()` rebuilds the envelope from the stored `QueueEnvelope`, + preserving `maxAttempts`, `backoffSchedule`, `uniqueKey`, `uniqueTtl`, and + `chain`. + ## [1.0.4] - 2026-04-19 ### Other diff --git a/README.md b/README.md index 60c7f1c..076e5cd 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,26 @@ # @snaapi/queue -A job queue for Deno built on top of Deno KV. No external dependencies, no -separate queue server. Jobs are dispatched, persisted, retried, and chained -using the KV queue primitives that ship with Deno. +A Postgres backed job queue for Deno. Designed for Deno Deploy and any +environment where you need durable, multi worker job processing. Uses +`LISTEN`/`NOTIFY` for low latency wakeups and `SELECT FOR UPDATE SKIP LOCKED` +for safe concurrent reservation. ## Install ```ts -import { Queue } from "@snaapi/queue"; +import { PostgresDriver, Queue } from "@snaapi/queue"; ``` ## Quick Start ```ts -import { Queue } from "@snaapi/queue"; +import { PostgresDriver, Queue } from "@snaapi/queue"; import type { JobHandler } from "@snaapi/queue"; -const kv = await Deno.openKv(); +const driver = new PostgresDriver({ + connectionString: Deno.env.get("DATABASE_URL")!, +}); +const queue = new Queue(driver); const sendEmail: JobHandler<{ to: string; subject: string }> = { handle(payload, ctx) { @@ -26,7 +30,6 @@ const sendEmail: JobHandler<{ to: string; subject: string }> = { }, }; -const queue = new Queue(kv); queue.register("send-email", sendEmail); queue.listen(); @@ -36,10 +39,86 @@ await queue.dispatch("send-email", { }).send(); ``` -Note that `queue.listen()` attaches to the `Deno.Kv` handle and must run in the -same long lived process that holds that handle. A script that only dispatches -jobs and then exits will enqueue work, but nothing will consume it until a -process with `listen()` is running against the same KV store. +Note that `queue.listen()` attaches to the backing store and must run in the +same long lived process. A script that only dispatches jobs and then exits will +enqueue work, but nothing will consume it until a process with `listen()` is +running against the same database. + +## Driver + +```ts +import { PostgresDriver, Queue } from "@snaapi/queue"; + +const driver = new PostgresDriver({ + connectionString: Deno.env.get("DATABASE_URL")!, + pollIntervalMs: 1000, // fallback when LISTEN/NOTIFY misses (default 1000) + concurrency: 1, // in-flight jobs per listener (default 1) + reserveTtlMs: 5 * 60_000, // how long a worker can hold a job before peers can reclaim it + tablePrefix: "snaapi_", // see "Table prefix" below +}); +const queue = new Queue(driver); +``` + +The driver wakes up via `LISTEN`/`NOTIFY` when a new job is enqueued and falls +back to a polling timer for delayed jobs. Multiple worker processes can +`listen()` against the same database. `SELECT FOR UPDATE SKIP LOCKED` ensures +each ready job is reserved by exactly one worker. Reserved jobs are deleted only +after the handler returns, so a worker crash leaves the row reserved until +`reserveTtlMs` elapses, at which point a peer can reclaim it. + +### Auto migration + +The Postgres driver runs `migrate()` lazily on first use. The schema is +idempotent (`CREATE TABLE IF NOT EXISTS`), so it is safe to leave on across +deploys. Disable it (and run migrations as a separate step) by passing +`autoMigrate: false`: + +```ts +const driver = new PostgresDriver({ + connectionString: Deno.env.get("DATABASE_URL")!, + autoMigrate: false, +}); +await driver.migrate(); // run explicitly if you prefer +``` + +The standalone task is also available: + +```bash +DATABASE_URL=postgres://... deno task db:migrate +``` + +### Table prefix + +By default the driver creates four tables: `snaapi_jobs`, `snaapi_failed_jobs`, +`snaapi_locks`, `snaapi_counters`. The prefix (default `snaapi_`) is +configurable via the `tablePrefix` option or the `QUEUE_TABLE_PREFIX` env var. +The prefix must match `[a-zA-Z_][a-zA-Z0-9_]*`. + +```ts +new PostgresDriver({ + connectionString: url, + tablePrefix: "myapp_", + // tables become myapp_jobs, myapp_failed_jobs, myapp_locks, myapp_counters +}); +``` + +### Local development + +A `compose.yaml` ships with the project: + +```bash +docker compose up -d postgres +export DATABASE_URL=postgres://queue:queue@localhost:5444/queue +deno task test +``` + +### Migrating from 1.x + +Versions before 2.0 took a `Deno.Kv` directly and exposed `ctx.kv` to handlers. +The 2.x line drops Deno KV support entirely (KV Connect on Deno Deploy does not +support `enqueue`). Pass a `PostgresDriver` to the `Queue` constructor and use +`ctx.locks` / `ctx.counters` from middleware instead of `ctx.kv`. Pending jobs +do not migrate between stores; drain the KV queue before cutting over. ## Dispatch Options @@ -129,8 +208,8 @@ await queue.chain([ ## Failed Jobs -Jobs that exhaust all retry attempts are stored in KV. You can inspect, retry, -or remove them. +Jobs that exhaust all retry attempts are persisted to the backing store. You can +inspect, retry, or remove them. ```ts // List failed jobs @@ -163,7 +242,8 @@ interface JobContext { maxAttempts: number; // configured max attempts queue: string; // queue name id: string; // unique dispatch ID - kv: Deno.Kv; // KV store reference + locks: LockDriver; // mutex primitive used by withoutOverlapping + counters: CounterDriver; // counter primitive used by rateLimit } ``` diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..0e5f1f3 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,19 @@ +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: queue + POSTGRES_PASSWORD: queue + POSTGRES_DB: queue + ports: + - "5444:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U queue -d queue"] + interval: 2s + timeout: 2s + retries: 20 + volumes: + - queue_pg:/var/lib/postgresql/data + +volumes: + queue_pg: diff --git a/deno.json b/deno.json index 26cb88e..fdc2389 100644 --- a/deno.json +++ b/deno.json @@ -4,10 +4,14 @@ "exports": "./mod.ts", "tasks": { "check": "deno check mod.ts", - "test": "deno test --unstable-kv -A", + "test": "deno test -A tests/queue_test.ts", + "db:up": "docker compose up -d postgres", + "db:down": "docker compose down", + "db:migrate": "deno run -A scripts/migrate.ts", "release": "deno run -A scripts/release.ts" }, "imports": { - "@std/assert": "jsr:@std/assert@1" + "@std/assert": "jsr:@std/assert@1", + "pg": "npm:pg@^8.11" } } diff --git a/deno.lock b/deno.lock index 8845d44..20d97ea 100644 --- a/deno.lock +++ b/deno.lock @@ -2,7 +2,8 @@ "version": "5", "specifiers": { "jsr:@std/assert@1": "1.0.19", - "jsr:@std/internal@^1.0.12": "1.0.12" + "jsr:@std/internal@^1.0.12": "1.0.12", + "npm:pg@^8.11.0": "8.20.0" }, "jsr": { "@std/assert@1.0.19": { @@ -15,9 +16,80 @@ "integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027" } }, + "npm": { + "pg-cloudflare@1.3.0": { + "integrity": "sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==" + }, + "pg-connection-string@2.12.0": { + "integrity": "sha512-U7qg+bpswf3Cs5xLzRqbXbQl85ng0mfSV/J0nnA31MCLgvEaAo7CIhmeyrmJpOr7o+zm0rXK+hNnT5l9RHkCkQ==" + }, + "pg-int8@1.0.1": { + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==" + }, + "pg-pool@3.13.0_pg@8.20.0": { + "integrity": "sha512-gB+R+Xud1gLFuRD/QgOIgGOBE2KCQPaPwkzBBGC9oG69pHTkhQeIuejVIk3/cnDyX39av2AxomQiyPT13WKHQA==", + "dependencies": [ + "pg" + ] + }, + "pg-protocol@1.13.0": { + "integrity": "sha512-zzdvXfS6v89r6v7OcFCHfHlyG/wvry1ALxZo4LqgUoy7W9xhBDMaqOuMiF3qEV45VqsN6rdlcehHrfDtlCPc8w==" + }, + "pg-types@2.2.0": { + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "dependencies": [ + "pg-int8", + "postgres-array", + "postgres-bytea", + "postgres-date", + "postgres-interval" + ] + }, + "pg@8.20.0": { + "integrity": "sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==", + "dependencies": [ + "pg-connection-string", + "pg-pool", + "pg-protocol", + "pg-types", + "pgpass" + ], + "optionalDependencies": [ + "pg-cloudflare" + ] + }, + "pgpass@1.0.5": { + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "dependencies": [ + "split2" + ] + }, + "postgres-array@2.0.0": { + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==" + }, + "postgres-bytea@1.0.1": { + "integrity": "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==" + }, + "postgres-date@1.0.7": { + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==" + }, + "postgres-interval@1.2.0": { + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "dependencies": [ + "xtend" + ] + }, + "split2@4.2.0": { + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==" + }, + "xtend@4.0.2": { + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" + } + }, "workspace": { "dependencies": [ - "jsr:@std/assert@1" + "jsr:@std/assert@1", + "npm:pg@^8.11.0" ] } } diff --git a/mod.ts b/mod.ts index dc18bf2..7cbfafe 100644 --- a/mod.ts +++ b/mod.ts @@ -3,6 +3,18 @@ export { PendingDispatch } from "./src/dispatcher.ts"; export { FailedJobStore } from "./src/failed.ts"; export { rateLimit, withoutOverlapping } from "./src/middleware.ts"; +export { PostgresDriver } from "./src/drivers/postgres.ts"; +export type { PostgresDriverOptions } from "./src/drivers/postgres.ts"; + +export type { + CounterDriver, + EnqueueOptions, + FailedJobDriver, + Listener, + LockDriver, + QueueDriver, +} from "./src/drivers/types.ts"; + export type { DispatchOptions, FailedJob, diff --git a/scripts/migrate.ts b/scripts/migrate.ts new file mode 100644 index 0000000..e456bb9 --- /dev/null +++ b/scripts/migrate.ts @@ -0,0 +1,18 @@ +// Run the Postgres schema migration for @snaapi/queue. +// Usage: DATABASE_URL=postgres://... deno run -A scripts/migrate.ts + +import { PostgresDriver } from "../mod.ts"; + +const url = Deno.env.get("DATABASE_URL"); +if (!url) { + console.error("DATABASE_URL is required"); + Deno.exit(1); +} + +const driver = new PostgresDriver({ connectionString: url }); +try { + await driver.migrate(); + console.log("snaapi_queue: schema applied"); +} finally { + await driver.close(); +} diff --git a/src/chain.ts b/src/chain.ts index 1040e9c..462d7e5 100644 --- a/src/chain.ts +++ b/src/chain.ts @@ -1,10 +1,10 @@ import type { JobChainStep } from "./types.ts"; import type { QueueEnvelope } from "./envelope.ts"; -import { Keys } from "./keys.ts"; +import type { QueueDriver } from "./drivers/types.ts"; /** Dispatch a chain of jobs. The first job is enqueued immediately; remaining steps are carried in the envelope. */ export async function dispatchChain( - kv: Deno.Kv, + driver: QueueDriver, steps: JobChainStep[], ): Promise { if (steps.length === 0) return; @@ -24,15 +24,13 @@ export async function dispatchChain( chain: rest.length > 0 ? rest : undefined, }; - await kv.enqueue(envelope, { - keysIfUndelivered: [Keys.undelivered(envelope.id)], - }); + await driver.enqueue(envelope, {}); } /** Enqueue the next step in a chain. Called by the worker after a successful job. */ export async function enqueueChainStep( - kv: Deno.Kv, + driver: QueueDriver, remainingSteps: JobChainStep[], ): Promise { - await dispatchChain(kv, remainingSteps); + await dispatchChain(driver, remainingSteps); } diff --git a/src/dispatcher.ts b/src/dispatcher.ts index ffdf73b..6e65a33 100644 --- a/src/dispatcher.ts +++ b/src/dispatcher.ts @@ -1,5 +1,5 @@ import type { QueueEnvelope } from "./envelope.ts"; -import { Keys } from "./keys.ts"; +import type { EnqueueOptions, QueueDriver } from "./drivers/types.ts"; const DEFAULT_QUEUE = "default"; const DEFAULT_MAX_ATTEMPTS = 3; @@ -8,7 +8,7 @@ const DEFAULT_UNIQUE_TTL = 300_000; // 5 minutes /** Chainable builder for dispatching a job. Call .send() to enqueue. */ export class PendingDispatch { - #kv: Deno.Kv; + #driver: QueueDriver; #jobName: string; #payload: unknown; #delay?: number; @@ -20,12 +20,12 @@ export class PendingDispatch { #chain?: QueueEnvelope["chain"]; constructor( - kv: Deno.Kv, + driver: QueueDriver, jobName: string, payload: unknown, chain?: QueueEnvelope["chain"], ) { - this.#kv = kv; + this.#driver = driver; this.#jobName = jobName; this.#payload = payload; this.#chain = chain; @@ -55,7 +55,7 @@ export class PendingDispatch { return this; } - /** Make this a unique job — prevents duplicates with the same key. */ + /** Make this a unique job. Prevents duplicates with the same key. */ unique(key: string, ttl?: number): this { this.#uniqueKey = key; if (ttl !== undefined) this.#uniqueTtl = ttl; @@ -80,29 +80,19 @@ export class PendingDispatch { chain: this.#chain, }; - const enqueueOpts: { delay?: number; keysIfUndelivered?: Deno.KvKey[] } = { - keysIfUndelivered: [Keys.undelivered(id)], - }; - if (this.#delay !== undefined) { - enqueueOpts.delay = this.#delay; - } + const opts: EnqueueOptions = {}; + if (this.#delay !== undefined) opts.delay = this.#delay; if (this.#uniqueKey) { - // Atomic: only enqueue if the lock key does NOT exist (versionstamp null) - const lockKey = Keys.unique(this.#uniqueKey); - - const result = await this.#kv.atomic() - .check({ key: lockKey, versionstamp: null }) - .enqueue(envelope, enqueueOpts) - .set(lockKey, true, { expireIn: this.#uniqueTtl }) - .commit(); - - if (!result.ok) { - // Lock already exists — duplicate job, silently skip - return; - } + // Driver decides how to atomically gate the enqueue on a lock. + await this.#driver.enqueueUnique( + envelope, + opts, + this.#uniqueKey, + this.#uniqueTtl, + ); } else { - await this.#kv.enqueue(envelope, enqueueOpts); + await this.#driver.enqueue(envelope, opts); } } } diff --git a/src/drivers/postgres.ts b/src/drivers/postgres.ts new file mode 100644 index 0000000..551c093 --- /dev/null +++ b/src/drivers/postgres.ts @@ -0,0 +1,630 @@ +import pg from "pg"; +import type { FailedJob } from "../types.ts"; +import type { QueueEnvelope } from "../envelope.ts"; +import type { + CounterDriver, + EnqueueOptions, + FailedJobDriver, + Listener, + LockDriver, + QueueDriver, +} from "./types.ts"; + +const { Pool, Client } = pg; +type Pool = pg.Pool; +type PoolClient = pg.PoolClient; +type Client = pg.Client; + +const DEFAULT_POLL_INTERVAL = 1000; +const DEFAULT_RESERVE_TTL = 5 * 60_000; +const DEFAULT_TABLE_PREFIX = "snaapi_"; + +export interface PostgresDriverOptions { + /** Postgres connection string (e.g. postgres://user:pass@host:5432/db). */ + connectionString: string; + /** Polling fallback interval in ms. Default: 1000. */ + pollIntervalMs?: number; + /** Max concurrent in-flight jobs per listener. Default: 1. */ + concurrency?: number; + /** How long a worker can hold a job before another can steal it. Default: 5m. */ + reserveTtlMs?: number; + /** Pool size. Default: 10. */ + poolSize?: number; + /** + * Prefix for the queue tables. Defaults to `snaapi_` (so tables are + * `snaapi_jobs`, `snaapi_failed_jobs`, `snaapi_locks`, `snaapi_counters`). + * The `LISTEN`/`NOTIFY` channel is derived from the prefix as well. + * Falls back to the `QUEUE_TABLE_PREFIX` env var when omitted. + */ + tablePrefix?: string; + /** + * Auto-run `migrate()` on first use. Default: true. Disable if you manage + * the schema yourself or run migrations as a separate deploy step. + */ + autoMigrate?: boolean; +} + +interface TableNames { + jobs: string; + failedJobs: string; + locks: string; + counters: string; + notifyChannel: string; +} + +function resolvePrefix(opts: PostgresDriverOptions): string { + return opts.tablePrefix ?? + Deno.env.get("QUEUE_TABLE_PREFIX") ?? + DEFAULT_TABLE_PREFIX; +} + +function uniqueLockKey(key: string): string { + // Namespace the unique-dispatch lock to avoid colliding with other lock + // keys consumers may use through ctx.locks (`withoutOverlapping`, etc). + return `unique:${key}`; +} + +function tableNamesFor(prefix: string): TableNames { + // Identifiers come from config, not user input; bare interpolation is safe. + // We also validate the prefix to keep that promise honest. + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(prefix)) { + throw new Error( + `PostgresDriver: tablePrefix "${prefix}" must match [a-zA-Z_][a-zA-Z0-9_]*`, + ); + } + return { + jobs: `${prefix}jobs`, + failedJobs: `${prefix}failed_jobs`, + locks: `${prefix}locks`, + counters: `${prefix}counters`, + notifyChannel: `${prefix}jobs_channel`, + }; +} + +function schemaSql(t: TableNames): string { + return ` +CREATE TABLE IF NOT EXISTS ${t.jobs} ( + id UUID PRIMARY KEY, + queue TEXT NOT NULL, + envelope JSONB NOT NULL, + available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + reserved_at TIMESTAMPTZ, + unique_key TEXT UNIQUE +); +DROP INDEX IF EXISTS ${t.jobs}_ready_idx; +CREATE INDEX IF NOT EXISTS ${t.jobs}_available_idx + ON ${t.jobs} (available_at); + +CREATE TABLE IF NOT EXISTS ${t.failedJobs} ( + id UUID PRIMARY KEY, + job_name TEXT NOT NULL, + queue TEXT NOT NULL, + payload JSONB NOT NULL, + error TEXT NOT NULL, + failed_at TIMESTAMPTZ NOT NULL, + attempts INT NOT NULL, + envelope JSONB NOT NULL +); +CREATE INDEX IF NOT EXISTS ${t.failedJobs}_queue_idx + ON ${t.failedJobs} (queue, failed_at DESC); + +CREATE TABLE IF NOT EXISTS ${t.locks} ( + key TEXT PRIMARY KEY, + expires_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS ${t.counters} ( + key TEXT PRIMARY KEY, + count BIGINT NOT NULL, + expires_at TIMESTAMPTZ NOT NULL +); +`; +} + +class PgFailedStore implements FailedJobDriver { + #pool: Pool; + #t: TableNames; + #ensureReady: () => Promise; + + constructor(pool: Pool, t: TableNames, ensureReady: () => Promise) { + this.#pool = pool; + this.#t = t; + this.#ensureReady = ensureReady; + } + + async store(record: FailedJob, envelope: QueueEnvelope): Promise { + await this.#ensureReady(); + await this.#pool.query( + `INSERT INTO ${this.#t.failedJobs} + (id, job_name, queue, payload, error, failed_at, attempts, envelope) + VALUES ($1, $2, $3, $4, $5, to_timestamp($6 / 1000.0), $7, $8) + ON CONFLICT (id) DO UPDATE SET + error = EXCLUDED.error, + failed_at = EXCLUDED.failed_at, + attempts = EXCLUDED.attempts, + envelope = EXCLUDED.envelope`, + [ + record.id, + record.jobName, + record.queue, + JSON.stringify(record.payload), + record.error, + record.failedAt, + record.attempts, + JSON.stringify(envelope), + ], + ); + } + + async list(queue: string | undefined, limit: number): Promise { + await this.#ensureReady(); + const params: unknown[] = []; + let where = ""; + if (queue) { + params.push(queue); + where = "WHERE queue = $1"; + } + params.push(limit); + const limitParam = `$${params.length}`; + const res = await this.#pool.query( + `SELECT id, job_name, queue, payload, error, + extract(epoch from failed_at) * 1000 AS failed_at_ms, + attempts + FROM ${this.#t.failedJobs} + ${where} + ORDER BY failed_at DESC + LIMIT ${limitParam}`, + params, + ); + return res.rows.map(rowToFailed); + } + + async get(id: string, queue: string): Promise { + await this.#ensureReady(); + const res = await this.#pool.query( + `SELECT id, job_name, queue, payload, error, + extract(epoch from failed_at) * 1000 AS failed_at_ms, + attempts + FROM ${this.#t.failedJobs} + WHERE id = $1 AND queue = $2`, + [id, queue], + ); + if (res.rowCount === 0) return null; + return rowToFailed(res.rows[0]); + } + + async retry(id: string, queue: string): Promise { + await this.#ensureReady(); + const client = await this.#pool.connect(); + try { + await client.query("BEGIN"); + const res = await client.query<{ envelope: QueueEnvelope }>( + `DELETE FROM ${this.#t.failedJobs} + WHERE id = $1 AND queue = $2 + RETURNING envelope`, + [id, queue], + ); + if (res.rowCount === 0) { + await client.query("ROLLBACK"); + return false; + } + const original = res.rows[0].envelope; + const fresh: QueueEnvelope = { + __snaapi_queue: true, + id: crypto.randomUUID(), + jobName: original.jobName, + payload: original.payload, + queue: original.queue, + attempt: 1, + maxAttempts: original.maxAttempts, + backoffSchedule: original.backoffSchedule, + uniqueKey: original.uniqueKey, + uniqueTtl: original.uniqueTtl, + chain: original.chain, + }; + await insertJob(client, this.#t, fresh, 0); + await client.query("COMMIT"); + return true; + } catch (err) { + await client.query("ROLLBACK").catch(() => {}); + throw err; + } finally { + client.release(); + } + } + + async retryAll(queue: string): Promise { + const list = await this.list(queue, 1000); + let count = 0; + for (const failed of list) { + if (await this.retry(failed.id, queue)) count++; + } + return count; + } + + async forget(id: string, queue: string): Promise { + await this.#ensureReady(); + await this.#pool.query( + `DELETE FROM ${this.#t.failedJobs} WHERE id = $1 AND queue = $2`, + [id, queue], + ); + } + + async purge(queue?: string): Promise { + await this.#ensureReady(); + const res = queue + ? await this.#pool.query( + `DELETE FROM ${this.#t.failedJobs} WHERE queue = $1`, + [queue], + ) + : await this.#pool.query(`DELETE FROM ${this.#t.failedJobs}`); + return res.rowCount ?? 0; + } +} + +function rowToFailed(row: Record): FailedJob { + return { + id: String(row.id), + jobName: String(row.job_name), + queue: String(row.queue), + payload: row.payload, + error: String(row.error), + failedAt: Number(row.failed_at_ms), + attempts: Number(row.attempts), + }; +} + +class PgLockDriver implements LockDriver { + #pool: Pool; + #t: TableNames; + #ensureReady: () => Promise; + + constructor(pool: Pool, t: TableNames, ensureReady: () => Promise) { + this.#pool = pool; + this.#t = t; + this.#ensureReady = ensureReady; + } + + async acquire(key: string, ttlMs: number): Promise { + await this.#ensureReady(); + const res = await this.#pool.query<{ acquired: boolean }>( + `INSERT INTO ${this.#t.locks} (key, expires_at) + VALUES ($1, NOW() + ($2 || ' milliseconds')::INTERVAL) + ON CONFLICT (key) DO UPDATE + SET expires_at = EXCLUDED.expires_at + WHERE ${this.#t.locks}.expires_at <= NOW() + RETURNING true AS acquired`, + [key, String(ttlMs)], + ); + return (res.rowCount ?? 0) > 0; + } + + async release(key: string): Promise { + await this.#ensureReady(); + await this.#pool.query( + `DELETE FROM ${this.#t.locks} WHERE key = $1`, + [key], + ); + } +} + +class PgCounterDriver implements CounterDriver { + #pool: Pool; + #t: TableNames; + #ensureReady: () => Promise; + + constructor(pool: Pool, t: TableNames, ensureReady: () => Promise) { + this.#pool = pool; + this.#t = t; + this.#ensureReady = ensureReady; + } + + async increment(key: string, windowMs: number): Promise { + await this.#ensureReady(); + const windowKey = `${key}:${Math.floor(Date.now() / windowMs)}`; + const res = await this.#pool.query<{ count: string }>( + `INSERT INTO ${this.#t.counters} (key, count, expires_at) + VALUES ($1, 1, NOW() + ($2 || ' milliseconds')::INTERVAL) + ON CONFLICT (key) DO UPDATE + SET count = ${this.#t.counters}.count + 1, + expires_at = EXCLUDED.expires_at + RETURNING count`, + [windowKey, String(windowMs)], + ); + return Number(res.rows[0].count); + } +} + +async function insertJob( + client: PoolClient | Client | Pool, + t: TableNames, + envelope: QueueEnvelope, + delayMs: number, +): Promise { + // The row PK is independent of envelope.id so a mid-flight retry + // (which keeps the same envelope.id) does not collide with the + // still-reserved original row. unique_key is NULL because dedup is + // enforced via the locks table; see PostgresDriver.enqueueUnique. + const res = await client.query( + `INSERT INTO ${t.jobs} (id, queue, envelope, available_at) + VALUES ($1, $2, $3, NOW() + ($4 || ' milliseconds')::INTERVAL)`, + [ + crypto.randomUUID(), + envelope.queue, + JSON.stringify(envelope), + String(delayMs), + ], + ); + return (res.rowCount ?? 0) > 0; +} + +/** Postgres implementation of `QueueDriver`. */ +export class PostgresDriver implements QueueDriver { + #pool: Pool; + #opts: Required< + Omit + >; + #connectionString: string; + #t: TableNames; + #migrationPromise?: Promise; + #notifyClient?: Client; + #pollTimer?: number; + #cleanupTimer?: number; + #stopped = false; + #wakeup: () => void = () => {}; + #inFlight = 0; + readonly failed: FailedJobDriver; + readonly locks: LockDriver; + readonly counters: CounterDriver; + + constructor(opts: PostgresDriverOptions) { + this.#connectionString = opts.connectionString; + this.#opts = { + pollIntervalMs: opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL, + concurrency: opts.concurrency ?? 1, + reserveTtlMs: opts.reserveTtlMs ?? DEFAULT_RESERVE_TTL, + poolSize: opts.poolSize ?? 10, + autoMigrate: opts.autoMigrate ?? true, + }; + this.#t = tableNamesFor(resolvePrefix(opts)); + this.#pool = new Pool({ + connectionString: this.#connectionString, + max: this.#opts.poolSize, + }); + const ensureReady = () => this.#ensureReady(); + this.failed = new PgFailedStore(this.#pool, this.#t, ensureReady); + this.locks = new PgLockDriver(this.#pool, this.#t, ensureReady); + this.counters = new PgCounterDriver(this.#pool, this.#t, ensureReady); + } + + /** Create or update the queue tables. Idempotent. */ + async migrate(): Promise { + await this.#pool.query(schemaSql(this.#t)); + } + + #ensureReady(): Promise { + if (!this.#opts.autoMigrate) return Promise.resolve(); + if (!this.#migrationPromise) { + this.#migrationPromise = this.migrate().catch((err) => { + // Reset so a later call can retry instead of being permanently broken. + this.#migrationPromise = undefined; + throw err; + }); + } + return this.#migrationPromise; + } + + async enqueue(envelope: QueueEnvelope, opts: EnqueueOptions): Promise { + await this.#ensureReady(); + await insertJob(this.#pool, this.#t, envelope, opts.delay ?? 0); + await this.#pool.query(`SELECT pg_notify($1, $2)`, [ + this.#t.notifyChannel, + envelope.queue, + ]); + } + + async enqueueUnique( + envelope: QueueEnvelope, + opts: EnqueueOptions, + lockKey: string, + lockTtlMs: number, + ): Promise { + await this.#ensureReady(); + // Hold a TTL-bounded row in the locks table so the dedupe window + // matches the requested lockTtlMs (the unique_key column on jobs would + // only dedupe until the row is deleted on completion, which is shorter + // than the documented contract). + const acquired = await this.#acquireUniqueLock(lockKey, lockTtlMs); + if (!acquired) return false; + + try { + const inserted = await insertJob( + this.#pool, + this.#t, + envelope, + opts.delay ?? 0, + ); + if (!inserted) { + await this.clearUniqueLock(lockKey); + return false; + } + await this.#pool.query(`SELECT pg_notify($1, $2)`, [ + this.#t.notifyChannel, + envelope.queue, + ]); + return true; + } catch (err) { + await this.clearUniqueLock(lockKey).catch(() => {}); + throw err; + } + } + + async clearUniqueLock(uniqueKey: string): Promise { + await this.#ensureReady(); + await this.#pool.query( + `DELETE FROM ${this.#t.locks} WHERE key = $1`, + [uniqueLockKey(uniqueKey)], + ); + } + + async #acquireUniqueLock( + uniqueKey: string, + ttlMs: number, + ): Promise { + const res = await this.#pool.query( + `INSERT INTO ${this.#t.locks} (key, expires_at) + VALUES ($1, NOW() + ($2 || ' milliseconds')::INTERVAL) + ON CONFLICT (key) DO UPDATE + SET expires_at = EXCLUDED.expires_at + WHERE ${this.#t.locks}.expires_at <= NOW() + RETURNING key`, + [uniqueLockKey(uniqueKey), String(ttlMs)], + ); + return (res.rowCount ?? 0) > 0; + } + + listen(handler: (envelope: QueueEnvelope) => Promise): Listener { + this.#stopped = false; + + const drainOnce = async () => { + if (this.#stopped) return; + while ( + !this.#stopped && this.#inFlight < this.#opts.concurrency + ) { + const reserved = await this.#reserveOne(); + if (!reserved) break; + this.#inFlight++; + (async () => { + try { + await handler(reserved.envelope); + } catch (_err) { + // Worker.ts already handles errors. Anything reaching here is a + // bug in the worker pipeline. + } finally { + // Ack the reservation. The row is only removed once the handler + // has returned, so a worker crash leaves the row reserved and a + // peer can reclaim it after reserveTtlMs. + await this.#ackReservation(reserved.id).catch(() => {}); + this.#inFlight--; + this.#wakeup(); + } + })(); + } + }; + + const loop = async () => { + // Make sure tables exist before we try to SELECT from them. + await this.#ensureReady().catch(() => {}); + while (!this.#stopped) { + await drainOnce(); + const wakeupPromise = new Promise((resolve) => { + this.#wakeup = resolve; + this.#pollTimer = setTimeout( + resolve, + this.#opts.pollIntervalMs, + ) as unknown as number; + }); + await wakeupPromise; + if (this.#pollTimer !== undefined) { + clearTimeout(this.#pollTimer); + this.#pollTimer = undefined; + } + } + }; + + this.#startNotifyClient().catch(() => { + // Notify failure falls back to polling. + }); + loop(); + + this.#cleanupTimer = setInterval(() => { + this.#pool.query( + `DELETE FROM ${this.#t.locks} WHERE expires_at <= NOW(); + DELETE FROM ${this.#t.counters} WHERE expires_at <= NOW();`, + ).catch(() => {}); + }, 60_000) as unknown as number; + + return { + stop: async () => { + this.#stopped = true; + this.#wakeup(); + if (this.#pollTimer !== undefined) clearTimeout(this.#pollTimer); + if (this.#cleanupTimer !== undefined) clearInterval(this.#cleanupTimer); + if (this.#notifyClient) { + try { + await this.#notifyClient.end(); + } catch (_err) { /* ignore */ } + this.#notifyClient = undefined; + } + const deadline = Date.now() + 5_000; + while (this.#inFlight > 0 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 25)); + } + }, + }; + } + + async close(): Promise { + this.#stopped = true; + if (this.#cleanupTimer !== undefined) clearInterval(this.#cleanupTimer); + if (this.#notifyClient) { + try { + await this.#notifyClient.end(); + } catch (_err) { /* ignore */ } + this.#notifyClient = undefined; + } + await this.#pool.end(); + } + + async #reserveOne(): Promise<{ id: string; envelope: QueueEnvelope } | null> { + const client = await this.#pool.connect(); + try { + await client.query("BEGIN"); + const res = await client.query<{ id: string; envelope: QueueEnvelope }>( + `WITH next AS ( + SELECT id FROM ${this.#t.jobs} + WHERE available_at <= NOW() + AND ( + reserved_at IS NULL + OR reserved_at <= NOW() - ($1::bigint * INTERVAL '1 millisecond') + ) + ORDER BY available_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE ${this.#t.jobs} AS jobs + SET reserved_at = NOW() + FROM next + WHERE jobs.id = next.id + RETURNING jobs.id, jobs.envelope`, + [this.#opts.reserveTtlMs], + ); + await client.query("COMMIT"); + if (res.rowCount === 0) return null; + return { id: res.rows[0].id, envelope: res.rows[0].envelope }; + } catch (err) { + await client.query("ROLLBACK").catch(() => {}); + throw err; + } finally { + client.release(); + } + } + + async #ackReservation(id: string): Promise { + await this.#pool.query( + `DELETE FROM ${this.#t.jobs} WHERE id = $1`, + [id], + ); + } + + async #startNotifyClient(): Promise { + const client = new Client({ connectionString: this.#connectionString }); + await client.connect(); + client.on("notification", () => this.#wakeup()); + client.on("error", () => { + this.#notifyClient = undefined; + }); + await client.query(`LISTEN ${this.#t.notifyChannel}`); + this.#notifyClient = client; + } +} diff --git a/src/drivers/types.ts b/src/drivers/types.ts new file mode 100644 index 0000000..e6b2b82 --- /dev/null +++ b/src/drivers/types.ts @@ -0,0 +1,69 @@ +import type { FailedJob } from "../types.ts"; +import type { QueueEnvelope } from "../envelope.ts"; + +/** Options passed to a driver's enqueue call. */ +export interface EnqueueOptions { + /** Delay in milliseconds before delivery. */ + delay?: number; +} + +/** Returned by `QueueDriver.listen`. Stop the consumer with `stop()`. */ +export interface Listener { + stop(): Promise; +} + +/** Backing-store specific operations for failed jobs. */ +export interface FailedJobDriver { + /** + * Persist a failed job. The original envelope is stored alongside the + * record so retry() can reconstruct the dispatch metadata + * (maxAttempts, backoffSchedule, uniqueKey/ttl, chain) accurately. + */ + store(record: FailedJob, envelope: QueueEnvelope): Promise; + list(queue: string | undefined, limit: number): Promise; + get(id: string, queue: string): Promise; + /** Re-enqueue a failed job and remove it from the failed store. */ + retry(id: string, queue: string): Promise; + /** Re-enqueue every failed job in the queue. Returns count retried. */ + retryAll(queue: string): Promise; + forget(id: string, queue: string): Promise; + purge(queue?: string): Promise; +} + +/** Mutex-style lock primitive used by the `withoutOverlapping` middleware. */ +export interface LockDriver { + /** Acquire a TTL-bounded lock. Returns true iff the caller now owns it. */ + acquire(key: string, ttlMs: number): Promise; + release(key: string): Promise; +} + +/** Windowed counter primitive used by the `rateLimit` middleware. */ +export interface CounterDriver { + /** Atomically increment the counter and return the new value. */ + increment(key: string, windowMs: number): Promise; +} + +/** The transport-agnostic interface every backing store implements. */ +export interface QueueDriver { + enqueue(envelope: QueueEnvelope, opts: EnqueueOptions): Promise; + /** + * Atomically: enqueue iff `lockKey` is unheld, then claim the lock for + * `lockTtlMs`. Returns false when the lock was already held (duplicate). + */ + enqueueUnique( + envelope: QueueEnvelope, + opts: EnqueueOptions, + lockKey: string, + lockTtlMs: number, + ): Promise; + /** Start consuming jobs. The handler runs once per delivered envelope. */ + listen(handler: (envelope: QueueEnvelope) => Promise): Listener; + /** Release a unique-dispatch lock after the job runs (success or terminal failure). */ + clearUniqueLock(uniqueKey: string): Promise; + + readonly failed: FailedJobDriver; + readonly locks: LockDriver; + readonly counters: CounterDriver; + + close(): Promise; +} diff --git a/src/envelope.ts b/src/envelope.ts index a405b88..47b58c7 100644 --- a/src/envelope.ts +++ b/src/envelope.ts @@ -1,6 +1,6 @@ import type { JobChainStep } from "./types.ts"; -/** The message envelope serialized into Deno KV queue. */ +/** The message envelope persisted into the queue store. */ export interface QueueEnvelope { /** Discriminator so the listener knows this message belongs to us */ __snaapi_queue: true; diff --git a/src/failed.ts b/src/failed.ts index 67fc79e..717bb66 100644 --- a/src/failed.ts +++ b/src/failed.ts @@ -1,120 +1,41 @@ import type { FailedJob } from "./types.ts"; -import type { QueueEnvelope } from "./envelope.ts"; -import { Keys } from "./keys.ts"; - -/** Store a failed job record in KV. Used internally by the worker. */ -export async function storeFailed( - kv: Deno.Kv, - envelope: QueueEnvelope, - error: string, -): Promise { - const record: FailedJob = { - id: envelope.id, - jobName: envelope.jobName, - queue: envelope.queue, - payload: envelope.payload, - error, - failedAt: Date.now(), - attempts: envelope.attempt, - }; - await kv.set(Keys.failed(envelope.queue, envelope.id), record); -} +import type { QueueDriver } from "./drivers/types.ts"; /** Provides access to failed job records for inspection and retry. */ export class FailedJobStore { - #kv: Deno.Kv; + #driver: QueueDriver; - constructor(kv: Deno.Kv) { - this.#kv = kv; + constructor(driver: QueueDriver) { + this.#driver = driver; } /** List failed jobs, optionally filtered by queue name. */ - async list(queue?: string, limit = 100): Promise { - const prefix = queue ? Keys.failedPrefix(queue) : Keys.failedAll(); - const results: FailedJob[] = []; - - for await (const entry of this.#kv.list({ prefix }, { limit })) { - results.push(entry.value); - } - return results; + list(queue?: string, limit = 100): Promise { + return this.#driver.failed.list(queue, limit); } /** Get a single failed job by ID. Searches the given queue or "default". */ - async get(id: string, queue = "default"): Promise { - const entry = await this.#kv.get(Keys.failed(queue, id)); - return entry.value; + get(id: string, queue = "default"): Promise { + return this.#driver.failed.get(id, queue); } /** Retry a failed job: re-enqueue it with attempt reset to 1, then remove from failed store. */ - async retry(id: string, queue = "default"): Promise { - const entry = await this.#kv.get(Keys.failed(queue, id)); - if (!entry.value) return false; - - const record = entry.value; - const envelope: QueueEnvelope = { - __snaapi_queue: true, - id: crypto.randomUUID(), - jobName: record.jobName, - payload: record.payload, - queue: record.queue, - attempt: 1, - maxAttempts: record.attempts, // preserve original max - backoffSchedule: [1000, 5000, 30000], - }; - - const result = await this.#kv.atomic() - .check(entry) - .enqueue(envelope) - .delete(Keys.failed(queue, id)) - .commit(); - - return result.ok; + retry(id: string, queue = "default"): Promise { + return this.#driver.failed.retry(id, queue); } /** Retry all failed jobs in a queue. Returns count of retried jobs. */ - async retryAll(queue = "default"): Promise { - let count = 0; - for await ( - const entry of this.#kv.list({ - prefix: Keys.failedPrefix(queue), - }) - ) { - const record = entry.value; - const envelope: QueueEnvelope = { - __snaapi_queue: true, - id: crypto.randomUUID(), - jobName: record.jobName, - payload: record.payload, - queue: record.queue, - attempt: 1, - maxAttempts: record.attempts, - backoffSchedule: [1000, 5000, 30000], - }; - - const result = await this.#kv.atomic() - .check(entry) - .enqueue(envelope) - .delete(entry.key) - .commit(); - - if (result.ok) count++; - } - return count; + retryAll(queue = "default"): Promise { + return this.#driver.failed.retryAll(queue); } /** Delete a failed job record without retrying. */ - async forget(id: string, queue = "default"): Promise { - await this.#kv.delete(Keys.failed(queue, id)); + forget(id: string, queue = "default"): Promise { + return this.#driver.failed.forget(id, queue); } /** Purge all failed jobs in a queue. Returns count of purged records. */ - async purge(queue?: string): Promise { - const prefix = queue ? Keys.failedPrefix(queue) : Keys.failedAll(); - let count = 0; - for await (const entry of this.#kv.list({ prefix })) { - await this.#kv.delete(entry.key); - count++; - } - return count; + purge(queue?: string): Promise { + return this.#driver.failed.purge(queue); } } diff --git a/src/keys.ts b/src/keys.ts deleted file mode 100644 index d08cd49..0000000 --- a/src/keys.ts +++ /dev/null @@ -1,28 +0,0 @@ -const PREFIX = "snaapi_queue" as const; - -export const Keys = { - /** Failed job by ID: ["snaapi_queue", "failed", queue, id] */ - failed(queue: string, id: string): Deno.KvKey { - return [PREFIX, "failed", queue, id]; - }, - - /** Prefix for all failed jobs in a queue */ - failedPrefix(queue: string): Deno.KvKey { - return [PREFIX, "failed", queue]; - }, - - /** Prefix for all failed jobs across all queues */ - failedAll(): Deno.KvKey { - return [PREFIX, "failed"]; - }, - - /** Unique job lock: ["snaapi_queue", "unique", uniqueKey] */ - unique(uniqueKey: string): Deno.KvKey { - return [PREFIX, "unique", uniqueKey]; - }, - - /** Undelivered fallback key: ["snaapi_queue", "undelivered", id] */ - undelivered(id: string): Deno.KvKey { - return [PREFIX, "undelivered", id]; - }, -} as const; diff --git a/src/middleware.ts b/src/middleware.ts index 78f1e70..a846350 100644 --- a/src/middleware.ts +++ b/src/middleware.ts @@ -1,5 +1,4 @@ import type { JobContext, JobMiddleware } from "./types.ts"; -import { Keys } from "./keys.ts"; /** Execute a middleware chain, then the final handler. */ export async function runMiddlewareChain( @@ -20,61 +19,36 @@ export async function runMiddlewareChain( await next(); } -/** Rate-limit job execution within a time window using KV counters. */ +/** Rate-limit job execution within a time window. */ export function rateLimit(opts: { key: string; maxPerWindow: number; windowMs: number; }): JobMiddleware { return async (ctx, _payload, next) => { - const windowKey = [ - "snaapi_queue", - "ratelimit", - opts.key, - Math.floor(Date.now() / opts.windowMs), - ]; - const counter = await ctx.kv.get(windowKey); - const current = counter.value ? Number(counter.value) : 0; - - if (current >= opts.maxPerWindow) { + const count = await ctx.counters.increment(opts.key, opts.windowMs); + if (count > opts.maxPerWindow) { throw new Error( - `Rate limit exceeded for "${opts.key}": ${current}/${opts.maxPerWindow}`, + `Rate limit exceeded for "${opts.key}": ${count}/${opts.maxPerWindow}`, ); } - - const atomic = ctx.kv.atomic() - .sum(windowKey, 1n) - .commit(); - await atomic; await next(); }; } -/** Prevent overlapping execution of a job. Acquires a KV lock, skips if already held. */ +/** Prevent overlapping execution of a job. Skips silently if the lock is held. */ export function withoutOverlapping(opts: { key: string; expiresIn?: number; }): JobMiddleware { - const ttl = opts.expiresIn ?? 300_000; // 5 min default - + const ttl = opts.expiresIn ?? 300_000; return async (ctx, _payload, next) => { - const lockKey = Keys.unique(`overlap:${opts.key}`); - - // Try to acquire lock atomically — only succeeds if key doesn't exist - const result = await ctx.kv.atomic() - .check({ key: lockKey, versionstamp: null }) - .set(lockKey, true, { expireIn: ttl }) - .commit(); - - if (!result.ok) { - // Lock held — skip this execution silently - return; - } - + const acquired = await ctx.locks.acquire(opts.key, ttl); + if (!acquired) return; try { await next(); } finally { - await ctx.kv.delete(lockKey); + await ctx.locks.release(opts.key); } }; } diff --git a/src/queue.ts b/src/queue.ts index 38c017b..9495246 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -3,18 +3,20 @@ import { PendingDispatch } from "./dispatcher.ts"; import { FailedJobStore } from "./failed.ts"; import { dispatchChain } from "./chain.ts"; import { createListener } from "./worker.ts"; +import type { Listener, QueueDriver } from "./drivers/types.ts"; /** The main queue interface. Register jobs, dispatch work, and start listening. */ export class Queue { - #kv: Deno.Kv; + #driver: QueueDriver; #handlers = new Map(); #middlewareMap = new Map(); #globalMiddleware: JobMiddleware[] = []; #failedStore: FailedJobStore; + #listener?: Listener; - constructor(kv: Deno.Kv) { - this.#kv = kv; - this.#failedStore = new FailedJobStore(kv); + constructor(driver: QueueDriver) { + this.#driver = driver; + this.#failedStore = new FailedJobStore(this.#driver); } /** Register a named job handler. */ @@ -36,29 +38,43 @@ export class Queue { return this; } - /** Dispatch a job. Returns a chainable builder — call .send() to enqueue. */ + /** Dispatch a job. Returns a chainable builder. Call .send() to enqueue. */ dispatch(jobName: string, payload: T): PendingDispatch { - return new PendingDispatch(this.#kv, jobName, payload); + return new PendingDispatch(this.#driver, jobName, payload); } /** Dispatch a chain of jobs to run sequentially. */ async chain(steps: JobChainStep[]): Promise { - await dispatchChain(this.#kv, steps); + await dispatchChain(this.#driver, steps); } /** Start listening for queue messages. Call once per worker process. */ listen(): void { - const listener = createListener({ - kv: this.#kv, + const handler = createListener({ + driver: this.#driver, handlers: this.#handlers, middlewareMap: this.#middlewareMap, globalMiddleware: this.#globalMiddleware, }); - this.#kv.listenQueue(listener); + this.#listener = this.#driver.listen(handler); + } + + /** Stop the listener and release driver resources. */ + async close(): Promise { + if (this.#listener) { + await this.#listener.stop(); + this.#listener = undefined; + } + await this.#driver.close(); } /** Access the failed job store for inspection and retry. */ get failed(): FailedJobStore { return this.#failedStore; } + + /** Underlying driver. Exposed for advanced use cases. */ + get driver(): QueueDriver { + return this.#driver; + } } diff --git a/src/types.ts b/src/types.ts index 82033e0..81a5531 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,5 @@ +import type { CounterDriver, LockDriver } from "./drivers/types.ts"; + /** The core job interface. Consumers implement this to define job handlers. */ export interface JobHandler { handle(payload: T, ctx: JobContext): Promise | void; @@ -5,33 +7,35 @@ export interface JobHandler { /** Context passed to every job handler during execution. */ export interface JobContext { - /** The job name as registered */ + /** The job name as registered. */ jobName: string; - /** Which attempt this is (1-based) */ + /** Which attempt this is (1-based). */ attempt: number; - /** Maximum attempts configured */ + /** Maximum attempts configured. */ maxAttempts: number; - /** The queue name this job was dispatched to */ + /** The queue name this job was dispatched to. */ queue: string; - /** Unique ID for this job dispatch */ + /** Unique ID for this job dispatch. */ id: string; - /** Access to the KV store for job-level operations */ - kv: Deno.Kv; + /** Lock primitive used by `withoutOverlapping` and available to user middleware. */ + locks: LockDriver; + /** Counter primitive used by `rateLimit` and available to user middleware. */ + counters: CounterDriver; } /** Options when dispatching a job. */ export interface DispatchOptions { - /** Delay in milliseconds before delivery */ + /** Delay in milliseconds before delivery. */ delay?: number; - /** Named queue (default: "default") */ + /** Named queue (default: "default"). */ queue?: string; - /** Max attempts before moving to failed jobs (default: 3) */ + /** Max attempts before moving to failed jobs (default: 3). */ maxAttempts?: number; - /** Backoff delays in ms between retries (default: [1000, 5000, 30000]) */ + /** Backoff delays in ms between retries (default: [1000, 5000, 30000]). */ backoffSchedule?: number[]; - /** Unique key — if set, prevents duplicate jobs with same key */ + /** Unique key. If set, prevents duplicate jobs with the same key. */ uniqueKey?: string; - /** TTL for uniqueness lock in ms (default: 300000 = 5 min) */ + /** TTL for uniqueness lock in ms (default: 300000 = 5 min). */ uniqueTtl?: number; } @@ -49,7 +53,7 @@ export interface JobChainStep { options?: Omit; } -/** A failed job record stored in KV. */ +/** A failed job record stored in the backing store. */ export interface FailedJob { id: string; jobName: string; diff --git a/src/worker.ts b/src/worker.ts index 82e2315..f37498c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,33 +1,33 @@ -import type { JobContext, JobHandler, JobMiddleware } from "./types.ts"; -import { isQueueEnvelope, type QueueEnvelope } from "./envelope.ts"; -import { Keys } from "./keys.ts"; +import type { + FailedJob, + JobContext, + JobHandler, + JobMiddleware, +} from "./types.ts"; +import type { QueueEnvelope } from "./envelope.ts"; +import type { QueueDriver } from "./drivers/types.ts"; import { runMiddlewareChain } from "./middleware.ts"; -import { storeFailed } from "./failed.ts"; import { enqueueChainStep } from "./chain.ts"; export interface WorkerConfig { - kv: Deno.Kv; + driver: QueueDriver; handlers: Map; middlewareMap: Map; globalMiddleware: JobMiddleware[]; } -/** Create the listener function to pass to kv.listenQueue(). */ +/** Build the per-envelope handler the driver invokes for each delivered job. */ export function createListener( config: WorkerConfig, -): (msg: unknown) => Promise { - const { kv, handlers, middlewareMap, globalMiddleware } = config; +): (envelope: QueueEnvelope) => Promise { + const { driver, handlers, middlewareMap, globalMiddleware } = config; - return async (msg: unknown) => { - // Ignore messages not from this library - if (!isQueueEnvelope(msg)) return; - - const envelope: QueueEnvelope = msg; + return async (envelope: QueueEnvelope) => { const handler = handlers.get(envelope.jobName); if (!handler) { await storeFailed( - kv, + driver, envelope, `No handler registered for "${envelope.jobName}"`, ); @@ -40,10 +40,10 @@ export function createListener( maxAttempts: envelope.maxAttempts, queue: envelope.queue, id: envelope.id, - kv, + locks: driver.locks, + counters: driver.counters, }; - // Build middleware chain: global first, then per-job const jobMw = middlewareMap.get(envelope.jobName) ?? []; const allMiddleware = [...globalMiddleware, ...jobMw]; @@ -57,18 +57,17 @@ export function createListener( }, ); - // Success: clear unique lock if present + // Success: clear unique lock if present. if (envelope.uniqueKey) { - await kv.delete(Keys.unique(envelope.uniqueKey)); + await driver.clearUniqueLock(envelope.uniqueKey); } - // Advance chain if there are remaining steps + // Advance chain. if (envelope.chain && envelope.chain.length > 0) { - await enqueueChainStep(kv, envelope.chain); + await enqueueChainStep(driver, envelope.chain); } } catch (error) { if (envelope.attempt < envelope.maxAttempts) { - // Re-enqueue with incremented attempt and backoff delay const backoffIndex = envelope.attempt - 1; const delay = envelope.backoffSchedule[backoffIndex] ?? envelope.backoffSchedule[envelope.backoffSchedule.length - 1] ?? @@ -78,19 +77,34 @@ export function createListener( ...envelope, attempt: envelope.attempt + 1, }; - await kv.enqueue(retryEnvelope, { delay }); + await driver.enqueue(retryEnvelope, { delay }); } else { - // Max attempts exhausted: store as failed const errorMessage = error instanceof Error ? error.message : String(error); - await storeFailed(kv, envelope, errorMessage); + await storeFailed(driver, envelope, errorMessage); - // Clear unique lock so the job can be retried manually if (envelope.uniqueKey) { - await kv.delete(Keys.unique(envelope.uniqueKey)); + await driver.clearUniqueLock(envelope.uniqueKey); } } } }; } + +async function storeFailed( + driver: QueueDriver, + envelope: QueueEnvelope, + error: string, +): Promise { + const record: FailedJob = { + id: envelope.id, + jobName: envelope.jobName, + queue: envelope.queue, + payload: envelope.payload, + error, + failedAt: Date.now(), + attempts: envelope.attempt, + }; + await driver.failed.store(record, envelope); +} diff --git a/tests/queue_test.ts b/tests/queue_test.ts index 2c7db62..ed74dd9 100644 --- a/tests/queue_test.ts +++ b/tests/queue_test.ts @@ -1,19 +1,62 @@ import { assertEquals } from "@std/assert"; -import { Queue } from "../mod.ts"; +import { PostgresDriver, Queue } from "../mod.ts"; import type { JobHandler } from "../mod.ts"; -async function withKv(fn: (kv: Deno.Kv) => Promise) { - const kv = await Deno.openKv(":memory:"); +const DATABASE_URL = Deno.env.get("DATABASE_URL"); + +if (!DATABASE_URL) { + console.warn( + "DATABASE_URL not set; skipping queue tests. " + + "Run `docker compose up -d postgres && DATABASE_URL=postgres://queue:queue@localhost:5444/queue deno task test`.", + ); +} + +async function withPg( + fn: (queue: Queue, driver: PostgresDriver) => Promise, + opts: { concurrency?: number } = {}, +) { + if (!DATABASE_URL) return; + const driver = new PostgresDriver({ + connectionString: DATABASE_URL, + pollIntervalMs: 100, + concurrency: opts.concurrency ?? 1, + }); + await driver.migrate(); + await truncate(); + const queue = new Queue(driver); try { - await fn(kv); + await fn(queue, driver); } finally { - kv.close(); + await queue.close(); } } -Deno.test("dispatch and process a simple job", async () => { - await withKv(async (kv) => { - // Arrange +const pgTest = (name: string, fn: () => Promise) => + Deno.test({ + name, + fn, + // npm:pg holds keep-alive timers and connection pools; close() drains + // the pool, but Deno's leak detector still flags transient timers. + sanitizeOps: false, + sanitizeResources: false, + }); + +async function truncate() { + const { default: pg } = await import("pg"); + const client = new pg.Client({ connectionString: DATABASE_URL }); + await client.connect(); + try { + await client.query( + `TRUNCATE snaapi_jobs, snaapi_failed_jobs, + snaapi_locks, snaapi_counters`, + ); + } finally { + await client.end(); + } +} + +pgTest("dispatch and process a simple job", async () => { + await withPg(async (queue) => { const results: string[] = []; const done = Promise.withResolvers(); const greetJob: JobHandler<{ name: string }> = { @@ -22,22 +65,17 @@ Deno.test("dispatch and process a simple job", async () => { done.resolve(); }, }; - const queue = new Queue(kv); queue.register("greet", greetJob); queue.listen(); - // Act await queue.dispatch("greet", { name: "World" }).send(); await done.promise; - - // Assert assertEquals(results, ["Hello World"]); }); }); -Deno.test("dispatch with delay", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("dispatch with delay", async () => { + await withPg(async (queue) => { const done = Promise.withResolvers(); let receivedAt = 0; const job: JobHandler = { @@ -46,87 +84,77 @@ Deno.test("dispatch with delay", async () => { done.resolve(); }, }; - const queue = new Queue(kv); queue.register("delayed", job); queue.listen(); - // Act const sentAt = Date.now(); - await queue.dispatch("delayed", null).delay(100).send(); + await queue.dispatch("delayed", null).delay(300).send(); await done.promise; - - // Assert const elapsed = receivedAt - sentAt; assertEquals( - elapsed >= 50, + elapsed >= 250, true, - `Expected delay >= 50ms, got ${elapsed}ms`, + `Expected delay >= 250ms, got ${elapsed}ms`, ); }); }); -Deno.test("job context is populated correctly", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("job context is populated correctly", async () => { + await withPg(async (queue) => { const done = Promise.withResolvers(); - let capturedCtx: Record = {}; + let captured: Record = {}; const job: JobHandler = { handle(_payload, ctx) { - capturedCtx = { + captured = { jobName: ctx.jobName, attempt: ctx.attempt, maxAttempts: ctx.maxAttempts, queue: ctx.queue, hasId: typeof ctx.id === "string" && ctx.id.length > 0, - hasKv: ctx.kv !== undefined, + hasLocks: typeof ctx.locks?.acquire === "function", + hasCounters: typeof ctx.counters?.increment === "function", }; done.resolve(); }, }; - const queue = new Queue(kv); - queue.register("ctx-test", job); + queue.register("ctx", job); queue.listen(); - // Act - await queue.dispatch("ctx-test", "data").onQueue("high").attempts(5).send(); + await queue.dispatch("ctx", "data").onQueue("high").attempts(5).send(); await done.promise; - // Assert - assertEquals(capturedCtx.jobName, "ctx-test"); - assertEquals(capturedCtx.attempt, 1); - assertEquals(capturedCtx.maxAttempts, 5); - assertEquals(capturedCtx.queue, "high"); - assertEquals(capturedCtx.hasId, true); - assertEquals(capturedCtx.hasKv, true); + assertEquals(captured.jobName, "ctx"); + assertEquals(captured.attempt, 1); + assertEquals(captured.maxAttempts, 5); + assertEquals(captured.queue, "high"); + assertEquals(captured.hasId, true); + assertEquals(captured.hasLocks, true); + assertEquals(captured.hasCounters, true); }); }); -Deno.test("failed job is stored after max attempts", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("failed job is stored after max attempts", async () => { + await withPg(async (queue) => { const done = Promise.withResolvers(); let attemptCount = 0; const failingJob: JobHandler = { handle() { attemptCount++; if (attemptCount >= 2) { - setTimeout(() => done.resolve(), 100); + setTimeout(() => done.resolve(), 200); } throw new Error("always fails"); }, }; - const queue = new Queue(kv); queue.register("fail-job", failingJob); queue.listen(); - // Act await queue.dispatch("fail-job", "test") .attempts(2) .backoff([50]) .send(); await done.promise; - // Assert const failed = await queue.failed.list("default"); assertEquals(failed.length, 1); assertEquals(failed[0].jobName, "fail-job"); @@ -135,9 +163,8 @@ Deno.test("failed job is stored after max attempts", async () => { }); }); -Deno.test("global middleware runs for all jobs", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("global middleware runs for all jobs", async () => { + await withPg(async (queue) => { const done = Promise.withResolvers(); const order: string[] = []; const job: JobHandler = { @@ -146,69 +173,24 @@ Deno.test("global middleware runs for all jobs", async () => { done.resolve(); }, }; - const queue = new Queue(kv); queue.use(async (_ctx, _payload, next) => { order.push("middleware-before"); await next(); order.push("middleware-after"); }); - queue.register("mw-test", job); + queue.register("mw", job); queue.listen(); - // Act - await queue.dispatch("mw-test", null).send(); + await queue.dispatch("mw", null).send(); await done.promise; - await new Promise((r) => setTimeout(r, 50)); + await new Promise((r) => setTimeout(r, 100)); - // Assert assertEquals(order, ["middleware-before", "handler", "middleware-after"]); }); }); -Deno.test("job-specific middleware only runs for that job", async () => { - await withKv(async (kv) => { - // Arrange - const done1 = Promise.withResolvers(); - const done2 = Promise.withResolvers(); - const logs: string[] = []; - const jobA: JobHandler = { - handle() { - logs.push("jobA"); - done1.resolve(); - }, - }; - const jobB: JobHandler = { - handle() { - logs.push("jobB"); - done2.resolve(); - }, - }; - const queue = new Queue(kv); - queue.register("a", jobA); - queue.register("b", jobB); - queue.middleware("a", async (_ctx, _payload, next) => { - logs.push("mw-a"); - await next(); - }); - queue.listen(); - - // Act - await queue.dispatch("a", null).send(); - await done1.promise; - await queue.dispatch("b", null).send(); - await done2.promise; - - // Assert - assertEquals(logs.includes("mw-a"), true); - assertEquals(logs.includes("jobA"), true); - assertEquals(logs.includes("jobB"), true); - assertEquals(logs.indexOf("mw-a") < logs.indexOf("jobA"), true); - }); -}); - -Deno.test("job chain executes in order", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("job chain executes in order", async () => { + await withPg(async (queue) => { const results: string[] = []; const done = Promise.withResolvers(); const stepHandler = ( @@ -219,13 +201,11 @@ Deno.test("job chain executes in order", async () => { resolve?.(); }, }); - const queue = new Queue(kv); queue.register("step1", stepHandler()); queue.register("step2", stepHandler()); queue.register("step3", stepHandler(done.resolve)); queue.listen(); - // Act await queue.chain([ { jobName: "step1", payload: { step: 1 } }, { jobName: "step2", payload: { step: 2 } }, @@ -233,14 +213,12 @@ Deno.test("job chain executes in order", async () => { ]); await done.promise; - // Assert assertEquals(results, ["step-1", "step-2", "step-3"]); }); }); -Deno.test("unique job prevents duplicates", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("unique job prevents duplicates", async () => { + await withPg(async (queue) => { let callCount = 0; const done = Promise.withResolvers(); const job: JobHandler = { @@ -249,24 +227,20 @@ Deno.test("unique job prevents duplicates", async () => { done.resolve(); }, }; - const queue = new Queue(kv); queue.register("unique-job", job); - // Act — dispatch twice with same unique key before listening await queue.dispatch("unique-job", null).unique("same-key", 60_000).send(); await queue.dispatch("unique-job", null).unique("same-key", 60_000).send(); queue.listen(); await done.promise; - await new Promise((r) => setTimeout(r, 200)); + await new Promise((r) => setTimeout(r, 300)); - // Assert assertEquals(callCount, 1); }); }); -Deno.test("failed job can be retried", async () => { - await withKv(async (kv) => { - // Arrange +pgTest("failed job can be retried", async () => { + await withPg(async (queue) => { let attemptCount = 0; const failDone = Promise.withResolvers(); const retryDone = Promise.withResolvers(); @@ -274,32 +248,201 @@ Deno.test("failed job can be retried", async () => { handle() { attemptCount++; if (attemptCount === 1) { - setTimeout(() => failDone.resolve(), 100); + setTimeout(() => failDone.resolve(), 200); throw new Error("first try fails"); } retryDone.resolve(); }, }; - const queue = new Queue(kv); queue.register("retry-job", job); queue.listen(); - // Act — dispatch and wait for initial failure await queue.dispatch("retry-job", "data").attempts(1).send(); await failDone.promise; const failed = await queue.failed.list("default"); - - // Assert — verify failure was recorded assertEquals(failed.length, 1); - // Act — retry the failed job const retried = await queue.failed.retry(failed[0].id, "default"); await retryDone.promise; - // Assert — verify retry succeeded and store is cleared assertEquals(retried, true); assertEquals(attemptCount, 2); const remaining = await queue.failed.list("default"); assertEquals(remaining.length, 0); }); }); + +pgTest("rate limit middleware throws when exceeded", async () => { + await withPg(async (queue) => { + const { rateLimit } = await import("../mod.ts"); + const done = Promise.withResolvers(); + let attempts = 0; + const job: JobHandler = { + handle() { + attempts++; + }, + }; + queue.register("rl", job); + queue.middleware( + "rl", + rateLimit({ key: "rl-test", maxPerWindow: 1, windowMs: 60_000 }), + ); + queue.listen(); + + // First dispatch succeeds (count=1, limit=1). + await queue.dispatch("rl", null).attempts(1).send(); + // Second dispatch trips the limit (count=2 > limit=1) and goes to failed. + await queue.dispatch("rl", null).attempts(1).send(); + + // Wait for both to settle. + const deadline = Date.now() + 3_000; + while (Date.now() < deadline) { + const failed = await queue.failed.list("default"); + if (attempts === 1 && failed.length === 1) { + done.resolve(); + break; + } + await new Promise((r) => setTimeout(r, 50)); + } + await done.promise; + assertEquals(attempts, 1); + }); +}); + +pgTest("schema is auto-migrated on first use", async () => { + if (!DATABASE_URL) return; + // Drop all tables; the driver should recreate them on first call. + const { default: pg } = await import("pg"); + const setup = new pg.Client({ connectionString: DATABASE_URL }); + await setup.connect(); + try { + await setup.query( + `DROP TABLE IF EXISTS snaapi_jobs, snaapi_failed_jobs, + snaapi_locks, snaapi_counters CASCADE`, + ); + } finally { + await setup.end(); + } + + const driver = new PostgresDriver({ + connectionString: DATABASE_URL, + pollIntervalMs: 100, + // autoMigrate defaults to true. + }); + const queue = new Queue(driver); + try { + const done = Promise.withResolvers(); + queue.register("auto", { handle: () => done.resolve() }); + queue.listen(); + await queue.dispatch("auto", null).send(); + await done.promise; + } finally { + await queue.close(); + } +}); + +pgTest("custom tablePrefix creates and uses prefixed tables", async () => { + if (!DATABASE_URL) return; + const prefix = "qtest_"; + const { default: pg } = await import("pg"); + + const setup = new pg.Client({ connectionString: DATABASE_URL }); + await setup.connect(); + try { + await setup.query( + `DROP TABLE IF EXISTS ${prefix}jobs, ${prefix}failed_jobs, + ${prefix}locks, ${prefix}counters CASCADE`, + ); + } finally { + await setup.end(); + } + + const driver = new PostgresDriver({ + connectionString: DATABASE_URL, + pollIntervalMs: 100, + tablePrefix: prefix, + }); + const queue = new Queue(driver); + try { + const done = Promise.withResolvers(); + queue.register("prefixed", { handle: () => done.resolve() }); + queue.listen(); + await queue.dispatch("prefixed", null).send(); + await done.promise; + + // Verify the prefixed tables exist (and the default ones were not + // touched by this driver instance). + const verify = new pg.Client({ connectionString: DATABASE_URL }); + await verify.connect(); + try { + const res = await verify.query( + `SELECT table_name FROM information_schema.tables + WHERE table_name LIKE $1 ORDER BY table_name`, + [`${prefix}%`], + ); + const names = (res.rows as Array<{ table_name: string }>) + .map((r) => r.table_name).sort(); + assertEquals(names, [ + `${prefix}counters`, + `${prefix}failed_jobs`, + `${prefix}jobs`, + `${prefix}locks`, + ]); + } finally { + await verify.end(); + } + } finally { + await queue.close(); + // Clean up so the test is repeatable. + const cleanup = new pg.Client({ connectionString: DATABASE_URL }); + await cleanup.connect(); + try { + await cleanup.query( + `DROP TABLE IF EXISTS ${prefix}jobs, ${prefix}failed_jobs, + ${prefix}locks, ${prefix}counters CASCADE`, + ); + } finally { + await cleanup.end(); + } + } +}); + +pgTest("withoutOverlapping skips concurrent runs", async () => { + await withPg(async (queue) => { + const { withoutOverlapping } = await import("../mod.ts"); + let active = 0; + let peak = 0; + let started = 0; + let completed = 0; + const inflight = Promise.withResolvers(); + const job: JobHandler = { + async handle() { + started++; + active++; + peak = Math.max(peak, active); + await inflight.promise; + active--; + completed++; + }, + }; + queue.register("solo", job); + queue.middleware( + "solo", + withoutOverlapping({ key: "solo-test", expiresIn: 60_000 }), + ); + queue.listen(); + + // Two dispatches enqueued together; both pulled by the concurrency=2 + // consumer at roughly the same time. Lock arbitration happens inside + // the middleware: only one wins, the other returns silently. + await queue.dispatch("solo", null).send(); + await queue.dispatch("solo", null).send(); + await new Promise((r) => setTimeout(r, 400)); + inflight.resolve(); + await new Promise((r) => setTimeout(r, 200)); + + assertEquals(peak, 1); + assertEquals(started, 1); + assertEquals(completed, 1); + }, { concurrency: 2 }); +});