Skip to content

Releases: stdiobus/stdiobus-python

Streaming, builder API, notifications, native kernel bundle, and event loop threading

05 Jun 14:28

Choose a tag to compare

Highlights

  • Streaming requests — real-time chunk delivery via async generators
  • Notification subscriptions with overflow control
  • Fluent builder API for ergonomic client construction
  • Bundled native kernel — prebuilt libstdio_bus.a for 4 platforms, zero external build steps
  • Native backend: event loop integration replaces background thread polling
  • Binary resolution module for subprocess backend

Streaming API

async for event in bus.stream_request("pool_id", {"prompt": "..."}):
    if event.type == StreamEventType.CHUNK:
        print(event.data, end="", flush=True)
    elif event.type == StreamEventType.RESULT:
        final = event.data

Chunks are delivered incrementally as they arrive — not accumulated and returned at the end.

Notification Subscriptions

sub = bus.subscribe("pool_id", overflow_policy=OverflowPolicy.DROP_OLDEST)
async for notification in sub:
    process(notification)
bus.unsubscribe(sub)

Builder API

bus = (
    StdioBusBuilder()
    .config(BusConfig(pools=[...]))
    .backend("native")
    .native_options(NativeOptions(listen_mode=ListenMode.TCP, tcp_port=9000))
    .build_sync()
)

Introspection

bus.get_worker_count()   # int
bus.get_client_count()   # int
bus.get_listen_mode()    # ListenMode

Native Backend: Event Loop Integration

All C API calls now execute exclusively on the owning asyncio event loop thread, matching the C library's single-thread contract.

  • fd-driven mode (preferred): poll fd registered via loop.add_reader() — zero-latency wake via kqueue/epoll
  • timer-driven mode (fallback): loop.call_later() timer when poll fd unavailable
  • Thread affinity enforced: send() from a foreign thread raises TransportError
  • No background threads, no GIL contention, no race conditions

Native Kernel

Prebuilt static libraries in kernel/prebuilds/:

  • aarch64-apple-darwin
  • x86_64-apple-darwin
  • aarch64-unknown-linux-gnu
  • x86_64-unknown-linux-gnu

CLI binaries in kernel/dist/ for darwin-{amd64,arm64}, linux-{amd64,arm64,armv7}, linux-musl-{amd64,arm64}.

pip install stdiobus[native]

Binary Resolution

Subprocess backend now resolves the stdio_bus binary with explicit precedence:

  1. User-provided path (override)
  2. Bundled kernel/dist/stdio_bus launcher
  3. System PATH fallback

Internal

  • Renamed _nativenative (public module)
  • Added setup.py with conditional cffi extension build
  • stream_echo_worker.py for E2E streaming validation
  • test_real_e2e.py — integration tests against live binary
  • test_native_threading.py — thread affinity invariant tests
  • 317+ unit/integration tests

Full Changelog

v2.1.1...v2.2.1

stdiobus v2.1.1

11 Apr 08:27

Choose a tag to compare

The SDK provides typed JSON-RPC messaging, session routing, streaming aggregation, and cross-platform backend support for AI agent communication.

Python SDK for building AI agents over stdio_bus — a reliable transport layer for ACP/MCP-style workflows.

Highlights

  • Streaming support — agent_message_chunk notifications are aggregated into result["text"] automatically
  • Cancellation semantics — stop() fails all pending requests with TransportError, no silent drops
  • Notification handler — on_notification() for subscribing to notifications separately from raw messages
  • Cross-SDK contract tests — wire-format parity verification with Rust and Node SDKs
  • CI pipeline — GitHub Actions for unit, E2E, real-binary, and Docker tests

Since 1.0.0

Key evolution across 1.0.02.0.02.1.1:

  • SubprocessBackend added — spawns stdio_bus binary via stdin/stdout NDJSON pipes, now the default backend
  • Hello handshake — stdio_bus/hello protocol negotiation with hello() and connect()
  • Protocol extensions — _ext.identity, _ext.audit, agentId routing in JSON-RPC messages
  • Programmatic config — BusConfig / PoolConfig / LimitsConfig dataclasses, no config files needed
  • Graceful shutdown — close stdin → drain timeout → SIGTERM → SIGKILL with orphan process cleanup

Full history: CHANGELOG.md

What's Changed in 2.1.1

Added

  • Streaming support — agent_message_chunk notification aggregation into result.text (ACP protocol parity with Rust SDK)
  • Cancellation semantics — stop() fails all pending requests with TransportError("Bus is shutting down")
  • on_notification() handler — subscribe to notifications separately from raw messages
  • Cross-SDK contract tests — wire-format parity verification (sessionId, agentId, _ext, config schema, streaming, cancellation)
  • CI pipeline — GitHub Actions workflow for unit, E2E, real-binary, and Docker tests
  • _PendingRequest internal class — tracks pending requests with streaming chunk aggregation

Changed

  • _handle_message now dispatches notifications separately and aggregates streaming chunks
  • stop() now cancels all in-flight requests before stopping backend
  • Backend crash (_on_backend_closed) uses _PendingRequest wrapper

Compatibility

Requirement Value
Python 3.10, 3.11, 3.12, 3.13, 3.14
OS Linux, macOS, Windows
Runtime Native cffi embed, stdio_bus binary in PATH, or Docker
Dependencies Zero (stdlib only). Optional: cffi for native backend
License Apache-2.0

Backends

Backend When used Config delivery
subprocess stdio_bus binary in PATH (default) --config-fd pipe
native libstdio_bus.a built with cffi embed API (in-process)
docker Docker available, no binary mounted config file

Auto-selection order: subprocess → native → docker (Unix), subprocess → docker (Windows).

Install

pip install stdiobus

With native backend support:

pip install stdiobus[native]

Quick Start

import asyncio
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig

async def main():
    async with AsyncStdioBus(
        config=BusConfig(
            pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
        )
    ) as bus:
        result = await bus.request("echo", {"message": "hello"})
        print(result)

asyncio.run(main())

Known Limitations

  • No automatic reconnect. If the bus process exits, pending requests fail with TransportError. Create a new instance to reconnect.
  • stdout from the bus process must carry NDJSON protocol messages only.

Links