Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 29 additions & 35 deletions packages/database-jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ pgpm deploy --createdb --database mydb1

The `app_jobs.jobs` table stores active jobs with the following key fields:
- `id`: Unique job identifier
- `database_id`: Database/tenant identifier
- `database_id`: Database/tenant identifier (nullable, read from JWT claims internally)
- `actor_id`: User who triggered the job (nullable, read from JWT claims internally)
- `queue_name`: Optional queue name (default: NULL)
- `task_identifier`: Job type/handler name
- `payload`: JSON data for the job
- `priority`: Lower numbers = higher priority (default: 0)
Expand All @@ -96,6 +98,7 @@ The `app_jobs.jobs` table stores active jobs with the following key fields:
- `max_attempts`: Maximum retry attempts (default: 25)
- `locked_by`: Worker ID that locked this job
- `locked_at`: When the job was locked
- `is_available`: Generated column (`locked_at IS NULL AND attempts < max_attempts`) — used by partial indexes for fast dequeue
- `key`: Optional unique key for upsert semantics

### Scheduled Jobs Table
Expand All @@ -106,21 +109,27 @@ The `app_jobs.scheduled_jobs` table stores recurring jobs with cron-style or rul

The `app_jobs.job_queues` table tracks queue statistics and locking state.

### Performance

The job queue uses partial covering indexes for fast dequeue:
- `jobs_main_index`: `btree (priority, run_at) INCLUDE (id, queue_name) WHERE is_available = true` — only indexes pending jobs
- `jobs_no_queue_index`: `btree (priority, run_at) INCLUDE (id) WHERE is_available = true AND queue_name IS NULL` — optimizes the common case (no named queue)

The `is_available` generated column automatically maintains index membership as jobs complete. `database_id` and `actor_id` have **no indexes** — they are envelope metadata, not used in the queue's hot path.

## Usage

### Adding Jobs

```sql
-- Add a simple job
-- Add a simple job (database_id and actor_id are read from JWT claims automatically)
SELECT app_jobs.add_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'send_email',
payload := '{"to": "user@example.com", "subject": "Hello"}'::json
);

-- Add a job with priority and delayed execution
SELECT app_jobs.add_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'generate_report',
payload := '{"report_id": 123}'::json,
run_at := now() + interval '1 hour',
Expand All @@ -130,7 +139,6 @@ SELECT app_jobs.add_job(

-- Add a job with a unique key (upsert semantics)
SELECT app_jobs.add_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'daily_summary',
payload := '{"date": "2025-01-15"}'::json,
job_key := 'daily_summary_2025_01_15',
Expand Down Expand Up @@ -176,26 +184,9 @@ SELECT app_jobs.fail_job(
### Scheduled Jobs

```sql
-- Schedule a job with cron-style timing
INSERT INTO app_jobs.scheduled_jobs (
database_id,
task_identifier,
payload,
schedule_info
) VALUES (
'5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
'cleanup_old_data',
'{"days": 30}'::json,
'{
"hour": [2],
"minute": [0],
"dayOfWeek": [0, 1, 2, 3, 4, 5, 6]
}'::json
);

-- Schedule a job with a rule (every minute for 3 minutes)
-- database_id and actor_id are read from JWT claims automatically
SELECT app_jobs.add_scheduled_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'heartbeat',
payload := '{}'::json,
schedule_info := json_build_object(
Expand All @@ -213,14 +204,13 @@ SELECT * FROM app_jobs.run_scheduled_job(scheduled_job_id := 1);

### app_jobs.add_job(...)

Adds a new job to the queue or updates an existing job if a key is provided.
Adds a new job to the queue or updates an existing job if a key is provided. `database_id` and `actor_id` are read internally from JWT claims — no need to pass them.

**Parameters:**
- `db_id` (uuid): Database/tenant identifier
- `identifier` (text): Job type/handler name
- `payload` (json): Job data (default: `{}`)
- `job_key` (text): Optional unique key for upsert (default: NULL)
- `queue_name` (text): Optional queue name (default: random UUID)
- `queue_name` (text): Optional queue name (default: NULL)
- `run_at` (timestamptz): When to run (default: now())
- `max_attempts` (integer): Maximum retries (default: 25)
- `priority` (integer): Job priority (default: 0)
Expand All @@ -234,7 +224,7 @@ Adds a new job to the queue or updates an existing job if a key is provided.

### app_jobs.get_job(...)

Fetches and locks the next available job for a worker.
Fetches and locks the next available job for a worker. Uses partial indexes on `is_available = true` for fast dequeue.

**Parameters:**
- `worker_id` (text): Unique worker identifier
Expand All @@ -244,7 +234,7 @@ Fetches and locks the next available job for a worker.
**Returns:** `app_jobs.jobs` row or NULL

**Behavior:**
- Selects jobs by priority, run_at, and id
- Selects jobs by priority, run_at, and id using partial covering indexes
- Locks the job and its queue
- Increments attempt counter
- Uses `FOR UPDATE SKIP LOCKED` for concurrency
Expand Down Expand Up @@ -277,17 +267,16 @@ Marks a job as failed and schedules retry if attempts remain.

### app_jobs.add_scheduled_job(...)

Creates a scheduled job with cron-style or rule-based timing.
Creates a scheduled job with cron-style or rule-based timing. `database_id` and `actor_id` are read internally from JWT claims.

**Parameters:**
- `db_id` (uuid): Database/tenant identifier
- `identifier` (text): Job type/handler name
- `payload` (json): Job data
- `schedule_info` (json): Scheduling configuration
- `payload` (json): Job data (default: `{}`)
- `schedule_info` (json): Scheduling configuration (default: `{}`)
- `job_key` (text): Optional unique key
- `queue_name` (text): Optional queue name
- `max_attempts` (integer): Maximum retries
- `priority` (integer): Job priority
- `max_attempts` (integer): Maximum retries (default: 25)
- `priority` (integer): Job priority (default: 0)

**Returns:** `app_jobs.scheduled_jobs` row

Expand Down Expand Up @@ -325,10 +314,15 @@ END LOOP;
The package includes several triggers for automatic management:

- **timestamps**: Automatically sets created_at/updated_at
- **notify_worker**: Sends LISTEN/NOTIFY events when jobs are added
- **notify_worker**: Statement-level NOTIFY trigger — sends a single `jobs:insert` notification per statement (not per row)
- **increase_job_queue_count**: Updates queue statistics on insert
- **decrease_job_queue_count**: Updates queue statistics on delete/update

### Additional Functions

- **`app_jobs.remove_job(job_key text)`**: Removes a job by its key
- **`app_jobs.force_unlock_workers(worker_ids text[])`**: Forcefully unlocks all jobs held by the specified workers

## Dependencies

- PGPM roles (anonymous, authenticated, administrator)
Expand Down
Loading