feat(trigger-service): add pipeline dispatcher (TS-IMPL-014)#667
Merged
Conversation
Add the dispatcher that turns start/resume matches into Workflow Service StartRun / RaiseExternalEvent calls. Transient failures retry with exponential backoff up to TRIGGER_DISPATCH_MAX_RETRIES; permanent failures and exhausted budgets dead-letter with a trigger.dispatch.failed audit. The dispatch runs inside the dedup guard so the dedup key commits only after a confirmed dispatch and rolls back on failure. A per-tenant fan-out depth limit (TRIGGER_FANOUT_MAX_DEPTH) rejects looping chains and audits trigger.loop.detected. Every RPC carries the deterministic dedup key as its idempotencyKey. Adds an AuditSink protocol with a Noop default until TS-IMPL-019. Closes #644
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the Trigger Service “Dispatcher” (TS-IMPL-014), the final stage of the trigger pipeline that turns matched subscriptions into Workflow Service internal RPC calls with dedup-commit ordering, bounded retries, dead-lettering, and fan-out loop protection.
Changes:
- Adds
Dispatcherwith start/resume dispatch arms, exponential backoff retry, dead-letter outcomes, and depth-limit loop rejection + audit sink surface. - Introduces
TRIGGER_FANOUT_MAX_DEPTHsetting (default 16) and exports dispatcher symbols fromcustos_trigger.pipeline. - Adds comprehensive dispatcher unit tests plus settings coverage for the new knob; marks TS-IMPL-014 complete in the trigger-service design TODOs.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/services/trigger-service/src/custos_trigger/pipeline/dispatch.py | New dispatcher implementation with retry/dead-letter/dedup-guard ordering and loop-depth guard + auditing. |
| src/services/trigger-service/src/custos_trigger/settings.py | Adds TRIGGER_FANOUT_MAX_DEPTH configuration wiring and default. |
| src/services/trigger-service/src/custos_trigger/pipeline/init.py | Exports dispatcher types/surface from the pipeline package and updates module docstring. |
| src/services/trigger-service/tests/test_dispatch.py | New test suite covering start/resume dispatch, retry/dead-letter behavior, dedup rollback, and loop guard. |
| src/services/trigger-service/tests/test_settings.py | Extends settings tests to include the new fanout depth knob. |
| design/components/trigger-service/todos.md | Marks TS-IMPL-014 as completed. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… + docs Wire the new fan-out depth limit into the trigger-service Helm chart (values.fanoutMaxDepth, ConfigMap TRIGGER_FANOUT_MAX_DEPTH), document it in design.md Configuration + Failure Modes, and add it to the chart README env table so operators can tune the loop guard in deployments. Extends the helm render test to assert the rendered default.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements TS-IMPL-014 — the Dispatcher, the tail of the linear pipeline
(
Classify → Match → Dedup → Dispatch). It turns matches into Workflow ServiceInternal RPCs with retry, dead-letter, dedup-commit ordering, and fan-out loop
protection.
Closes #644What's included
custos_trigger/pipeline/dispatch.py—Dispatcher:StartRun(workflowVersionId, inputs)via theWorkflowServiceClient; resume match →RaiseExternalEvent(runId, stepId, eventName, payload).408/429/5xx,transport) are retried with exponential backoff up to
TRIGGER_DISPATCH_MAX_RETRIES; permanent failures and exhausted budgetsdead-letter with a
trigger.dispatch.failedaudit event.Deduplicator.guard, so the dedup key is rolled back on failure and theredelivery can re-attempt (design § Failure Modes row 1).
depthexceeds the per-tenant
TRIGGER_FANOUT_MAX_DEPTHlimit is rejected andtrigger.loop.detectedis audited.idempotencyKeyforthe hard idempotency guarantee.
AuditSinkprotocol +NoopAuditSinkdefault (real OTel/audit lands inTS-IMPL-019);
DispatchOutcome/DispatchStatusresult types.settings.py— adds theTRIGGER_FANOUT_MAX_DEPTHknob (default16).pipeline/__init__.py— exports the dispatcher surface.tests/test_dispatch.pycovering both arms, duplicate skip,retry→success, retry-exhaustion→dead-letter, non-retryable→immediate
dead-letter, dedup rollback on failure, loop rejection + boundary, and the
Noop audit/default-sink paths (100% on
dispatch.py); plus settings coveragefor the new knob.
Notes
inputMapping${{ … }}placeholder resolution is performed by the receiverbefore handing the subscription to the dispatcher; the dispatcher forwards the
resolved
input_mappingas the start inputs.no list surface), matching the TS-IMPL-012 matcher design.
Acceptance
ruff/ruff format/mypyclean;pytest≥90% (99.9% total, dispatch.py 100%).