From a8e9b03c32d5f38f0d27a2a47d6b8a2fd7d39d01 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 22 May 2026 15:25:40 -0700 Subject: [PATCH] Add launcher type and cluster URL to root execution span execution.launcher = top-level key from launcher_data (e.g. 'kubernetes', 'kubernetes_job', 'skypilot') distinguishes launcher mechanism. k8s.cluster.url on root span allows GKE vs Nebius cluster identification by URL pattern in oasis-backend's multi-cloud setup. --- .../instrumentation/execution_tracing.py | 47 ++++++++++++++++- .../launchers/common_annotations.py | 2 + .../instrumentation/test_execution_tracing.py | 51 +++++++++++++++++++ 3 files changed, 99 insertions(+), 1 deletion(-) diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index efda67e..6b749ac 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -14,6 +14,7 @@ from opentelemetry.trace import StatusCode from .. import backend_types_sql as bts +from ..launchers import common_annotations _logger = logging.getLogger(__name__) _tracer = trace.get_tracer("tangle.orchestrator") @@ -87,6 +88,46 @@ def _launcher_pod_attrs( return attrs +def _launcher_type_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Launcher type and cluster identity for the root execution span. + + Uses the top-level key of launcher_data as the launcher type — forward-compatible + with any launcher implementation. For k8s-family launchers adds cluster_server + so GKE and Nebius clusters (which share the same launcher class in oasis-backend) + can be distinguished by URL pattern. + """ + if execution.container_execution_id is None: + return {} + ce = execution.container_execution + if ce is None or not ce.launcher_data: + return {} + launcher_key = next(iter(ce.launcher_data)) + attrs: dict[str, object] = {"execution.launcher": launcher_key} + inner = ce.launcher_data[launcher_key] + if isinstance(inner, dict) and (cluster_url := inner.get("cluster_server")): + attrs["k8s.cluster.url"] = cluster_url + return attrs + + +def _cloud_provider_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Cloud provider for the root execution span, read from task_spec annotations. + + Returns ``{"execution.cloud_provider": value}`` when the + ``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present, + otherwise an empty dict. Callers (e.g. oasis-backend's MultiLauncherContainerLauncher) + set this annotation at routing time; tangle launchers with a fixed cloud affinity + can set it too. + """ + provider = ( + (execution.task_spec or {}) + .get("annotations", {}) + .get(common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY) + ) + if provider is None: + return {} + return {"execution.cloud_provider": provider} + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK). @@ -116,7 +157,11 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: root = _tracer.start_span( "execution", - attributes={"execution.id": execution.id}, + attributes={ + "execution.id": execution.id, + **_launcher_type_attrs(execution=execution), + **_cloud_provider_attrs(execution=execution), + }, start_time=_ns(dt=first_time), ) root_ctx = trace.set_span_in_context(root) diff --git a/cloud_pipelines_backend/launchers/common_annotations.py b/cloud_pipelines_backend/launchers/common_annotations.py index 01ecdd3..0c865e0 100644 --- a/cloud_pipelines_backend/launchers/common_annotations.py +++ b/cloud_pipelines_backend/launchers/common_annotations.py @@ -9,3 +9,5 @@ CONTAINER_EXECUTION_ID_ANNOTATION_KEY = ( "cloud-pipelines.net/orchestration/container_execution.id" ) + +CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider" diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index 15761cc..fbd8f9c 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -10,6 +10,7 @@ from cloud_pipelines_backend import backend_types_sql as bts from cloud_pipelines_backend.instrumentation import execution_tracing +from cloud_pipelines_backend.launchers import common_annotations @pytest.fixture() @@ -237,3 +238,53 @@ def test_non_pending_span_has_no_k8s_attributes( for span in span_exporter.get_finished_spans(): assert "k8s.pod.name" not in (span.attributes or {}) + + +class TestLauncherTypeAttrs: + def test_root_span_carries_launcher_type( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + # No container_execution on this stub — attribute should be absent. + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.launcher" not in (root.attributes or {}) + + def test_root_span_no_launcher_attrs_without_container_execution( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SKIPPED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.launcher" not in (root.attributes or {}) + + def test_root_span_carries_cloud_provider_when_annotation_set( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution.task_spec = { + "annotations": {common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY: "nebius"} + } + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.cloud_provider"] == "nebius" + + def test_root_span_omits_cloud_provider_when_annotation_absent( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.cloud_provider" not in (root.attributes or {})