A minimal, elegant, and robust async RabbitMQ client for Node.js
coniglio (Italian for βrabbitβ) is a modern wrapper around RabbitMQ designed to be dead-simple to use, resilient in production, and fully composable with async iterators and streaming libraries. Inspired by libraries like postgres, it gives you just the right abstraction for real-world systems without hiding the power of AMQP.
- β
for awaitβ¦ofstreaming API β process messages naturally with backpressure - β Auto reconnect β connection and channel recovery handled transparently with exponential backoff + jitter
- β JSON decoding by default β or opt-out for raw Buffer access
- β
Manual
ack()/nack()β total control over message flow - β
Composable β works beautifully with
p-map,exstream.js,highland, standard Node streams and more - β Multiple isolated connections β supports multi-tenant, multi-env, and dynamic routing
- β TypeScript support β full type inference over event data
- β
Zero dependencies (core version) β just
amqplibunder the hood
npm install coniglioimport coniglio from 'coniglio'
const conn = await coniglio('amqp://localhost')
// Configure queues and exchanges
// (optional, useful for bootstrapping or tests)
await conn.configure({
queues: [
{ name: 'jobs.email', durable: true }
],
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
]
})
// Consume messages from a queue
for await (const msg of conn.listen('jobs.email')) {
await sendEmail(msg.data)
conn.ack(msg)
}
// Publish messages to an exchange
setInterval(() => {
conn.publish('domain.events', 'jobs.email', { message: 'hello world' })
}, 200)- Why Coniglio?
- API
- Philosophy
- Resilience
- Multiple connections
- TypeScript Integration
- Usage in Production Systems
- Coming Soon
- License
If youβve used amqplib, you know itβs the canonical RabbitMQ library for Node.js β powerful, stable, and low-level. But it leaves you wiring:
- reconnect logic
- message decoding
- ack/nack control
- error-safe consumption
- backpressure handling
- channel separation for pub/sub
Coniglio wraps amqplib with a modern, minimal layer built for real apps.
You get the same underlying power, but with an API that feels natural and production-ready.
| Feature | amqplib |
coniglio β
|
|---|---|---|
| Promise API | β
Fully async/await |
|
| Manual reconnects | β You handle it | β Built-in |
| Message streaming | β No | β
for await...of |
| Built-in JSON decoding | β Raw Buffer | β On by default |
Safe manual ack() / nack() |
β Yes | β Ergonomic handling |
| Channel separation (pub/sub) | β Manual | β Automatic |
| Backpressure-friendly | β Needs plumbing | β Native support |
| TypeScript types | β First-class |
π Coniglio is Italian for βrabbitβ β simple, fast, and alert.
for await (const msg of coniglio.listen('my-queue')) {
try {
await handle(msg.body)
msg.ack()
} catch (err) {
msg.nack()
}
}Creates a new connection instance.
const conn = await coniglio('amqp://localhost', {
logger: console, // optional custom logger
json: true, // default: true (parse JSON messages)
prefetch: 10, // default: 10 (number of unacknowledged messages)
})If you want to use a custom logger, it should implement the Logger interface:
export interface Logger {
debug(...args: any[]): void
info(...args: any[]): void
warn(...args: any[]): void
error(...args: any[]): void
}Example with pino:
import pino from 'pino'
const log = pino({ level: 'debug' })
const conn = await coniglio('amqp://localhost', { logger: log })Returns an async iterator of messages consumed from a queue.
interface ListenOptions {
json?: boolean // default: true (parse JSON)
prefetch?: number // default: 10
routingKeys?: (keyof T)[] // optional filter, TypeScript-safe
}Each yielded msg: Message<T> has:
interface Message<T> {
raw: amqp.ConsumeMessage // original AMQP message
content: Buffer // raw message body
data?: T // parsed JSON if json = true
contentIsJson: boolean // true if JSON parsed successfully
routingKey: string // the routing key
}for await (const msg of conn.listen('my-queue', { prefetch: 100 })) {
// msg.data is parsed JSON
if (msg.contentIsJson) {
processData(msg.data)
} else {
// handle parsing error
console.warn('Failed to parse JSON:', msg.content.toString())
}
conn.ack(msg)
}for await (const msg of conn.listen('my-queue', { json: false })) {
// msg.data === undefined
// use msg.content (Buffer) directly
processRaw(msg.content)
conn.ack(msg)
}Manually acknowledge or reject a message.
conn.ack(msg)
conn.nack(msg, true) // requeue = truePublish a message. Payload is serialized with JSON.stringify under the hood.
await conn.publish('domain.events', 'user.created', { userId: '123' }, { priority: 5 })- Retry: on failure, retries indefinitely with exponential backoff + jitter (1s β 30s).
- Confirm channel: ensures broker receipt before resolving.
Declare queues and exchanges explicitly. Useful for bootstrapping, tests, and dynamic setups.
await conn.configure({
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
],
queues: [
{
name: 'jobs.email',
durable: true,
bindTo: [{ exchange: 'domain.events', routingKey: 'jobs.email' }]
}
]
})interface ConfigureOptions {
exchanges?: {
name: string
type: 'topic' | 'fanout' | 'direct' | 'headers'
durable?: boolean
autoDelete?: boolean
internal?: boolean
arguments?: Record<string, any>
}[]
queues?: {
name: string
durable?: boolean
exclusive?: boolean
autoDelete?: boolean
deadLetterExchange?: string
messageTtl?: number
maxLength?: number
arguments?: Record<string, any>
bindTo?: {
exchange: string
routingKey: string
arguments?: Record<string, any>
}[]
}[]
}Gracefully close all channels and the underlying connection. Use this during application shutdown.
coniglio doesnβt manage your concurrency. It just delivers messages as an async generator. Use your favorite tool:
import pMap from 'p-map'
await pMap(
conn.listen('jobs.video'),
async msg => {
await transcode(msg.data)
conn.ack(msg)
},
{ concurrency: 5 }
)Or go reactive:
import { pipeline } from 'exstream'
await pipeline(
conn.listen('metrics'),
s => s.map(msg => parse(msg.data)),
s => s.forEach(logMetric)
)- Detects and recovers from connection or channel failures automatically
- Messages aren't lost or stuck unacknowledged
- Keeps the developer in control β no magic retries or swallowing errors
const prod = coniglio('amqp://prod-host')
const qa = coniglio('amqp://qa-host')
await prod.publish('events', 'prod.ready', { ok: true })
await qa.publish('events', 'qa.ready', { ok: true })If you are using TypeScript and want full type safety over your messages, you can pass a generic type map to coniglio():
type RouteKeyMap = {
'user.created': { userId: string }
'invoice.sent': { invoiceId: string; total: number }
}
const r = await coniglio<RouteKeyMap>('amqp://localhost')
// Consume all events from a queue
for await (const msg of conn.listen('queue.main')) {
switch (msg.event) {
case 'user.created': {
msg.data.userId // β
typed as string
break
}
case 'invoice.sent': {
msg.data.invoiceId // β
typed as string
msg.data.total // β
typed as number
break
}
}
}
// Or filter statically by known events (with narrow typing)
for await (const msg of conn.listen('queue.main', { routeKeys: ['user.created'] })) {
msg.data.userId // β
typed as string
}Coniglio is designed to thrive in real-world setups with high message throughput and resilience requirements.
βοΈ Backpressure support
Using native for await...of, you get natural backpressure without buffering hell.
βοΈ Controlled flow Acknowledge or retry only when you're ready β no leaking messages or auto-ack surprises.
βοΈ Crash-safe reconnect
If RabbitMQ restarts, coniglio handles reconnection, re-initialization, and queue rebinds with zero config.
βοΈ Works in microservices & monoliths Use it in a Fastify server, a background worker, or a Kubernetes job β it's just an async iterator.
βοΈ Easy to observe and test You own the consumer loop. Add metrics, tracing, or mocks wherever you need β no magic, no black boxes.
- test suite with real RabbitMQ integration
-
conn.stream()for fullReadableStreaminterop (withAbortSignal) - Built-in metrics and logging hooks
- Retry helpers (e.g. DLQ support, customizable backoff)
MIT β as simple and open as the API itself.
