Skip to content

edge: support lifecycle event subscriptions and glob device_id filters#27

Merged
kavya-chennoju merged 1 commit into
mainfrom
feat/peer-lifecycle-events
May 12, 2026
Merged

edge: support lifecycle event subscriptions and glob device_id filters#27
kavya-chennoju merged 1 commit into
mainfrom
feat/peer-lifecycle-events

Conversation

@kavya-chennoju
Copy link
Copy Markdown
Collaborator

Closes #26.

What changes

The registry already publishes device-connect.<tenant>.device.{online,offline} on a shared per-tenant subject. This PR teaches @on to subscribe to those — drivers can now react to peers appearing and disappearing through the same decorator they already use, without polling or invoke timeouts.

Also adds glob support to device_id= filters: NATS/Zenoh single-token wildcards can't express sub-token patterns like interlock-*, so glob patterns subscribe to the broker single-token wildcard and filter post-hoc with fnmatch.

# react when a specific peer drops
@on(device_id="interlock-01", event_name="peer_lost")
async def on_interlock_lost(self, device_id, event_name, payload):
    await self.set_power(0.0)

# react when ANY interlock-class peer drops
@on(device_id="interlock-*", event_name="peer_lost")
async def on_any_interlock_lost(self, device_id, event_name, payload):
    await self.set_power(0.0)

Per-device .event.<name> subscriptions are byte-identical; existing @on users see no behavior change.

Files

  • device_connect_edge/drivers/base.py — lifecycle subject routing, glob filter, _device_id_matches() helper, _LIFECYCLE_ALIAS_TO_CANONICAL map.
  • tests/test_drivers.pyTestLifecycleSubscriptions: 7 new tests covering subject routing, alias mapping, exact + glob device_id filter, per-device regression check, glob-on-per-device-events.
  • examples/peer-lifecycle/device_{a,b}.py — 2-device runnable demo.

Tests

30 passed, 0 failed   (23 existing + 7 new)

End-to-end verification

Path NATS Zenoh
Lifecycle subject routing
Exact device_id= filter
Glob device_id="device-*" (caught 2 peers) not run, same code path

NATS run, glob filter:

13:56:46  killed device-a AND device-c
13:57:01  device-b: peer LOST: device-a   ->  black_out
13:57:02  device-b: peer LOST: device-c   ->  black_out

Known follow-ups (separate PRs)

  • device_type= filter for lifecycle events — needs ~5 LOC server-side change to include device_type in the registry's lifecycle event payload. Skipped here because relying on the D2D peer cache mid-offline is racy.
  • tag= filter — needs a new DeviceIdentity.tags field; bigger API change.

Lets drivers react to peers coming and going via the existing @on
decorator. The registry already publishes device.online / device.offline
on a shared per-tenant subject; this patch teaches @on to recognize
those event names (plus peer_present / peer_lost aliases) and route them
to that subject instead of the per-device .event.<name> pattern.

Also adds glob support to device_id=... filters using fnmatch, since
NATS/Zenoh wildcards don't span sub-tokens (interlock-* can't be
expressed at the broker). Glob patterns subscribe to the broker
single-token wildcard and filter post-hoc.

  @on(device_id="interlock-*", event_name="peer_lost")
  async def on_any_interlock_lost(self, device_id, event_name, payload):
      await self.set_power(0.0)

Per-device .event.<name> path is unchanged; existing @on subscriptions
are byte-identical. Verified end-to-end on both NATS and Zenoh with the
new examples/peer-lifecycle/ demo.

Closes #26
@soupat
Copy link
Copy Markdown
Collaborator

soupat commented May 12, 2026

Looks great for registry mode! Quick gap I wanted to flag: the device.{online,offline} subjects are registry-published, so D2D drivers using @on(event_name="peer_present"|"peer_lost") would see nothing in D2D mode. Same decorator, different behavior depending on discovery mode -- worth closing the asymmetry.

I put up #31 stacked on this branch as a concrete proposal: PresenceCollector grows a symmetric on_peer_removed callback (fires from _on_presence on departing: true and from _prune_loop on timeout) and a small multi-listener add API; _setup_subscription detects a D2D collector and routes lifecycle handlers via collector callbacks instead of broker subjects. Same @on surface, same alias names, same device_id= filter (exact + glob). 9 new tests, 437 total passing.

Happy to fold #31's diff into this PR if you'd rather keep it one merge, or land it separately after this one -- whatever fits your review style.

@kavya-chennoju kavya-chennoju merged commit cb638b7 into main May 12, 2026
9 checks passed
kavya-chennoju pushed a commit that referenced this pull request May 12, 2026
Stacked on #27. Extends peer lifecycle subscriptions to D2D mode.

## What changes

#27 routes \`@on(event_name=\"peer_present\"|\"peer_lost\")\` to the
registry-published \`device-connect.<tenant>.device.{online,offline}\`
subject. D2D mode has no registry publishing those subjects, so D2D
drivers using the same decorator see nothing.

This PR delivers the same logical events via \`PresenceCollector\`
callbacks when a D2D collector is present on the device runtime. The
\`@on\` surface, alias names, and \`device_id=\` filter (exact + glob)
are unchanged -- mode-transparent behavior.

## How

\`discovery.py\`:
- \`PresenceCollector\` grows symmetric \`on_peer_removed\` support
(fires on \`departing: true\` graceful departure AND on prune-timeout in
\`_prune_loop\`).
- Multi-listener model: constructor-form \`on_new_peer\` /
\`on_peer_removed\` kwargs still work as single-listener seeds; new
\`add_on_new_peer\` / \`add_on_peer_removed\` methods append additional
listeners. The existing \`collector._on_new_peer = ...\` pattern in
\`device.py\` keeps working via a back-compat property.

\`drivers/base.py\`:
- \`_setup_subscription\` checks \`self._device._d2d_collector\` for
lifecycle aliases; if present, the handler is wired via the collector's
new listener methods (\`_setup_lifecycle_d2d\`) instead of subscribing
to the broker subject.
- Per-device events are unaffected; still subscribe through the broker.

Glob and exact \`device_id=\` filters apply post-hoc inside the delivery
wrapper (same \`_device_id_matches\` helper as #27).

## Tests

9 new tests, 437 total passing (30 from #27 + 7 prior lifecycle + new):

\`\`\`
TestLifecycleSubscriptionsD2D
  test_peer_lost_routes_through_collector
  test_peer_present_routes_through_collector
  test_d2d_lifecycle_device_id_filter_exact
  test_d2d_lifecycle_device_id_filter_glob
  test_d2d_per_device_event_still_uses_broker

TestPresenceCollectorRemovedCallback
  test_graceful_departure_fires_on_peer_removed
  test_prune_timeout_fires_on_peer_removed
  test_add_on_peer_removed_supports_multiple_listeners
  test_constructor_callback_coexists_with_listeners
\`\`\`

## Why stacked

This PR depends on #27's \`_LIFECYCLE_ALIAS_TO_CANONICAL\` and
\`_device_id_matches\` helpers; without #27, the D2D path has no
canonical lifecycle name to bind against. Once #27 lands the diff
against \`main\` is the 390 LOC here.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Emit event when a device goes offline

2 participants