feat: Implement ProcessorChain for composite metric reporting#2569
feat: Implement ProcessorChain for composite metric reporting#2569drewrelmas wants to merge 15 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2569 +/- ##
==========================================
+ Coverage 88.42% 88.44% +0.01%
==========================================
Files 631 632 +1
Lines 232112 232715 +603
==========================================
+ Hits 205243 205816 +573
- Misses 26345 26375 +30
Partials 524 524
🚀 New features to boost your workflow:
|
|
Marking as draft again, upon further review of the benchmarks I don't think this is a strictly valid comparison at this iteration. Theoretically the chain benchmark should be equivalent or faster than the regular path. |
|
Putting here what we discussed offline: I was wondering if ProcessorChain in this PR can be a ChainProcessor instead :D 1. ChainProcessor (user-facing composition)This is what the PR implements today, but reframed: it's just another processor that allows you to compose multiple processors inside it. It implements the 2. ProcessorChain (engine-level optimization)NOTE: This section is exploratory to share an idea, rather than a specific implementation. This is a separate concept. A processor chain at the engine level would be about optimization, not about adding functionality. The idea would be to separate the pure data processing from message handling in processors, something like: // Full processor — owns its message loop
trait Processor {
async fn process(&mut self, channel, effect_handler);
}
// Pure data transform — no message loop, no channels
trait PDataProcessor {
fn process_pdata(&mut self, pdata: PData) -> Vec<PData>;
}A processor that implements This keeps the two concerns separate:
|
|
Two very brief comments on this PR:
|
|
I like the memory-reduction aspect of the proposal, support on those grounds. This is a comment on the instrumentation question, not on the processor chain technique. This is a "what if" question--what if we placed a logical stopwatch into the Context; using the helper API we have for measuring compute time, we'll just increment the stopwatch and return it to the context. The pipeline already instruments the point where contexts enter and exit a node. When the context exits we record a process-compute-time measurement. This is a meaningful measurement for individual nodes as well as across any pair of connected nodes, accumulating compute time in the direction of travel. |
|
Nice work, the PR description made the design easy to follow. One thing I’m still curious about is whether chained processors are expected to get the full effect-handler/runtime behavior. The intermediate stages look lighter-weight than a top-level processor (Vec-backed, default-port-only, no obvious timer/wakeup support), which seems totally fine for simple transforms/filters, but I’m less sure about processors like If the intent is to support a simpler subset for now, it might be worth either documenting that clearly or rejecting incompatible processors up front in |
I agree with the distinction of whether a processor supports it or not. In this comment the |
|
Is the primary motivation composite metrics? If so, @jmacd's context-propagated stopwatch suggestion seems better suited and worth exploring. It could achieve correct composite duration metrics ( I bring this up because the chaining approach introduces a couple of trade-offs that don't exist in the current model: Scheduling fairness regression. The chain converts N separate tasks (each yielding between batches) into a single task that runs all intermediate stages without yielding. A single processor could already starve others with a long-running implementation, but that's a localized bug. Chaining systematically increases uninterrupted CPU time per task. The total compute time is the sum of all sub-processors in the chain, during which the executor cannot schedule receiver accepts, exporter flushes, or timer ticks. The intermediate Vec-backed sends are plain Rust operations, not async channel sends, so they never give the executor a scheduling opportunity between stages. This could be mitigated with Unbounded intermediate buffer. The intermediate Vec buffer is unbounded. Burden on the user. Both issues above are regressions from the existing pipeline model. As @lalitb and @gouslu noted, we'd want to restrict which processors can be chained. But validating compatibility isn't just about checking for compute-only processors. Even a chain of purely compute/transform processors has the scheduling fairness issue described above. The safety constraints are architectural (chain length, per-processor compute cost, output cardinality), not just a property of the processor type. Chaining has its own meritsThe chain's benefit of memory reduction from eliminating intermediate channels is a valid motivator (as @lquerel noted), separate from composite metrics. But that benefit needs to be weighed against the regressions above, which would need guardrails (yield points, buffer bounds, processor compatibility validation) to reach the safety level of the existing model. |
Change Summary
Implements
ProcessorChain— a composite node that runs multiple sub-processors sequentially within a single task, eliminating inter-processor channels. The chain reports a singlecompute.success.duration(orcompute.failed.duration) metric covering the total compute cost of a record batch passing through all sub-processors, while each sub-processor still reports its own individual duration.Motivation
When a single logical operation involves multiple internal processors for performance optimization (e.g., attributes -> condense -> recordset KQL), operators need a single duration metric representing the total cost of that logical operation per batch. Without this, the only option is statistical aggregation at the metrics layer, which produces incorrect min/max values (
min(A) + min(B) != min(A+B)).The
ProcessorChainapproach gives per-batch composite timing with correct Mmsc distribution (min/max/sum/count).Config syntax
Telemetry output
With
runtime_metrics: normal, this pipeline producescompute.success.durationfor all 5 of the following:node.idnode.typenode.urninsert_Aprocessorurn:otel:processor:attributechainprocessor_chainurn:otel:processor_chain:compositechain/insert_Bprocessorurn:otel:processor:attributechain/insert_Cprocessorurn:otel:processor:attributeinsert_Dprocessorurn:otel:processor:attributeThe composite duration is always >= the sum of sub-processor durations (it includes inter-stage overhead).
Querying metrics locally shows the following result:
[ { "node_id": "insert_A", "node_type": "processor", "node_urn": "urn:otel:processor:attribute", "success": { "avg_ms": 1.2843131327433628, "count": 226 } }, { "node_id": "insert_D", "node_type": "processor", "node_urn": "urn:otel:processor:attribute", "success": { "avg_ms": 0.36642488938053097, "count": 226 } }, { "node_id": "chain", "node_type": "processor_chain", "node_urn": "urn:otel:processor_chain:composite", "success": { "avg_ms": 0.9586159513274336, "count": 226 } }, { "node_id": "chain/insert_B", "node_type": "processor", "node_urn": "urn:otel:processor:attribute", "success": { "avg_ms": 0.4922827168141593, "count": 226 } }, { "node_id": "chain/insert_C", "node_type": "processor", "node_urn": "urn:otel:processor:attribute", "success": { "avg_ms": 0.37540364159292033, "count": 226 } } ]Design decisions
EffectHandlerwired to a sharedRc<RefCell<Vec<PData>>>via aVecSender. When the sub-processor calls send_message(), the output is pushed directly into theVec— no channel send/recv, waker management, or async polling overhead. These buffer handlers are created once at construction and reused for every batch.PDatavalue through each intermediate stage without anyVecoperations. Only when a stage produces 0 or 2+ outputs does it fall back to staging vecs (stage_a/stage_b) which swap roles viastd::mem::swapand retain heap capacity across calls.node.id(e.g.,chain/insert_B) andnode.urnviawith_node_telemetry_handlescoping during factory creation. This ensures sub-processor metrics are clearly separated from the chain's composite metrics in telemetry output.MetricsReporterfrom theCollectTelemetrycontrol message is forwarded to sub-processors so their metric snapshots reach the telemetry registry (not an orphaned channel).NodeKind::ProcessorChain: Reuses the existing (previously unused) config variant. Maps toNodeType::Processorin the engine so it participates in normal processor wiring.Performance
~300µs simulated work per processor (1,000 batches, single-threaded LocalSet):
With realistic per-processor compute (~300µs, approximating processor work), the chain overhead is within noise — effectively zero.
~100ns simulated work per processor (1,000 batches, single-threaded LocalSet):
At trivially low work (100ns per processor), the chain matches or beats separate tasks for
len=1thanks to a single-processor fast path. Forlen>=2, the remaining ~5-8% gap is the cost of per-stageVec/RefCellbookkeeping, which is significant only at these artificially low work levels.The chain's value is not raw throughput — it's the ability to produce a correct composite duration metric (min/max/sum/count) across all sub-processors, which is impossible with separate processors.
Future work
processor_chainconfig switch to opt internal nodes out of reporting their owndurationmetrics: Add processor_chain config switches to enable/disable sub-processor metric reporting #2577What issue does this PR close?
NodeKind::ProcessorChain#2556processor_chain#2579NodeUserConfigforProcessorChainprocessorconfigs #2576How are these changes tested?
Unit tests, benchmarks, and running fake config locally
Are there any user-facing changes?
Yes, users can now configure
processor_chainelements in their configuration.