diff --git a/packages/database-jobs/README.md b/packages/database-jobs/README.md index 1015d10e..930e00a5 100644 --- a/packages/database-jobs/README.md +++ b/packages/database-jobs/README.md @@ -87,7 +87,9 @@ pgpm deploy --createdb --database mydb1 The `app_jobs.jobs` table stores active jobs with the following key fields: - `id`: Unique job identifier -- `database_id`: Database/tenant identifier +- `database_id`: Database/tenant identifier (nullable, read from JWT claims internally) +- `actor_id`: User who triggered the job (nullable, read from JWT claims internally) +- `queue_name`: Optional queue name (default: NULL) - `task_identifier`: Job type/handler name - `payload`: JSON data for the job - `priority`: Lower numbers = higher priority (default: 0) @@ -96,6 +98,7 @@ The `app_jobs.jobs` table stores active jobs with the following key fields: - `max_attempts`: Maximum retry attempts (default: 25) - `locked_by`: Worker ID that locked this job - `locked_at`: When the job was locked +- `is_available`: Generated column (`locked_at IS NULL AND attempts < max_attempts`) — used by partial indexes for fast dequeue - `key`: Optional unique key for upsert semantics ### Scheduled Jobs Table @@ -106,21 +109,27 @@ The `app_jobs.scheduled_jobs` table stores recurring jobs with cron-style or rul The `app_jobs.job_queues` table tracks queue statistics and locking state. +### Performance + +The job queue uses partial covering indexes for fast dequeue: +- `jobs_main_index`: `btree (priority, run_at) INCLUDE (id, queue_name) WHERE is_available = true` — only indexes pending jobs +- `jobs_no_queue_index`: `btree (priority, run_at) INCLUDE (id) WHERE is_available = true AND queue_name IS NULL` — optimizes the common case (no named queue) + +The `is_available` generated column automatically maintains index membership as jobs complete. `database_id` and `actor_id` have **no indexes** — they are envelope metadata, not used in the queue's hot path. + ## Usage ### Adding Jobs ```sql --- Add a simple job +-- Add a simple job (database_id and actor_id are read from JWT claims automatically) SELECT app_jobs.add_job( - db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid, identifier := 'send_email', payload := '{"to": "user@example.com", "subject": "Hello"}'::json ); -- Add a job with priority and delayed execution SELECT app_jobs.add_job( - db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid, identifier := 'generate_report', payload := '{"report_id": 123}'::json, run_at := now() + interval '1 hour', @@ -130,7 +139,6 @@ SELECT app_jobs.add_job( -- Add a job with a unique key (upsert semantics) SELECT app_jobs.add_job( - db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid, identifier := 'daily_summary', payload := '{"date": "2025-01-15"}'::json, job_key := 'daily_summary_2025_01_15', @@ -176,26 +184,9 @@ SELECT app_jobs.fail_job( ### Scheduled Jobs ```sql --- Schedule a job with cron-style timing -INSERT INTO app_jobs.scheduled_jobs ( - database_id, - task_identifier, - payload, - schedule_info -) VALUES ( - '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid, - 'cleanup_old_data', - '{"days": 30}'::json, - '{ - "hour": [2], - "minute": [0], - "dayOfWeek": [0, 1, 2, 3, 4, 5, 6] - }'::json -); - -- Schedule a job with a rule (every minute for 3 minutes) +-- database_id and actor_id are read from JWT claims automatically SELECT app_jobs.add_scheduled_job( - db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid, identifier := 'heartbeat', payload := '{}'::json, schedule_info := json_build_object( @@ -213,14 +204,13 @@ SELECT * FROM app_jobs.run_scheduled_job(scheduled_job_id := 1); ### app_jobs.add_job(...) -Adds a new job to the queue or updates an existing job if a key is provided. +Adds a new job to the queue or updates an existing job if a key is provided. `database_id` and `actor_id` are read internally from JWT claims — no need to pass them. **Parameters:** -- `db_id` (uuid): Database/tenant identifier - `identifier` (text): Job type/handler name - `payload` (json): Job data (default: `{}`) - `job_key` (text): Optional unique key for upsert (default: NULL) -- `queue_name` (text): Optional queue name (default: random UUID) +- `queue_name` (text): Optional queue name (default: NULL) - `run_at` (timestamptz): When to run (default: now()) - `max_attempts` (integer): Maximum retries (default: 25) - `priority` (integer): Job priority (default: 0) @@ -234,7 +224,7 @@ Adds a new job to the queue or updates an existing job if a key is provided. ### app_jobs.get_job(...) -Fetches and locks the next available job for a worker. +Fetches and locks the next available job for a worker. Uses partial indexes on `is_available = true` for fast dequeue. **Parameters:** - `worker_id` (text): Unique worker identifier @@ -244,7 +234,7 @@ Fetches and locks the next available job for a worker. **Returns:** `app_jobs.jobs` row or NULL **Behavior:** -- Selects jobs by priority, run_at, and id +- Selects jobs by priority, run_at, and id using partial covering indexes - Locks the job and its queue - Increments attempt counter - Uses `FOR UPDATE SKIP LOCKED` for concurrency @@ -277,17 +267,16 @@ Marks a job as failed and schedules retry if attempts remain. ### app_jobs.add_scheduled_job(...) -Creates a scheduled job with cron-style or rule-based timing. +Creates a scheduled job with cron-style or rule-based timing. `database_id` and `actor_id` are read internally from JWT claims. **Parameters:** -- `db_id` (uuid): Database/tenant identifier - `identifier` (text): Job type/handler name -- `payload` (json): Job data -- `schedule_info` (json): Scheduling configuration +- `payload` (json): Job data (default: `{}`) +- `schedule_info` (json): Scheduling configuration (default: `{}`) - `job_key` (text): Optional unique key - `queue_name` (text): Optional queue name -- `max_attempts` (integer): Maximum retries -- `priority` (integer): Job priority +- `max_attempts` (integer): Maximum retries (default: 25) +- `priority` (integer): Job priority (default: 0) **Returns:** `app_jobs.scheduled_jobs` row @@ -325,10 +314,15 @@ END LOOP; The package includes several triggers for automatic management: - **timestamps**: Automatically sets created_at/updated_at -- **notify_worker**: Sends LISTEN/NOTIFY events when jobs are added +- **notify_worker**: Statement-level NOTIFY trigger — sends a single `jobs:insert` notification per statement (not per row) - **increase_job_queue_count**: Updates queue statistics on insert - **decrease_job_queue_count**: Updates queue statistics on delete/update +### Additional Functions + +- **`app_jobs.remove_job(job_key text)`**: Removes a job by its key +- **`app_jobs.force_unlock_workers(worker_ids text[])`**: Forcefully unlocks all jobs held by the specified workers + ## Dependencies - PGPM roles (anonymous, authenticated, administrator)