Non-blocking php client for pgmq. See the extension installation guide.
composer require thesis/pgmqSince you most likely expect exactly-once semantics from a database-based queue, all requests — sending or processing business logic with message acknowledgments — must be transactional.
And the transaction object is short-lived: it cannot be used after rollback() or commit(), so it cannot be made a dependency.
That's why all the API is built on functions that take Amp\Postgres\PostgresLink as their first parameter, which can be either a transaction object or just a connection.
And only the consumer accepts Amp\Postgres\PostgresConnection, because it itself opens transactions for reading and acknowledging messages transactionally.
- Create queue
- Create unlogged queue
- Create partitioned queue
- List queues
- List queue metrics
- List queue metadata
- Drop queue
- Purge queue
- Send message
- Send message with relative delay
- Send message with absolute delay
- Send batch
- Send batch with relative delay
- Send batch with absolute delay
- Read message
- Read batch
- Pop message
- Read batch with poll
- Set visibility timeout
- Archive message
- Archive batch
- Delete message
- Delete batch
- Enable notify insert
- Disable notify insert
- Bind topic
- Unbind topic
- Send topic
- Send topic with delay
- Test routing
- Validate routing key
- Validate topic pattern
- Read grouped
- Read grouped round-robin
- Read grouped head
- Read grouped with poll
- Read grouped round-robin with poll
- Create FIFO index
- Create FIFO index for all queues
- Consume messages
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createUnloggedQueue($pg, 'events');use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createPartitionedQueue(
pg: $pg,
queue: 'events',
partitionInterval: 10000,
retentionInterval: 100000,
);use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\listQueues($pg) as $queue) {
$md = $queue->metadata();
var_dump($md);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\metrics($pg) as $metrics) {
var_dump($metrics);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\listQueueMetadata($pg) as $md) {
var_dump($md);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
TimeSpan::fromSeconds(5),
);use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new \DateTimeImmutable('+5 seconds'),
);use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
[
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
],
TimeSpan::fromSeconds(5),
);use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
[
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
],
new \DateTimeImmutable('+5 seconds'),
);use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
batch: 10,
maxPoll: TimeSpan::fromSeconds(5),
pollInterval: TimeSpan::fromMilliseconds(250),
);use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
// handle the message
$queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
$queue->archive($message->id);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];
if ($messages !== []) {
$queue->archiveBatch(array_map(
static fn(Pgmq\Message $message): int => $messages->id),
$messages,
);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
$queue->delete($message->id);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];
if ($messages !== []) {
$queue->deleteBatch(array_map(
static fn(Pgmq\Message $message): int => $messages->id),
$messages,
);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returneduse Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();Bind a queue to a topic pattern. Messages sent with a routing key matching the pattern will be delivered to the queue.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'emails');
Pgmq\bindTopic($pg, 'notifications.*', $queue->name);Remove a queue binding from a topic pattern.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
Pgmq\unbindTopic($pg, 'notifications.*', 'emails');Send a message to all queues bound to patterns matching the given routing key.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$matched = Pgmq\sendTopic($pg, 'notifications.email', new Pgmq\SendMessage('{"user": 1}'));use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$matched = Pgmq\sendTopic(
$pg,
'notifications.email',
new Pgmq\SendMessage('{"user": 1}'),
TimeSpan::fromSeconds(5),
);Test which queues would receive a message for a given routing key without actually sending a message.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\testRouting($pg, 'notifications.email') as $route) {
var_dump($route->pattern, $route->queue, $route->compiledRegex);
}use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
Pgmq\validateRoutingKey($pg, 'events.created'); // throws PostgresQueryError if invaliduse Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
Pgmq\validateTopicPattern($pg, 'events.*'); // throws PostgresQueryError if invalidRead messages respecting FIFO ordering within groups. Messages are grouped by the x-pgmq-group header. Only the oldest unprocessed message from each group is returned.
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
$queue->send(new Pgmq\SendMessage('{"action": "created"}', '{"x-pgmq-group": "order-1"}'));
$queue->send(new Pgmq\SendMessage('{"action": "paid"}', '{"x-pgmq-group": "order-1"}'));
$queue->send(new Pgmq\SendMessage('{"action": "created"}', '{"x-pgmq-group": "order-2"}'));
$messages = $queue->readGrouped(10, TimeSpan::fromSeconds(30));Read messages with round-robin distribution across groups.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
$messages = $queue->readGroupedRR(10);Read exactly one message per FIFO group — the head (oldest, lowest msg_id) message in each group — across up to qty groups in a single operation. Only groups with a visible, unlocked head message are included.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
$messages = $queue->readGroupedHead(10);use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
$messages = $queue->readGroupedWithPoll(
count: 10,
maxPoll: TimeSpan::fromSeconds(5),
pollInterval: TimeSpan::fromMilliseconds(250),
);use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
$messages = $queue->readGroupedRRWithPoll(
count: 10,
maxPoll: TimeSpan::fromSeconds(5),
pollInterval: TimeSpan::fromMilliseconds(250),
);Create a GIN index on the headers column for FIFO queue performance optimization. This is required before using grouped read functions.
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'orders');
Pgmq\createFifoIndex($pg, 'orders');use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
Pgmq\createFifoIndexAll($pg);This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.
- First of all, create the extension if it doesn't exist yet:
use Thesis\Pgmq;
Pgmq\createExtension($pg);- Then create a queue:
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');- Next, create the consumer object:
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);- Now we can proceed to configure the queue consumer handler:
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
var_dump($messages);
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);Through Pgmq\ConsumeConfig you can configure:
- the
batchsize of received messages; - the message visibility timeout;
- enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
- and set the polling interval.
At least one of these settings — listenForInserts or pollTimeout — must be specified.
Through the Pgmq\ConsumeController, you can:
- ack messages, causing them to be deleted from the queue;
- nack messages with a delay, setting a visibility timeout for them;
- terminate processing (when a message can no longer be retried), resulting in them being archived;
- stop the consumer.
Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
$ctrl->tx->execute('...some business logic');
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.
use Thesis\Pgmq;
use function Amp\trapSignal;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
$ctrl->tx->execute('...some business logic');
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);
trapSignal([\SIGINT, \SIGTERM])
$context->stop();
$context->awaitCompletion();Or stop all current consumers using $consumer->stop():
use Thesis\Pgmq;
use function Amp\trapSignal;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(...);
trapSignal([\SIGINT, \SIGTERM])
$consumer->stop();
$context->awaitCompletion();The MIT License (MIT). Please see License File for more information.