From 660f08916e16a53f99f763f830890b3af546a25d Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 14 Apr 2026 16:53:41 +0100 Subject: [PATCH 1/2] fix broken chain of responsibility in otel tracing middleware --- .../transport/middleware/otel_tracing.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index f27a583..b35d68e 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -107,6 +107,72 @@ def wrapped_callback(header, message): return call_next(channel_hint, wrapped_callback, **kwargs) + def raw_send(self, call_next: Callable, destination: str, message, **kwargs): + # Get current span context (may be None if this is the root span) + current_span = trace.get_current_span() + parent_context = ( + trace.set_span_in_context(current_span) if current_span else None + ) + + with self.tracer.start_as_current_span( + "transport.raw_send", + context=parent_context, + ) as span: + self._set_span_attributes(span, destination=destination) + + # Inject the current trace context into the message headers + headers = kwargs.get("headers", {}) + if headers is None: + headers = {} + inject(headers) # This modifies headers in-place + kwargs["headers"] = headers + + return call_next(destination, message, **kwargs) + + def broadcast(self, call_next: Callable, destination: str, message, **kwargs): + # Get current span context (may be None if this is the root span) + current_span = trace.get_current_span() + parent_context = ( + trace.set_span_in_context(current_span) if current_span else None + ) + + with self.tracer.start_as_current_span( + "transport.broadcast", + context=parent_context, + ) as span: + self._set_span_attributes(span, destination=destination) + + # Inject the current trace context into the message headers + headers = kwargs.get("headers", {}) + if headers is None: + headers = {} + inject(headers) # This modifies headers in-place + kwargs["headers"] = headers + + return call_next(destination, message, **kwargs) + + def raw_broadcast(self, call_next: Callable, destination: str, message, **kwargs): + # Get current span context (may be None if this is the root span) + current_span = trace.get_current_span() + parent_context = ( + trace.set_span_in_context(current_span) if current_span else None + ) + + with self.tracer.start_as_current_span( + "transport.raw_broadcast", + context=parent_context, + ) as span: + self._set_span_attributes(span, destination=destination) + + # Inject the current trace context into the message headers + headers = kwargs.get("headers", {}) + if headers is None: + headers = {} + inject(headers) # This modifies headers in-place + kwargs["headers"] = headers + + return call_next(destination, message, **kwargs) + def unsubscribe( self, call_next: Callable, From f7687d26308038a3e5e3f862ba44da78f8f6f62d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:55:42 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/transport/middleware/otel_tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index b35d68e..aed057e 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -172,7 +172,7 @@ def raw_broadcast(self, call_next: Callable, destination: str, message, **kwargs kwargs["headers"] = headers return call_next(destination, message, **kwargs) - + def unsubscribe( self, call_next: Callable,