From 68a527c0e6a6134aea54a354762685cc619b50e4 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 22 May 2026 15:29:42 -0700 Subject: [PATCH] Add pipeline/parent context to root execution span execution.parent_id and execution.task_id link individual execution traces to their parent pipeline run, enabling correlation with future pipeline-level business logic traces. --- .../instrumentation/execution_tracing.py | 11 ++++++++ .../instrumentation/test_execution_tracing.py | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index 50bfe0f..0da7756 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -140,6 +140,16 @@ def _cache_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: return attrs +def _pipeline_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Parent execution and task identity for the root execution span.""" + attrs: dict[str, object] = {} + if execution.parent_execution_id is not None: + attrs["execution.parent_id"] = execution.parent_execution_id + if execution.task_id_in_parent_execution is not None: + attrs["execution.task_id"] = execution.task_id_in_parent_execution + return attrs + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK). @@ -174,6 +184,7 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: **_launcher_type_attrs(execution=execution), **_cloud_provider_attrs(execution=execution), **_cache_attrs(execution=execution), + **_pipeline_attrs(execution=execution), }, start_time=_ns(dt=first_time), ) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index 1c859d5..4128712 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -318,3 +318,31 @@ def test_cache_hit_sets_hit_true_and_reused_from_id( assert ( root.attributes["execution.cache.reused_from_id"] == "source-execution-id" ) + + +class TestPipelineAttrs: + def test_root_span_carries_parent_and_task_id( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution.parent_execution_id = "parent-exec-id" + execution.task_id_in_parent_execution = "my-task" + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.parent_id"] == "parent-exec-id" + assert root.attributes["execution.task_id"] == "my-task" + + def test_root_execution_omits_parent_attrs_when_absent( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.parent_id" not in (root.attributes or {}) + assert "execution.task_id" not in (root.attributes or {})