Skip to content

feat: Implement ProcessorChain for composite metric reporting#2569

Open
drewrelmas wants to merge 15 commits intoopen-telemetry:mainfrom
drewrelmas:drewrelmas/processor_chain_2
Open

feat: Implement ProcessorChain for composite metric reporting#2569
drewrelmas wants to merge 15 commits intoopen-telemetry:mainfrom
drewrelmas:drewrelmas/processor_chain_2

Conversation

@drewrelmas
Copy link
Copy Markdown
Contributor

@drewrelmas drewrelmas commented Apr 7, 2026

Change Summary

Implements ProcessorChain — a composite node that runs multiple sub-processors sequentially within a single task, eliminating inter-processor channels. The chain reports a single compute.success.duration (orcompute.failed.duration) metric covering the total compute cost of a record batch passing through all sub-processors, while each sub-processor still reports its own individual duration.

Motivation

When a single logical operation involves multiple internal processors for performance optimization (e.g., attributes -> condense -> recordset KQL), operators need a single duration metric representing the total cost of that logical operation per batch. Without this, the only option is statistical aggregation at the metrics layer, which produces incorrect min/max values (min(A) + min(B) != min(A+B)).

The ProcessorChain approach gives per-batch composite timing with correct Mmsc distribution (min/max/sum/count).

Config syntax

version: otel_dataflow/v1
engine: {}
groups:
  default:
    pipelines:
      main:
        policies:
          telemetry:
            runtime_metrics: normal
          channel_capacity:
            control:
              node: 100
              pipeline: 100
            pdata: 128

        nodes:
          receiver:
            type: receiver:traffic_generator
            config:
              traffic_config:
                max_batch_size: 1
                signals_per_second: 1
                log_weight: 100
              registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
          insert_A:
            type: processor:attribute
            config:
              actions:
                - action: insert
                  key: pre_chain
                  value: A
          chain:
            type: processor_chain:composite
            config:
              processors:
                insert_B:
                  type: processor:attribute
                  config:
                    actions:
                      - action: insert
                        key: chain_step_1
                        value: B
                insert_C:
                  type: processor:attribute
                  config:
                    actions:
                      - action: insert
                        key: chain_step_2
                        value: C
          insert_D:
            type: processor:attribute
            config:
              actions:
                - action: insert
                  key: post_chain
                  value: D
          debug:
            type: processor:debug
            config:
              verbosity: detailed
              mode: signal
          noop:
            type: exporter:noop
            config:

        connections:
          - from: receiver
            to: insert_A
          - from: insert_A
            to: chain
          - from: chain
            to: insert_D
          - from: insert_D
            to: debug
          - from: debug
            to: noop

Telemetry output

With runtime_metrics: normal, this pipeline produces compute.success.duration for all 5 of the following:

node.id node.type node.urn
insert_A processor urn:otel:processor:attribute
chain processor_chain urn:otel:processor_chain:composite
chain/insert_B processor urn:otel:processor:attribute
chain/insert_C processor urn:otel:processor:attribute
insert_D processor urn:otel:processor:attribute

The composite duration is always >= the sum of sub-processor durations (it includes inter-stage overhead).

Querying metrics locally shows the following result:

curl -s "http://127.0.0.1:8080/api/v1/telemetry/metrics?format=json" | jq '[.metric_sets[] | select(.name == "processor.compute") | {node_id: .attributes["node.id"].String, node_type: .attributes["node.type"].String, node_urn: .attributes["node.urn"].String, success: (.metrics[] | select(.name == "compute.success.duration") | .value | {avg_ms: (if .count > 0 then .sum / .count / 1e6 else 0 end), count})}]'

[
  {
    "node_id": "insert_A",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 1.2843131327433628,
      "count": 226
    }
  },
  {
    "node_id": "insert_D",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.36642488938053097,
      "count": 226
    }
  },
  {
    "node_id": "chain",
    "node_type": "processor_chain",
    "node_urn": "urn:otel:processor_chain:composite",
    "success": {
      "avg_ms": 0.9586159513274336,
      "count": 226
    }
  },
  {
    "node_id": "chain/insert_B",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.4922827168141593,
      "count": 226
    }
  },
  {
    "node_id": "chain/insert_C",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.37540364159292033,
      "count": 226
    }
  }
]

Design decisions

  • Vec-backed buffers: Each intermediate sub-processor (all except the last) gets a buffer EffectHandler wired to a shared Rc<RefCell<Vec<PData>>> via a VecSender. When the sub-processor calls send_message(), the output is pushed directly into the Vec — no channel send/recv, waker management, or async polling overhead. These buffer handlers are created once at construction and reused for every batch.
  • Single-item fast path: For chains of length 1, the chain delegates directly to the sole sub-processor with zero staging overhead. For length ≥ 2, an optimistic fast path threads a single PData value through each intermediate stage without any Vec operations. Only when a stage produces 0 or 2+ outputs does it fall back to staging vecs (stage_a/stage_b) which swap roles via std::mem::swap and retain heap capacity across calls.
  • Last sub-processor uses real EffectHandler: The last sub-processor in the chain doesn't need a buffer — it sends directly through the real downstream channel, like any normal processor would.
  • Sub-processor entity registration: Each sub-processor gets a distinct telemetry entity with its own node.id (e.g., chain/insert_B) and node.urn via with_node_telemetry_handle scoping during factory creation. This ensures sub-processor metrics are clearly separated from the chain's composite metrics in telemetry output.
  • CollectTelemetry forwarding: The real MetricsReporter from the CollectTelemetry control message is forwarded to sub-processors so their metric snapshots reach the telemetry registry (not an orphaned channel).
  • NodeKind::ProcessorChain: Reuses the existing (previously unused) config variant. Maps to NodeType::Processor in the engine so it participates in normal processor wiring.

Performance

~300µs simulated work per processor (1,000 batches, single-threaded LocalSet):

Chain len Chained (ms) Separate (ms) Overhead
1 301.6 301.4 +0.1%
2 603.4 604.0 -0.1%
3 904.9 904.9 0.0%

With realistic per-processor compute (~300µs, approximating processor work), the chain overhead is within noise — effectively zero.

~100ns simulated work per processor (1,000 batches, single-threaded LocalSet):

Chain len Chained (µs) Separate (µs) Overhead
1 305 320 -5%
2 713 661 +8%
3 1,033 990 +4%

At trivially low work (100ns per processor), the chain matches or beats separate tasks for len=1 thanks to a single-processor fast path. For len>=2, the remaining ~5-8% gap is the cost of per-stage Vec/RefCell bookkeeping, which is significant only at these artificially low work levels.

The chain's value is not raw throughput — it's the ability to produce a correct composite duration metric (min/max/sum/count) across all sub-processors, which is impossible with separate processors.

Future work

What issue does this PR close?

How are these changes tested?

Unit tests, benchmarks, and running fake config locally

Are there any user-facing changes?

Yes, users can now configure processor_chain elements in their configuration.

@github-actions github-actions bot added the rust Pull requests that update Rust code label Apr 7, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 92.86872% with 44 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.44%. Comparing base (9b4b8dc) to head (692f119).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2569      +/-   ##
==========================================
+ Coverage   88.42%   88.44%   +0.01%     
==========================================
  Files         631      632       +1     
  Lines      232112   232715     +603     
==========================================
+ Hits       205243   205816     +573     
- Misses      26345    26375      +30     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 90.25% <92.86%> (+0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.74% <ø> (ø)
otel-arrow-go 52.45% <ø> (ø)
quiver 92.27% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@drewrelmas drewrelmas changed the title Implement ProcessorChain for composite metric reporting feat: Implement ProcessorChain for composite metric reporting Apr 7, 2026
Copy link
Copy Markdown
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@drewrelmas drewrelmas marked this pull request as ready for review April 7, 2026 21:19
@drewrelmas drewrelmas requested a review from a team as a code owner April 7, 2026 21:19
@drewrelmas
Copy link
Copy Markdown
Contributor Author

drewrelmas commented Apr 7, 2026

Marking as draft again, upon further review of the benchmarks I don't think this is a strictly valid comparison at this iteration. Theoretically the chain benchmark should be equivalent or faster than the regular path.

@drewrelmas drewrelmas marked this pull request as draft April 7, 2026 22:08
@drewrelmas drewrelmas marked this pull request as ready for review April 8, 2026 18:48
@gouslu
Copy link
Copy Markdown
Contributor

gouslu commented Apr 8, 2026

Putting here what we discussed offline:

I was wondering if ProcessorChain in this PR can be a ChainProcessor instead :D

1. ChainProcessor (user-facing composition)

This is what the PR implements today, but reframed: it's just another processor that allows you to compose multiple processors inside it. It implements the Processor trait, it's registered via a normal ProcessorFactory, and ideally requires minimal or no engine changes. The engine doesn't need to know it's special — it's a processor of processors and can have more logic in it.

2. ProcessorChain (engine-level optimization)

NOTE: This section is exploratory to share an idea, rather than a specific implementation.

This is a separate concept. A processor chain at the engine level would be about optimization, not about adding functionality. The idea would be to separate the pure data processing from message handling in processors, something like:

// Full processor — owns its message loop
trait Processor {
    async fn process(&mut self, channel, effect_handler);
}

// Pure data transform — no message loop, no channels
trait PDataProcessor {
    fn process_pdata(&mut self, pdata: PData) -> Vec<PData>;
}

A processor that implements PDataProcessor advertises that its data processing is pure and stateless with respect to the message loop. The engine can then automatically fuse linear sequences of PDataProcessors into a single task — no user configuration needed, no new YAML syntax. The engine handles the message loop once at the outer level and just calls a.process_pdata()b.process_pdata()c.process_pdata() internally.

This keeps the two concerns separate:

  • ChainProcessor: user wants to group processors explicitly for composite metrics or organizational reasons — it's just a processor, no engine changes needed.
  • ProcessorChain: engine detects fusable processors and optimizes them transparently — this is an engine concern, not a user concern.

@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Apr 8, 2026

Two very brief comments on this PR:

  • The benefit of chaining processors cannot be measured only in terms of CPU usage. The main benefit is in memory consumption. By eliminating channels, we reduce the amount of pdata accumulated in transit through those channels, and therefore reduce memory usage across the entire pipeline.
  • Originally, the idea behind the NodeKind::ProcessorChain variant was to allow the engine to automatically infer processor chains in order to transparently eliminate channel overhead, especially memory overhead. What I like about this PR is that, as a first step, we can make this concept explicit and then add support for this optimization in a future PR.
    • The difference I see between the explicit version and the implicit version is the transparency of this optimization at the level of the generated entities and signals. In the current PR, the sub-processors, and therefore the corresponding entities, are scoped by the chain, which seems like the logical behavior a user would expect.
    • For the future implicit version, ideally we should preserve transparency, meaning that the processor entities would remain the original ones, with however one additional entity corresponding to the chain, as in the explicit version.

@jmacd
Copy link
Copy Markdown
Contributor

jmacd commented Apr 9, 2026

I like the memory-reduction aspect of the proposal, support on those grounds.

This is a comment on the instrumentation question, not on the processor chain technique. This is a "what if" question--what if we placed a logical stopwatch into the Context; using the helper API we have for measuring compute time, we'll just increment the stopwatch and return it to the context. The pipeline already instruments the point where contexts enter and exit a node. When the context exits we record a process-compute-time measurement. This is a meaningful measurement for individual nodes as well as across any pair of connected nodes, accumulating compute time in the direction of travel.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Apr 9, 2026

Nice work, the PR description made the design easy to follow.

One thing I’m still curious about is whether chained processors are expected to get the full effect-handler/runtime behavior. The intermediate stages look lighter-weight than a top-level processor (Vec-backed, default-port-only, no obvious timer/wakeup support), which seems totally fine for simple transforms/filters, but I’m less sure about processors like retry_processor or fanout_processor that depend on richer runtime behavior. Since the config still accepts a full sub-processor node config, that mismatch would likely only show up at runtime.

If the intent is to support a simpler subset for now, it might be worth either documenting that clearly or rejecting incompatible processors up front in create_processor_chain.

@gouslu
Copy link
Copy Markdown
Contributor

gouslu commented Apr 9, 2026

Nice work, the PR description made the design easy to follow.

One thing I’m still curious about is whether chained processors are expected to get the full effect-handler/runtime behavior. The intermediate stages look lighter-weight than a top-level processor (Vec-backed, default-port-only, no obvious timer/wakeup support), which seems totally fine for simple transforms/filters, but I’m less sure about processors like retry_processor or fanout_processor that depend on richer runtime behavior. Since the config still accepts a full sub-processor node config, that mismatch would likely only show up at runtime.

If the intent is to support a simpler subset for now, it might be worth either documenting that clearly or rejecting incompatible processors up front in create_processor_chain.

I agree with the distinction of whether a processor supports it or not. In this comment the PDataProcessor idea (or sth like that) was meant to address that need. It would be good to have processors distinguish themselves.

@utpilla
Copy link
Copy Markdown
Contributor

utpilla commented Apr 9, 2026

Is the primary motivation composite metrics? If so, @jmacd's context-propagated stopwatch suggestion seems better suited and worth exploring. It could achieve correct composite duration metrics (min/max/sum/count across a group of processors) without architectural changes. Processors would stay as separate tasks with bounded channels, no engine changes needed, and it could work across any pair of connected nodes rather than only explicitly configured chains. There are implementation details to work out (attaching timing to PData, defining composite groups, handling fan-out), but the core idea avoids the trade-offs below.

I bring this up because the chaining approach introduces a couple of trade-offs that don't exist in the current model:

Scheduling fairness regression. The chain converts N separate tasks (each yielding between batches) into a single task that runs all intermediate stages without yielding. A single processor could already starve others with a long-running implementation, but that's a localized bug. Chaining systematically increases uninterrupted CPU time per task. The total compute time is the sum of all sub-processors in the chain, during which the executor cannot schedule receiver accepts, exporter flushes, or timer ticks. The intermediate Vec-backed sends are plain Rust operations, not async channel sends, so they never give the executor a scheduling opportunity between stages. This could be mitigated with yield_now().await between stages.

Unbounded intermediate buffer. The intermediate Vec buffer is unbounded. push() always succeeds with no capacity check. In practice this is fine for well-behaved 1:1 processors (one output per input), which is the common case. However, the existing model's bounded mpsc channels put a ceiling on how much memory one stage can consume at a time. If a processor calls send_message() multiple times per input, the channel's capacity limit applies backpressure by yielding when full. The chain removes that ceiling.

Burden on the user. Both issues above are regressions from the existing pipeline model. As @lalitb and @gouslu noted, we'd want to restrict which processors can be chained. But validating compatibility isn't just about checking for compute-only processors. Even a chain of purely compute/transform processors has the scheduling fairness issue described above. The safety constraints are architectural (chain length, per-processor compute cost, output cardinality), not just a property of the processor type.

Chaining has its own merits

The chain's benefit of memory reduction from eliminating intermediate channels is a valid motivator (as @lquerel noted), separate from composite metrics. But that benefit needs to be weighed against the regressions above, which would need guardrails (yield points, buffer bounds, processor compatibility validation) to reach the safety level of the existing model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Initial implementation of processor_chain Use NodeUserConfig for ProcessorChain processor configs

6 participants