Skip to content

feat(discovery): operations stack — invoke, broadcast, subscribe, where, CLI#29

Open
soupat wants to merge 8 commits into
updated-hierarchical-discoveryfrom
discovery-operations
Open

feat(discovery): operations stack — invoke, broadcast, subscribe, where, CLI#29
soupat wants to merge 8 commits into
updated-hierarchical-discoveryfrom
discovery-operations

Conversation

@soupat
Copy link
Copy Markdown
Collaborator

@soupat soupat commented May 10, 2026

Summary

Stacked on top of PR #28. Completes the discovery design's operations
layer (phases 4-6 of the spec) and the optional CEL where predicate.
With this PR merged, the discovery API ships end-to-end: labels,
selectors, discovery, invocation, async fan-out, and operator CLI.

This PR targets updated-hierarchical-discovery so the diff shows only
the operations work. GitHub auto-retargets to main once #28 merges.

What's new

  • invoke(selector, params, llm_reasoning) — selector-driven sync
    call. Selector must resolve to one (device, function) tuple; zero or
    multiple matches return structured errors (no_match,
    ambiguous_match). invoke_device becomes advisory-deprecated.
  • invoke_many(selector, params, timeout, max_concurrency) — sync
    parallel fan-out. Partial-failure semantics: per-target results and
    errors are returned even if some fail. Per-target 30s timeout default.
  • broadcast(selector, params, where=, bindings=, fire_at=, on_late=)
    — async fan-out. Returns correlation_id immediately; replies stream
    on device-connect.<zone>.<device_id>.event.async_reply.<correlation_id>.
  • CEL where predicate evaluated at each candidate device against
    {identity, labels, status, bindings}. Self-deselection is silent;
    compile-validated at the dispatcher before publication. Optional
    dependency: pip install device-connect-agent-tools[predicate].
  • Synchronized fan-out via fire_at (wall-clock epoch seconds) +
    on_late (skip|fire). Each device holds the message and fires
    from its own clock at the deadline. NTP-typical spread is 5-10 ms.
  • subscribe(selector) returns a Subscription handle for live
    events. Two selector forms: "correlation:<id>" for broadcast
    replies, or event-scoped selectors (event(<name>) /
    device(...).event(<name>)). Supports read(), iter(timeout),
    context-manager with, and the standard for msg in sub: protocol.
  • await_replies(correlation_id, timeout, until) — sync helper for
    the common broadcast-then-collect pattern.
  • devctl discover / discover-labels — selector-driven discovery
    on the CLI. Historical devctl discover (mDNS scan) renamed to
    devctl mdns-scan with scan alias.
  • statectl invoke / invoke-many / broadcast / subscribe /
    await
    — operator-facing wrappers for the new tools. JSON-shaped
    --param k=v, structured exit codes for pipelines.
  • docs/discovery.md extended with operations, edge-side where,
    synchronized fan-out, worked examples, and CLI reference.

Commits

fix(broadcast):  robustness pass on edge handler, subscribe, and CLI
docs:            extend discovery guide for operations, where, and CLI
feat(cli):       selector-driven verbs in devctl and statectl
feat(broadcast): async fan-out with correlation, fire_at, and subscribe
feat(predicate): add CEL where evaluator with optional [predicate] extra
feat(invoke):    selector-based invoke and invoke_many with sync fan-out

The final commit is the result of running the same three reviews used
on PR #28 (code-review-and-quality, code-simplification,
api-and-interface-design) on this branch and addressing 11 findings:
race fix in Subscription.read, broadcast handler no longer blocks the
subscription callback on fire_at, wire-format rename from
target_device_ids to targets, __iter__ protocol on Subscription,
CLI SIGINT handling, structured exit codes, identity.device_id in
predicate context, ASCII compliance.

Backwards compatibility

  • invoke_device(device_id, function, ...) still works, now emits a
    DeprecationWarning pointing at invoke('device(<id>).function(<name>)', params).
  • All first-party adapters (Claude Agent SDK, Strands, LangChain, the
    in-tree StrandsOpenAIDeviceConnectAgent) migrated to invoke /
    invoke_many; invoke_device is removed from their tool surfaces.
  • invoke_device_with_fallback is kept unchanged — no selector
    equivalent exists for "try a list of devices in order".
  • The historical devctl discover mDNS-scan verb is renamed to
    devctl mdns-scan (with scan alias). Scripts that called
    devctl discover for the mDNS path should switch to devctl scan.

Test plan

  • 1031 unit tests pass (511 edge + 206 agent-tools + 314 server)
  • 105 integration tests pass on NATS, zero failures. New
    tests/tests/test_tools_invoke.py (9 tests) and
    tests/tests/test_tools_broadcast.py (9 tests) exercise:
    • single-target invoke against camera / sensor / robot
    • invoke ambiguous-match, no-match, invalid-scope, parse-error
    • invoke_many full fan-out, partial failure, zero candidates,
      function-only selector
    • end-to-end broadcast + reply on correlation_id
    • CEL where filters at the edge (location, bindings allowlist)
    • fire_at synchronization (spread under 0.5s with on_late=skip),
      late-arrival drop with on_late=skip
    • await_replies(until=K) early return
    • subscribe("correlation:<id>") streaming
    • subscribe(event(<name>)) live event capture
    • for msg in sub: iter protocol end-to-end
  • Zenoh integration tests skipped locally; CI will exercise them

soupat added 8 commits May 10, 2026 11:08
Add two selector-driven invocation tools that replace the legacy
invoke_device(device_id, function, params) shape:

- invoke(selector, params, llm_reasoning) resolves a function-scoped
  selector to exactly one (device, function) tuple and calls it. Returns
  {success, device_id, function, result|error}. Returns no_match,
  ambiguous_match, invalid_invoke_scope, or invalid_selector errors as
  structured envelopes when the selector does not resolve cleanly.
- invoke_many(selector, params, timeout, max_concurrency, llm_reasoning)
  resolves to N (device, function) tuples and fans out the calls in
  parallel via a thread pool. Partial-failure semantics: a single
  target's failure does not abort siblings. Returns {candidates, matched,
  succeeded, failed, results, errors} with per-target structured errors.
  Per-target timeout defaults to 30s.

invoke_device gains a DeprecationWarning pointing to invoke(); the
function still works for one release while callers migrate. Adapters
(Claude Agent SDK, Strands, LangChain, the in-tree
StrandsOpenAIDeviceConnectAgent, and the operator-facing AGENT_SCRIPT
template) drop invoke_device and expose invoke / invoke_many instead.
invoke_device_with_fallback stays unchanged -- it covers a different
ergonomic case (try a list of device ids in order) with no selector
equivalent.

22 unit tests cover scope rejection, ambiguous and zero matches,
JSON-RPC error mapping, partial failure, per-target timeout
propagation, and llm_reasoning stripping. 9 integration tests cover
single-target invoke, robot dispatch through to event emission,
fan-out across multiple cameras, partial failure, and zero-candidate
empty envelopes.
Add device_connect_edge.predicate, a thin wrapper around cel-python that
compiles where expressions into reusable WherePredicate objects and
evaluates them against device-local context (identity, labels, status,
shared bindings).

CEL was chosen over JSONLogic because the v4 design's mask-indexing
pattern (mask[seat_row][seat_col] == 1) needs computed array indices,
which JSONLogic's literal-path var operator cannot express without
flattening the mask to 1D and indexing arithmetically. CEL handles it
natively.

cel-python is an optional dependency. Importing the module without it
installed succeeds; compiling or evaluating a predicate raises a clear
PredicateCompileError pointing at the [predicate] extra:

    pip install device-connect-edge[predicate]
    pip install device-connect-agent-tools[predicate]

The evaluator is shared by the dispatcher (validates expressions before
sending them out) and the device runtime (evaluates per-call to decide
whether to execute a fan-out). 16 unit tests cover compilation,
evaluation, the mask-indexing regression case, missing-variable and
type-mismatch error surfaces, and evaluator reusability.
Add the async selector-driven fan-out path so callers do not have to
block on the slowest device:

- broadcast(selector, params, where=, bindings=, fire_at=, on_late=)
  publishes a single envelope to a fanout subject keyed by tenant.
  Returns immediately with a correlation_id and the candidate count.
  Compile-validates the optional CEL where predicate at the dispatcher
  so syntax errors short-circuit before reaching the wire.

- DeviceRuntime._broadcast_subscription receives envelopes on
  ``device-connect.<tenant>.broadcast``. Each candidate self-elects via
  the target_device_ids gate (pre-resolved by the dispatcher from the
  selector), then evaluates the optional where predicate against its
  own context (identity, labels, status, shared bindings). On match the
  device executes the function and emits a reply on
  ``device-connect.<tenant>.<device_id>.event.async_reply.<correlation_id>``
  carrying {success, result|error, actually_fired_at}.

- fire_at + on_late synchronized fan-out: the edge holds the message
  until the wall-clock deadline and fires from its own clock.
  on_late=skip drops late arrivals (preserves coherence for
  card-stunt / light-show style workloads); on_late=fire executes
  immediately. The achieved spread depends on NTP residual (~5-10 ms
  typical) rather than network jitter (~50-150 ms).

- subscribe(selector) returns a Subscription handle. Two selector
  forms: ``correlation:<id>`` for broadcast replies, and event-scoped
  selectors (``event(<name>)`` or ``device(...).event(<name>)``) for
  live event streams. The handle exposes sync read() and a yielding
  iter() with idle-timeout reset.

- await_replies(correlation_id, timeout, until) sync helper for the
  common broadcast-then-collect pattern; subscribes, drains, returns
  the list of reply payloads.

The edge predicate context mirrors DeviceStatus.location into
labels["location"] when the driver did not declare a labels.location
itself, matching the dispatcher-side flatten_device contract so the
same selector and predicate strings work on both sides.

Test coverage: 38 unit tests across broadcast (12), subscribe (12),
and existing modules; 5 NATS integration tests cover end-to-end
broadcast + reply, where filter at the edge, fire_at synchronization
spread, on_late=skip late-arrival drop, and subscribe(correlation:<id>)
streaming.
Add the operator-facing shell surface for selector-driven discovery and
operations:

devctl verbs (read-side):
  - devctl discover "<selector>" [--offset N] [--limit M]
  - devctl discover-labels [--key K] [--offset N] [--limit M]

statectl verbs (write-side):
  - statectl invoke "<selector>" [--param k=v ...]
  - statectl invoke-many "<selector>" [--param k=v ...] [--timeout T]
                                      [--max-concurrency N]
  - statectl broadcast "<selector>" [--param k=v ...] [--where E]
                                    [--bindings JSON] [--fire-at T]
                                    [--on-late skip|fire]
  - statectl subscribe "<selector>" [--timeout T] [--until N]
  - statectl await <correlation_id> [--timeout T] [--until N]

Each verb is a thin wrapper over the Python tool of the same name and
exits non-zero on tool-side errors so they compose into shell pipelines
naturally. Parameter values are decoded as JSON when they look like
JSON (numbers, booleans, arrays, objects, quoted strings) and pass
through as strings otherwise, so common shapes (--param resolution=4k,
--param zones='[1,2,3]') work without quoting heroics.

The historical ``devctl discover`` verb (mDNS scan for uncommissioned
devices) is renamed to ``mdns-scan`` with ``scan`` as an alias, so
``discover`` is free for the selector-driven sense. Existing scripts
should switch from ``devctl discover`` to ``devctl scan`` if they were
exercising the mDNS path.

22 parser-shape unit tests guard against argument drift; the underlying
tools already have full unit and integration coverage from earlier
phases.
Add the operations layer (invoke / invoke_many / broadcast / subscribe /
await_replies) to docs/discovery.md, with the edge-side ``where``
predicate, synchronized fan-out via ``fire_at`` / ``on_late``, worked
examples that exercise each tool, and the corresponding
devctl / statectl CLI verbs.

The guide now covers everything the discovery API ships: labels schema,
selector grammar, the five scope shapes, response envelope, error codes,
all seven tools, and the CLI surface.
Applies findings from the pre-merge review of the operations stack:

Edge runtime (device.py):
  - Hand the broadcast envelope off to a tracked task so the subscription
    callback returns immediately. A long fire_at hold or slow driver
    function no longer blocks subsequent broadcasts from being received.
  - Extract _handle_broadcast_envelope and _evaluate_where so the where
    self-election step is isolated, unit-testable, and the callback body
    stays flat.
  - Splice device_id into the predicate's identity context so the natural
    ``identity.device_id == "..."`` form works (DeviceIdentity itself
    does not carry device_id; that lives on the runtime).

Wire format (tools.py + device.py):
  - Rename the broadcast envelope's ``target_device_ids`` field to
    ``targets`` before any edge ships. Shorter, less prescriptive, and
    matches the dispatcher-side ``candidates`` naming.

Subscription handle (tools.py):
  - Fix a race in Subscription.read(): truncate by the snapshot length
    captured BEFORE iteration, not by clearing post-iteration. A message
    appended by the messaging callback during draining now survives to
    the next read instead of being silently dropped.
  - Add __iter__ so ``for msg in sub:`` works with a sensible 30s idle
    timeout, matching the standard Python iteration protocol.

CLI (statectl/operations_cli.py):
  - statectl subscribe now catches KeyboardInterrupt cleanly (exit 130),
    distinguishes "got messages" (exit 0) from "idle timeout with no
    messages" (exit 4), so shell pipelines can branch on either outcome.
  - statectl invoke-many exits 3 when any target failed (alongside the
    existing 1 for top-level errors), so partial failure is visible to
    callers without parsing JSON.

ASCII compliance (predicate.py, tools.py):
  - Drop a banned-vocabulary token from a docstring.
  - Replace an em-dash in invoke_device's docstring with ASCII text.

New tests:
  - Unit: __iter__ protocol + race-safety guard for Subscription.read.
  - Integration: broadcast where=identity.device_id in bindings.allow
    (exercises the new identity context + bindings path),
    await_replies(until=) early-return timing, ``for msg in sub:``
    iteration end-to-end, and subscribe(event(...)) live-event capture.
…ters

Phases 4-5 added broadcast() and await_replies() to the agent-tools
surface but the adapter migration in feat(invoke) only carried invoke /
invoke_many across. The flashlight-auditorium demo needs the LLM to
issue selector-driven broadcasts with where + bindings + fire_at, so
broadcast and await_replies both need to be Strands/LangChain/Claude
tools as well.

Tool descriptions for the Claude adapter spell out the broadcast +
await_replies pairing (caller fires broadcast, then awaits replies by
correlation_id) so agents discover the workflow from the tool docs.

subscribe() is intentionally NOT exposed via the adapters: it returns
a Subscription handle that does not serialise cleanly as a tool result
and is more natural to call from operator code or the CLI than from an
LLM. Agents needing the same shape use broadcast + await_replies.
The broadcast handler built the where-predicate context from
``caps.identity`` -- but DeviceCapabilities does not carry an
``identity`` field; that lives on the driver as a separate
DeviceIdentity model. The ``getattr(caps, "identity", None)`` fallback
masked the bug: identity_dict was always just ``{"device_id": ...}``
with none of the driver's extra fields (seat_row, seat_col, x-mhp slot
metadata, ...) reaching the predicate.

Symptom: a where predicate like
``bindings.mask[identity.seat_row][identity.seat_col] == 1`` failed at
every candidate (CEL surfaces undefined field access as CELEvalError,
fail-closed fires, nobody self-elects).

Fix: read identity from ``self._driver.identity`` and splice in
``device_id`` from the runtime. Backwards-compatible with drivers that
don't expose an identity property (driver_identity is None -> only
device_id is present, same as before for those drivers).

Surfaced while building the flashlight-auditorium demo, where each
phone exposes its seat coordinates as extra fields on DeviceIdentity
and the spell-CMU broadcast indexes a 2D mask by those coordinates.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant