From 059c47b252cf119a3c4c58accc5982947793de09 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 22 May 2026 15:19:48 -0700 Subject: [PATCH] Add retroactive OTel execution lifecycle tracing Emits a root 'execution' span and one 'execution.status' child span per status history entry when an ExecutionNode reaches a terminal state. All span timestamps are derived from the existing status history so durations reflect actual time spent, not when this code ran. - New module: cloud_pipelines_backend/instrumentation/execution_tracing.py - Hook: metrics._handle_before_commit calls try_emit_execution_trace - Orchestrator: otel.setup_providers() so the exporter is active - Tests: InMemorySpanExporter-backed suite in tests/instrumentation/ --- .../instrumentation/execution_tracing.py | 83 ++++++++++ .../instrumentation/metrics.py | 2 + orchestrator_main.py | 4 + .../instrumentation/test_execution_tracing.py | 146 ++++++++++++++++++ 4 files changed, 235 insertions(+) create mode 100644 cloud_pipelines_backend/instrumentation/execution_tracing.py create mode 100644 tests/instrumentation/test_execution_tracing.py diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py new file mode 100644 index 0000000..a6512eb --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -0,0 +1,83 @@ +"""Retroactive OTel trace emission for execution lifecycle. + +When an ExecutionNode reaches a terminal status, emits a root ``execution`` +span covering the full lifetime plus one ``execution.status`` child span per +status entry recorded in the status history. All timestamps are derived from +the history so span durations reflect actual time spent, not when this code +ran. +""" + +import datetime +import logging + +from opentelemetry import trace + +from .. import backend_types_sql as bts + +_logger = logging.getLogger(__name__) +_tracer = trace.get_tracer("tangle.orchestrator") + +_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY +_TERMINAL_STATUSES = frozenset(s.value for s in bts.CONTAINER_STATUSES_ENDED) + + +_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + +def _ns(*, dt: datetime.datetime) -> int: + """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK). + + Uses integer arithmetic on timedelta components to avoid float64 precision + loss at current Unix timestamps (~1.75e18 ns), where the ULP is ~512 ns. + """ + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) + delta = dt - _EPOCH + return ( + delta.days * 86_400 + delta.seconds + ) * 1_000_000_000 + delta.microseconds * 1_000 + + +def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None: + """Emit a complete execution trace when *execution* reaches a terminal status. + + No-op for non-terminal executions. All exceptions are caught and logged so + that tracing failures never affect the surrounding SQLAlchemy commit. + """ + history: list = (execution.extra_data or {}).get(_HISTORY_KEY, []) + if not history or history[-1]["status"] not in _TERMINAL_STATUSES: + return + try: + first_time = datetime.datetime.fromisoformat(history[0]["first_observed_at"]) + last_time = datetime.datetime.fromisoformat(history[-1]["first_observed_at"]) + + root = _tracer.start_span( + "execution", + attributes={"execution.id": execution.id}, + start_time=_ns(dt=first_time), + ) + root_ctx = trace.set_span_in_context(root) + + for i, entry in enumerate(history): + t_start = datetime.datetime.fromisoformat(entry["first_observed_at"]) + t_end = ( + datetime.datetime.fromisoformat(history[i + 1]["first_observed_at"]) + if i + 1 < len(history) + else last_time + ) + attrs: dict[str, object] = { + "execution.id": execution.id, + "execution.status": entry["status"], + } + _tracer.start_span( + f"execution.status {entry['status']}", + context=root_ctx, + attributes=attrs, + start_time=_ns(dt=t_start), + ).end(end_time=_ns(dt=t_end)) + + root.end(end_time=_ns(dt=last_time)) + except Exception: + _logger.warning( + f"Failed to emit execution trace for {execution.id!r}", exc_info=True + ) diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index abba410..2d7ca4b 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -31,6 +31,7 @@ from sqlalchemy import orm from .. import backend_types_sql +from . import execution_tracing _logger = logging.getLogger(__name__) @@ -102,3 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None: exc_info=True, ) obj._status_changed = False + execution_tracing.try_emit_execution_trace(execution=obj) diff --git a/orchestrator_main.py b/orchestrator_main.py index badf961..59be9a9 100644 --- a/orchestrator_main.py +++ b/orchestrator_main.py @@ -7,6 +7,7 @@ from cloud_pipelines_backend import orchestrator_sql from cloud_pipelines_backend.instrumentation import bugsnag_instrumentation +from cloud_pipelines_backend.instrumentation import opentelemetry as otel from cloud_pipelines_backend.launchers import kubernetes_launchers from cloud_pipelines.orchestration.storage_providers import local_storage @@ -26,6 +27,7 @@ def _build_launcher(): from cloud_pipelines_backend.launchers.skypilot_launchers import ( SkyPilotKubernetesLauncher, ) + return SkyPilotKubernetesLauncher( infra=os.environ.get("SKYPILOT_INFRA", "kubernetes"), pool=os.environ.get("SKYPILOT_POOL"), @@ -36,6 +38,7 @@ def _build_launcher(): from kubernetes import config as k8s_config_lib from kubernetes import client as k8s_client_lib + try: k8s_config_lib.load_incluster_config() except Exception: @@ -75,6 +78,7 @@ def main(): logger.info("Starting the orchestrator") bugsnag_instrumentation.setup(service_name="tangle-orchestrator") + otel.setup_providers() DEFAULT_DATABASE_URI = "sqlite:///db.sqlite" database_uri = os.environ.get("DATABASE_URI", DEFAULT_DATABASE_URI) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py new file mode 100644 index 0000000..41cb39b --- /dev/null +++ b/tests/instrumentation/test_execution_tracing.py @@ -0,0 +1,146 @@ +"""Tests for execution lifecycle OTel trace emission.""" + +import datetime + +import pytest +from opentelemetry import trace +from opentelemetry.sdk import trace as otel_sdk_trace +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend.instrumentation import execution_tracing + + +@pytest.fixture() +def span_exporter(monkeypatch: pytest.MonkeyPatch) -> InMemorySpanExporter: + """Isolated in-memory span exporter for each test. + + Patches ``execution_tracing._tracer`` directly so tests are independent of + global OTel provider state (the module-level ProxyTracer would otherwise + remain bound to the provider from the first test run). + """ + exporter = InMemorySpanExporter() + provider = otel_sdk_trace.TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + monkeypatch.setattr( + execution_tracing, "_tracer", provider.get_tracer("tangle.orchestrator") + ) + return exporter + + +def _make_execution( + *, statuses: list[str], extra: dict | None = None +) -> bts.ExecutionNode: + """Build an ExecutionNode stub with a pre-populated status history. + + Assigns a deterministic ID because OTel drops None-valued attributes and + execution.id is only set by the DB insert_default in production. + """ + history = [ + { + "status": s, + "first_observed_at": ( + datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc) + + datetime.timedelta(minutes=i * 5) + ).isoformat(), + } + for i, s in enumerate(statuses) + ] + node = bts.ExecutionNode(task_spec={}) + node.id = "test-execution-id" + node.extra_data = { + bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history, + **(extra or {}), + } + return node + + +class TestTryEmitExecutionTrace: + def test_no_spans_for_non_terminal_execution( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "RUNNING"]) + execution_tracing.try_emit_execution_trace(execution=execution) + assert span_exporter.get_finished_spans() == () + + def test_no_spans_for_empty_history( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=[]) + execution_tracing.try_emit_execution_trace(execution=execution) + assert span_exporter.get_finished_spans() == () + + def test_emits_root_and_child_spans_on_terminal( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + names = {s.name for s in span_exporter.get_finished_spans()} + assert "execution" in names + assert any(n.startswith("execution.status ") for n in names) + + def test_child_span_count_matches_history( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + status_spans = [ + s + for s in span_exporter.get_finished_spans() + if s.name.startswith("execution.status ") + ] + assert len(status_spans) == 3 + + def test_root_span_has_execution_id_attribute( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.id"] == execution.id + + def test_child_spans_share_trace_id_with_root( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + finished = span_exporter.get_finished_spans() + trace_ids = {s.context.trace_id for s in finished} + assert len(trace_ids) == 1 + + def test_root_span_duration_matches_history( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + duration_ns = root.end_time - root.start_time + assert duration_ns == int( + datetime.timedelta(minutes=10).total_seconds() * 1_000_000_000 + ) + + def test_child_span_status_attribute( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + status_spans = [ + s + for s in span_exporter.get_finished_spans() + if s.name.startswith("execution.status ") + ] + assert {s.name for s in status_spans} == { + "execution.status QUEUED", + "execution.status SUCCEEDED", + }