Skip to content

feat(trigger-service): resume-subscription RPCs (TS-IMPL-016)#669

Merged
toddysm merged 2 commits into
mainfrom
ts-impl-016-resume-rpcs
Jun 5, 2026
Merged

feat(trigger-service): resume-subscription RPCs (TS-IMPL-016)#669
toddysm merged 2 commits into
mainfrom
ts-impl-016-resume-rpcs

Conversation

@toddysm
Copy link
Copy Markdown
Owner

@toddysm toddysm commented Jun 5, 2026

TS-IMPL-016: Resume RPCs RegisterResumeSubscription / CancelResumeSubscription (REQ-081)

Implements the internal Dapr method routes the Workflow Service calls across each waitFor: step's lifecycle.

What

  • POST /RegisterResumeSubscription — register (or idempotently re-register) a one-shot resume wait keyed on the (runId, stepId, eventKey) triple:
    • Replay of a still-live wait returns the existing subscriptionId with no write.
    • Divergent selector → original wins + a resume.subscription.divergent audit event.
    • A wait whose prior row has lapsed past its TTL is treated as a fresh registration.
    • The CEL selector is compiled at register time (malformed → 422 Problem+JSON).
    • TTL parsed from the request ISO-8601 duration (e.g. PT24H), falling back to TRIGGER_RESUME_DEFAULT_TTL_SECONDS.
  • POST /CancelResumeSubscription — idempotent cancel; a clean 204 no-op for unknown/expired keys.

How

  • Routes match the exact wire shapes the WF DaprTriggerServiceClient sends ({runId, stepId, eventKey, selector, ttl}{subscriptionId}; cancel {runId, stepId, eventKey} → 2xx).
  • These are internal service-to-service calls authenticated at the Dapr mesh layer (mTLS + app-id), not via the call-context envelope — both method paths are added to the call-context middleware bypass set (the WF caller propagates no x-custos-callctx header).
  • A resume registration carries no workspace; the globally-unique (runId, stepId, eventKey) triple keys it, so rows partition under a reserved workspace sentinel. resume_id is a deterministic NUL-joined SHA-256 of the triple.
  • New ResumeReadable capability Protocol + ResumeSubscriptionStore.get (mirrors the SubscriptionReadable pattern) for the idempotency lookup.
  • New RPC wire models (RegisterResumeRequest / RegisterResumeResponse / CancelResumeRequest).
  • create_app gains an audit_sink param wiring the resume RPCs and the internally-built dispatcher to one shared sink (defaults to NoopAuditSink until TS-IMPL-019); resume-default-TTL is bound on app.state.

Tests

tests/test_rpc_routes.py drives the routes through a real TestClient over the in-process store: response shape, idempotency, divergence-audits-original-wins, TTL-expiry refresh, TTL parsing/fallback, invalid-selector 422, missing-field 400, cancel (known/unknown/then-reregister), middleware-bypass assertion, ISO-8601 duration parser table, and the read-capability guard.

Gates: ruff check / ruff format --check / mypy src tests clean; 417 passed, total coverage 98.98% (rpc.py 100%).

Closes #646

Implement the internal Dapr method routes the Workflow Service calls across
each `waitFor:` step's lifecycle:

- `POST /RegisterResumeSubscription` — register (or idempotently re-register)
  a one-shot resume wait keyed on `(runId, stepId, eventKey)`. A replay returns
  the existing `subscriptionId`; a divergent selector keeps the original and
  emits a `resume.subscription.divergent` audit (original wins). A wait whose
  prior row has lapsed past its TTL is treated as a fresh registration. The CEL
  selector is compiled at register time (malformed -> 422). TTL is parsed from
  the request ISO-8601 duration, falling back to
  `TRIGGER_RESUME_DEFAULT_TTL_SECONDS`.
- `POST /CancelResumeSubscription` — idempotent cancel; a clean 204 no-op for
  unknown/expired keys.

The routes match the exact wire shapes the WF `DaprTriggerServiceClient` sends
and are authenticated at the Dapr mesh layer, so they bypass the call-context
middleware. Resume rows partition under a reserved workspace sentinel since the
`(runId, stepId, eventKey)` triple is globally unique.

Adds a `ResumeReadable` capability Protocol + `ResumeSubscriptionStore.get`,
the RPC wire models, an `audit_sink` create_app param wiring the resume RPCs
and dispatcher to a shared sink, and a resume-default-TTL app-state binding.

Closes #646
Copilot AI review requested due to automatic review settings June 5, 2026 05:56
@toddysm toddysm added type:implementation Implementation work item phase:implementation Implementation phase component:trigger-service Trigger Service component labels Jun 5, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds the Trigger Service’s internal Dapr service-invocation RPC surface that the Workflow Service uses to register/cancel one-shot “resume waits” for waitFor: steps, keyed idempotently by (runId, stepId, eventKey). It also wires the needed app dependencies (audit sink, default TTL, call-context bypass) and introduces tests that exercise the RPC contract end-to-end against the in-memory metadata store.

Changes:

  • Add internal RPC routes POST /RegisterResumeSubscription and POST /CancelResumeSubscription with idempotency/divergence-audit semantics and ISO-8601 TTL handling.
  • Introduce a ResumeReadable capability + ResumeSubscriptionStore.get() for idempotency lookups, and expose new RPC wire models.
  • Wire app state/dependencies for audit_sink and default resume TTL; add call-context middleware bypass paths and comprehensive route tests.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/services/trigger-service/src/custos_trigger/api/routes/rpc.py Implements the two internal resume RPC endpoints, ID derivation, TTL parsing/resolution, and audit emission.
src/services/trigger-service/src/custos_trigger/app.py Mounts the new router and wires audit_sink + resume_default_ttl_seconds into app.state; passes audit sink into the internally-built dispatcher.
src/services/trigger-service/src/custos_trigger/dependencies.py Adds FastAPI dependencies for audit sink, resume store, and default TTL.
src/services/trigger-service/src/custos_trigger/middleware/callctx.py Adds the resume RPC method paths to the call-context bypass set.
src/services/trigger-service/src/custos_trigger/models.py Adds wire models for register/cancel resume RPC request/response bodies.
src/services/trigger-service/src/custos_trigger/stores/base.py Adds the ResumeReadable protocol capability for optional resume-row read-back.
src/services/trigger-service/src/custos_trigger/stores/resume.py Adds ResumeSubscriptionStore.get() and supporting types/errors for idempotency reads.
src/services/trigger-service/src/custos_trigger/stores/init.py Re-exports new resume store capability/error/types.
src/services/trigger-service/src/custos_trigger/api/routes/init.py Exposes the new resume RPC router for app mounting.
src/services/trigger-service/tests/test_rpc_routes.py New end-to-end tests for route shape, idempotency, divergence auditing, TTL handling, selector validation, cancel behavior, and bypass set.
design/components/trigger-service/todos.md Marks TS-IMPL-016 as complete.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/services/trigger-service/src/custos_trigger/api/routes/rpc.py
Comment thread src/services/trigger-service/src/custos_trigger/stores/resume.py
…e read-unsupported

Address Copilot review on #669:
- Compile the CEL selector up front on every RegisterResumeSubscription path
  (including an idempotent replay) so a malformed selector yields a 422 rather
  than being accepted when an already-live wait exists.
- Add a route-local trigger.api.resume_read_unsupported kind (501) + handler so
  a backend with no resume read surface fails as a stable Problem+JSON rather
  than an opaque 500, mirroring SubscriptionReadUnsupportedError.
@toddysm toddysm merged commit ff5e2e2 into main Jun 5, 2026
30 checks passed
@toddysm toddysm deleted the ts-impl-016-resume-rpcs branch June 5, 2026 06:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component:trigger-service Trigger Service component phase:implementation Implementation phase type:implementation Implementation work item

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TS-IMPL-016: Resume RPCs Register/Cancel ResumeSubscription (REQ-081)

2 participants