-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigrate.go
More file actions
49 lines (43 loc) · 1.57 KB
/
migrate.go
File metadata and controls
49 lines (43 loc) · 1.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package pgqueue
import (
"context"
"embed"
"fmt"
)
// Migrations exports the raw goose-formatted migration files for use with
// goose or compatible tooling.
//
// WARNING: Do not pass these files directly to pgx Conn.Exec — they contain
// both Up and Down sections. Use Migrate() for programmatic schema setup.
var Migrations embed.FS
//go:embed migrations/*.sql
var migrationsFS embed.FS
func init() { Migrations = migrationsFS }
const migrationDDL = `
CREATE TABLE IF NOT EXISTS pgqueue_tasks (
id BIGSERIAL PRIMARY KEY,
queue VARCHAR(128) NOT NULL,
status SMALLINT NOT NULL DEFAULT 0,
try SMALLINT NOT NULL DEFAULT 0,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_ran_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
claimed_at TIMESTAMPTZ,
lease_until TIMESTAMPTZ,
claim_token VARCHAR(36),
payload BYTEA,
hash VARCHAR(128)
);
CREATE INDEX IF NOT EXISTS pgqueue_tasks_dequeue_idx ON pgqueue_tasks (queue, status, run_at);
CREATE UNIQUE INDEX IF NOT EXISTS pgqueue_tasks_hash_uniq ON pgqueue_tasks (queue, hash);
CREATE INDEX IF NOT EXISTS pgqueue_tasks_reaper_idx ON pgqueue_tasks (status, lease_until) WHERE status = 1;
`
// Migrate creates the pgqueue_tasks table and indexes if they don't exist.
// Safe to call multiple times — all statements use IF NOT EXISTS.
func Migrate(ctx context.Context, q *Queue) error {
_, err := q.tasks.DB.Conn.Exec(ctx, migrationDDL)
if err != nil {
return fmt.Errorf("pgqueue: migrate: %w", err)
}
return nil
}