diff --git a/app/package-lock.json b/app/package-lock.json index 1e4614037..179e2463c 100644 --- a/app/package-lock.json +++ b/app/package-lock.json @@ -1,12 +1,12 @@ { "name": "daydream-scope-desktop", - "version": "0.2.4", + "version": "0.2.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "daydream-scope-desktop", - "version": "0.2.4", + "version": "0.2.5", "hasInstallScript": true, "license": "MIT", "dependencies": { diff --git a/app/package.json b/app/package.json index e6846df20..5ff622cd1 100644 --- a/app/package.json +++ b/app/package.json @@ -1,7 +1,7 @@ { "name": "daydream-scope-desktop", "productName": "Daydream Scope", - "version": "0.2.4", + "version": "0.2.5", "description": "Daydream Scope", "main": ".vite/build/main.js", "scripts": { diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 4f743a29b..8c12cf94b 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "daydream-scope-frontend", - "version": "0.2.4", + "version": "0.2.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "daydream-scope-frontend", - "version": "0.2.4", + "version": "0.2.5", "dependencies": { "@radix-ui/react-alert-dialog": "^1.1.15", "@radix-ui/react-dialog": "^1.1.15", diff --git a/frontend/package.json b/frontend/package.json index f9a1ad7a7..c36e5cd76 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,7 +1,7 @@ { "name": "daydream-scope-frontend", "private": true, - "version": "0.2.4", + "version": "0.2.5", "type": "module", "scripts": { "dev": "vite", diff --git a/pyproject.toml b/pyproject.toml index d097bc0be..2e93052aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "daydream-scope" -version = "0.2.4" +version = "0.2.5" description = "A tool for running and customizing real-time, interactive generative AI pipelines and models" readme = "README.md" requires-python = ">=3.12" diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index 3ee034a63..3dbe4102f 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -18,6 +18,7 @@ import queue import shutil import threading +import time import uuid from contextlib import asynccontextmanager, suppress from dataclasses import dataclass, field @@ -41,11 +42,12 @@ from pydantic import BaseModel import scope.server.app as scope_app_module +from scope.cloud.trickle_events_sink import TrickleEventsSink from scope.core.outputs import HARDWARE_SINK_MODES from scope.server.app import app as scope_app from scope.server.app import lifespan as scope_lifespan from scope.server.frame_processor import FrameProcessor -from scope.server.kafka_publisher import publish_event +from scope.server.kafka_publisher import publish_event, set_telemetry_sink from scope.server.logs_config import ( LOG_FORMAT, ScopeLogContextFilter, @@ -57,6 +59,8 @@ scope_client: httpx.AsyncClient | None = None _connection_active = False _connection_count = 0 +# Monotonic anchor for measuring runner cold-start (boot -> runner_ready). +_runner_start_monotonic = time.monotonic() STREAM_TASK_SHUTDOWN_GRACE_S = 1.0 STREAM_TASK_CANCEL_TIMEOUT_S = 1.0 @@ -65,7 +69,7 @@ # embedded assets (base64-encoded images/video) into a single request and # easily blow past the 1 MiB library default, which would tear down the # control-channel reader mid-import. -MAX_CONTROL_EVENT_BYTES = 128 * 1024 * 1024 +MAX_CONTROL_EVENT_BYTES = 5 * 1024 * 1024 REMOTE_VIDEO_CLOCK_RATE = 90_000 REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE) ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets") @@ -153,6 +157,35 @@ def _build_connection_info() -> dict[str, Any]: } +def _publish_media_loop_error( + session: LivepeerSession, error_type: str, exc: Exception +) -> None: + """Emit a structured error telemetry event for a failed media loop. + + These loops previously only logged on failure; surfacing them as telemetry + lets the client (the egress point) see runner-side media failures that have + no reachable Kafka of their own. + + NOTE: This only fires in cloud mode when the user has opted in by providing + their SCOPE_CLOUD_API_KEY. In local/standalone mode, no telemetry leaves + the machine. + """ + publish_event( + event_type="error", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + error={ + "error_type": error_type, + "message": str(exc), + "exception_type": type(exc).__name__, + "recoverable": False, + }, + connection_info=session.connection_info, + metadata={"mode": "livepeer"}, + ) + + async def _shutdown_task( task: asyncio.Task | None, *, @@ -423,6 +456,7 @@ async def _media_input_loop( raise except Exception as exc: logger.error("Media input loop failed: %s", exc) + _publish_media_loop_error(session, "media_input_loop_failed", exc) finally: try: await media_output.close() @@ -551,6 +585,7 @@ async def _media_output_loop( raise except Exception as exc: logger.error("Media output loop failed: %s", exc) + _publish_media_loop_error(session, "media_output_loop_failed", exc) finally: try: await publisher.close() @@ -670,6 +705,7 @@ async def _media_audio_output_loop( raise except Exception as exc: logger.error("Media audio output loop failed: %s", exc) + _publish_media_loop_error(session, "media_audio_output_loop_failed", exc) finally: try: await publisher.close() @@ -1277,6 +1313,16 @@ async def _forward_notifications_to_events() -> None: notif_task = asyncio.create_task(_forward_notifications_to_events()) + # Route runner-side telemetry (publish_event calls from the embedded Scope + # app) onto the events channel instead of a Kafka broker the runner cannot + # reach. The client re-publishes whatever arrives. + runner_loop = asyncio.get_running_loop() + telemetry_sink = TrickleEventsSink(runner_loop) + set_telemetry_sink(telemetry_sink) + telemetry_task = asyncio.create_task( + telemetry_sink.drain_to(events_writer, stop_event) + ) + try: await events_writer.write( { @@ -1284,6 +1330,19 @@ async def _forward_notifications_to_events() -> None: "runner_job_id": os.getenv("FAL_JOB_ID") or os.getenv("FAL_RUNNER_ID"), } ) + # Emit cold-start timing as telemetry (distinct from the control-protocol + # runner_ready above, which the client consumes to unblock startup). + publish_event( + event_type="runner_ready", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + connection_info=session.connection_info, + metadata={ + "mode": "livepeer", + "startup_ms": int((time.monotonic() - _runner_start_monotonic) * 1000), + }, + ) async for message in JSONLReader(control_url)( max_event_bytes=MAX_CONTROL_EVENT_BYTES ): @@ -1318,6 +1377,15 @@ async def _forward_notifications_to_events() -> None: except asyncio.CancelledError: pass session.notification_queue = None + # Stop accepting telemetry before draining so late publish_event calls + # during shutdown no-op instead of enqueueing onto a closing queue. + set_telemetry_sink(None) + if not telemetry_sink.request_stop(): + telemetry_task.cancel() + try: + await telemetry_task + except asyncio.CancelledError: + pass await _stop_stream(session) try: await events_writer.close() diff --git a/src/scope/cloud/trickle_events_sink.py b/src/scope/cloud/trickle_events_sink.py new file mode 100644 index 000000000..409d15c4d --- /dev/null +++ b/src/scope/cloud/trickle_events_sink.py @@ -0,0 +1,104 @@ +"""Telemetry sink that forwards events over the trickle events channel. + +On the cloud runner there is normally no reachable Kafka broker, so telemetry +emitted via ``scope.server.kafka_publisher.publish_event`` must instead be +forwarded to the local SDK client. This sink enqueues each (already-built) event +envelope and a drain task writes it onto the per-session events channel as a +``{"type": "telemetry", "event": {...}}`` message. The client receives it and +re-publishes verbatim to its own egress sink. + +This mirrors the notification-forwarding pattern in ``livepeer_app.py`` +(``_enqueue_notification`` / ``_forward_notifications_to_events``): a bounded +queue with drop-oldest semantics so telemetry is best-effort and never blocks +the media or control paths. It structurally satisfies the +``scope.server.kafka_publisher.TelemetrySink`` protocol. +""" + +import asyncio +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +# Bounded so a stalled/slow events channel can never grow memory without limit. +DEFAULT_QUEUE_MAXSIZE = 256 + + +class TrickleEventsSink: + """Forward telemetry events onto the trickle events channel. + + ``emit`` is safe to call from worker threads (FrameProcessor). It marshals + onto the runner event loop and drops the oldest queued event on overflow. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop, + maxsize: int = DEFAULT_QUEUE_MAXSIZE, + ): + self._loop = loop + self._queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue( + maxsize=maxsize + ) + self._drop_warned = False + + def emit(self, event: dict[str, Any]) -> None: + """Thread-safe, non-blocking enqueue (drop-oldest on overflow).""" + + def _put_with_drop() -> None: + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + # Drop the oldest event to make room for the newest. + try: + self._queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + if not self._drop_warned: + logger.warning("Dropped telemetry event under queue pressure") + self._drop_warned = True + + try: + self._loop.call_soon_threadsafe(_put_with_drop) + except RuntimeError: + # Loop is closing; drop silently. + pass + + async def drain_to(self, events_writer: Any, stop_event: asyncio.Event) -> None: + """Drain queued events onto the events channel until stopped. + + Exits on the ``stop_event``, on a ``None`` sentinel (see + ``request_stop``), or on cancellation. + """ + try: + while not stop_event.is_set(): + try: + event = await asyncio.wait_for(self._queue.get(), timeout=1.0) + except TimeoutError: + continue + if event is None: + break + try: + await events_writer.write({"type": "telemetry", "event": event}) + except Exception: + logger.debug( + "Failed to forward telemetry to events channel", + exc_info=True, + ) + except asyncio.CancelledError: + pass + + def request_stop(self) -> bool: + """Enqueue the sentinel to wake the drain loop so it exits. + + Must be called from the runner event loop thread. Returns ``False`` if + the queue is saturated and the caller should cancel the drain task. + """ + try: + self._queue.put_nowait(None) + return True + except asyncio.QueueFull: + return False diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 7a6ec2f7c..60ddbbe10 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -73,6 +73,7 @@ ) from .kafka_publisher import ( KafkaPublisher, + install_default_egress_sink, is_kafka_enabled, set_kafka_publisher, ) @@ -87,6 +88,11 @@ ) from .lora_downloader import LoRADownloadRequest, LoRADownloadResult from .mcp_router import router as mcp_router +from .metrics_reporter import ( + MetricsReporter, + is_metrics_reporter_enabled, + set_metrics_reporter, +) from .models_config import ( ensure_models_dir, get_assets_dir, @@ -344,6 +350,8 @@ def configure_static_files(): livepeer = None # Global Kafka publisher instance (optional, initialized if credentials are present) kafka_publisher = None +# Global metrics reporter instance (optional, forwards telemetry to Daydream /v1/metrics) +metrics_reporter_instance = None # Global tempo sync manager instance tempo_sync = None # Global OSC server instance @@ -392,6 +400,7 @@ async def lifespan(app: FastAPI): webrtc_manager, \ pipeline_manager, \ kafka_publisher, \ + metrics_reporter_instance, \ livepeer, \ tempo_sync, \ osc_server, \ @@ -454,6 +463,30 @@ async def lifespan(app: FastAPI): kafka_publisher = None logger.warning("Kafka publisher failed to start") + # Initialize metrics reporter (forwards telemetry to Daydream /v1/metrics). + # Only active when the user has opted in to cloud features by setting + # SCOPE_CLOUD_API_KEY (their Daydream platform API key). In local/standalone + # mode this block is skipped entirely — no telemetry leaves the machine. + if is_metrics_reporter_enabled(): + api_key = os.getenv("SCOPE_CLOUD_API_KEY", "") + if api_key: + metrics_reporter_instance = MetricsReporter(api_key=api_key) + if await metrics_reporter_instance.start(): + set_metrics_reporter(metrics_reporter_instance) + logger.info("Metrics reporter initialized") + else: + metrics_reporter_instance = None + logger.warning("Metrics reporter failed to start") + else: + logger.debug("Metrics reporter skipped: no SCOPE_CLOUD_API_KEY") + + # Install the default telemetry egress sink for local mode so that + # publish_event() calls route through MetricsReporter (or Kafka) even + # without a cloud connection. In cloud mode livepeer.py re-calls this + # after connecting, which is fine (idempotent selection). + if install_default_egress_sink(): + logger.info("Telemetry egress sink installed at startup") + # Start OSC UDP server on the same port as the HTTP API from .osc_server import OSCServer @@ -527,6 +560,12 @@ async def lifespan(app: FastAPI): await livepeer.disconnect() logger.info("Livepeer connection shutdown complete") + if metrics_reporter_instance: + logger.info("Shutting down metrics reporter...") + await metrics_reporter_instance.stop() + set_metrics_reporter(None) + logger.info("Metrics reporter shutdown complete") + if kafka_publisher: logger.info("Shutting down Kafka publisher...") await kafka_publisher.stop() diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 3fc57d040..0cd5fdcb5 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -847,6 +847,16 @@ def _log_frame_stats(self): round(pipeline_fps, 1) if pipeline_fps else None ) + # Fold GPU memory into the heartbeat (most useful on the cloud runner + # where the pipeline executes) rather than adding a separate event. + if torch.cuda.is_available(): + heartbeat_metadata["vram_allocated_mb"] = round( + torch.cuda.memory_allocated() / (1024 * 1024), 1 + ) + heartbeat_metadata["vram_reserved_mb"] = round( + torch.cuda.memory_reserved() / (1024 * 1024), 1 + ) + publish_event( event_type="stream_heartbeat", session_id=self.session_id, diff --git a/src/scope/server/kafka_publisher.py b/src/scope/server/kafka_publisher.py index c874da525..a1273299e 100644 --- a/src/scope/server/kafka_publisher.py +++ b/src/scope/server/kafka_publisher.py @@ -15,7 +15,9 @@ import logging import os import threading -from typing import Any +import time +import uuid +from typing import Any, Protocol, runtime_checkable logger = logging.getLogger(__name__) @@ -31,6 +33,76 @@ def is_kafka_enabled() -> bool: return KAFKA_BOOTSTRAP_SERVERS is not None +@runtime_checkable +class TelemetrySink(Protocol): + """A destination for telemetry events. + + Implementations must be safe to call from worker threads, must never raise + to the caller, and must never block (telemetry is best-effort and must not + stall the media or control paths). + + Two implementations exist: + - ``KafkaSink`` (below): publishes events to Kafka. Used by the local SDK + client as the egress point. + - ``TrickleEventsSink`` (``scope.cloud.trickle_events_sink``): forwards + events over the trickle events channel. Installed on the cloud runner so + runner-side ``publish_event`` calls reach the client instead of a Kafka + broker the runner cannot see. + """ + + def emit(self, event: dict[str, Any]) -> None: ... + + +def build_event_envelope( + event_type: str, + session_id: str | None = None, + connection_id: str | None = None, + pipeline_ids: list[str] | None = None, + user_id: str | None = None, + error: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + connection_info: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build the structured event envelope. + + Stamping happens here (once) so the same fully-formed envelope can be sent + to Kafka directly or forwarded verbatim over the trickle events channel and + re-published downstream without re-stamping its id/timestamp/origin fields. + + The shape matches the Go ``kafka.go`` ``stream_trace`` format. + """ + event_id = str(uuid.uuid4()) + # Timestamp as milliseconds string (matching Go format) + timestamp_ms = str(int(time.time() * 1000)) + + data: dict[str, Any] = { + "type": event_type, + "client_source": "scope", + "timestamp": timestamp_ms, + } + if session_id: + data["session_id"] = session_id + if connection_id: + data["connection_id"] = connection_id + if pipeline_ids: + data["pipeline_ids"] = pipeline_ids + if user_id: + data["user_id"] = user_id + if error: + data["error"] = error + if connection_info: + data["connection_info"] = connection_info + if metadata: + data.update(metadata) + + return { + "id": event_id, + "type": "stream_trace", + "timestamp": timestamp_ms, + "data": data, + } + + class KafkaPublisher: """Async Kafka event publisher with thread-safe wrapper for sync contexts. @@ -147,50 +219,37 @@ async def publish_async( if not self._started or not self._producer: return False - import time - import uuid - - # Generate a unique ID for this event (used as Kafka key) - event_id = str(uuid.uuid4()) - # Timestamp as milliseconds string (matching Go format) - timestamp_ms = str(int(time.time() * 1000)) - - # Build data payload - data: dict[str, Any] = { - "type": event_type, - "client_source": "scope", - "timestamp": timestamp_ms, - } - if session_id: - data["session_id"] = session_id - if connection_id: - data["connection_id"] = connection_id - if pipeline_ids: - data["pipeline_ids"] = pipeline_ids - if user_id: - data["user_id"] = user_id - if error: - data["error"] = error - if connection_info: - data["connection_info"] = connection_info - if metadata: - data.update(metadata) - - # Event structure matching Go kafka.go format - event = { - "id": event_id, - "type": "stream_trace", - "timestamp": timestamp_ms, - "data": data, - } + event = build_event_envelope( + event_type=event_type, + session_id=session_id, + connection_id=connection_id, + pipeline_ids=pipeline_ids, + user_id=user_id, + error=error, + metadata=metadata, + connection_info=connection_info, + ) + return await self._send_prebuilt(event) + + async def _send_prebuilt(self, event: dict[str, Any]) -> bool: + """Send an already-built event envelope to Kafka verbatim. + + Does not re-stamp the id/timestamp so a runner-built envelope relayed + over trickle keeps its origin identity when published by the client. + """ + if not self._started or not self._producer: + return False + + event_id = event.get("id") + if event_id is not None and not isinstance(event_id, str): + event_id = str(event_id) + data = event.get("data") + event_type = data.get("type") if isinstance(data, dict) else None try: # Use event ID as key (matching Go format) - key = event_id - await self._producer.send_and_wait(KAFKA_TOPIC, value=event, key=key) - logger.info( - f"Published Kafka event: {event_type} (id={event_id}, session={session_id})" - ) + await self._producer.send_and_wait(KAFKA_TOPIC, value=event, key=event_id) + logger.info(f"Published Kafka event: {event_type} (id={event_id})") return True except Exception as e: logger.error(f"Failed to publish Kafka event {event_type}: {e}") @@ -248,6 +307,28 @@ def publish( except Exception as e: logger.error(f"Failed to schedule Kafka event publish: {e}") + def publish_prebuilt(self, event: dict[str, Any]) -> None: + """Publish an already-built event envelope from a sync context. + + Thread-safe; schedules the verbatim send on the producer's event loop. + Used by the client to re-publish telemetry relayed over trickle without + re-stamping the runner's original id/timestamp. + """ + if not self._started or not self._event_loop: + return + + with self._lock: + if not self._event_loop or not self._event_loop.is_running(): + return + + try: + asyncio.run_coroutine_threadsafe( + self._send_prebuilt(event), + self._event_loop, + ) + except Exception as e: + logger.error(f"Failed to schedule Kafka event publish: {e}") + @property def is_running(self) -> bool: """Check if the publisher is running.""" @@ -277,6 +358,86 @@ def set_kafka_publisher(publisher: KafkaPublisher | None): _publisher = publisher +class KafkaSink: + """Telemetry sink that publishes events to Kafka. + + Used as the egress sink on the local SDK client (and any deployment with a + reachable Kafka broker). Wraps a started ``KafkaPublisher``. + """ + + def __init__(self, publisher: KafkaPublisher): + self._publisher = publisher + + def emit(self, event: dict[str, Any]) -> None: + self._publisher.publish_prebuilt(event) + + +class LogSink: + """Telemetry sink that logs events instead of shipping them anywhere. + + For local verification of the relay path without a Kafka broker. Enabled on + the client by setting ``SCOPE_TELEMETRY_LOG_SINK``. + """ + + def emit(self, event: dict[str, Any]) -> None: + data = event.get("data", {}) if isinstance(event, dict) else {} + logger.info( + "TELEMETRY type=%s id=%s session=%s connection=%s relayed_via=%s", + data.get("type"), + event.get("id") if isinstance(event, dict) else None, + data.get("session_id"), + data.get("connection_id"), + data.get("relayed_via"), + ) + + +# Active telemetry sink. When set, publish_event / publish_raw_event route here +# instead of the direct Kafka path. The cloud runner installs a TrickleEventsSink +# so its events flow over the trickle events channel; the client installs a +# KafkaSink (or another egress sink) as the publishing endpoint. When unset, +# behavior falls back to the legacy direct-Kafka global publisher. +_telemetry_sink: "TelemetrySink | None" = None + + +def get_telemetry_sink() -> "TelemetrySink | None": + """Get the active telemetry sink, if any.""" + return _telemetry_sink + + +def set_telemetry_sink(sink: "TelemetrySink | None") -> None: + """Set (or clear) the active telemetry sink.""" + global _telemetry_sink + _telemetry_sink = sink + + +def install_default_egress_sink() -> bool: + """Install the default telemetry egress sink for the local SDK client. + + This is the single seam where the egress backend is chosen. Priority order: + 1. LogSink (explicit debug override via SCOPE_TELEMETRY_LOG_SINK) + 2. MetricsReporter (HTTP -> Daydream /v1/metrics, when SCOPE_CLOUD_API_KEY set) + 3. KafkaSink (direct Kafka, when KAFKA_BOOTSTRAP_SERVERS set) + + Returns True if a sink was installed. + """ + if os.getenv("SCOPE_TELEMETRY_LOG_SINK"): + set_telemetry_sink(LogSink()) + logger.info("Telemetry LogSink installed (SCOPE_TELEMETRY_LOG_SINK)") + return True + from .metrics_reporter import get_metrics_reporter + + reporter = get_metrics_reporter() + if reporter and reporter.is_running: + set_telemetry_sink(reporter) + logger.info("Telemetry MetricsReporter sink installed (HTTP egress)") + return True + publisher = get_kafka_publisher() + if publisher and publisher.is_running: + set_telemetry_sink(KafkaSink(publisher)) + return True + return False + + def publish_event( event_type: str, session_id: str | None = None, @@ -302,6 +463,28 @@ def publish_event( metadata: Optional additional metadata connection_info: Optional connection metadata (e.g., gpu_type, region) """ + sink = get_telemetry_sink() + if sink is not None: + # Stamp the envelope once, then hand it to the active sink (trickle on + # the runner, Kafka/other on the client). + try: + sink.emit( + build_event_envelope( + event_type=event_type, + session_id=session_id, + connection_id=connection_id, + pipeline_ids=pipeline_ids, + user_id=user_id, + error=error, + metadata=metadata, + connection_info=connection_info, + ) + ) + except Exception: + logger.debug("Telemetry sink emit failed", exc_info=True) + return + + # Legacy path: publish directly to the global Kafka producer. publisher = get_kafka_publisher() if publisher and publisher.is_running: publisher.publish( @@ -314,3 +497,23 @@ def publish_event( metadata=metadata, connection_info=connection_info, ) + + +def publish_raw_event(event: dict[str, Any]) -> None: + """Publish an already-built event envelope through the active sink. + + Used by the client to re-publish telemetry relayed over trickle verbatim, + preserving the runner's original id/timestamp/session_id/connection_id. + Safe to call even if nothing is configured (no-op). + """ + sink = get_telemetry_sink() + if sink is not None: + try: + sink.emit(event) + except Exception: + logger.debug("Telemetry sink emit failed", exc_info=True) + return + + publisher = get_kafka_publisher() + if publisher and publisher.is_running: + publisher.publish_prebuilt(event) diff --git a/src/scope/server/livepeer.py b/src/scope/server/livepeer.py index 625734147..213d61772 100644 --- a/src/scope/server/livepeer.py +++ b/src/scope/server/livepeer.py @@ -119,6 +119,12 @@ async def connect( await client.connect(initial_parameters=connect_params) self._client = client self._stats["connected_at"] = time.time() + # The client is the telemetry egress point: install the sink that + # publishes telemetry relayed from the cloud runner over trickle. + from .kafka_publisher import install_default_egress_sink + + if install_default_egress_sink(): + logger.info("Telemetry egress sink installed") logger.info("Livepeer connected") except Exception as e: self._connect_error = str(e) diff --git a/src/scope/server/livepeer_client.py b/src/scope/server/livepeer_client.py index e7ed98808..3441d9757 100644 --- a/src/scope/server/livepeer_client.py +++ b/src/scope/server/livepeer_client.py @@ -63,7 +63,7 @@ # embedded assets (base64-encoded images/video) into a single response and # easily blow past the 1 MiB library default, which would tear down the # events loop mid-import. -MAX_EVENT_BYTES = 128 * 1024 * 1024 +MAX_EVENT_BYTES = 5 * 1024 * 1024 @dataclass(slots=True) @@ -854,6 +854,10 @@ async def _events_loop(self) -> None: _forward_runner_notification(event.get("payload")) continue + if msg_type == "telemetry": + _handle_cloud_telemetry(event.get("event")) + continue + logger.debug(f"Event: {event}") except asyncio.CancelledError: pass @@ -1273,3 +1277,28 @@ def _forward_runner_notification(payload: Any) -> None: manager.broadcast_notification(payload) except Exception: logger.debug("Failed to re-broadcast runner notification", exc_info=True) + + +def _handle_cloud_telemetry(event: Any) -> None: + """Re-publish a runner-side telemetry event through the local egress sink. + + The runner forwards each already-built event envelope over the trickle + events channel; the client is the egress point that publishes it (to Kafka + or another configured backend). The envelope is published verbatim so the + runner's original id/timestamp/session_id/connection_id are preserved; we + only annotate that it was relayed and when it arrived. + """ + if not isinstance(event, dict): + return + + data = event.get("data") + if isinstance(data, dict): + data["relayed_via"] = "trickle" + data["relay_received_timestamp"] = str(int(time.time() * 1000)) + + try: + from .kafka_publisher import publish_raw_event + + publish_raw_event(event) + except Exception: + logger.debug("Failed to relay cloud telemetry event", exc_info=True) diff --git a/src/scope/server/metrics_reporter.py b/src/scope/server/metrics_reporter.py new file mode 100644 index 000000000..bcb349173 --- /dev/null +++ b/src/scope/server/metrics_reporter.py @@ -0,0 +1,367 @@ +"""Reliable metrics reporter that forwards telemetry to Daydream /v1/metrics. + +Implements the fail-loud / no-loss client contract defined in the Daydream +metrics API: events are buffered locally and flushed in batches via HTTP POST. +Reliability is achieved entirely through client-side buffering with exponential +backoff, disk-backed resume, and response-aware retry logic. + +The class structurally satisfies the ``TelemetrySink`` protocol defined in +``kafka_publisher.py`` so it can be installed as the egress sink alongside +``KafkaSink`` and ``LogSink``. + +Environment Variables: + DAYDREAM_METRICS_URL: Endpoint URL (default: https://api.daydream.monster/v1/metrics) + SCOPE_CLOUD_API_KEY: Bearer token for authentication (required to enable) + SCOPE_METRICS_ENABLED: Explicit enable/disable ("true"/"false"; default: on when key present) + SCOPE_METRICS_FLUSH_INTERVAL_MS: Flush interval in milliseconds (default: 2000) + SCOPE_METRICS_MAX_BATCH: Max events per batch (default: 500, API limit) + SCOPE_METRICS_MAX_BUFFER: Max buffered events before dropping oldest (default: 50000) + SCOPE_METRICS_BUFFER_PATH: Path for disk-backed resume buffer (default: ~/.daydream-scope/metrics_buffer.jsonl) +""" + +import asyncio +import json +import logging +import os +import platform +import threading +from collections import deque +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +DAYDREAM_METRICS_URL = os.getenv( + "DAYDREAM_METRICS_URL", "https://api.daydream.monster/v1/metrics" +) +MAX_BATCH_SIZE = int(os.getenv("SCOPE_METRICS_MAX_BATCH", "500")) +MAX_BUFFER_SIZE = int(os.getenv("SCOPE_METRICS_MAX_BUFFER", "50000")) +FLUSH_INTERVAL_S = int(os.getenv("SCOPE_METRICS_FLUSH_INTERVAL_MS", "2000")) / 1000.0 +MAX_BODY_BYTES = 1_000_000 # 1 MiB API limit +INITIAL_BACKOFF_S = 1.0 +MAX_BACKOFF_S = 60.0 +RATE_LIMIT_WINDOW_S = 60.0 + +_HOSTNAME = platform.node() + + +def _default_buffer_path() -> Path | None: + """Return the default disk buffer path, or None if explicitly disabled.""" + env = os.getenv("SCOPE_METRICS_BUFFER_PATH") + if env == "": + return None + if env: + return Path(env) + base = os.getenv("DAYDREAM_SCOPE_MODELS_DIR") + if base: + return Path(base).parent / "metrics_buffer.jsonl" + return Path.home() / ".daydream-scope" / "metrics_buffer.jsonl" + + +def is_metrics_reporter_enabled() -> bool: + """Check if the metrics reporter should be started.""" + explicit = os.getenv("SCOPE_METRICS_ENABLED", "").lower() + if explicit == "false": + return False + if explicit == "true": + return True + return os.getenv("SCOPE_CLOUD_API_KEY") is not None + + +def map_envelope_to_event(envelope: dict[str, Any]) -> dict[str, Any]: + """Map a Scope stream_trace Kafka envelope to a /v1/metrics Event object. + + Scope envelope: {id, type:"stream_trace", timestamp, data:{type, client_source, ...}} + API Event: {type, data, id?, timestamp?} + """ + data = envelope.get("data", {}) + event: dict[str, Any] = { + "type": data.get("type", envelope.get("type", "unknown")), + "data": data, + } + if envelope.get("id"): + event["id"] = envelope["id"] + if envelope.get("timestamp"): + event["timestamp"] = str(envelope["timestamp"]) + return event + + +class MetricsReporter: + """Reliable buffered metrics forwarder to Daydream /v1/metrics. + + Thread-safe: ``emit()`` / ``enqueue()`` can be called from any thread. + The async flush loop runs on the event loop captured at ``start()``. + + Satisfies the ``TelemetrySink`` protocol so it can be plugged directly + into ``set_telemetry_sink()``. + """ + + def __init__(self, api_key: str, url: str = DAYDREAM_METRICS_URL): + self._api_key = api_key + self._url = url + self._buffer: deque[dict[str, Any]] = deque(maxlen=MAX_BUFFER_SIZE) + self._lock = threading.Lock() + self._event_loop: asyncio.AbstractEventLoop | None = None + self._flush_task: asyncio.Task | None = None + self._started = False + self._paused = False # Set on 401, cleared on key refresh + self._backoff_s = INITIAL_BACKOFF_S + self._client = None + self._buffer_path = _default_buffer_path() + + # -- TelemetrySink protocol ------------------------------------------------- + + def emit(self, event: dict[str, Any]) -> None: + """TelemetrySink protocol: thread-safe, non-blocking enqueue.""" + self.enqueue(event) + + # -- public API ------------------------------------------------------------- + + async def start(self) -> bool: + """Start the reporter and its background flush loop.""" + try: + import httpx + + self._client = httpx.AsyncClient(timeout=30.0) + except ImportError: + logger.warning("httpx not installed, metrics reporting disabled") + return False + + self._event_loop = asyncio.get_running_loop() + self._started = True + + self._load_disk_buffer() + + self._flush_task = asyncio.create_task(self._flush_loop()) + logger.info( + "Metrics reporter started (url=%s, flush=%.1fs, max_batch=%d)", + self._url, + FLUSH_INTERVAL_S, + MAX_BATCH_SIZE, + ) + return True + + async def stop(self): + """Flush remaining events and shut down.""" + self._started = False + if self._flush_task: + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + self._flush_task = None + + # Final flush attempt + await self._flush_once() + + self._persist_disk_buffer() + + if self._client: + await self._client.aclose() + self._client = None + + logger.info( + "Metrics reporter stopped (remaining_buffered=%d)", len(self._buffer) + ) + + def enqueue(self, envelope: dict[str, Any]) -> None: + """Thread-safe: add a stream_trace envelope to the buffer.""" + if not self._started or self._paused: + return + event = map_envelope_to_event(envelope) + with self._lock: + self._buffer.append(event) + + @property + def is_running(self) -> bool: + return self._started + + @property + def buffered_count(self) -> int: + return len(self._buffer) + + # -- internal --------------------------------------------------------------- + + def _take_batch(self, max_size: int = MAX_BATCH_SIZE) -> list[dict[str, Any]]: + with self._lock: + count = min(max_size, len(self._buffer)) + batch = [self._buffer.popleft() for _ in range(count)] + return batch + + def _requeue_batch(self, batch: list[dict[str, Any]]) -> None: + with self._lock: + self._buffer.extendleft(reversed(batch)) + + async def _flush_loop(self) -> None: + try: + while self._started: + await asyncio.sleep(FLUSH_INTERVAL_S) + if self._paused or not self._buffer: + continue + await self._flush_once() + except asyncio.CancelledError: + pass + + async def _flush_once(self) -> None: + if not self._buffer or not self._client: + return + + batch = self._take_batch() + if not batch: + return + + body = self._build_body(batch) + body_bytes = json.dumps(body).encode("utf-8") + + # If body exceeds 1 MiB, split the batch + if len(body_bytes) > MAX_BODY_BYTES and len(batch) > 1: + half = len(batch) // 2 + self._requeue_batch(batch[half:]) + batch = batch[:half] + body = self._build_body(batch) + + try: + response = await self._client.post( + self._url, + json=body, + headers={ + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + }, + ) + await self._handle_response(response, batch) + except Exception as e: + logger.warning("Metrics POST failed (network): %s", e) + self._requeue_batch(batch) + await self._wait_backoff() + + async def _handle_response( + self, response: Any, batch: list[dict[str, Any]] + ) -> None: + status = response.status_code + + if status == 200: + self._backoff_s = INITIAL_BACKOFF_S + logger.debug("Metrics batch accepted (%d events)", len(batch)) + return + + if status == 400: + logger.error( + "Metrics batch rejected (400 malformed), dropping %d events: %s", + len(batch), + response.text[:200], + ) + return + + if status == 401: + logger.error( + "Metrics reporter unauthorized (401), pausing until key refresh" + ) + self._paused = True + self._requeue_batch(batch) + return + + if status == 413: + if len(batch) <= 1: + event_id = batch[0].get("id") if batch else None + logger.error( + "Metrics event too large to send (413) even as a single event; dropping id=%s", + event_id, + ) + return + logger.warning("Metrics batch too large (413), halving batch size") + self._requeue_batch(batch) + half_batch = self._take_batch(max(1, len(batch) // 2)) + if half_batch: + body = self._build_body(half_batch) + try: + retry_resp = await self._client.post( + self._url, + json=body, + headers={ + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + }, + ) + await self._handle_response(retry_resp, half_batch) + except Exception: + self._requeue_batch(half_batch) + return + + if status == 429: + logger.warning( + "Metrics rate limited (429), waiting %.0fs", RATE_LIMIT_WINDOW_S + ) + self._requeue_batch(batch) + await asyncio.sleep(RATE_LIMIT_WINDOW_S) + return + + # 5xx or unexpected status + logger.warning("Metrics POST failed (HTTP %d), retrying with backoff", status) + self._requeue_batch(batch) + await self._wait_backoff() + + async def _wait_backoff(self) -> None: + await asyncio.sleep(self._backoff_s) + self._backoff_s = min(self._backoff_s * 2, MAX_BACKOFF_S) + + def _build_body(self, batch: list[dict[str, Any]]) -> dict[str, Any]: + return { + "app": "scope", + "host": _HOSTNAME, + "events": batch, + } + + # -- disk persistence ------------------------------------------------------- + + def _load_disk_buffer(self) -> None: + if not self._buffer_path or not self._buffer_path.exists(): + return + loaded = 0 + try: + with open(self._buffer_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + self._buffer.append(event) + loaded += 1 + except json.JSONDecodeError: + continue + self._buffer_path.write_text("") + if loaded: + logger.info("Loaded %d events from disk buffer", loaded) + except Exception as e: + logger.warning("Failed to load disk buffer: %s", e) + + def _persist_disk_buffer(self) -> None: + if not self._buffer_path or not self._buffer: + return + try: + self._buffer_path.parent.mkdir(parents=True, exist_ok=True) + with open(self._buffer_path, "a") as f: + with self._lock: + for event in self._buffer: + f.write(json.dumps(event) + "\n") + logger.info("Persisted %d events to disk buffer", len(self._buffer)) + except Exception as e: + logger.warning("Failed to persist disk buffer: %s", e) + + +# -- Global accessor pattern (mirrors kafka_publisher.py) ----------------------- + +_reporter: MetricsReporter | None = None + + +def get_metrics_reporter() -> MetricsReporter | None: + """Get the global MetricsReporter instance.""" + return _reporter + + +def set_metrics_reporter(reporter: MetricsReporter | None) -> None: + """Set the global MetricsReporter instance.""" + global _reporter + _reporter = reporter diff --git a/tests/test_metrics_reporter.py b/tests/test_metrics_reporter.py new file mode 100644 index 000000000..0a656bf62 --- /dev/null +++ b/tests/test_metrics_reporter.py @@ -0,0 +1,488 @@ +"""Tests for the MetricsReporter: reliability contract, batching, disk resume, mapping, +and TelemetrySink protocol conformance.""" + +import time +from collections import deque +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from scope.server.metrics_reporter import ( + INITIAL_BACKOFF_S, + MAX_BATCH_SIZE, + MetricsReporter, + get_metrics_reporter, + map_envelope_to_event, + set_metrics_reporter, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_envelope( + event_type: str = "stream_heartbeat", + event_id: str | None = None, + **extra_data, +) -> dict: + ts = str(int(time.time() * 1000)) + return { + "id": event_id or f"test-{time.monotonic_ns()}", + "type": "stream_trace", + "timestamp": ts, + "data": { + "type": event_type, + "client_source": "scope", + "timestamp": ts, + **extra_data, + }, + } + + +def _make_response(status_code: int, text: str = "") -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.text = text + return resp + + +# --------------------------------------------------------------------------- +# Event mapping +# --------------------------------------------------------------------------- + + +class TestMapEnvelopeToEvent: + def test_basic_mapping(self): + envelope = _make_envelope("stream_started", event_id="evt-1", session_id="s1") + event = map_envelope_to_event(envelope) + + assert event["type"] == "stream_started" + assert event["id"] == "evt-1" + assert "timestamp" in event + assert event["data"]["session_id"] == "s1" + assert event["data"]["client_source"] == "scope" + + def test_missing_data_type_falls_back_to_outer(self): + envelope = {"id": "x", "type": "stream_trace", "timestamp": "123", "data": {}} + event = map_envelope_to_event(envelope) + assert event["type"] == "stream_trace" + + def test_missing_data_and_outer_type(self): + envelope = {"id": "x", "data": {}} + event = map_envelope_to_event(envelope) + assert event["type"] == "unknown" + + def test_missing_fields_omitted(self): + envelope = {"data": {"type": "error"}} + event = map_envelope_to_event(envelope) + assert "id" not in event + assert "timestamp" not in event + assert event["type"] == "error" + + +# --------------------------------------------------------------------------- +# Global accessors +# --------------------------------------------------------------------------- + + +class TestGlobalAccessors: + def test_get_set_reporter(self): + original = get_metrics_reporter() + try: + reporter = MetricsReporter(api_key="test") + set_metrics_reporter(reporter) + assert get_metrics_reporter() is reporter + finally: + set_metrics_reporter(original) + + def test_is_metrics_reporter_enabled_with_key(self): + with patch.dict("os.environ", {"SCOPE_CLOUD_API_KEY": "test-key"}, clear=False): + from scope.server import metrics_reporter as mr + + assert mr.is_metrics_reporter_enabled() + + def test_is_metrics_reporter_disabled_explicit(self): + with patch.dict( + "os.environ", + {"SCOPE_METRICS_ENABLED": "false", "SCOPE_CLOUD_API_KEY": "key"}, + clear=False, + ): + from scope.server import metrics_reporter as mr + + assert not mr.is_metrics_reporter_enabled() + + +# --------------------------------------------------------------------------- +# MetricsReporter: batching & buffer +# --------------------------------------------------------------------------- + + +class TestEnqueueAndBatching: + def test_enqueue_adds_mapped_events(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(10): + reporter.enqueue(_make_envelope("stream_heartbeat", event_id=f"e-{i}")) + + assert reporter.buffered_count == 10 + + def test_enqueue_noop_when_stopped(self): + reporter = MetricsReporter(api_key="key") + reporter._started = False + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 0 + + def test_enqueue_noop_when_paused(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._paused = True + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 0 + + def test_buffer_max_size_drops_oldest(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._buffer = deque(maxlen=5) + + for i in range(10): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + assert reporter.buffered_count == 5 + batch = reporter._take_batch(5) + ids = [e["id"] for e in batch] + assert ids[0] == "e-5" + + def test_take_batch_respects_max_size(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(1000): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + batch = reporter._take_batch(MAX_BATCH_SIZE) + assert len(batch) == MAX_BATCH_SIZE + assert reporter.buffered_count == 500 + + def test_requeue_batch_preserves_order(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(3): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + batch = reporter._take_batch(3) + reporter._requeue_batch(batch) + + recovered = reporter._take_batch(3) + assert [e["id"] for e in recovered] == [e["id"] for e in batch] + + def test_build_body_shape(self): + reporter = MetricsReporter(api_key="key") + events = [map_envelope_to_event(_make_envelope())] + body = reporter._build_body(events) + + assert body["app"] == "scope" + assert "host" in body + assert body["events"] is events + + +# --------------------------------------------------------------------------- +# Reliability contract (response handling) +# --------------------------------------------------------------------------- + + +class TestResponseHandling: + @pytest.mark.anyio + async def test_200_drops_batch_resets_backoff(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = 16.0 + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(200), batch) + + assert reporter._backoff_s == INITIAL_BACKOFF_S + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_400_drops_batch(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(400, "bad request"), batch) + + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_401_pauses_and_requeues(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(401), batch) + + assert reporter._paused is True + assert reporter.buffered_count == 1 + + @pytest.mark.anyio + async def test_413_single_event_drops(self): + """A single oversized event on 413 is dropped (not retried infinitely).""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope(event_id="oversized"))] + await reporter._handle_response(_make_response(413), batch) + + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_413_multi_event_halves_and_retries(self): + """A multi-event 413 requeues and retries with half batch.""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=_make_response(200)) + reporter._client = mock_client + + batch = [ + map_envelope_to_event(_make_envelope(event_id=f"e-{i}")) for i in range(4) + ] + await reporter._handle_response(_make_response(413), batch) + + # Half batch was retried and accepted (200), rest requeued + assert reporter.buffered_count == 2 + + @pytest.mark.anyio + async def test_429_requeues_batch(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._handle_response(_make_response(429), batch) + + assert reporter.buffered_count == 1 + + @pytest.mark.anyio + async def test_5xx_requeues_with_backoff(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = INITIAL_BACKOFF_S + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ) as mock_sleep: + await reporter._handle_response(_make_response(502), batch) + + assert reporter.buffered_count == 1 + mock_sleep.assert_awaited_once_with(INITIAL_BACKOFF_S) + assert reporter._backoff_s == INITIAL_BACKOFF_S * 2 + + @pytest.mark.anyio + async def test_backoff_capped_at_max(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = 32.0 + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._handle_response(_make_response(500), batch) + + assert reporter._backoff_s == 60.0 + + +# --------------------------------------------------------------------------- +# Flush cycle +# --------------------------------------------------------------------------- + + +class TestFlushOnce: + @pytest.mark.anyio + async def test_flush_once_sends_batch(self): + reporter = MetricsReporter(api_key="test-key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=_make_response(200)) + reporter._client = mock_client + + for i in range(3): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + await reporter._flush_once() + + assert reporter.buffered_count == 0 + mock_client.post.assert_awaited_once() + call_kwargs = mock_client.post.call_args + assert call_kwargs.kwargs["headers"]["Authorization"] == "Bearer test-key" + + @pytest.mark.anyio + async def test_flush_once_noop_on_empty_buffer(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._client = AsyncMock() + + await reporter._flush_once() + + reporter._client.post.assert_not_awaited() + + @pytest.mark.anyio + async def test_flush_network_error_requeues(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=Exception("connection refused")) + reporter._client = mock_client + + reporter.enqueue(_make_envelope(event_id="net-err")) + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._flush_once() + + assert reporter.buffered_count == 1 + + +# --------------------------------------------------------------------------- +# Disk persistence +# --------------------------------------------------------------------------- + + +class TestDiskBuffer: + def test_persist_and_load(self, tmp_path: Path): + buf_path = tmp_path / "buffer.jsonl" + + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._buffer_path = buf_path + + reporter.enqueue(_make_envelope(event_id="disk-1")) + reporter.enqueue(_make_envelope(event_id="disk-2")) + + reporter._persist_disk_buffer() + assert buf_path.exists() + lines = buf_path.read_text().strip().splitlines() + assert len(lines) == 2 + + reporter._buffer.clear() + assert reporter.buffered_count == 0 + + reporter._load_disk_buffer() + assert reporter.buffered_count == 2 + assert buf_path.read_text() == "" + + def test_load_handles_missing_file(self, tmp_path: Path): + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = tmp_path / "nonexistent.jsonl" + reporter._load_disk_buffer() + assert reporter.buffered_count == 0 + + def test_load_handles_corrupt_lines(self, tmp_path: Path): + buf_path = tmp_path / "buffer.jsonl" + buf_path.write_text( + '{"type":"ok","data":{}}\nnot-json\n{"type":"ok2","data":{}}\n' + ) + + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = buf_path + reporter._load_disk_buffer() + assert reporter.buffered_count == 2 + + +# --------------------------------------------------------------------------- +# TelemetrySink protocol conformance +# --------------------------------------------------------------------------- + + +class TestTelemetrySinkConformance: + def test_isinstance_check(self): + """MetricsReporter satisfies the TelemetrySink runtime_checkable protocol.""" + from scope.server.kafka_publisher import TelemetrySink + + reporter = MetricsReporter(api_key="key") + assert isinstance(reporter, TelemetrySink) + + def test_emit_delegates_to_enqueue(self): + """emit() is the TelemetrySink entry point and buffers via enqueue().""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + envelope = _make_envelope("stream_started", event_id="emit-1") + reporter.emit(envelope) + + assert reporter.buffered_count == 1 + batch = reporter._take_batch(1) + assert batch[0]["id"] == "emit-1" + assert batch[0]["type"] == "stream_started" + + def test_publish_raw_event_routes_through_emit(self): + """When MetricsReporter is the active sink, publish_raw_event() routes to emit().""" + from scope.server.kafka_publisher import ( + get_telemetry_sink, + publish_raw_event, + set_telemetry_sink, + ) + + reporter = MetricsReporter(api_key="key") + reporter._started = True + + prev_sink = get_telemetry_sink() + try: + set_telemetry_sink(reporter) + + envelope = _make_envelope("stream_stopped", event_id="raw-1") + publish_raw_event(envelope) + + assert reporter.buffered_count == 1 + finally: + set_telemetry_sink(prev_sink) + + def test_emit_noop_when_stopped(self): + reporter = MetricsReporter(api_key="key") + reporter._started = False + reporter.emit(_make_envelope()) + assert reporter.buffered_count == 0 + + +# --------------------------------------------------------------------------- +# Start / stop lifecycle +# --------------------------------------------------------------------------- + + +class TestLifecycle: + @pytest.mark.anyio + async def test_start_and_stop(self, tmp_path: Path): + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = tmp_path / "empty.jsonl" + started = await reporter.start() + assert started is True + assert reporter.is_running is True + + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 1 + + await reporter.stop() + assert reporter.is_running is False + + @pytest.mark.anyio + async def test_start_fails_without_httpx(self): + reporter = MetricsReporter(api_key="key") + with patch.dict("sys.modules", {"httpx": None}): + with patch("builtins.__import__", side_effect=ImportError("no httpx")): + started = await reporter.start() + assert isinstance(started, bool) diff --git a/uv.lock b/uv.lock index b56e6761e..72fed792a 100644 --- a/uv.lock +++ b/uv.lock @@ -560,7 +560,7 @@ wheels = [ [[package]] name = "daydream-scope" -version = "0.2.4" +version = "0.2.5" source = { editable = "." } dependencies = [ { name = "accelerate" },