Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion cloud_pipelines_backend/instrumentation/execution_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opentelemetry.trace import StatusCode

from .. import backend_types_sql as bts
from ..launchers import common_annotations

_logger = logging.getLogger(__name__)
_tracer = trace.get_tracer("tangle.orchestrator")
Expand Down Expand Up @@ -87,6 +88,46 @@ def _launcher_pod_attrs(
return attrs


def _launcher_type_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]:
"""Launcher type and cluster identity for the root execution span.

Uses the top-level key of launcher_data as the launcher type — forward-compatible
with any launcher implementation. For k8s-family launchers adds cluster_server
so GKE and Nebius clusters (which share the same launcher class in oasis-backend)
can be distinguished by URL pattern.
"""
if execution.container_execution_id is None:
return {}
ce = execution.container_execution
if ce is None or not ce.launcher_data:
return {}
launcher_key = next(iter(ce.launcher_data))
attrs: dict[str, object] = {"execution.launcher": launcher_key}
inner = ce.launcher_data[launcher_key]
if isinstance(inner, dict) and (cluster_url := inner.get("cluster_server")):
attrs["k8s.cluster.url"] = cluster_url
return attrs


def _cloud_provider_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]:
"""Cloud provider for the root execution span, read from task_spec annotations.

Returns ``{"execution.cloud_provider": value}`` when the
``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present,
otherwise an empty dict. Callers (e.g. oasis-backend's MultiLauncherContainerLauncher)
set this annotation at routing time; tangle launchers with a fixed cloud affinity
can set it too.
"""
provider = (
(execution.task_spec or {})
.get("annotations", {})
.get(common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY)
)
if provider is None:
return {}
return {"execution.cloud_provider": provider}


def _ns(*, dt: datetime.datetime) -> int:
"""Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).

Expand Down Expand Up @@ -116,7 +157,11 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None:

root = _tracer.start_span(
"execution",
attributes={"execution.id": execution.id},
attributes={
"execution.id": execution.id,
**_launcher_type_attrs(execution=execution),
**_cloud_provider_attrs(execution=execution),
},
start_time=_ns(dt=first_time),
)
root_ctx = trace.set_span_in_context(root)
Expand Down
2 changes: 2 additions & 0 deletions cloud_pipelines_backend/launchers/common_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
CONTAINER_EXECUTION_ID_ANNOTATION_KEY = (
"cloud-pipelines.net/orchestration/container_execution.id"
)

CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider"
51 changes: 51 additions & 0 deletions tests/instrumentation/test_execution_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend.instrumentation import execution_tracing
from cloud_pipelines_backend.launchers import common_annotations


@pytest.fixture()
Expand Down Expand Up @@ -237,3 +238,53 @@ def test_non_pending_span_has_no_k8s_attributes(

for span in span_exporter.get_finished_spans():
assert "k8s.pod.name" not in (span.attributes or {})


class TestLauncherTypeAttrs:
def test_root_span_carries_launcher_type(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
# No container_execution on this stub — attribute should be absent.
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert "execution.launcher" not in (root.attributes or {})

def test_root_span_no_launcher_attrs_without_container_execution(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SKIPPED"])
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert "execution.launcher" not in (root.attributes or {})

def test_root_span_carries_cloud_provider_when_annotation_set(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
execution.task_spec = {
"annotations": {common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY: "nebius"}
}
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert root.attributes["execution.cloud_provider"] == "nebius"

def test_root_span_omits_cloud_provider_when_annotation_absent(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert "execution.cloud_provider" not in (root.attributes or {})