feat(trigger-service): resume-subscription RPCs (TS-IMPL-016)#669
Merged
Conversation
Implement the internal Dapr method routes the Workflow Service calls across each `waitFor:` step's lifecycle: - `POST /RegisterResumeSubscription` — register (or idempotently re-register) a one-shot resume wait keyed on `(runId, stepId, eventKey)`. A replay returns the existing `subscriptionId`; a divergent selector keeps the original and emits a `resume.subscription.divergent` audit (original wins). A wait whose prior row has lapsed past its TTL is treated as a fresh registration. The CEL selector is compiled at register time (malformed -> 422). TTL is parsed from the request ISO-8601 duration, falling back to `TRIGGER_RESUME_DEFAULT_TTL_SECONDS`. - `POST /CancelResumeSubscription` — idempotent cancel; a clean 204 no-op for unknown/expired keys. The routes match the exact wire shapes the WF `DaprTriggerServiceClient` sends and are authenticated at the Dapr mesh layer, so they bypass the call-context middleware. Resume rows partition under a reserved workspace sentinel since the `(runId, stepId, eventKey)` triple is globally unique. Adds a `ResumeReadable` capability Protocol + `ResumeSubscriptionStore.get`, the RPC wire models, an `audit_sink` create_app param wiring the resume RPCs and dispatcher to a shared sink, and a resume-default-TTL app-state binding. Closes #646
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds the Trigger Service’s internal Dapr service-invocation RPC surface that the Workflow Service uses to register/cancel one-shot “resume waits” for waitFor: steps, keyed idempotently by (runId, stepId, eventKey). It also wires the needed app dependencies (audit sink, default TTL, call-context bypass) and introduces tests that exercise the RPC contract end-to-end against the in-memory metadata store.
Changes:
- Add internal RPC routes
POST /RegisterResumeSubscriptionandPOST /CancelResumeSubscriptionwith idempotency/divergence-audit semantics and ISO-8601 TTL handling. - Introduce a
ResumeReadablecapability +ResumeSubscriptionStore.get()for idempotency lookups, and expose new RPC wire models. - Wire app state/dependencies for
audit_sinkand default resume TTL; add call-context middleware bypass paths and comprehensive route tests.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/services/trigger-service/src/custos_trigger/api/routes/rpc.py | Implements the two internal resume RPC endpoints, ID derivation, TTL parsing/resolution, and audit emission. |
| src/services/trigger-service/src/custos_trigger/app.py | Mounts the new router and wires audit_sink + resume_default_ttl_seconds into app.state; passes audit sink into the internally-built dispatcher. |
| src/services/trigger-service/src/custos_trigger/dependencies.py | Adds FastAPI dependencies for audit sink, resume store, and default TTL. |
| src/services/trigger-service/src/custos_trigger/middleware/callctx.py | Adds the resume RPC method paths to the call-context bypass set. |
| src/services/trigger-service/src/custos_trigger/models.py | Adds wire models for register/cancel resume RPC request/response bodies. |
| src/services/trigger-service/src/custos_trigger/stores/base.py | Adds the ResumeReadable protocol capability for optional resume-row read-back. |
| src/services/trigger-service/src/custos_trigger/stores/resume.py | Adds ResumeSubscriptionStore.get() and supporting types/errors for idempotency reads. |
| src/services/trigger-service/src/custos_trigger/stores/init.py | Re-exports new resume store capability/error/types. |
| src/services/trigger-service/src/custos_trigger/api/routes/init.py | Exposes the new resume RPC router for app mounting. |
| src/services/trigger-service/tests/test_rpc_routes.py | New end-to-end tests for route shape, idempotency, divergence auditing, TTL handling, selector validation, cancel behavior, and bypass set. |
| design/components/trigger-service/todos.md | Marks TS-IMPL-016 as complete. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…e read-unsupported Address Copilot review on #669: - Compile the CEL selector up front on every RegisterResumeSubscription path (including an idempotent replay) so a malformed selector yields a 422 rather than being accepted when an already-live wait exists. - Add a route-local trigger.api.resume_read_unsupported kind (501) + handler so a backend with no resume read surface fails as a stable Problem+JSON rather than an opaque 500, mirroring SubscriptionReadUnsupportedError.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TS-IMPL-016: Resume RPCs
RegisterResumeSubscription/CancelResumeSubscription(REQ-081)Implements the internal Dapr method routes the Workflow Service calls across each
waitFor:step's lifecycle.What
POST /RegisterResumeSubscription— register (or idempotently re-register) a one-shot resume wait keyed on the(runId, stepId, eventKey)triple:subscriptionIdwith no write.resume.subscription.divergentaudit event.selectoris compiled at register time (malformed →422Problem+JSON).PT24H), falling back toTRIGGER_RESUME_DEFAULT_TTL_SECONDS.POST /CancelResumeSubscription— idempotent cancel; a clean204no-op for unknown/expired keys.How
DaprTriggerServiceClientsends ({runId, stepId, eventKey, selector, ttl}→{subscriptionId}; cancel{runId, stepId, eventKey}→ 2xx).x-custos-callctxheader).(runId, stepId, eventKey)triple keys it, so rows partition under a reserved workspace sentinel.resume_idis a deterministic NUL-joined SHA-256 of the triple.ResumeReadablecapability Protocol +ResumeSubscriptionStore.get(mirrors theSubscriptionReadablepattern) for the idempotency lookup.RegisterResumeRequest/RegisterResumeResponse/CancelResumeRequest).create_appgains anaudit_sinkparam wiring the resume RPCs and the internally-built dispatcher to one shared sink (defaults toNoopAuditSinkuntil TS-IMPL-019); resume-default-TTL is bound onapp.state.Tests
tests/test_rpc_routes.pydrives the routes through a realTestClientover the in-process store: response shape, idempotency, divergence-audits-original-wins, TTL-expiry refresh, TTL parsing/fallback, invalid-selector 422, missing-field 400, cancel (known/unknown/then-reregister), middleware-bypass assertion, ISO-8601 duration parser table, and the read-capability guard.Gates:
ruff check/ruff format --check/mypy src testsclean; 417 passed, total coverage 98.98% (rpc.py100%).Closes #646