Skip to content

ADD: ByteChunker for streaming decode-to-bytes#129

Closed
wtn wants to merge 1 commit into
databento:devfrom
wtn:batch
Closed

ADD: ByteChunker for streaming decode-to-bytes#129
wtn wants to merge 1 commit into
databento:devfrom
wtn:batch

Conversation

@wtn
Copy link
Copy Markdown
Contributor

@wtn wtn commented Apr 22, 2026

Adds decode::ByteChunker and decode::RecordFilter: composable wrappers around any DecodeRecordRef. ByteChunker aggregates raw record bytes into bounded chunks; RecordFilter (itself a DecodeRecordRef) drops records by instrument_id, publisher_id, or a half-open time window [start_ts, end_ts). Compose as ByteChunker::new(RecordFilter::builder(decoder).…build()?) or use either alone.

I realize this is a large patch; understand if you need to reject on architectural grounds, or defer to later.

Motivation

My use case necessitates making header-based filtering from interpreted languages much faster, which we can do by skipping per-record FFI allocation entirely. Inspecting a header field currently requires the full decode path; for low-selectivity filters (meaning, a small proportion kept/used), a lot of work is wasted. Pairing a pre-decode header filter with raw-byte output lets the binding hand back contiguous bytes the caller can forward, recompress, or batch-deserialize in one shot.

Local prototype Python binding (~75 lines of pyo3) over an earlier version, filtering an uncompressed XNAS MBP-1 file to 1 symbol (1.421% selectivity):

baseline input throughput filtered + chunked (input) speedup
DBNDecoder + Python filter 14.12M rec/s 55.79M rec/s 4.0×

Benchmarked on aarch64-darwin, 6.7 GB MBP-1 (decompressed from 2.2 GB zst), 83.4M input records; both paths emit the same 1.18M filtered records, across 3 trials. Additive to databento-python's NDArrayStreamIterator for header-filtered, heterogeneous, or live streams and raw-byte forwarding.

API

impl<D> ByteChunker<D> {
    pub const DEFAULT_MAX_BYTES: NonZeroUsize; // 4 MiB
    pub fn new(decoder: D) -> Self;
    pub fn with_max_records(self, n: NonZeroUsize) -> Self;
    pub fn with_max_bytes(self, n: NonZeroUsize) -> Self;
    pub fn into_inner(self) -> D;
}
impl<D: DecodeRecordRef> ByteChunker<D> {
    pub fn next_chunk(&mut self) -> Result<Option<ByteChunk<'_>>>;
}

impl<D> RecordFilter<D> {
    pub fn builder(decoder: D) -> RecordFilterBuilder<D>;
    pub fn into_parts(self) -> (D, Option<RecordBuf>);
    pub fn tripping_record(&self) -> Option<&RecordBuf>;
    pub fn stats(&self) -> FilterStats;
}

impl<D> RecordFilterBuilder<D> {
    pub fn instrument_ids<I: IntoIterator<Item = u32>>(self, ids: I) -> Self;
    pub fn publisher_ids<I, P>(self, ids: I) -> Self
        where I: IntoIterator<Item = P>, P: Into<u16>;
    pub fn start_ts(self, start_ts: u64) -> Self;
    pub fn end_ts(self, end_ts: u64) -> Self;
    pub fn build(self) -> Result<RecordFilter<D>>;
}

#[non_exhaustive]
pub struct FilterStats { pub emitted: u64, pub dropped_by_time: u64,
    pub dropped_by_instrument_id: u64, pub dropped_by_publisher_id: u64 }

Both wrappers passthrough DbnMetadata; RecordFilter also implements DecodeRecord and DecodeStream. Filters are ANDed; time filters key on Record::raw_index_ts. end_ts terminates iteration at the first record at or past the bound; that record is consumed and stashed as a RecordBuf so callers can prepend it on resume. Output bytes follow the wrapped decoder's VersionUpgradePolicy, so prepending its metadata yields a valid stream.

Design notes

  • Two types instead of one. Filtering and chunking are independent; splitting them lets a caller chunk unfiltered, filter without chunking, or compose both in one line.
  • Filter lists use linear Vec::contains: faster than HashSet for the small lists this targets (tens of ids).
  • end_ts requires monotonically non-decreasing primary timestamps. DBN files from the Databento API satisfy this; documented on end_ts.
  • Decoder errors are terminal; the in-progress chunk is dropped. A partial chunk would make the "one chunk = a contiguous DBN slice" contract conditional on callers remembering to drain.
  • Tripping-record stash is filter-aware. A record that would have failed the header filters isn't stashed, so resuming through the inner decoder doesn't reintroduce a record the caller had filtered out.
Benchmark script
"""Benchmark ByteChunker vs DBNDecoder on a DBN file.

Compares full iteration ("touch .instrument_id per record") and
low-selectivity filter (one instrument_id, ~1-2% share) between the
existing DBNDecoder path and the prototype ByteChunker Python binding.

Usage: benchmark.py path/to/file.dbn(.zst)
"""
import statistics
import struct
import sys
import time
from collections import Counter
from pathlib import Path

from databento_dbn import (
    ByteChunker,
    Compression,
    DBNDecoder,
    MBOMsg,
    MBP1Msg,
)

CHUNK = 1 << 16   # 64 KB reads for the baseline DBNDecoder loop
TRIALS = 3


def pick_type(path: Path):
    if ".mbp-1" in path.name:
        return MBP1Msg
    if ".mbo" in path.name:
        return MBOMsg
    raise RuntimeError(f"unknown schema for {path.name}")


def is_zstd(path: Path) -> bool:
    return path.suffix == ".zst"


def profile_instrument_ids(path: Path) -> tuple[int, Counter]:
    """One ByteChunker pass; walk record headers in Python to count
    records per instrument_id (length byte at offset 0, instrument_id
    u32 LE at offset 4)."""
    sc = ByteChunker(str(path))
    total = 0
    counter: Counter = Counter()
    for buf, count in sc:
        mv = memoryview(buf)
        offset = 0
        buf_len = len(mv)
        while offset < buf_len:
            length = mv[offset]
            rec_size = length * 4
            inst_id = struct.unpack_from("<I", mv, offset + 4)[0]
            counter[inst_id] += 1
            offset += rec_size
        total += count
    return total, counter


def bench_baseline_full(path: Path, rec_type, compression: Compression):
    count = 0
    acc = 0
    t0 = time.perf_counter()
    dec = DBNDecoder(compression=compression)
    with open(path, "rb") as f:
        while True:
            chunk = f.read(CHUNK)
            if not chunk:
                break
            dec.write(chunk)
            for rec in dec.decode():
                if isinstance(rec, rec_type):
                    acc += rec.instrument_id
                    count += 1
        for rec in dec.decode():
            if isinstance(rec, rec_type):
                acc += rec.instrument_id
                count += 1
    return count, time.perf_counter() - t0


def bench_chunker_full(path: Path):
    count = 0
    t0 = time.perf_counter()
    for _, c in ByteChunker(str(path)):
        count += c
    return count, time.perf_counter() - t0


def bench_baseline_filter(path: Path, rec_type, compression, target_ids: set[int]):
    count = 0
    t0 = time.perf_counter()
    dec = DBNDecoder(compression=compression)
    with open(path, "rb") as f:
        while True:
            chunk = f.read(CHUNK)
            if not chunk:
                break
            dec.write(chunk)
            for rec in dec.decode():
                if isinstance(rec, rec_type) and rec.instrument_id in target_ids:
                    count += 1
        for rec in dec.decode():
            if isinstance(rec, rec_type) and rec.instrument_id in target_ids:
                count += 1
    return count, time.perf_counter() - t0


def bench_chunker_filter(path: Path, target_ids: list[int]):
    count = 0
    t0 = time.perf_counter()
    for _, c in ByteChunker(str(path), instrument_ids=target_ids):
        count += c
    return count, time.perf_counter() - t0


def run_trials(label, fn, *args):
    times = []
    count = 0
    for i in range(TRIALS):
        c, e = fn(*args)
        times.append(e)
        count = c
        print(f"    {label} trial {i+1}: {e:>6.2f}s ({c:,} records)")
    return count, statistics.median(times)


def mrec_s(n, t):
    return f"{n / t / 1e6:.2f}M rec/s"


def main():
    path = Path(sys.argv[1])
    rec_type = pick_type(path)
    compression = Compression.ZSTD if is_zstd(path) else Compression.NONE

    print("\nprofiling instrument_id distribution...")
    total, counter = profile_instrument_ids(path)
    target_id, target_n = counter.most_common()[-1]
    for iid, n in counter.most_common():
        if 0.005 <= n / total <= 0.02:
            target_id, target_n = iid, n
            break
    selectivity = target_n / total
    print(f"  total={total:,}, filter target instrument_id={target_id} ({selectivity*100:.3f}%)")

    print("\n== Full iteration ==")
    c1, t1 = run_trials("DBNDecoder ", bench_baseline_full, path, rec_type, compression)
    c2, t2 = run_trials("ByteChunker", bench_chunker_full, path)
    print(f"\n  DBNDecoder : {mrec_s(c1, t1)}")
    print(f"  ByteChunker: {mrec_s(c2, t2)}   ({t1/t2:.1f}x)")

    print(f"\n== Filter ({selectivity*100:.3f}%) ==")
    c1, t1 = run_trials("DBNDecoder ", bench_baseline_filter, path, rec_type, compression, {target_id})
    c2, t2 = run_trials("ByteChunker", bench_chunker_filter, path, [target_id])
    print(f"\n  DBNDecoder : {mrec_s(total, t1)} input throughput")
    print(f"  ByteChunker: {mrec_s(total, t2)} input throughput  ({t1/t2:.1f}x)")


if __name__ == "__main__":
    main()

Type of change

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

Checklist

  • My code builds locally with no new warnings (scripts/build.sh)
  • My code follows the style guidelines (scripts/lint.sh and scripts/format.sh)
  • New and existing unit tests pass locally with my changes (scripts/test.sh)
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works

Declaration

I confirm this contribution is made under an Apache 2.0 license and that I have the authority
necessary to make this contribution on behalf of its copyright owner.

@threecgreen
Copy link
Copy Markdown
Contributor

I'm hesitant to merge it as it seems pretty specific, though it has some similarities to #117. Is this something you could implement in your own crate/pyo3 Python extension library? There doesn't seem to be a trait interaction requiring it to be in dbn.

You could also look at using DbnFsm::process_multiple instead of rebuffering the records into ByteChunk.

@wtn
Copy link
Copy Markdown
Contributor Author

wtn commented Apr 23, 2026

Thank you!! DbnFsm is ~1.4× faster for my use case, re-implemented as a crate: wtn/dbn-chunks

@wtn wtn closed this Apr 23, 2026
@threecgreen
Copy link
Copy Markdown
Contributor

awesome, glad that worked for you

@wtn wtn deleted the batch branch May 2, 2026 04:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants