Skip to content

Cardano-Forge/anvil-barrow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Barrow Version

Barrow is a framework for building blockchain indexing tools. It provides a simple API for defining and running indexing jobs on the Cardano blockchain.

Installation

npm i @ada-anvil/barrow

Usage

Controller

The Controller class is the main entry point for defining and running indexing jobs.

Constructor:

new Controller(config, startOpts?)

Parameters:

  1. config (required): Configuration object with the following properties:

    • syncClient: An instance of SyncClient that provides a generator for sync events.
    • errorHandler (optional): An instance of ErrorHandler that handles errors during sync events.
    • logger (optional): A function that handles log events.
    • tracingConfig (optional): An object that configures tracing for the controller.
  2. startOpts (optional): Default options to use for all start() calls. These will be merged with options passed to start(), with start() options taking precedence. See Sync Job Configuration for available options (excluding point).

SyncClient

The SyncClient interface defines a method for generating sync events.

Currently, the only implementation of SyncClient is OgmiosSyncClient, which uses the Ogmios API to sync with the blockchain.

Future support is planned for other sync clients such as Dolos.

ErrorHandler

The ErrorHandler class is responsible for handling errors during sync events.

Methods:

  • register(filter, handler): Registers an error handler or retry policy for a specific error type or class.
  • handle(error): Processes an error and returns the handling result.
  • reset(): Resets the error handler to its initial state.
Retry policies

Built-in retry handlers:

  • ErrorHandler.retry(options): Retries the sync event after a specified delay.
  • ErrorHandler.retryWithBackoff(options): Retries the sync event with exponential backoff.
Error filters

A filter can be:

  • An Error class (only instances of that class will be handled)
  • A function that takes an error and returns a boolean indicating whether to handle it
Retry options

Options for ErrorHandler.retry and ErrorHandler.retryWithBackoff:

  • maxRetries (optional): Maximum number of retries (default: 3)
  • baseDelay (optional): Base delay in milliseconds between retries (default: 1000)
  • backoff (optional): Use exponential backoff (default: false)
  • persistent (optional): Preserve error handler state between retries (default: false)

Getting Started

Step 1: Install Dependencies

Install the Ogmios client:

npm i @cardano-ogmios/client

Step 2: Create a Sync Client

Create an instance of OgmiosSyncClient:

import { OgmiosSyncClient } from "@ada-anvil/barrow/ogmios";

const syncClient = new OgmiosSyncClient({
  host: "localhost",
  port: 1337,
  tls: false,
});

Configuration options:

  • host: Ogmios node hostname
  • port: Ogmios node port
  • tls: Enable TLS connection

Step 3: Create a Controller

Create a Controller instance with your sync client:

import { Controller, ErrorHandler } from "@ada-anvil/barrow";

const controller = new Controller({
  syncClient,
  errorHandler: new ErrorHandler(),
});

You can optionally provide default start options as a second parameter. These defaults will be merged with options passed to start():

const controller = new Controller(
  {
    syncClient,
    errorHandler: new ErrorHandler(),
  },
  {
    // Default options for all start() calls
    throttle: [100, "milliseconds"],
    fn: (event) => {
      console.log(event);
    },
  },
);

Step 4: Start Syncing

Start the controller with a sync job configuration:

await controller.start({
  fn: (event) => {
    console.log(event);
  },
  point: {
    slot: 101163751,
    id: "fa5a6a51632b90557665fcb33970f4fb372dff6ad0191e083ff3b6b221f2b87e",
  },
  throttle: [100, "milliseconds"],
});

// Wait for sync completion
await controller.waitForCompletion();

Controlling Sync Jobs

Pause and Resume:

await controller.pause(); // Preserves state
await controller.resume(); // Resumes from paused point

Restart:

Calling start() on a paused job resets the state and starts from scratch.

Job Completion

A sync job can complete in two ways:

  1. Using takeUntil: The takeUntil function returns true

    await controller.start({
      fn: (event) => {
        /* process event */
      },
      point: startPoint,
      takeUntil: ({ state }) => state.meta.syncTip?.slot >= targetSlot,
    });

    Note: takeUntil runs on ALL events, including filtered ones. This allows you to stop syncing based on conditions that don't depend on event processing:

    await controller.start({
      fn: (event) => {
        /* process event */
      },
      filter: (event) => event.type === "apply", // Only process apply events
      point: startPoint,
      takeUntil: ({ state }) => state.counters.filterCount >= 100, // Stop after 100 filtered events
    });

    To run takeUntil only on processed events, you can use the isFilteredOut property:

    await controller.start({
      fn: (event) => {
        /* process event */
      },
      filter: (event) => event.type === "apply",
      point: startPoint,
      takeUntil: ({ lastEvent, state }) => {
        // Return early if the event was filtered out
        if (lastEvent.isFilteredOut) return false;
    
        // Only count processed events
        return state.counters.applyCount >= 10;
      },
    });
  2. Using handler return value: The fn handler returns { done: true }

    await controller.start({
      fn: (event) => {
        // Process event
        if (someCondition) {
          return { done: true };
        }
      },
      point: startPoint,
    });

Throttling

The throttle option allows you to control the rate of event processing by adding delays between events. Throttle is applied to ALL events, including filtered ones:

await controller.start({
  fn: (event) => {
    /* process event */
  },
  filter: (event) => event.type === "apply", // Only process apply events
  point: startPoint,
  throttle: [100, "milliseconds"], // Delays after ALL events (filtered and processed)
});

Sync Job Configuration

Configuration properties:

  • point (required): Starting point for syncing (slot and block ID)
  • fn (optional): Function that handles sync events
  • throttle (optional): Throttle duration for sync events. Throttle applies to ALL events (including filtered events)
  • filter (optional): Function to filter sync events
  • takeUntil (optional): Function that returns true to stop syncing. This function runs on ALL events (including filtered events). Use lastEvent.isFilteredOut to handle filtered events differently

Data Structures

Sync Event:

  • type: Event type
  • block: Block that was synced
  • tip: Current chain tip

Point:

  • slot: Slot number
  • id: Block hash

Tip:

  • slot: Slot number
  • id: Block hash
  • height: Block height

Block:

  • type: Block type
  • era: Cardano era
  • id: Block hash
  • height: Block height
  • slot (optional): Slot number

Logger

Barrow provides built-in logging support using Pino.

Setup

Install Pino:

npm i pino

Configure the logger:

import { pinoLogger } from "@ada-anvil/barrow/pino";
import { pino } from "pino";

const controller = new Controller({
  syncClient: new OgmiosSyncClient({
    host: "localhost",
    port: 1337,
    tls: false,
  }),
  logger: pinoLogger(pino()),
});

Tracing

Barrow supports OpenTelemetry for metrics and tracing.

Setup

Install OpenTelemetry:

npm i @opentelemetry/api

Configure tracing:

import { otelTracingConfig } from "@ada-anvil/barrow/otel";

const controller = new Controller({
  syncClient: new OgmiosSyncClient({
    host: "localhost",
    port: 1337,
    tls: false,
  }),
  tracingConfig: otelTracingConfig(),
});

The otelTracingConfig function accepts either:

  • A Meter instance
  • A configuration object with name, version (optional), and opts (optional)

Available Metrics

Metric Definitions

Metric Key Type Name Description Value Type Unit
status gauge status Controller status int -
syncTipSlot gauge sync_tip_slot Sync tip slot int -
syncTipHeight gauge sync_tip_height Sync tip height int -
chainTipSlot gauge chain_tip_slot Chain tip slot int -
chainTipHeight gauge chain_tip_height Chain tip height int -
isSynced gauge is_synced Is synced (1 = yes, 0 = no) int -
processingTime histogram processing_time Time it takes to process an event - milliseconds
arrivalTime histogram arrival_time Time it takes to receive an event - milliseconds
applyCount gauge apply_count Number of apply events int -
resetCount gauge reset_count Number of reset events int -
filterCount gauge filter_count Number of filtered events int -
errorCount gauge error_count Number of errors int -

Examples

Example implementations are available in the src/examples directory.

Running Examples

  1. Install dependencies:

    npm i
  2. Create a .env file in the project root:

    OGMIOS_NODE_HOST=<ogmios-node-host>
    OGMIOS_NODE_PORT=<ogmios-node-port>
    OGMIOS_NODE_TLS=<ogmios-node-tls>
  3. Run an example:

    npm run example kitchen-sink

About

Chain indexing tools for the Cardano blockchain

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors