Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/workflows/transport/middleware/otel_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,72 @@ def wrapped_callback(header, message):

return call_next(channel_hint, wrapped_callback, **kwargs)

def raw_send(self, call_next: Callable, destination: str, message, **kwargs):
# Get current span context (may be None if this is the root span)
current_span = trace.get_current_span()
parent_context = (
trace.set_span_in_context(current_span) if current_span else None
)

with self.tracer.start_as_current_span(
"transport.raw_send",
context=parent_context,
) as span:
self._set_span_attributes(span, destination=destination)

# Inject the current trace context into the message headers
headers = kwargs.get("headers", {})
if headers is None:
headers = {}
inject(headers) # This modifies headers in-place
kwargs["headers"] = headers

return call_next(destination, message, **kwargs)

def broadcast(self, call_next: Callable, destination: str, message, **kwargs):
# Get current span context (may be None if this is the root span)
current_span = trace.get_current_span()
parent_context = (
trace.set_span_in_context(current_span) if current_span else None
)

with self.tracer.start_as_current_span(
"transport.broadcast",
context=parent_context,
) as span:
self._set_span_attributes(span, destination=destination)

# Inject the current trace context into the message headers
headers = kwargs.get("headers", {})
if headers is None:
headers = {}
inject(headers) # This modifies headers in-place
kwargs["headers"] = headers

return call_next(destination, message, **kwargs)

def raw_broadcast(self, call_next: Callable, destination: str, message, **kwargs):
# Get current span context (may be None if this is the root span)
current_span = trace.get_current_span()
parent_context = (
trace.set_span_in_context(current_span) if current_span else None
)

with self.tracer.start_as_current_span(
"transport.raw_broadcast",
context=parent_context,
) as span:
self._set_span_attributes(span, destination=destination)

# Inject the current trace context into the message headers
headers = kwargs.get("headers", {})
if headers is None:
headers = {}
inject(headers) # This modifies headers in-place
kwargs["headers"] = headers

return call_next(destination, message, **kwargs)

def unsubscribe(
self,
call_next: Callable,
Expand Down
Loading