Fastify plugin for pg-boss. It registers
typed queues, schedules, and workers from plugin options, and decorates the
Fastify instance with the real PgBoss object.
The plugin keeps pg-boss itself visible. Use fastify.pgBoss or getPgBoss(app)
to call the full pg-boss API directly.
- Node.js
>=22.12.0 - Fastify
^5.0.0 - pg-boss
^12.15.0 - PostgreSQL supported by pg-boss
npm install fastify-pg-boss pg-bossimport Fastify from 'fastify'
import fastifyPgBoss, { definePgBossWorker, getPgBoss } from 'fastify-pg-boss'
const app = Fastify({ logger: true })
type EmailJob = {
userId: string
}
await app.register(fastifyPgBoss, {
connectionString: process.env.POSTGRES_URL,
workers: [
definePgBossWorker<EmailJob>({
name: 'email-worker',
queue: 'email/send',
createQueue: true,
options: {
pollingIntervalSeconds: 10,
},
async handler(jobs) {
for (const job of jobs) {
app.log.info({ jobId: job.id, userId: job.data.userId }, 'sending email')
}
},
}),
],
})
await getPgBoss(app).send('email/send', {
userId: 'user_123',
})When the plugin is enabled, fastify.pgBoss is a PgBoss instance. When the
plugin is disabled, it is decorated as null.
- Creates or accepts a
PgBossinstance. - Starts pg-boss during plugin registration by default.
- Registers queues, schedules, and workers in that order.
- Lets worker factories receive the Fastify instance.
- Logs pg-boss
errorevents throughfastify.logby default. - Calls
offWorkfor registered workers and stops pg-boss from FastifyonCloseby default. - Leaves the complete pg-boss API available through the original instance.
Provide one of connectionString, constructorOptions, or boss.
await app.register(fastifyPgBoss, {
connectionString: process.env.POSTGRES_URL,
})await app.register(fastifyPgBoss, {
constructorOptions: {
connectionString: process.env.POSTGRES_URL,
schema: 'jobs',
},
})import { PgBoss } from 'pg-boss'
const boss = new PgBoss({
connectionString: process.env.POSTGRES_URL,
})
await app.register(fastifyPgBoss, {
boss,
})boss may also be a connection string, pg-boss constructor options, an existing
PgBoss instance, or a factory that receives the Fastify instance.
await app.register(fastifyPgBoss, {
boss: async (app) => {
app.log.info('creating pg-boss')
return new PgBoss({
connectionString: process.env.POSTGRES_URL,
})
},
})This plugin does not wrap pg-boss methods. Anything available on pg-boss is
available on fastify.pgBoss.
await app.pgBoss?.send('queue-name', { ok: true })
await app.pgBoss?.schedule('queue-name', '0 8 * * *', { source: 'cron' })Use getPgBoss(app) when you want a non-nullable instance or a clear error if
pg-boss is not available.
import { getPgBoss } from 'fastify-pg-boss'
const boss = getPgBoss(app)
await boss.send('queue-name', { ok: true })Use queues when you want the plugin to create queues before schedules and
workers are registered.
await app.register(fastifyPgBoss, {
connectionString: process.env.POSTGRES_URL,
queues: [
'email/send',
{
name: 'reports/daily',
retryLimit: 3,
retryDelay: 30,
},
],
})definePgBossQueue is a typed identity helper for exporting queue definitions
from another file.
import { definePgBossQueue } from 'fastify-pg-boss'
export const emailQueue = definePgBossQueue({
name: 'email/send',
retryLimit: 3,
})Use schedules for standalone pg-boss schedules.
await app.register(fastifyPgBoss, {
connectionString: process.env.POSTGRES_URL,
schedules: [
{
name: 'reports/daily',
cron: '0 8 * * *',
data: {
source: 'schedule',
},
key: 'daily-report',
options: {
tz: 'UTC',
retryLimit: 1,
},
},
],
})Set enabled: false to keep a schedule definition in code without registering
it.
{
name: 'reports/daily',
cron: '0 8 * * *',
enabled: false,
}definePgBossSchedule is a typed identity helper for exported schedule
definitions.
Workers are registered after queues and schedules. A worker uses name as its
queue name unless queue is provided.
import { definePgBossWorker } from 'fastify-pg-boss'
type ReportJob = {
date?: string
}
export const dailyReportWorker = definePgBossWorker<ReportJob>({
name: 'daily-report-worker',
queue: 'reports/daily',
createQueue: true,
queueOptions: {
retryBackoff: true,
retryLimit: 3,
},
options: {
pollingIntervalSeconds: 10,
},
async handler(jobs) {
for (const job of jobs) {
// generate report
}
},
})Set enabled: false to skip worker registration.
Worker handlers receive only the jobs fetched by pg-boss. If a worker needs
Fastify services, pass a worker factory to definePgBossWorker. The factory
runs during plugin registration and receives the Fastify instance.
import type { FastifyInstance } from 'fastify'
import type { Job, WorkHandler } from 'pg-boss'
import { definePgBossWorker } from 'fastify-pg-boss'
type OnThisDayJob = {
date?: string
urlTemplate?: string
}
export function createOnThisDayWorker(app: FastifyInstance): WorkHandler<OnThisDayJob> {
return async (jobs: Job<OnThisDayJob>[]) => {
for (const job of jobs) {
app.log.info({ jobId: job.id, queue: job.name }, 'processing notifications')
}
}
}
export const onThisDayWorker = definePgBossWorker<OnThisDayJob>((app) => ({
name: 'on-this-day',
queue: 'notifications/on-this-day/daily',
createQueue: true,
handler: createOnThisDayWorker(app),
}))A worker can register its own schedule. The schedule defaults to the worker's queue.
definePgBossWorker<ReportJob>({
name: 'daily-report-worker',
queue: 'reports/daily',
createQueue: true,
schedule: {
cron: '0 8 * * *',
data: {},
key: 'daily-report',
tz: 'Asia/Bangkok',
},
async handler(jobs) {
for (const job of jobs) {
// generate scheduled report
}
},
})For the smallest scheduled worker, schedule can be only the cron expression.
definePgBossWorker({
name: 'daily-summary',
createQueue: true,
schedule: '0 8 * * *',
async handler(jobs) {
// process jobs
},
})Use schedule.name when the scheduled queue should differ from the worker queue.
Use schedule.enabled: false to skip registering the schedule.
Set includeMetadata: true when you want pg-boss metadata on fetched jobs.
definePgBossWorker<ReportJob>({
name: 'metadata-worker',
queue: 'reports/metadata',
createQueue: true,
includeMetadata: true,
options: {
batchSize: 1,
includeMetadata: true,
pollingIntervalSeconds: 10,
},
async handler(jobs) {
for (const job of jobs) {
// process job.state, job.priority, and other metadata
}
},
})On Fastify close, the plugin calls offWork for each registered worker and then
stops pg-boss. Use worker-level options to tune that behavior.
definePgBossWorker({
name: 'long-running-worker',
offWorkOptions: {
wait: true,
},
async handler(jobs) {
// process jobs
},
})Set offWorkOnClose: false for workers that should not be passed to
boss.offWork during plugin shutdown.
The plugin logs pg-boss error events through fastify.log.error by default.
Provide events to attach custom handlers. Each handler receives the Fastify
instance first.
await app.register(fastifyPgBoss, {
connectionString: process.env.POSTGRES_URL,
events: {
error(app, error) {
app.log.error({ err: error }, 'custom pg-boss error')
},
warning(app, warning) {
app.log.warn({ warning }, 'pg-boss warning')
},
stopped(app) {
app.log.info('pg-boss stopped')
},
},
})Supported event keys are error, warning, wip, stopped, and bam.
If a custom event handler throws, the plugin catches the failure and logs it as
pg-boss event handler failed.
Set logErrors: false to disable the default error logger.
| Option | Description |
|---|---|
enabled |
Set false to decorate pgBoss as null and skip pg-boss creation and startup. Defaults to true. |
boss |
Existing PgBoss instance, connection string, constructor options, or `(fastify) => PgBoss |
connectionString |
PostgreSQL connection string. Used when boss and constructorOptions are omitted. |
constructorOptions |
pg-boss constructor options. |
start |
Start pg-boss during plugin registration. Defaults to true. |
stopOnClose |
Run worker offWork and PgBoss.stop() in Fastify onClose. Defaults to true. |
stopOptions |
Options passed to PgBoss.stop(). |
queues |
Queue names or pg-boss queue definitions to create before schedules and workers. |
schedules |
Schedule definitions to register before workers. |
workers |
Worker definitions or worker factories to register after queues and schedules. |
events |
Custom pg-boss event handlers. |
logErrors |
Set false to disable the default pg-boss error logger. |
export {
fastifyPgBoss,
definePgBossQueue,
definePgBossSchedule,
definePgBossWorker,
getPgBoss,
}The package also exports TypeScript types for plugin options, worker
definitions, schedules, queues, events, and worker handlers. Import runtime
pg-boss classes, helpers, and job types directly from pg-boss.
Install dependencies:
npm installStart the test PostgreSQL database:
npm run db:upRun typecheck, build, tests, and coverage thresholds:
npm testRun only the TSTyche type tests:
npm run type-testThe test suite expects PostgreSQL at:
postgres://fastify_pg_boss:fastify_pg_boss@localhost:55432/fastify_pg_bossUse POSTGRES_URL to point tests at a different database.
Stop and remove the test database:
npm run db:downOther useful scripts:
npm run typecheck
npm run type-test
npm run build