diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index 0da7756..90d6396 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -15,6 +15,7 @@ from .. import backend_types_sql as bts from ..launchers import common_annotations +from ..launchers import kubernetes_launchers _logger = logging.getLogger(__name__) _tracer = trace.get_tracer("tangle.orchestrator") @@ -150,6 +151,23 @@ def _pipeline_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: return attrs +def _resource_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, object]: + """CPU, memory, and accelerator requests for the PENDING span.""" + if status != bts.ContainerExecutionStatus.PENDING: + return {} + annotations: dict = (execution.task_spec or {}).get("annotations", {}) + attrs: dict[str, object] = {} + if cpu := annotations.get(kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY): + attrs["execution.resources.cpu"] = cpu + if memory := annotations.get(kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY): + attrs["execution.resources.memory"] = memory + if accelerators := annotations.get( + kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY + ): + attrs["execution.resources.accelerators"] = accelerators + return attrs + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK). @@ -202,13 +220,18 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: "execution.status": entry["status"], **_error_attrs(execution=execution, status=entry["status"]), **_launcher_pod_attrs(execution=execution, status=entry["status"]), + **_resource_attrs(execution=execution, status=entry["status"]), } + start_ns = _ns(dt=t_start) + end_ns = _ns(dt=t_end) + if end_ns <= start_ns: + end_ns = start_ns + 1 _tracer.start_span( f"execution.status {entry['status']}", context=root_ctx, attributes=attrs, - start_time=_ns(dt=t_start), - ).end(end_time=_ns(dt=t_end)) + start_time=start_ns, + ).end(end_time=end_ns) if history[-1]["status"] in _ERROR_TERMINAL_STATUSES: root.set_status(status=StatusCode.ERROR) diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 2d7ca4b..6e949e8 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -103,4 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None: exc_info=True, ) obj._status_changed = False - execution_tracing.try_emit_execution_trace(execution=obj) + execution_tracing.emit_execution_trace(execution=obj) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index 4128712..df3bdfd 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -62,21 +62,21 @@ def test_no_spans_for_non_terminal_execution( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "RUNNING"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) assert span_exporter.get_finished_spans() == () def test_no_spans_for_empty_history( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=[]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) assert span_exporter.get_finished_spans() == () def test_emits_root_and_child_spans_on_terminal( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) names = {s.name for s in span_exporter.get_finished_spans()} assert "execution" in names @@ -86,7 +86,7 @@ def test_child_span_count_matches_history( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) status_spans = [ s @@ -99,7 +99,7 @@ def test_root_span_has_execution_id_attribute( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -110,7 +110,7 @@ def test_child_spans_share_trace_id_with_root( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) finished = span_exporter.get_finished_spans() trace_ids = {s.context.trace_id for s in finished} @@ -120,7 +120,7 @@ def test_root_span_duration_matches_history( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -134,7 +134,7 @@ def test_child_span_status_attribute( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) status_spans = [ s @@ -157,7 +157,7 @@ def test_failed_span_carries_orchestration_error_message( bts.EXECUTION_NODE_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY: "missing outputs" }, ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) failed_span = next( s @@ -176,7 +176,7 @@ def test_system_error_span_carries_exception_message_and_stacktrace( bts.EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_FULL_KEY: "Traceback...", }, ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) err_span = next( s @@ -192,7 +192,7 @@ def test_root_span_marked_error_on_failed( from opentelemetry.trace import StatusCode execution = _make_execution(statuses=["QUEUED", "FAILED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -205,7 +205,7 @@ def test_root_span_not_marked_error_on_succeeded( from opentelemetry.trace import StatusCode execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -220,7 +220,7 @@ def test_pending_span_carries_k8s_attributes( execution = _make_execution( statuses=["QUEUED", "PENDING", "RUNNING", "SUCCEEDED"] ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) pending_span = next( s @@ -234,7 +234,7 @@ def test_non_pending_span_has_no_k8s_attributes( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) for span in span_exporter.get_finished_spans(): assert "k8s.pod.name" not in (span.attributes or {}) @@ -246,7 +246,7 @@ def test_root_span_carries_launcher_type( ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) # No container_execution on this stub — attribute should be absent. - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -257,7 +257,7 @@ def test_root_span_no_launcher_attrs_without_container_execution( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SKIPPED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -271,7 +271,7 @@ def test_root_span_carries_cloud_provider_when_annotation_set( execution.task_spec = { "annotations": {common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY: "nebius"} } - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -282,7 +282,7 @@ def test_root_span_omits_cloud_provider_when_annotation_absent( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -295,7 +295,7 @@ def test_cache_miss_sets_hit_false( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -309,7 +309,7 @@ def test_cache_hit_sets_hit_true_and_reused_from_id( statuses=["QUEUED", "SUCCEEDED"], extra={"reused_from_execution_node_id": "source-execution-id"}, ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -327,7 +327,7 @@ def test_root_span_carries_parent_and_task_id( execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) execution.parent_execution_id = "parent-exec-id" execution.task_id_in_parent_execution = "my-task" - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -339,10 +339,67 @@ def test_root_execution_omits_parent_attrs_when_absent( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" ) assert "execution.parent_id" not in (root.attributes or {}) assert "execution.task_id" not in (root.attributes or {}) + + +class TestResourceAttrs: + def test_pending_span_carries_cpu_and_memory( + self, span_exporter: InMemorySpanExporter + ) -> None: + from cloud_pipelines_backend.launchers import kubernetes_launchers + + execution = _make_execution( + statuses=["QUEUED", "PENDING", "RUNNING", "SUCCEEDED"] + ) + execution.task_spec = { + "annotations": { + kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY: "4", + kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY: "16Gi", + } + } + execution_tracing.emit_execution_trace(execution=execution) + + pending_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "PENDING" + ) + assert pending_span.attributes["execution.resources.cpu"] == "4" + assert pending_span.attributes["execution.resources.memory"] == "16Gi" + + def test_pending_span_carries_accelerators_when_present( + self, span_exporter: InMemorySpanExporter + ) -> None: + from cloud_pipelines_backend.launchers import kubernetes_launchers + + execution = _make_execution(statuses=["QUEUED", "PENDING", "SUCCEEDED"]) + execution.task_spec = { + "annotations": { + kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY: '{"H100": 1}', + } + } + execution_tracing.emit_execution_trace(execution=execution) + + pending_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "PENDING" + ) + assert ( + pending_span.attributes["execution.resources.accelerators"] == '{"H100": 1}' + ) + + def test_non_pending_spans_have_no_resource_attrs( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.emit_execution_trace(execution=execution) + + for span in span_exporter.get_finished_spans(): + assert "execution.resources.cpu" not in (span.attributes or {})