Skip to content

micheletriaca/coniglio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

8 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ‡ coniglio

A minimal, elegant, and robust async RabbitMQ client for Node.js

Fastest full PostgreSQL nodejs client


coniglio (Italian for β€œrabbit”) is a modern wrapper around RabbitMQ designed to be dead-simple to use, resilient in production, and fully composable with async iterators and streaming libraries. Inspired by libraries like postgres, it gives you just the right abstraction for real-world systems without hiding the power of AMQP.



πŸš€ Features

  • βœ… for await…of streaming API β€” process messages naturally with backpressure
  • βœ… Auto reconnect β€” connection and channel recovery handled transparently with exponential backoff + jitter
  • βœ… JSON decoding by default β€” or opt-out for raw Buffer access
  • βœ… Manual ack() / nack() β€” total control over message flow
  • βœ… Composable β€” works beautifully with p-map, exstream.js, highland, standard Node streams and more
  • βœ… Multiple isolated connections β€” supports multi-tenant, multi-env, and dynamic routing
  • βœ… TypeScript support β€” full type inference over event data
  • βœ… Zero dependencies (core version) β€” just amqplib under the hood

πŸ“¦ Installation

npm install coniglio

✨ Usage

import coniglio from 'coniglio'

const conn = await coniglio('amqp://localhost')

// Configure queues and exchanges
// (optional, useful for bootstrapping or tests)
await conn.configure({
  queues: [
    { name: 'jobs.email', durable: true }
  ],
  exchanges: [
    { name: 'domain.events', type: 'topic', durable: true }
  ]
})

// Consume messages from a queue
for await (const msg of conn.listen('jobs.email')) {
  await sendEmail(msg.data)
  conn.ack(msg)
}

// Publish messages to an exchange
setInterval(() => {
  conn.publish('domain.events', 'jobs.email', { message: 'hello world' })
}, 200)

πŸ“¦ Table of contents

  1. Why Coniglio?
  2. API
  3. Philosophy
  4. Resilience
  5. Multiple connections
  6. TypeScript Integration
  7. Usage in Production Systems
  8. Coming Soon
  9. License

πŸ‡ Why Coniglio?

If you’ve used amqplib, you know it’s the canonical RabbitMQ library for Node.js β€” powerful, stable, and low-level. But it leaves you wiring:

  • reconnect logic
  • message decoding
  • ack/nack control
  • error-safe consumption
  • backpressure handling
  • channel separation for pub/sub

Coniglio wraps amqplib with a modern, minimal layer built for real apps. You get the same underlying power, but with an API that feels natural and production-ready.

βœ… A quick comparison

Feature amqplib coniglio βœ…
Promise API ⚠️ Basic (thenable) βœ… Fully async/await
Manual reconnects ❌ You handle it βœ… Built-in
Message streaming ❌ No βœ… for await...of
Built-in JSON decoding ❌ Raw Buffer βœ… On by default
Safe manual ack() / nack() βœ… Yes βœ… Ergonomic handling
Channel separation (pub/sub) ❌ Manual βœ… Automatic
Backpressure-friendly ❌ Needs plumbing βœ… Native support
TypeScript types ⚠️ Community βœ… First-class

πŸ‡ Coniglio is Italian for β€œrabbit” β€” simple, fast, and alert.

for await (const msg of coniglio.listen('my-queue')) {
  try {
    await handle(msg.body)
    msg.ack()
  } catch (err) {
    msg.nack()
  }
}

πŸ“– API

const conn = await coniglio(url, opts?)

Creates a new connection instance.

const conn = await coniglio('amqp://localhost', {
  logger: console, // optional custom logger
  json: true,      // default: true (parse JSON messages)
  prefetch: 10,    // default: 10 (number of unacknowledged messages)
})

If you want to use a custom logger, it should implement the Logger interface:

export interface Logger {
  debug(...args: any[]): void
  info(...args: any[]): void
  warn(...args: any[]): void
  error(...args: any[]): void
}

Example with pino:

import pino from 'pino'
const log = pino({ level: 'debug' })
const conn = await coniglio('amqp://localhost', { logger: log })

conn.listen(queue, opts?)

Returns an async iterator of messages consumed from a queue.

interface ListenOptions {
  json?: boolean           // default: true (parse JSON)
  prefetch?: number        // default: 10
  routingKeys?: (keyof T)[] // optional filter, TypeScript-safe
}

Each yielded msg: Message<T> has:

interface Message<T> {
  raw: amqp.ConsumeMessage    // original AMQP message
  content: Buffer             // raw message body
  data?: T                    // parsed JSON if json = true
  contentIsJson: boolean      // true if JSON parsed successfully
  routingKey: string          // the routing key
}

JSON-enabled example

for await (const msg of conn.listen('my-queue', { prefetch: 100 })) {
  // msg.data is parsed JSON
  if (msg.contentIsJson) {
    processData(msg.data)
  } else {
    // handle parsing error
    console.warn('Failed to parse JSON:', msg.content.toString())
  }
  conn.ack(msg)
}

JSON-opt-out example

for await (const msg of conn.listen('my-queue', { json: false })) {
  // msg.data === undefined
  // use msg.content (Buffer) directly
  processRaw(msg.content)
  conn.ack(msg)
}

conn.ack(msg) / conn.nack(msg, requeue = false)

Manually acknowledge or reject a message.

conn.ack(msg)
conn.nack(msg, true) // requeue = true

conn.publish(exchange, routingKey, payload, opts?)

Publish a message. Payload is serialized with JSON.stringify under the hood.

await conn.publish('domain.events', 'user.created', { userId: '123' }, { priority: 5 })
  • Retry: on failure, retries indefinitely with exponential backoff + jitter (1s β†’ 30s).
  • Confirm channel: ensures broker receipt before resolving.

conn.configure({ queues, exchanges })

Declare queues and exchanges explicitly. Useful for bootstrapping, tests, and dynamic setups.

await conn.configure({
  exchanges: [
    { name: 'domain.events', type: 'topic', durable: true }
  ],
  queues: [
    {
      name: 'jobs.email',
      durable: true,
      bindTo: [{ exchange: 'domain.events', routingKey: 'jobs.email' }]
    }
  ]
})
interface ConfigureOptions {
  exchanges?: {
    name: string
    type: 'topic' | 'fanout' | 'direct' | 'headers'
    durable?: boolean
    autoDelete?: boolean
    internal?: boolean
    arguments?: Record<string, any>
  }[]
  queues?: {
    name: string
    durable?: boolean
    exclusive?: boolean
    autoDelete?: boolean
    deadLetterExchange?: string
    messageTtl?: number
    maxLength?: number
    arguments?: Record<string, any>
    bindTo?: {
      exchange: string
      routingKey: string
      arguments?: Record<string, any>
    }[]
  }[]
}

await conn.close()

Gracefully close all channels and the underlying connection. Use this during application shutdown.


🧠 Philosophy

coniglio doesn’t manage your concurrency. It just delivers messages as an async generator. Use your favorite tool:

import pMap from 'p-map'

await pMap(
  conn.listen('jobs.video'),
  async msg => {
    await transcode(msg.data)
    conn.ack(msg)
  },
  { concurrency: 5 }
)

Or go reactive:

import { pipeline } from 'exstream'

await pipeline(
  conn.listen('metrics'),
  s => s.map(msg => parse(msg.data)),
  s => s.forEach(logMetric)
)

πŸ’ͺ Resilience by design

  • Detects and recovers from connection or channel failures automatically
  • Messages aren't lost or stuck unacknowledged
  • Keeps the developer in control β€” no magic retries or swallowing errors

βœ… Multiple connections

const prod = coniglio('amqp://prod-host')
const qa = coniglio('amqp://qa-host')

await prod.publish('events', 'prod.ready', { ok: true })
await qa.publish('events', 'qa.ready', { ok: true })

🧩 TypeScript Integration

If you are using TypeScript and want full type safety over your messages, you can pass a generic type map to coniglio():

type RouteKeyMap = {
  'user.created': { userId: string }
  'invoice.sent': { invoiceId: string; total: number }
}

const r = await coniglio<RouteKeyMap>('amqp://localhost')

// Consume all events from a queue
for await (const msg of conn.listen('queue.main')) {
  switch (msg.event) {
    case 'user.created': {
      msg.data.userId // βœ… typed as string
      break
    }
    case 'invoice.sent': {
      msg.data.invoiceId // βœ… typed as string
      msg.data.total // βœ… typed as number
      break
    }
  }
}

// Or filter statically by known events (with narrow typing)
for await (const msg of conn.listen('queue.main', { routeKeys: ['user.created'] })) {
  msg.data.userId // βœ… typed as string
}

πŸ—οΈ Usage in Production Systems

Coniglio is designed to thrive in real-world setups with high message throughput and resilience requirements.

βœ”οΈ Backpressure support Using native for await...of, you get natural backpressure without buffering hell.

βœ”οΈ Controlled flow Acknowledge or retry only when you're ready β€” no leaking messages or auto-ack surprises.

βœ”οΈ Crash-safe reconnect If RabbitMQ restarts, coniglio handles reconnection, re-initialization, and queue rebinds with zero config.

βœ”οΈ Works in microservices & monoliths Use it in a Fastify server, a background worker, or a Kubernetes job β€” it's just an async iterator.

βœ”οΈ Easy to observe and test You own the consumer loop. Add metrics, tracing, or mocks wherever you need β€” no magic, no black boxes.


πŸͺ„ Coming soon (planned)

  • test suite with real RabbitMQ integration
  • conn.stream() for full ReadableStream interop (with AbortSignal)
  • Built-in metrics and logging hooks
  • Retry helpers (e.g. DLQ support, customizable backoff)

πŸ“˜ License

MIT β€” as simple and open as the API itself.

About

A minimal, elegant, and robust async RabbitMQ client for Node.js

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors