diff --git a/app/package-lock.json b/app/package-lock.json index 1e4614037..179e2463c 100644 --- a/app/package-lock.json +++ b/app/package-lock.json @@ -1,12 +1,12 @@ { "name": "daydream-scope-desktop", - "version": "0.2.4", + "version": "0.2.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "daydream-scope-desktop", - "version": "0.2.4", + "version": "0.2.5", "hasInstallScript": true, "license": "MIT", "dependencies": { diff --git a/app/package.json b/app/package.json index e6846df20..5ff622cd1 100644 --- a/app/package.json +++ b/app/package.json @@ -1,7 +1,7 @@ { "name": "daydream-scope-desktop", "productName": "Daydream Scope", - "version": "0.2.4", + "version": "0.2.5", "description": "Daydream Scope", "main": ".vite/build/main.js", "scripts": { diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 4f743a29b..8c12cf94b 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "daydream-scope-frontend", - "version": "0.2.4", + "version": "0.2.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "daydream-scope-frontend", - "version": "0.2.4", + "version": "0.2.5", "dependencies": { "@radix-ui/react-alert-dialog": "^1.1.15", "@radix-ui/react-dialog": "^1.1.15", diff --git a/frontend/package.json b/frontend/package.json index f9a1ad7a7..c36e5cd76 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,7 +1,7 @@ { "name": "daydream-scope-frontend", "private": true, - "version": "0.2.4", + "version": "0.2.5", "type": "module", "scripts": { "dev": "vite", diff --git a/pyproject.toml b/pyproject.toml index 0caf56723..2e93052aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "daydream-scope" -version = "0.2.4" +version = "0.2.5" description = "A tool for running and customizing real-time, interactive generative AI pipelines and models" readme = "README.md" requires-python = ">=3.12" @@ -83,7 +83,9 @@ midi = [ daydream-scope = "scope.server.app:main" build = "scope.server.build:main" download_models = "scope.server.download_models:main" +download_loras = "scope.server.download_loras:main" livepeer-runner = "scope.cloud.livepeer_app:main" +livepeer-scope = "scope.cloud.livepeer_scope_app:main" [project.urls] Homepage = "https://github.com/daydreamlive/scope" diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index 99c4276a6..0e228ad52 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -18,8 +18,9 @@ import queue import shutil import threading +import time import uuid -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from dataclasses import dataclass, field from pathlib import Path from typing import Any @@ -41,11 +42,12 @@ from pydantic import BaseModel import scope.server.app as scope_app_module +from scope.cloud.trickle_events_sink import TrickleEventsSink from scope.core.outputs import HARDWARE_SINK_MODES from scope.server.app import app as scope_app from scope.server.app import lifespan as scope_lifespan from scope.server.frame_processor import FrameProcessor -from scope.server.kafka_publisher import publish_event +from scope.server.kafka_publisher import publish_event, set_telemetry_sink from scope.server.logs_config import ( LOG_FORMAT, ScopeLogContextFilter, @@ -57,6 +59,8 @@ scope_client: httpx.AsyncClient | None = None _connection_active = False _connection_count = 0 +# Monotonic anchor for measuring runner cold-start (boot -> runner_ready). +_runner_start_monotonic = time.monotonic() STREAM_TASK_SHUTDOWN_GRACE_S = 1.0 STREAM_TASK_CANCEL_TIMEOUT_S = 1.0 @@ -65,7 +69,7 @@ # embedded assets (base64-encoded images/video) into a single request and # easily blow past the 1 MiB library default, which would tear down the # control-channel reader mid-import. -MAX_CONTROL_EVENT_BYTES = 128 * 1024 * 1024 +MAX_CONTROL_EVENT_BYTES = 5 * 1024 * 1024 REMOTE_VIDEO_CLOCK_RATE = 90_000 REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE) ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets") @@ -153,6 +157,35 @@ def _build_connection_info() -> dict[str, Any]: } +def _publish_media_loop_error( + session: LivepeerSession, error_type: str, exc: Exception +) -> None: + """Emit a structured error telemetry event for a failed media loop. + + These loops previously only logged on failure; surfacing them as telemetry + lets the client (the egress point) see runner-side media failures that have + no reachable Kafka of their own. + + NOTE: This only fires in cloud mode when the user has opted in by providing + their SCOPE_CLOUD_API_KEY. In local/standalone mode, no telemetry leaves + the machine. + """ + publish_event( + event_type="error", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + error={ + "error_type": error_type, + "message": str(exc), + "exception_type": type(exc).__name__, + "recoverable": False, + }, + connection_info=session.connection_info, + metadata={"mode": "livepeer"}, + ) + + async def _shutdown_task( task: asyncio.Task | None, *, @@ -257,6 +290,14 @@ def _resolve_produces_video( return bool(status_info.get("produces_video", True)) +def _public_stream_channel(channel: dict[str, Any]) -> dict[str, Any]: + return { + "url": channel["url"], + "direction": channel["direction"], + "mime_type": channel.get("mime_type", ""), + } + + async def _request_stream_channels( session: LivepeerSession, *, @@ -294,13 +335,17 @@ async def _request_stream_channels( mime_type = channel.get("mime_type") if not isinstance(url, str) or not isinstance(ch_direction, str): continue - normalized.append( - { - "url": url, - "direction": ch_direction, - "mime_type": mime_type if isinstance(mime_type, str) else "", - } - ) + internal_url = channel.get("internal_url") + internal_url = internal_url if isinstance(internal_url, str) else "" + normalized_channel = { + "url": url, + "direction": ch_direction, + "mime_type": mime_type if isinstance(mime_type, str) else "", + "io_url": internal_url or url, + } + if internal_url: + normalized_channel["internal_url"] = internal_url + normalized.append(normalized_channel) return normalized @@ -423,6 +468,7 @@ async def _media_input_loop( raise except Exception as exc: logger.error("Media input loop failed: %s", exc) + _publish_media_loop_error(session, "media_input_loop_failed", exc) finally: try: await media_output.close() @@ -551,6 +597,7 @@ async def _media_output_loop( raise except Exception as exc: logger.error("Media output loop failed: %s", exc) + _publish_media_loop_error(session, "media_output_loop_failed", exc) finally: try: await publisher.close() @@ -670,6 +717,7 @@ async def _media_audio_output_loop( raise except Exception as exc: logger.error("Media audio output loop failed: %s", exc) + _publish_media_loop_error(session, "media_audio_output_loop_failed", exc) finally: try: await publisher.close() @@ -987,14 +1035,14 @@ async def _handle_control_message( inbound_url: str | None = None for channel in channels: ch = { - **channel, + **_public_stream_channel(channel), "role": "input", "input_track_index": input_idx, "source_node_id": source_node_id, } active_channels.append(ch) if channel["direction"] == "in": - inbound_url = channel["url"] + inbound_url = channel["io_url"] if inbound_url is None: raise RuntimeError("response did not include input track URL") input_subscribe_urls[input_idx] = inbound_url @@ -1009,7 +1057,7 @@ async def _handle_control_message( ) for channel in channels: ch = { - **channel, + **_public_stream_channel(channel), "role": "output", "output_track_index": output_idx, "sink_node_id": sink_node_id, @@ -1017,7 +1065,7 @@ async def _handle_control_message( } active_channels.append(ch) if channel["direction"] == "out": - outbound_url = channel["url"] + outbound_url = channel["io_url"] if outbound_url is None: raise RuntimeError("response did not include output track URL") output_publish_urls[output_idx] = outbound_url @@ -1029,13 +1077,13 @@ async def _handle_control_message( ) for channel in channels: ch = { - **channel, + **_public_stream_channel(channel), "role": "output_audio", "output_media_kind": "audio", } active_channels.append(ch) if channel["direction"] == "out": - audio_publish_url = channel["url"] + audio_publish_url = channel["io_url"] if audio_publish_url is None: raise RuntimeError( "response did not include audio output track URL" @@ -1277,6 +1325,16 @@ async def _forward_notifications_to_events() -> None: notif_task = asyncio.create_task(_forward_notifications_to_events()) + # Route runner-side telemetry (publish_event calls from the embedded Scope + # app) onto the events channel instead of a Kafka broker the runner cannot + # reach. The client re-publishes whatever arrives. + runner_loop = asyncio.get_running_loop() + telemetry_sink = TrickleEventsSink(runner_loop) + set_telemetry_sink(telemetry_sink) + telemetry_task = asyncio.create_task( + telemetry_sink.drain_to(events_writer, stop_event) + ) + try: await events_writer.write( { @@ -1284,6 +1342,19 @@ async def _forward_notifications_to_events() -> None: "runner_job_id": os.getenv("FAL_JOB_ID") or os.getenv("FAL_RUNNER_ID"), } ) + # Emit cold-start timing as telemetry (distinct from the control-protocol + # runner_ready above, which the client consumes to unblock startup). + publish_event( + event_type="runner_ready", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + connection_info=session.connection_info, + metadata={ + "mode": "livepeer", + "startup_ms": int((time.monotonic() - _runner_start_monotonic) * 1000), + }, + ) async for message in JSONLReader(control_url)( max_event_bytes=MAX_CONTROL_EVENT_BYTES ): @@ -1318,6 +1389,15 @@ async def _forward_notifications_to_events() -> None: except asyncio.CancelledError: pass session.notification_queue = None + # Stop accepting telemetry before draining so late publish_event calls + # during shutdown no-op instead of enqueueing onto a closing queue. + set_telemetry_sink(None) + if not telemetry_sink.request_stop(): + telemetry_task.cancel() + try: + await telemetry_task + except asyncio.CancelledError: + pass await _stop_stream(session) try: await events_writer.close() @@ -1501,9 +1581,32 @@ async def websocket_endpoint(ws: WebSocket) -> None: # Complete the handshake with the orchestrator await ws.send_json({"type": "started"}) - # Keep the WebSocket open and route orchestrator responses used by control handlers. + # Keep the WebSocket open and route orchestrator responses used by control + # handlers. Also watch the control-channel task: if the trickle + # control/events channels are removed, the task exits and this websocket + # session should unwind instead of waiting forever for websocket traffic. while True: - raw_message = await ws.receive_text() + receive_task = asyncio.create_task(ws.receive_text()) + done, pending = await asyncio.wait( + {receive_task, control_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + if control_task in done: + if receive_task in pending: + receive_task.cancel() + try: + await receive_task + except asyncio.CancelledError: + pass + logger.info("Control channel ended; closing websocket session") + with suppress(Exception): + await ws.close( + code=4000, + reason="control channel ended", + ) + break + + raw_message = receive_task.result() try: message = json.loads(raw_message) except json.JSONDecodeError: diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 1d234a0b6..580074125 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -32,6 +32,10 @@ ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets" +class RunnerSessionEnded(Exception): + """Raised when the inner runner intentionally ends the Livepeer session.""" + + # --------------------------------------------------------------------------- # Kafka publisher — matches fal_app.py KafkaPublisher for event parity # --------------------------------------------------------------------------- @@ -364,12 +368,22 @@ async def runner_to_client() -> None: # the client is gone (normal shutdown) so prioritize that. # Otherwise, re-raise for other types of unexpected errors. disconnect_exc: WebSocketDisconnect | None = None + session_ended_exc: RunnerSessionEnded | None = None unexpected_exc: Exception | None = None for task in (*done, *pending): try: await task - except (asyncio.CancelledError, ConnectionClosed): + except asyncio.CancelledError: pass + except ConnectionClosed as exc: + # Convert the runner's intentional control-channel shutdown into + # a local terminal signal so the fal proxy loop does not reconnect. + for close in (getattr(exc, "rcvd", None), getattr(exc, "sent", None)): + if getattr(close, "code", None) == 4000: + session_ended_exc = RunnerSessionEnded( + "runner control channel ended" + ) + break except WebSocketDisconnect as exc: disconnect_exc = disconnect_exc or exc except Exception as exc: @@ -377,6 +391,8 @@ async def runner_to_client() -> None: if disconnect_exc is not None: raise disconnect_exc + if session_ended_exc is not None: + raise session_ended_exc if unexpected_exc is not None: raise unexpected_exc @@ -546,6 +562,8 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: print(f"Connecting proxy to runner websocket at {RUNNER_LOCAL_WS_URL}") try: await _proxy_ws(client_ws) + except RunnerSessionEnded: + break except ( ConnectionClosed, InvalidStatus, diff --git a/src/scope/cloud/livepeer_scope_app.py b/src/scope/cloud/livepeer_scope_app.py new file mode 100644 index 000000000..0b8d14eed --- /dev/null +++ b/src/scope/cloud/livepeer_scope_app.py @@ -0,0 +1,937 @@ +"""Livepeer live-runner wrapper for Scope. + +This app is the outer Livepeer runner registered with the orchestrator. It +accepts the SDK ``/scope`` live-runner request, creates the control/events +trickle channels, then bridges the current Scope websocket runner protocol to +the new live-runner session model. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import re +import signal +import subprocess +from contextlib import asynccontextmanager, suppress +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any +from urllib.parse import urlparse, urlunparse + +import aiohttp +import click +import uvicorn +from fastapi import FastAPI, HTTPException, Request +from livepeer_gateway.live_runner import ( + LiveRunnerRegistration, + create_trickle_channels, + register_runner, + remove_trickle_channels, + stop_runner_session, +) + +logger = logging.getLogger(__name__) + +LIVEPEER_APP_NAME = "live-video-to-video/scope" +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 8989 +DEFAULT_INNER_PORT = 8001 +DEFAULT_ORCHESTRATOR_URL = "http://localhost:8935" +DEFAULT_INNER_STARTUP_TIMEOUT_SECONDS = 90.0 +INNER_WS_HANDSHAKE_TIMEOUT_SECONDS = 20.0 +DEFAULT_RETRY_DELAY_SECONDS = 2.5 +DEFAULT_MAX_FAILURES_PER_WINDOW = 20 +DEFAULT_FAILURE_WINDOW_SECONDS = 60.0 +DEFAULT_PRICE_PER_UNIT = 0 +DEFAULT_PIXELS_PER_UNIT = 1 +DEFAULT_PRICE_UNIT = "USD" +CHANNEL_MIME_JSONL = "application/jsonl" +DEFAULT_SCOPE_HOME_DIRNAME = ".daydream-scope" +DEFAULT_SESSION_ASSETS_DIR = Path("/tmp/.daydream-scope/assets") + +_registration: LiveRunnerRegistration | None = None +_inner_process: subprocess.Popen | None = None +_sessions: dict[str, ScopeBridgeSession] = {} +_cleanup_event: asyncio.Event | None = None + + +@dataclass(slots=True) +class ScopeRunnerSettings: + host: str = DEFAULT_HOST + port: int = DEFAULT_PORT + orchestrator_url: str = DEFAULT_ORCHESTRATOR_URL + orch_secret: str = "" + runner_url: str = "" + inner_ws_url: str = f"ws://127.0.0.1:{DEFAULT_INNER_PORT}/ws" + inner_host: str = DEFAULT_HOST + inner_port: int = DEFAULT_INNER_PORT + inner_startup_timeout_seconds: float = DEFAULT_INNER_STARTUP_TIMEOUT_SECONDS + retry_delay_seconds: float = DEFAULT_RETRY_DELAY_SECONDS + max_failures_per_window: int = DEFAULT_MAX_FAILURES_PER_WINDOW + failure_window_seconds: float = DEFAULT_FAILURE_WINDOW_SECONDS + price_per_unit: int = DEFAULT_PRICE_PER_UNIT + pixels_per_unit: int = DEFAULT_PIXELS_PER_UNIT + price_unit: str = DEFAULT_PRICE_UNIT + + def resolved_runner_url(self) -> str: + if self.runner_url: + return self.runner_url + return f"http://{self.host}:{self.port}" + + +@dataclass(slots=True) +class ScopeBridgeSession: + session_id: str + headers: dict[str, str] + job_info: dict[str, Any] + settings: ScopeRunnerSettings + task: asyncio.Task | None = None + websocket: Any | None = None + channel_url_to_name: dict[str, str] = field(default_factory=dict) + stop_requested: bool = False + cleanup_started: bool = False + channel_counter: int = 0 + + +settings = ScopeRunnerSettings() + + +def _env_int(name: str, default: int) -> int: + raw = os.getenv(name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{name} must be an integer") from exc + + +def _validate_price_settings(price_per_unit: int, pixels_per_unit: int) -> None: + if pixels_per_unit <= 0: + raise ValueError("LIVEPEER_RUNNER_PIXELS_PER_UNIT must be positive") + + +def _settings_from_env() -> ScopeRunnerSettings: + port = int(os.getenv("LIVEPEER_RUNNER_PORT", str(DEFAULT_PORT))) + inner_port = int(os.getenv("LIVEPEER_INNER_PORT", str(DEFAULT_INNER_PORT))) + host = os.getenv("LIVEPEER_RUNNER_HOST", DEFAULT_HOST) + runner_url = os.getenv("LIVEPEER_RUNNER_URL", "") + price_per_unit = _env_int( + "LIVEPEER_RUNNER_PRICE_PER_UNIT", + DEFAULT_PRICE_PER_UNIT, + ) + pixels_per_unit = _env_int( + "LIVEPEER_RUNNER_PIXELS_PER_UNIT", + DEFAULT_PIXELS_PER_UNIT, + ) + _validate_price_settings(price_per_unit, pixels_per_unit) + inner_ws_url = os.getenv( + "LIVEPEER_INNER_WS_URL", + f"ws://127.0.0.1:{inner_port}/ws", + ) + inner_host, parsed_inner_port = _inner_bind_from_ws_url(inner_ws_url) + return ScopeRunnerSettings( + host=host, + port=port, + orchestrator_url=os.getenv("LIVEPEER_ORCH_URL", DEFAULT_ORCHESTRATOR_URL), + orch_secret=os.getenv("LIVEPEER_ORCH_SECRET", ""), + runner_url=runner_url, + inner_ws_url=inner_ws_url, + inner_host=os.getenv("LIVEPEER_INNER_HOST", inner_host), + inner_port=inner_port + if "LIVEPEER_INNER_PORT" in os.environ + else parsed_inner_port, + price_per_unit=price_per_unit, + pixels_per_unit=pixels_per_unit, + price_unit=os.getenv("LIVEPEER_RUNNER_PRICE_UNIT", DEFAULT_PRICE_UNIT), + ) + + +def _inner_bind_from_ws_url(inner_ws_url: str) -> tuple[str, int]: + parsed = urlparse(inner_ws_url) + return parsed.hostname or DEFAULT_HOST, parsed.port or DEFAULT_INNER_PORT + + +def _header_snapshot(request: Request) -> dict[str, str]: + return { + "Livepeer-Runner-Route": request.headers.get("Livepeer-Runner-Route", ""), + "Livepeer-Session-Id": request.headers.get("Livepeer-Session-Id", ""), + "Livepeer-Session-Token": request.headers.get("Livepeer-Session-Token", ""), + "Livepeer-Session-Control": request.headers.get("Livepeer-Session-Control", ""), + } + + +def _session_id(headers: dict[str, str]) -> str: + session_id = headers.get("Livepeer-Session-Id", "").strip() + if not session_id: + raise HTTPException(status_code=400, detail="missing Livepeer-Session-Id") + return session_id + + +async def _register_runner( + runner_settings: ScopeRunnerSettings, +) -> LiveRunnerRegistration: + if not runner_settings.orch_secret: + raise RuntimeError("Livepeer orchestrator secret is required") + return await register_runner( + runner_settings.orchestrator_url, + secret=runner_settings.orch_secret, + runner_url=runner_settings.resolved_runner_url(), + app=LIVEPEER_APP_NAME, + price_per_unit=runner_settings.price_per_unit, + pixels_per_unit=runner_settings.pixels_per_unit, + price_unit=runner_settings.price_unit, + mode="persistent", + capacity=1, + on_session_release=_on_session_release, + ) + + +def _build_inner_scope_env() -> dict[str, str]: + env = os.environ.copy() + + shared_dir = Path.home() / DEFAULT_SCOPE_HOME_DIRNAME / "models" + session_dir = DEFAULT_SESSION_ASSETS_DIR + + # Shared / persistent data. + env.setdefault("DAYDREAM_SCOPE_MODELS_DIR", str(shared_dir)) + env.setdefault("DAYDREAM_SCOPE_LORA_SHARED_DIR", str(shared_dir / "lora")) + + # Session local data, removed after each session. + env.setdefault("DAYDREAM_SCOPE_ASSETS_DIR", str(session_dir)) + env.setdefault("DAYDREAM_SCOPE_LORA_DIR", str(session_dir / "lora")) + env.setdefault("DAYDREAM_SCOPE_LOGS_DIR", str(session_dir / "logs")) + env.setdefault("DAYDREAM_SCOPE_PLUGINS_DIR", str(session_dir / "plugins")) + + return env + + +def _start_inner_scope(runner_settings: ScopeRunnerSettings) -> subprocess.Popen: + command = [ + "livepeer-runner", + "--host", + runner_settings.inner_host, + "--port", + str(runner_settings.inner_port), + ] + logger.info("Starting inner Scope runner: %s", " ".join(command)) + env = _build_inner_scope_env() + env.setdefault("SCOPE_HOST", runner_settings.inner_host) + env.setdefault("SCOPE_PORT", str(runner_settings.inner_port)) + popen_kwargs: dict[str, Any] = {"env": env} + if os.name != "nt": + # Give the inner runner its own process group so Ctrl-C can clean up + # anything it spawns instead of leaving orphaned model/media workers. + popen_kwargs["start_new_session"] = True + return subprocess.Popen(command, **popen_kwargs) + + +async def _wait_for_inner_scope( + runner_settings: ScopeRunnerSettings, + process: subprocess.Popen | None = None, +) -> None: + deadline = ( + asyncio.get_running_loop().time() + + runner_settings.inner_startup_timeout_seconds + ) + status_url = _inner_status_url(runner_settings.inner_ws_url) + while True: + # If the child died, the status endpoint will never come up. Fail now + # instead of logging connection failures until the startup timeout. + if process is not None and process.poll() is not None: + raise RuntimeError( + "Inner Scope runner process exited during startup " + f"(pid={getattr(process, 'pid', None)} code={process.returncode})" + ) + + try: + async with aiohttp.ClientSession() as client: + async with client.get( + status_url, + timeout=aiohttp.ClientTimeout(total=2.0), + ) as response: + if response.status == 200: + status = await response.json() + if ( + status.get("listener_ready") is True + and status.get("connection_active") is False + ): + return + logger.info( + "Inner Scope runner not ready for new websocket: %s", + status, + ) + else: + text = await response.text() + logger.info( + "Inner Scope status endpoint returned HTTP %s: %s", + response.status, + text[:500], + ) + except Exception as exc: + logger.debug("Inner Scope status check failed: %s", exc) + + if asyncio.get_running_loop().time() >= deadline: + raise RuntimeError( + f"Timed out waiting for inner Scope runner at {status_url}" + ) + await asyncio.sleep(1.0) + + +def _inner_status_url(inner_ws_url: str) -> str: + parsed = urlparse(inner_ws_url) + scheme = "https" if parsed.scheme == "wss" else "http" + path = "/internal/status" + return urlunparse((scheme, parsed.netloc, path, "", "", "")) + + +def _inner_cleanup_url(inner_ws_url: str) -> str: + parsed = urlparse(inner_ws_url) + scheme = "https" if parsed.scheme == "wss" else "http" + path = "/internal/cleanup-session" + return urlunparse((scheme, parsed.netloc, path, "", "", "")) + + +def _get_cleanup_event() -> asyncio.Event: + global _cleanup_event + if _cleanup_event is None: + _cleanup_event = asyncio.Event() + _cleanup_event.set() + return _cleanup_event + + +async def _cleanup_inner_scope_session( + runner_settings: ScopeRunnerSettings, +) -> None: + cleanup_url = _inner_cleanup_url(runner_settings.inner_ws_url) + try: + async with aiohttp.ClientSession() as client: + async with client.post( + cleanup_url, + timeout=aiohttp.ClientTimeout(total=180.0), + ) as response: + text = await response.text() + if response.status != 200: + logger.warning( + "Inner runner cleanup endpoint failed: HTTP %s: %s", + response.status, + text[:200], + ) + return + + try: + payload = json.loads(text) + except json.JSONDecodeError: + logger.warning( + "Inner runner cleanup endpoint returned invalid JSON: %s", + text[:200], + ) + return + + if not isinstance(payload, dict) or not payload.get("ok", False): + logger.warning( + "Inner runner cleanup completed with issues: %s", + payload, + ) + else: + logger.info("Inner runner cleanup completed successfully") + except Exception as exc: + logger.warning("Inner runner cleanup request failed: %s", exc) + + +async def _run_inner_cleanup(runner_settings: ScopeRunnerSettings) -> None: + event = _get_cleanup_event() + event.clear() + try: + await _cleanup_inner_scope_session(runner_settings) + except Exception as exc: + logger.warning("Inner runner cleanup request failed: %s", exc) + finally: + event.set() + + +async def _stop_inner_scope() -> None: + global _inner_process + process = _inner_process + _inner_process = None + if process is None or process.poll() is not None: + return + + # Prefer stopping the process group created in _start_inner_scope; fall + # back to the direct child on Windows or if the process group is gone. + pgid = None + if os.name != "nt" and hasattr(os, "getpgid") and hasattr(os, "killpg"): + with suppress(ProcessLookupError): + pgid = os.getpgid(process.pid) + + if pgid is None: + process.terminate() + else: + os.killpg(pgid, signal.SIGTERM) + try: + await asyncio.wait_for( + asyncio.to_thread(process.wait), + timeout=10.0, + ) + except TimeoutError: + # Some media/model stacks ignore SIGTERM during shutdown; do not leave + # the terminal stuck forever waiting for the inner runner. + if pgid is None: + process.kill() + else: + os.killpg(pgid, signal.SIGKILL) + await asyncio.to_thread(process.wait) + + +@asynccontextmanager +async def lifespan(_app: FastAPI): + await _start_livepeer_scope() + try: + yield + finally: + await _shutdown_livepeer_scope() + + +async def _start_livepeer_scope() -> None: + global _registration + global _inner_process + + _inner_process = _start_inner_scope(settings) + try: + await _wait_for_inner_scope(settings, _inner_process) + _registration = await _register_runner(settings) + except BaseException: + # Lifespan startup can be cancelled by Ctrl-C before normal shutdown + # runs, so clean up the child here too. + await _stop_inner_scope() + raise + logger.info( + "Registered Scope live runner app=%s runner_url=%s", + LIVEPEER_APP_NAME, + settings.resolved_runner_url(), + ) + + +async def _shutdown_livepeer_scope() -> None: + global _registration + + await _cleanup_all_sessions(notify_orchestrator=True) + if _registration is not None: + await _registration.close() + _registration = None + await _stop_inner_scope() + + +async def _run_livepeer_scope_server(host: str, port: int) -> int: + config = uvicorn.Config(app, host=host, port=port, log_config=None, lifespan="off") + server = uvicorn.Server(config) + server_task = asyncio.create_task(server.serve()) + + try: + while not server.started and not server.should_exit: + await asyncio.sleep(0.05) + + if server.should_exit: + await server_task + return 1 + + try: + await _start_livepeer_scope() + except Exception as exc: + logger.error("Livepeer Scope startup failed: %s", exc) + server.should_exit = True + await server_task + return 1 + + try: + await server_task + finally: + await _shutdown_livepeer_scope() + return 0 + except BaseException: + server.should_exit = True + with suppress(Exception): + await _shutdown_livepeer_scope() + raise + + +app = FastAPI( + lifespan=lifespan, + title="Livepeer Scope Runner App", + description="Registers Scope as a Livepeer live runner and bridges to the inner Scope websocket runner.", +) + + +@app.post("/scope") +async def scope_endpoint(request: Request) -> dict[str, Any]: + # Ensure previous sessions are fully torn down before accepting a new one. + await _get_cleanup_event().wait() + + headers = _header_snapshot(request) + session_id = _session_id(headers) + if session_id in _sessions: + raise HTTPException(status_code=409, detail="session already active") + + body = await request.json() + if not isinstance(body, dict): + raise HTTPException(status_code=400, detail="request body must be an object") + + bridge_session = ScopeBridgeSession( + session_id=session_id, + headers=headers, + job_info={}, + settings=settings, + ) + + channels = await create_trickle_channels( + bridge_session, + [ + {"name": "control", "mime_type": CHANNEL_MIME_JSONL}, + {"name": "events", "mime_type": CHANNEL_MIME_JSONL}, + ], + ) + by_name = {channel["name"]: channel for channel in channels} + control = by_name.get("control") + events = by_name.get("events") + if control is None or events is None: + raise HTTPException( + status_code=502, + detail="orchestrator did not return control/events channels", + ) + + bridge_session.job_info = { + "manifest_id": session_id, + "control_url": control.get("internal_url") or control["url"], + "events_url": events.get("internal_url") or events["url"], + "params": body.get("params") if isinstance(body.get("params"), dict) else {}, + } + bridge_session.channel_url_to_name[control["url"]] = control["name"] + bridge_session.channel_url_to_name[events["url"]] = events["name"] + _sessions[session_id] = bridge_session + + try: + first_ws = await _connect_inner_once(bridge_session) + except Exception: + _sessions.pop(session_id, None) + with suppress(Exception): + await remove_trickle_channels(bridge_session, ["control", "events"]) + raise + + bridge_session.task = asyncio.create_task( + _bridge_loop(bridge_session, first_ws=first_ws) + ) + + return { + "manifest_id": session_id, + "session_id": session_id, + "control_url": control["url"], + "events_url": events["url"], + } + + +async def _bridge_loop( + session: ScopeBridgeSession, + *, + first_ws: Any | None = None, +) -> None: + reconnectable = _reconnectable_exceptions() + failure_timestamps: list[float] = [] + ws = first_ws + + try: + while not session.stop_requested: + try: + if ws is None: + ws = await _connect_inner_once(session) + await _read_ws(session, ws) + session.stop_requested = True + break + except reconnectable as exc: + # The inner runner uses close code 4000 when the Livepeer control + # channel has ended. That is an intentional session shutdown, not + # a transport failure to heal with another websocket connection. + if _is_control_channel_close(exc): + session.stop_requested = True + break + logger.warning( + "Inner Scope websocket disconnected, reconnecting: %s", exc + ) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Inner Scope bridge failed") + session.stop_requested = True + + with suppress(Exception): + if ws is not None: + await ws.close() + ws = None + + if session.stop_requested: + break + + now = asyncio.get_running_loop().time() + cutoff = now - session.settings.failure_window_seconds + failure_timestamps.append(now) + failure_timestamps = [ts for ts in failure_timestamps if ts > cutoff] + if len(failure_timestamps) > session.settings.max_failures_per_window: + logger.error( + "Inner Scope websocket failed too often; stopping session %s", + session.session_id, + ) + break + await asyncio.sleep(session.settings.retry_delay_seconds) + finally: + await _cleanup_session(session, notify_orchestrator=True) + + +async def _connect_inner_once(session: ScopeBridgeSession) -> Any: + import websockets + + await _wait_for_inner_scope(session.settings) + ws = await websockets.connect(session.settings.inner_ws_url) + session.websocket = ws + + ready = parse_json( + await asyncio.wait_for(ws.recv(), timeout=INNER_WS_HANDSHAKE_TIMEOUT_SECONDS) + ) + if ready.get("type") != "ready": + await ws.close() + raise RuntimeError(f"inner runner did not send ready: {ready!r}") + + await ws.send(json.dumps(session.job_info)) + started = parse_json( + await asyncio.wait_for(ws.recv(), timeout=INNER_WS_HANDSHAKE_TIMEOUT_SECONDS) + ) + if started.get("type") == "error": + await ws.close() + _raise_inner_start_error(started) + if started.get("type") != "started": + await ws.close() + raise RuntimeError(f"inner runner did not start: {started!r}") + return ws + + +async def _read_ws(session: ScopeBridgeSession, ws: Any) -> None: + async for raw in ws: + await _handle_ws_message(session, ws, parse_json(raw)) + + +def parse_json(raw: str) -> dict[str, Any]: + message = json.loads(raw) + if not isinstance(message, dict): + raise RuntimeError(f"expected websocket JSON object, got {message!r}") + return message + + +def _raise_inner_start_error(message: dict[str, Any]) -> None: + error = message.get("error") + detail = error if isinstance(error, str) and error else "inner runner did not start" + if message.get("code") == "ACCESS_DENIED": + raise HTTPException(status_code=401, detail=detail) + raise RuntimeError(f"inner runner did not start: {message!r}") + + +async def _handle_ws_message( + session: ScopeBridgeSession, + ws: Any, + message: dict[str, Any], +) -> None: + msg_type = message.get("type") + if msg_type == "create_channels": + await _handle_create_channels(session, ws, message) + return + if msg_type == "close_channels": + await _handle_close_channels(session, message) + return + if msg_type == "ping": + await ws.send( + json.dumps( + { + "type": "pong", + "request_id": message.get("request_id"), + "timestamp": message.get("timestamp"), + } + ) + ) + return + if msg_type == "restarting": + request_id = message.get("request_id") + if not request_id: + logger.warning( + "Ignoring inner restarting message without request_id for session %s", + session.session_id, + ) + return + await ws.send( + json.dumps( + { + "type": "response", + "request_id": request_id, + } + ) + ) + logger.info( + "Acknowledged inner restarting message for session %s", + session.session_id, + ) + return + + logger.debug("Ignoring inner websocket message type=%s", msg_type) + + +async def _handle_create_channels( + session: ScopeBridgeSession, + ws: Any, + message: dict[str, Any], +) -> None: + request_id = message.get("request_id") + direction = message.get("direction") + if direction not in {"in", "out"}: + await _send_ws_error(ws, request_id, "create_channels direction must be in/out") + return + + mime_type = message.get("mime_type") + if not isinstance(mime_type, str) or not mime_type: + mime_type = "video/MP2T" + channel_name = _next_channel_name(session, direction, request_id) + try: + channels = await create_trickle_channels( + session, + [{"name": channel_name, "mime_type": mime_type}], + ) + except Exception as exc: + logger.exception("Failed to create stream channel") + await _send_ws_error(ws, request_id, str(exc)) + return + + response_channels = [] + for channel in channels: + url = channel.get("url") + if not isinstance(url, str) or not url: + continue + session.channel_url_to_name[url] = channel.get("name", channel_name) + response_channel = { + **channel, + "direction": direction, + "mime_type": channel.get("mime_type", mime_type), + } + response_channels.append(response_channel) + + await ws.send( + json.dumps( + { + "type": "response", + "request_id": request_id, + "channels": response_channels, + } + ) + ) + + +async def _handle_close_channels( + session: ScopeBridgeSession, + message: dict[str, Any], +) -> None: + urls = message.get("channels") + if not isinstance(urls, list): + return + names = [] + for url in urls: + if not isinstance(url, str): + continue + name = session.channel_url_to_name.pop(url, None) + if name is not None: + names.append(name) + if not names: + return + with suppress(Exception): + await remove_trickle_channels(session, names) + + +async def _send_ws_error(ws: Any, request_id: Any, error: str) -> None: + await ws.send( + json.dumps( + { + "type": "response", + "request_id": request_id, + "error": error, + "channels": [], + } + ) + ) + + +def _next_channel_name( + session: ScopeBridgeSession, + direction: str, + request_id: Any, +) -> str: + session.channel_counter += 1 + request_part = str(request_id or session.channel_counter) + request_part = re.sub(r"[^A-Za-z0-9_.-]+", "-", request_part).strip("-") + if not request_part: + request_part = str(session.channel_counter) + return f"{direction}-{request_part}-{session.channel_counter}" + + +async def _cleanup_all_sessions(*, notify_orchestrator: bool) -> None: + sessions = list(_sessions.values()) + await asyncio.gather( + *( + _cleanup_session(session, notify_orchestrator=notify_orchestrator) + for session in sessions + ), + return_exceptions=True, + ) + + +async def _cleanup_session( + session: ScopeBridgeSession, + *, + notify_orchestrator: bool, +) -> None: + if session.cleanup_started: + return + session.cleanup_started = True + session.stop_requested = True + _sessions.pop(session.session_id, None) + + task = session.task + if task is not None and task is not asyncio.current_task() and not task.done(): + task.cancel() + with suppress(asyncio.CancelledError): + await task + + websocket = session.websocket + session.websocket = None + if websocket is not None: + with suppress(Exception): + await websocket.close() + + channel_names = list(set(session.channel_url_to_name.values())) + session.channel_url_to_name.clear() + with suppress(Exception): + await remove_trickle_channels(session, channel_names) + + if notify_orchestrator: + with suppress(Exception): + await stop_runner_session(session) + + await _run_inner_cleanup(session.settings) + + +def _reconnectable_exceptions() -> tuple[type[BaseException], ...]: + from websockets.exceptions import ConnectionClosed, InvalidHandshake, InvalidStatus + + return (ConnectionClosed, InvalidStatus, InvalidHandshake, OSError) + + +def _is_control_channel_close(exc: BaseException) -> bool: + for close in (getattr(exc, "rcvd", None), getattr(exc, "sent", None)): + if getattr(close, "code", None) == 4000: + return True + return False + + +def _on_session_release(event: Any) -> None: + session = _sessions.get(event.session_id) + if session is None: + return + asyncio.create_task(_cleanup_session(session, notify_orchestrator=False)) + + +@click.command() +@click.option("--host", default=DEFAULT_HOST, show_default=True, help="Host to bind to") +@click.option("--port", default=DEFAULT_PORT, show_default=True, help="Port to bind to") +@click.option( + "--orchestrator", + default=DEFAULT_ORCHESTRATOR_URL, + show_default=True, + help="Livepeer orchestrator URL", +) +@click.option( + "--orch-secret", + envvar="LIVEPEER_ORCH_SECRET", + required=True, + help="Livepeer runner bootstrap secret", +) +@click.option( + "--runner-url", + default="", + help="Public runner URL registered with the orchestrator", +) +@click.option( + "--inner-ws-url", + default=f"ws://127.0.0.1:{DEFAULT_INNER_PORT}/ws", + show_default=True, + help="Inner Scope runner websocket URL", +) +@click.option( + "--price-per-unit", + envvar="LIVEPEER_RUNNER_PRICE_PER_UNIT", + default=DEFAULT_PRICE_PER_UNIT, + show_default=True, + type=int, + help="Runner price per unit advertised to the orchestrator", +) +@click.option( + "--pixels-per-unit", + envvar="LIVEPEER_RUNNER_PIXELS_PER_UNIT", + default=DEFAULT_PIXELS_PER_UNIT, + show_default=True, + type=int, + help="Pixels per advertised price unit", +) +@click.option( + "--price-unit", + envvar="LIVEPEER_RUNNER_PRICE_UNIT", + default=DEFAULT_PRICE_UNIT, + show_default=True, + help="Currency unit for advertised runner price", +) +def main( + host: str, + port: int, + orchestrator: str, + orch_secret: str, + runner_url: str, + inner_ws_url: str, + price_per_unit: int, + pixels_per_unit: int, + price_unit: str, +) -> None: + """Run the Livepeer Scope live-runner app.""" + global settings + logging.basicConfig(level=logging.INFO) + _validate_price_settings(price_per_unit, pixels_per_unit) + inner_host, inner_port = _inner_bind_from_ws_url(inner_ws_url) + settings = ScopeRunnerSettings( + host=host, + port=port, + orchestrator_url=orchestrator, + orch_secret=orch_secret, + runner_url=runner_url, + inner_ws_url=inner_ws_url, + inner_host=inner_host, + inner_port=inner_port, + price_per_unit=price_per_unit, + pixels_per_unit=pixels_per_unit, + price_unit=price_unit, + ) + try: + exit_code = asyncio.run(_run_livepeer_scope_server(host, port)) + except KeyboardInterrupt: + raise SystemExit(130) from None + if exit_code: + raise SystemExit(exit_code) + + +settings = _settings_from_env() + + +if __name__ == "__main__": + main() diff --git a/src/scope/cloud/trickle_events_sink.py b/src/scope/cloud/trickle_events_sink.py new file mode 100644 index 000000000..409d15c4d --- /dev/null +++ b/src/scope/cloud/trickle_events_sink.py @@ -0,0 +1,104 @@ +"""Telemetry sink that forwards events over the trickle events channel. + +On the cloud runner there is normally no reachable Kafka broker, so telemetry +emitted via ``scope.server.kafka_publisher.publish_event`` must instead be +forwarded to the local SDK client. This sink enqueues each (already-built) event +envelope and a drain task writes it onto the per-session events channel as a +``{"type": "telemetry", "event": {...}}`` message. The client receives it and +re-publishes verbatim to its own egress sink. + +This mirrors the notification-forwarding pattern in ``livepeer_app.py`` +(``_enqueue_notification`` / ``_forward_notifications_to_events``): a bounded +queue with drop-oldest semantics so telemetry is best-effort and never blocks +the media or control paths. It structurally satisfies the +``scope.server.kafka_publisher.TelemetrySink`` protocol. +""" + +import asyncio +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +# Bounded so a stalled/slow events channel can never grow memory without limit. +DEFAULT_QUEUE_MAXSIZE = 256 + + +class TrickleEventsSink: + """Forward telemetry events onto the trickle events channel. + + ``emit`` is safe to call from worker threads (FrameProcessor). It marshals + onto the runner event loop and drops the oldest queued event on overflow. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop, + maxsize: int = DEFAULT_QUEUE_MAXSIZE, + ): + self._loop = loop + self._queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue( + maxsize=maxsize + ) + self._drop_warned = False + + def emit(self, event: dict[str, Any]) -> None: + """Thread-safe, non-blocking enqueue (drop-oldest on overflow).""" + + def _put_with_drop() -> None: + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + # Drop the oldest event to make room for the newest. + try: + self._queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + if not self._drop_warned: + logger.warning("Dropped telemetry event under queue pressure") + self._drop_warned = True + + try: + self._loop.call_soon_threadsafe(_put_with_drop) + except RuntimeError: + # Loop is closing; drop silently. + pass + + async def drain_to(self, events_writer: Any, stop_event: asyncio.Event) -> None: + """Drain queued events onto the events channel until stopped. + + Exits on the ``stop_event``, on a ``None`` sentinel (see + ``request_stop``), or on cancellation. + """ + try: + while not stop_event.is_set(): + try: + event = await asyncio.wait_for(self._queue.get(), timeout=1.0) + except TimeoutError: + continue + if event is None: + break + try: + await events_writer.write({"type": "telemetry", "event": event}) + except Exception: + logger.debug( + "Failed to forward telemetry to events channel", + exc_info=True, + ) + except asyncio.CancelledError: + pass + + def request_stop(self) -> bool: + """Enqueue the sentinel to wake the drain loop so it exits. + + Must be called from the runner event loop thread. Returns ``False`` if + the queue is saturated and the caller should cancel the drain task. + """ + try: + self._queue.put_nowait(None) + return True + except asyncio.QueueFull: + return False diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 7a6ec2f7c..60ddbbe10 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -73,6 +73,7 @@ ) from .kafka_publisher import ( KafkaPublisher, + install_default_egress_sink, is_kafka_enabled, set_kafka_publisher, ) @@ -87,6 +88,11 @@ ) from .lora_downloader import LoRADownloadRequest, LoRADownloadResult from .mcp_router import router as mcp_router +from .metrics_reporter import ( + MetricsReporter, + is_metrics_reporter_enabled, + set_metrics_reporter, +) from .models_config import ( ensure_models_dir, get_assets_dir, @@ -344,6 +350,8 @@ def configure_static_files(): livepeer = None # Global Kafka publisher instance (optional, initialized if credentials are present) kafka_publisher = None +# Global metrics reporter instance (optional, forwards telemetry to Daydream /v1/metrics) +metrics_reporter_instance = None # Global tempo sync manager instance tempo_sync = None # Global OSC server instance @@ -392,6 +400,7 @@ async def lifespan(app: FastAPI): webrtc_manager, \ pipeline_manager, \ kafka_publisher, \ + metrics_reporter_instance, \ livepeer, \ tempo_sync, \ osc_server, \ @@ -454,6 +463,30 @@ async def lifespan(app: FastAPI): kafka_publisher = None logger.warning("Kafka publisher failed to start") + # Initialize metrics reporter (forwards telemetry to Daydream /v1/metrics). + # Only active when the user has opted in to cloud features by setting + # SCOPE_CLOUD_API_KEY (their Daydream platform API key). In local/standalone + # mode this block is skipped entirely — no telemetry leaves the machine. + if is_metrics_reporter_enabled(): + api_key = os.getenv("SCOPE_CLOUD_API_KEY", "") + if api_key: + metrics_reporter_instance = MetricsReporter(api_key=api_key) + if await metrics_reporter_instance.start(): + set_metrics_reporter(metrics_reporter_instance) + logger.info("Metrics reporter initialized") + else: + metrics_reporter_instance = None + logger.warning("Metrics reporter failed to start") + else: + logger.debug("Metrics reporter skipped: no SCOPE_CLOUD_API_KEY") + + # Install the default telemetry egress sink for local mode so that + # publish_event() calls route through MetricsReporter (or Kafka) even + # without a cloud connection. In cloud mode livepeer.py re-calls this + # after connecting, which is fine (idempotent selection). + if install_default_egress_sink(): + logger.info("Telemetry egress sink installed at startup") + # Start OSC UDP server on the same port as the HTTP API from .osc_server import OSCServer @@ -527,6 +560,12 @@ async def lifespan(app: FastAPI): await livepeer.disconnect() logger.info("Livepeer connection shutdown complete") + if metrics_reporter_instance: + logger.info("Shutting down metrics reporter...") + await metrics_reporter_instance.stop() + set_metrics_reporter(None) + logger.info("Metrics reporter shutdown complete") + if kafka_publisher: logger.info("Shutting down Kafka publisher...") await kafka_publisher.stop() diff --git a/src/scope/server/download_loras.py b/src/scope/server/download_loras.py new file mode 100644 index 000000000..9f67bb352 --- /dev/null +++ b/src/scope/server/download_loras.py @@ -0,0 +1,94 @@ +""" +Cross-platform LoRA downloader CLI. +""" + +import argparse +import asyncio +import logging +import sys + +from .lora_downloader import LoRADownloadRequest, download_lora +from .models_config import get_lora_dir + +logger = logging.getLogger(__name__) + + +async def download_loras( + url: str, expected_sha256: str | None = None, filename: str | None = None +) -> None: + """Download a LoRA from a direct URL into the configured LoRA directory.""" + lora_dir = get_lora_dir() + logger.info(f"Downloading LoRA to: {lora_dir}") + + request = LoRADownloadRequest( + source="url", + url=url, + filename=filename, + expected_sha256=expected_sha256, + ) + result = await download_lora(request, lora_dir) + + print("Downloaded LoRA:") + print(f" Filename: {result.filename}") + print(f" Path: {result.path}") + print(f" SHA256: {result.sha256}") + print(f" Size: {result.size_bytes} bytes") + + +def main() -> None: + """Main entry point for the download_loras script.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + parser = argparse.ArgumentParser( + description="Download LoRAs for Daydream Scope from direct URLs", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python download_loras.py --url https://example.com/style.safetensors + python download_loras.py -u https://example.com/style.safetensors --expected-sha256 abc123 + python download_loras.py -u https://example.com/download/123 --filename style.safetensors + """, + ) + parser.add_argument( + "--url", + "-u", + type=str, + required=True, + help="Direct URL for the LoRA file to download.", + ) + parser.add_argument( + "--expected-sha256", + type=str, + default=None, + help="Expected SHA-256 hash for integrity verification.", + ) + parser.add_argument( + "--filename", + type=str, + default=None, + help="Filename to save the LoRA as. Useful for URLs that do not include a stable filename.", + ) + + args = parser.parse_args() + + try: + asyncio.run( + download_loras( + args.url, + expected_sha256=args.expected_sha256, + filename=args.filename, + ) + ) + except KeyboardInterrupt: + print("\nCancelled.") + sys.exit(130) + except Exception as e: + print(f"\nERROR: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 3fc57d040..0cd5fdcb5 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -847,6 +847,16 @@ def _log_frame_stats(self): round(pipeline_fps, 1) if pipeline_fps else None ) + # Fold GPU memory into the heartbeat (most useful on the cloud runner + # where the pipeline executes) rather than adding a separate event. + if torch.cuda.is_available(): + heartbeat_metadata["vram_allocated_mb"] = round( + torch.cuda.memory_allocated() / (1024 * 1024), 1 + ) + heartbeat_metadata["vram_reserved_mb"] = round( + torch.cuda.memory_reserved() / (1024 * 1024), 1 + ) + publish_event( event_type="stream_heartbeat", session_id=self.session_id, diff --git a/src/scope/server/kafka_publisher.py b/src/scope/server/kafka_publisher.py index c874da525..a1273299e 100644 --- a/src/scope/server/kafka_publisher.py +++ b/src/scope/server/kafka_publisher.py @@ -15,7 +15,9 @@ import logging import os import threading -from typing import Any +import time +import uuid +from typing import Any, Protocol, runtime_checkable logger = logging.getLogger(__name__) @@ -31,6 +33,76 @@ def is_kafka_enabled() -> bool: return KAFKA_BOOTSTRAP_SERVERS is not None +@runtime_checkable +class TelemetrySink(Protocol): + """A destination for telemetry events. + + Implementations must be safe to call from worker threads, must never raise + to the caller, and must never block (telemetry is best-effort and must not + stall the media or control paths). + + Two implementations exist: + - ``KafkaSink`` (below): publishes events to Kafka. Used by the local SDK + client as the egress point. + - ``TrickleEventsSink`` (``scope.cloud.trickle_events_sink``): forwards + events over the trickle events channel. Installed on the cloud runner so + runner-side ``publish_event`` calls reach the client instead of a Kafka + broker the runner cannot see. + """ + + def emit(self, event: dict[str, Any]) -> None: ... + + +def build_event_envelope( + event_type: str, + session_id: str | None = None, + connection_id: str | None = None, + pipeline_ids: list[str] | None = None, + user_id: str | None = None, + error: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + connection_info: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build the structured event envelope. + + Stamping happens here (once) so the same fully-formed envelope can be sent + to Kafka directly or forwarded verbatim over the trickle events channel and + re-published downstream without re-stamping its id/timestamp/origin fields. + + The shape matches the Go ``kafka.go`` ``stream_trace`` format. + """ + event_id = str(uuid.uuid4()) + # Timestamp as milliseconds string (matching Go format) + timestamp_ms = str(int(time.time() * 1000)) + + data: dict[str, Any] = { + "type": event_type, + "client_source": "scope", + "timestamp": timestamp_ms, + } + if session_id: + data["session_id"] = session_id + if connection_id: + data["connection_id"] = connection_id + if pipeline_ids: + data["pipeline_ids"] = pipeline_ids + if user_id: + data["user_id"] = user_id + if error: + data["error"] = error + if connection_info: + data["connection_info"] = connection_info + if metadata: + data.update(metadata) + + return { + "id": event_id, + "type": "stream_trace", + "timestamp": timestamp_ms, + "data": data, + } + + class KafkaPublisher: """Async Kafka event publisher with thread-safe wrapper for sync contexts. @@ -147,50 +219,37 @@ async def publish_async( if not self._started or not self._producer: return False - import time - import uuid - - # Generate a unique ID for this event (used as Kafka key) - event_id = str(uuid.uuid4()) - # Timestamp as milliseconds string (matching Go format) - timestamp_ms = str(int(time.time() * 1000)) - - # Build data payload - data: dict[str, Any] = { - "type": event_type, - "client_source": "scope", - "timestamp": timestamp_ms, - } - if session_id: - data["session_id"] = session_id - if connection_id: - data["connection_id"] = connection_id - if pipeline_ids: - data["pipeline_ids"] = pipeline_ids - if user_id: - data["user_id"] = user_id - if error: - data["error"] = error - if connection_info: - data["connection_info"] = connection_info - if metadata: - data.update(metadata) - - # Event structure matching Go kafka.go format - event = { - "id": event_id, - "type": "stream_trace", - "timestamp": timestamp_ms, - "data": data, - } + event = build_event_envelope( + event_type=event_type, + session_id=session_id, + connection_id=connection_id, + pipeline_ids=pipeline_ids, + user_id=user_id, + error=error, + metadata=metadata, + connection_info=connection_info, + ) + return await self._send_prebuilt(event) + + async def _send_prebuilt(self, event: dict[str, Any]) -> bool: + """Send an already-built event envelope to Kafka verbatim. + + Does not re-stamp the id/timestamp so a runner-built envelope relayed + over trickle keeps its origin identity when published by the client. + """ + if not self._started or not self._producer: + return False + + event_id = event.get("id") + if event_id is not None and not isinstance(event_id, str): + event_id = str(event_id) + data = event.get("data") + event_type = data.get("type") if isinstance(data, dict) else None try: # Use event ID as key (matching Go format) - key = event_id - await self._producer.send_and_wait(KAFKA_TOPIC, value=event, key=key) - logger.info( - f"Published Kafka event: {event_type} (id={event_id}, session={session_id})" - ) + await self._producer.send_and_wait(KAFKA_TOPIC, value=event, key=event_id) + logger.info(f"Published Kafka event: {event_type} (id={event_id})") return True except Exception as e: logger.error(f"Failed to publish Kafka event {event_type}: {e}") @@ -248,6 +307,28 @@ def publish( except Exception as e: logger.error(f"Failed to schedule Kafka event publish: {e}") + def publish_prebuilt(self, event: dict[str, Any]) -> None: + """Publish an already-built event envelope from a sync context. + + Thread-safe; schedules the verbatim send on the producer's event loop. + Used by the client to re-publish telemetry relayed over trickle without + re-stamping the runner's original id/timestamp. + """ + if not self._started or not self._event_loop: + return + + with self._lock: + if not self._event_loop or not self._event_loop.is_running(): + return + + try: + asyncio.run_coroutine_threadsafe( + self._send_prebuilt(event), + self._event_loop, + ) + except Exception as e: + logger.error(f"Failed to schedule Kafka event publish: {e}") + @property def is_running(self) -> bool: """Check if the publisher is running.""" @@ -277,6 +358,86 @@ def set_kafka_publisher(publisher: KafkaPublisher | None): _publisher = publisher +class KafkaSink: + """Telemetry sink that publishes events to Kafka. + + Used as the egress sink on the local SDK client (and any deployment with a + reachable Kafka broker). Wraps a started ``KafkaPublisher``. + """ + + def __init__(self, publisher: KafkaPublisher): + self._publisher = publisher + + def emit(self, event: dict[str, Any]) -> None: + self._publisher.publish_prebuilt(event) + + +class LogSink: + """Telemetry sink that logs events instead of shipping them anywhere. + + For local verification of the relay path without a Kafka broker. Enabled on + the client by setting ``SCOPE_TELEMETRY_LOG_SINK``. + """ + + def emit(self, event: dict[str, Any]) -> None: + data = event.get("data", {}) if isinstance(event, dict) else {} + logger.info( + "TELEMETRY type=%s id=%s session=%s connection=%s relayed_via=%s", + data.get("type"), + event.get("id") if isinstance(event, dict) else None, + data.get("session_id"), + data.get("connection_id"), + data.get("relayed_via"), + ) + + +# Active telemetry sink. When set, publish_event / publish_raw_event route here +# instead of the direct Kafka path. The cloud runner installs a TrickleEventsSink +# so its events flow over the trickle events channel; the client installs a +# KafkaSink (or another egress sink) as the publishing endpoint. When unset, +# behavior falls back to the legacy direct-Kafka global publisher. +_telemetry_sink: "TelemetrySink | None" = None + + +def get_telemetry_sink() -> "TelemetrySink | None": + """Get the active telemetry sink, if any.""" + return _telemetry_sink + + +def set_telemetry_sink(sink: "TelemetrySink | None") -> None: + """Set (or clear) the active telemetry sink.""" + global _telemetry_sink + _telemetry_sink = sink + + +def install_default_egress_sink() -> bool: + """Install the default telemetry egress sink for the local SDK client. + + This is the single seam where the egress backend is chosen. Priority order: + 1. LogSink (explicit debug override via SCOPE_TELEMETRY_LOG_SINK) + 2. MetricsReporter (HTTP -> Daydream /v1/metrics, when SCOPE_CLOUD_API_KEY set) + 3. KafkaSink (direct Kafka, when KAFKA_BOOTSTRAP_SERVERS set) + + Returns True if a sink was installed. + """ + if os.getenv("SCOPE_TELEMETRY_LOG_SINK"): + set_telemetry_sink(LogSink()) + logger.info("Telemetry LogSink installed (SCOPE_TELEMETRY_LOG_SINK)") + return True + from .metrics_reporter import get_metrics_reporter + + reporter = get_metrics_reporter() + if reporter and reporter.is_running: + set_telemetry_sink(reporter) + logger.info("Telemetry MetricsReporter sink installed (HTTP egress)") + return True + publisher = get_kafka_publisher() + if publisher and publisher.is_running: + set_telemetry_sink(KafkaSink(publisher)) + return True + return False + + def publish_event( event_type: str, session_id: str | None = None, @@ -302,6 +463,28 @@ def publish_event( metadata: Optional additional metadata connection_info: Optional connection metadata (e.g., gpu_type, region) """ + sink = get_telemetry_sink() + if sink is not None: + # Stamp the envelope once, then hand it to the active sink (trickle on + # the runner, Kafka/other on the client). + try: + sink.emit( + build_event_envelope( + event_type=event_type, + session_id=session_id, + connection_id=connection_id, + pipeline_ids=pipeline_ids, + user_id=user_id, + error=error, + metadata=metadata, + connection_info=connection_info, + ) + ) + except Exception: + logger.debug("Telemetry sink emit failed", exc_info=True) + return + + # Legacy path: publish directly to the global Kafka producer. publisher = get_kafka_publisher() if publisher and publisher.is_running: publisher.publish( @@ -314,3 +497,23 @@ def publish_event( metadata=metadata, connection_info=connection_info, ) + + +def publish_raw_event(event: dict[str, Any]) -> None: + """Publish an already-built event envelope through the active sink. + + Used by the client to re-publish telemetry relayed over trickle verbatim, + preserving the runner's original id/timestamp/session_id/connection_id. + Safe to call even if nothing is configured (no-op). + """ + sink = get_telemetry_sink() + if sink is not None: + try: + sink.emit(event) + except Exception: + logger.debug("Telemetry sink emit failed", exc_info=True) + return + + publisher = get_kafka_publisher() + if publisher and publisher.is_running: + publisher.publish_prebuilt(event) diff --git a/src/scope/server/livepeer.py b/src/scope/server/livepeer.py index 625734147..213d61772 100644 --- a/src/scope/server/livepeer.py +++ b/src/scope/server/livepeer.py @@ -119,6 +119,12 @@ async def connect( await client.connect(initial_parameters=connect_params) self._client = client self._stats["connected_at"] = time.time() + # The client is the telemetry egress point: install the sink that + # publishes telemetry relayed from the cloud runner over trickle. + from .kafka_publisher import install_default_egress_sink + + if install_default_egress_sink(): + logger.info("Telemetry egress sink installed") logger.info("Livepeer connected") except Exception as e: self._connect_error = str(e) diff --git a/src/scope/server/livepeer_client.py b/src/scope/server/livepeer_client.py index 20f076144..3441d9757 100644 --- a/src/scope/server/livepeer_client.py +++ b/src/scope/server/livepeer_client.py @@ -63,7 +63,7 @@ # embedded assets (base64-encoded images/video) into a single response and # easily blow past the 1 MiB library default, which would tear down the # events loop mid-import. -MAX_EVENT_BYTES = 128 * 1024 * 1024 +MAX_EVENT_BYTES = 5 * 1024 * 1024 @dataclass(slots=True) @@ -236,10 +236,7 @@ async def connect(self, initial_parameters: dict | None = None) -> None: params=params or None, ) - # start_scope is synchronous and may block on network I/O, so run it in - # a worker thread to avoid blocking the event loop. - self._job = await asyncio.to_thread( - start_scope, + self._job = await start_scope( # If unset, orchestrator is discovered via token signer/discovery fields. self._orchestrator_url, request, @@ -247,17 +244,14 @@ async def connect(self, initial_parameters: dict | None = None) -> None: signer_url=signer_url, signer_headers=signer_headers, timeout=300.0, - use_tofu=bool(os.environ.get("LIVEPEER_DEV_MODE")), ) self._connection_id = getattr(self._job, "manifest_id", None) set_connection_id(self._connection_id) - # start_scope runs in a worker thread without an event loop, so - # deferred async initialisers need to be kicked off now. if self._job.control_url: self._control_writer = JSONLWriter(self._job.control_url) - if self._job.signer_url: + if self._job.payment_session is not None: self._payment_task = asyncio.create_task( self._payment_loop(self._job, self._job.payment_session) ) @@ -860,6 +854,10 @@ async def _events_loop(self) -> None: _forward_runner_notification(event.get("payload")) continue + if msg_type == "telemetry": + _handle_cloud_telemetry(event.get("event")) + continue + logger.debug(f"Event: {event}") except asyncio.CancelledError: pass @@ -914,7 +912,7 @@ async def _payment_loop(self, job: Any, payment_session: Any) -> None: if not self._connected or self._job is not job: break try: - await asyncio.to_thread(payment_session.send_payment) + await payment_session.send_payment() except SkipPaymentCycle as e: logger.debug("Livepeer payment loop skipped cycle: %s", e) except Exception as e: @@ -1279,3 +1277,28 @@ def _forward_runner_notification(payload: Any) -> None: manager.broadcast_notification(payload) except Exception: logger.debug("Failed to re-broadcast runner notification", exc_info=True) + + +def _handle_cloud_telemetry(event: Any) -> None: + """Re-publish a runner-side telemetry event through the local egress sink. + + The runner forwards each already-built event envelope over the trickle + events channel; the client is the egress point that publishes it (to Kafka + or another configured backend). The envelope is published verbatim so the + runner's original id/timestamp/session_id/connection_id are preserved; we + only annotate that it was relayed and when it arrived. + """ + if not isinstance(event, dict): + return + + data = event.get("data") + if isinstance(data, dict): + data["relayed_via"] = "trickle" + data["relay_received_timestamp"] = str(int(time.time() * 1000)) + + try: + from .kafka_publisher import publish_raw_event + + publish_raw_event(event) + except Exception: + logger.debug("Failed to relay cloud telemetry event", exc_info=True) diff --git a/src/scope/server/lora_downloader.py b/src/scope/server/lora_downloader.py index 3a40c7870..f15bd1b90 100644 --- a/src/scope/server/lora_downloader.py +++ b/src/scope/server/lora_downloader.py @@ -24,6 +24,7 @@ class LoRADownloadRequest(BaseModel): model_id: str | None = None version_id: str | None = None url: str | None = None + filename: str | None = None subfolder: str | None = None expected_sha256: str | None = None civitai_token: str | None = None @@ -168,6 +169,9 @@ async def download_lora( else: raise ValueError(f"Unknown source: {request.source}") + if request.filename: + filename = request.filename + # Resolve filename from the actual download response if we don't have one yet if not filename: filename = await loop.run_in_executor(None, _filename_from_response, url) diff --git a/src/scope/server/metrics_reporter.py b/src/scope/server/metrics_reporter.py new file mode 100644 index 000000000..bcb349173 --- /dev/null +++ b/src/scope/server/metrics_reporter.py @@ -0,0 +1,367 @@ +"""Reliable metrics reporter that forwards telemetry to Daydream /v1/metrics. + +Implements the fail-loud / no-loss client contract defined in the Daydream +metrics API: events are buffered locally and flushed in batches via HTTP POST. +Reliability is achieved entirely through client-side buffering with exponential +backoff, disk-backed resume, and response-aware retry logic. + +The class structurally satisfies the ``TelemetrySink`` protocol defined in +``kafka_publisher.py`` so it can be installed as the egress sink alongside +``KafkaSink`` and ``LogSink``. + +Environment Variables: + DAYDREAM_METRICS_URL: Endpoint URL (default: https://api.daydream.monster/v1/metrics) + SCOPE_CLOUD_API_KEY: Bearer token for authentication (required to enable) + SCOPE_METRICS_ENABLED: Explicit enable/disable ("true"/"false"; default: on when key present) + SCOPE_METRICS_FLUSH_INTERVAL_MS: Flush interval in milliseconds (default: 2000) + SCOPE_METRICS_MAX_BATCH: Max events per batch (default: 500, API limit) + SCOPE_METRICS_MAX_BUFFER: Max buffered events before dropping oldest (default: 50000) + SCOPE_METRICS_BUFFER_PATH: Path for disk-backed resume buffer (default: ~/.daydream-scope/metrics_buffer.jsonl) +""" + +import asyncio +import json +import logging +import os +import platform +import threading +from collections import deque +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +DAYDREAM_METRICS_URL = os.getenv( + "DAYDREAM_METRICS_URL", "https://api.daydream.monster/v1/metrics" +) +MAX_BATCH_SIZE = int(os.getenv("SCOPE_METRICS_MAX_BATCH", "500")) +MAX_BUFFER_SIZE = int(os.getenv("SCOPE_METRICS_MAX_BUFFER", "50000")) +FLUSH_INTERVAL_S = int(os.getenv("SCOPE_METRICS_FLUSH_INTERVAL_MS", "2000")) / 1000.0 +MAX_BODY_BYTES = 1_000_000 # 1 MiB API limit +INITIAL_BACKOFF_S = 1.0 +MAX_BACKOFF_S = 60.0 +RATE_LIMIT_WINDOW_S = 60.0 + +_HOSTNAME = platform.node() + + +def _default_buffer_path() -> Path | None: + """Return the default disk buffer path, or None if explicitly disabled.""" + env = os.getenv("SCOPE_METRICS_BUFFER_PATH") + if env == "": + return None + if env: + return Path(env) + base = os.getenv("DAYDREAM_SCOPE_MODELS_DIR") + if base: + return Path(base).parent / "metrics_buffer.jsonl" + return Path.home() / ".daydream-scope" / "metrics_buffer.jsonl" + + +def is_metrics_reporter_enabled() -> bool: + """Check if the metrics reporter should be started.""" + explicit = os.getenv("SCOPE_METRICS_ENABLED", "").lower() + if explicit == "false": + return False + if explicit == "true": + return True + return os.getenv("SCOPE_CLOUD_API_KEY") is not None + + +def map_envelope_to_event(envelope: dict[str, Any]) -> dict[str, Any]: + """Map a Scope stream_trace Kafka envelope to a /v1/metrics Event object. + + Scope envelope: {id, type:"stream_trace", timestamp, data:{type, client_source, ...}} + API Event: {type, data, id?, timestamp?} + """ + data = envelope.get("data", {}) + event: dict[str, Any] = { + "type": data.get("type", envelope.get("type", "unknown")), + "data": data, + } + if envelope.get("id"): + event["id"] = envelope["id"] + if envelope.get("timestamp"): + event["timestamp"] = str(envelope["timestamp"]) + return event + + +class MetricsReporter: + """Reliable buffered metrics forwarder to Daydream /v1/metrics. + + Thread-safe: ``emit()`` / ``enqueue()`` can be called from any thread. + The async flush loop runs on the event loop captured at ``start()``. + + Satisfies the ``TelemetrySink`` protocol so it can be plugged directly + into ``set_telemetry_sink()``. + """ + + def __init__(self, api_key: str, url: str = DAYDREAM_METRICS_URL): + self._api_key = api_key + self._url = url + self._buffer: deque[dict[str, Any]] = deque(maxlen=MAX_BUFFER_SIZE) + self._lock = threading.Lock() + self._event_loop: asyncio.AbstractEventLoop | None = None + self._flush_task: asyncio.Task | None = None + self._started = False + self._paused = False # Set on 401, cleared on key refresh + self._backoff_s = INITIAL_BACKOFF_S + self._client = None + self._buffer_path = _default_buffer_path() + + # -- TelemetrySink protocol ------------------------------------------------- + + def emit(self, event: dict[str, Any]) -> None: + """TelemetrySink protocol: thread-safe, non-blocking enqueue.""" + self.enqueue(event) + + # -- public API ------------------------------------------------------------- + + async def start(self) -> bool: + """Start the reporter and its background flush loop.""" + try: + import httpx + + self._client = httpx.AsyncClient(timeout=30.0) + except ImportError: + logger.warning("httpx not installed, metrics reporting disabled") + return False + + self._event_loop = asyncio.get_running_loop() + self._started = True + + self._load_disk_buffer() + + self._flush_task = asyncio.create_task(self._flush_loop()) + logger.info( + "Metrics reporter started (url=%s, flush=%.1fs, max_batch=%d)", + self._url, + FLUSH_INTERVAL_S, + MAX_BATCH_SIZE, + ) + return True + + async def stop(self): + """Flush remaining events and shut down.""" + self._started = False + if self._flush_task: + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + self._flush_task = None + + # Final flush attempt + await self._flush_once() + + self._persist_disk_buffer() + + if self._client: + await self._client.aclose() + self._client = None + + logger.info( + "Metrics reporter stopped (remaining_buffered=%d)", len(self._buffer) + ) + + def enqueue(self, envelope: dict[str, Any]) -> None: + """Thread-safe: add a stream_trace envelope to the buffer.""" + if not self._started or self._paused: + return + event = map_envelope_to_event(envelope) + with self._lock: + self._buffer.append(event) + + @property + def is_running(self) -> bool: + return self._started + + @property + def buffered_count(self) -> int: + return len(self._buffer) + + # -- internal --------------------------------------------------------------- + + def _take_batch(self, max_size: int = MAX_BATCH_SIZE) -> list[dict[str, Any]]: + with self._lock: + count = min(max_size, len(self._buffer)) + batch = [self._buffer.popleft() for _ in range(count)] + return batch + + def _requeue_batch(self, batch: list[dict[str, Any]]) -> None: + with self._lock: + self._buffer.extendleft(reversed(batch)) + + async def _flush_loop(self) -> None: + try: + while self._started: + await asyncio.sleep(FLUSH_INTERVAL_S) + if self._paused or not self._buffer: + continue + await self._flush_once() + except asyncio.CancelledError: + pass + + async def _flush_once(self) -> None: + if not self._buffer or not self._client: + return + + batch = self._take_batch() + if not batch: + return + + body = self._build_body(batch) + body_bytes = json.dumps(body).encode("utf-8") + + # If body exceeds 1 MiB, split the batch + if len(body_bytes) > MAX_BODY_BYTES and len(batch) > 1: + half = len(batch) // 2 + self._requeue_batch(batch[half:]) + batch = batch[:half] + body = self._build_body(batch) + + try: + response = await self._client.post( + self._url, + json=body, + headers={ + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + }, + ) + await self._handle_response(response, batch) + except Exception as e: + logger.warning("Metrics POST failed (network): %s", e) + self._requeue_batch(batch) + await self._wait_backoff() + + async def _handle_response( + self, response: Any, batch: list[dict[str, Any]] + ) -> None: + status = response.status_code + + if status == 200: + self._backoff_s = INITIAL_BACKOFF_S + logger.debug("Metrics batch accepted (%d events)", len(batch)) + return + + if status == 400: + logger.error( + "Metrics batch rejected (400 malformed), dropping %d events: %s", + len(batch), + response.text[:200], + ) + return + + if status == 401: + logger.error( + "Metrics reporter unauthorized (401), pausing until key refresh" + ) + self._paused = True + self._requeue_batch(batch) + return + + if status == 413: + if len(batch) <= 1: + event_id = batch[0].get("id") if batch else None + logger.error( + "Metrics event too large to send (413) even as a single event; dropping id=%s", + event_id, + ) + return + logger.warning("Metrics batch too large (413), halving batch size") + self._requeue_batch(batch) + half_batch = self._take_batch(max(1, len(batch) // 2)) + if half_batch: + body = self._build_body(half_batch) + try: + retry_resp = await self._client.post( + self._url, + json=body, + headers={ + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + }, + ) + await self._handle_response(retry_resp, half_batch) + except Exception: + self._requeue_batch(half_batch) + return + + if status == 429: + logger.warning( + "Metrics rate limited (429), waiting %.0fs", RATE_LIMIT_WINDOW_S + ) + self._requeue_batch(batch) + await asyncio.sleep(RATE_LIMIT_WINDOW_S) + return + + # 5xx or unexpected status + logger.warning("Metrics POST failed (HTTP %d), retrying with backoff", status) + self._requeue_batch(batch) + await self._wait_backoff() + + async def _wait_backoff(self) -> None: + await asyncio.sleep(self._backoff_s) + self._backoff_s = min(self._backoff_s * 2, MAX_BACKOFF_S) + + def _build_body(self, batch: list[dict[str, Any]]) -> dict[str, Any]: + return { + "app": "scope", + "host": _HOSTNAME, + "events": batch, + } + + # -- disk persistence ------------------------------------------------------- + + def _load_disk_buffer(self) -> None: + if not self._buffer_path or not self._buffer_path.exists(): + return + loaded = 0 + try: + with open(self._buffer_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + self._buffer.append(event) + loaded += 1 + except json.JSONDecodeError: + continue + self._buffer_path.write_text("") + if loaded: + logger.info("Loaded %d events from disk buffer", loaded) + except Exception as e: + logger.warning("Failed to load disk buffer: %s", e) + + def _persist_disk_buffer(self) -> None: + if not self._buffer_path or not self._buffer: + return + try: + self._buffer_path.parent.mkdir(parents=True, exist_ok=True) + with open(self._buffer_path, "a") as f: + with self._lock: + for event in self._buffer: + f.write(json.dumps(event) + "\n") + logger.info("Persisted %d events to disk buffer", len(self._buffer)) + except Exception as e: + logger.warning("Failed to persist disk buffer: %s", e) + + +# -- Global accessor pattern (mirrors kafka_publisher.py) ----------------------- + +_reporter: MetricsReporter | None = None + + +def get_metrics_reporter() -> MetricsReporter | None: + """Get the global MetricsReporter instance.""" + return _reporter + + +def set_metrics_reporter(reporter: MetricsReporter | None) -> None: + """Set the global MetricsReporter instance.""" + global _reporter + _reporter = reporter diff --git a/tests/test_lora_downloader.py b/tests/test_lora_downloader.py index ca236a62a..b08ce0497 100644 --- a/tests/test_lora_downloader.py +++ b/tests/test_lora_downloader.py @@ -1,4 +1,5 @@ import asyncio +import sys from pathlib import Path from unittest.mock import patch @@ -14,6 +15,143 @@ ) +def test_download_loras_cli_url(capsys): + from scope.server import download_loras as cli + from scope.server.lora_downloader import LoRADownloadResult + + async def fake_download_lora(request, lora_dir, civitai_token=None): + assert request.source == "url" + assert request.url == "https://example.com/lora.safetensors" + assert request.expected_sha256 is None + assert lora_dir == Path("/tmp/loras") + assert civitai_token is None + return LoRADownloadResult( + filename="lora.safetensors", + path="/tmp/loras/lora.safetensors", + sha256="abc123", + size_bytes=42, + ) + + with ( + patch.object( + sys, + "argv", + ["download_loras", "--url", "https://example.com/lora.safetensors"], + ), + patch( + "scope.server.download_loras.get_lora_dir", return_value=Path("/tmp/loras") + ), + patch( + "scope.server.download_loras.download_lora", side_effect=fake_download_lora + ), + ): + cli.main() + + captured = capsys.readouterr() + assert "Downloaded LoRA:" in captured.out + assert "Filename: lora.safetensors" in captured.out + assert "Path: /tmp/loras/lora.safetensors" in captured.out + assert "SHA256: abc123" in captured.out + assert "Size: 42 bytes" in captured.out + + +def test_download_loras_cli_expected_sha256(): + from scope.server import download_loras as cli + from scope.server.lora_downloader import LoRADownloadResult + + async def fake_download_lora(request, lora_dir, civitai_token=None): + assert request.expected_sha256 == "expected" + return LoRADownloadResult( + filename="lora.safetensors", + path="/tmp/loras/lora.safetensors", + sha256="expected", + size_bytes=42, + ) + + with ( + patch.object( + sys, + "argv", + [ + "download_loras", + "--url", + "https://example.com/lora.safetensors", + "--expected-sha256", + "expected", + ], + ), + patch( + "scope.server.download_loras.get_lora_dir", return_value=Path("/tmp/loras") + ), + patch( + "scope.server.download_loras.download_lora", side_effect=fake_download_lora + ), + ): + cli.main() + + +def test_download_loras_cli_filename(): + from scope.server import download_loras as cli + from scope.server.lora_downloader import LoRADownloadResult + + async def fake_download_lora(request, lora_dir, civitai_token=None): + assert request.filename == "expected-name.safetensors" + return LoRADownloadResult( + filename="expected-name.safetensors", + path="/tmp/loras/expected-name.safetensors", + sha256="abc123", + size_bytes=42, + ) + + with ( + patch.object( + sys, + "argv", + [ + "download_loras", + "--url", + "https://example.com/download/123", + "--filename", + "expected-name.safetensors", + ], + ), + patch( + "scope.server.download_loras.get_lora_dir", return_value=Path("/tmp/loras") + ), + patch( + "scope.server.download_loras.download_lora", side_effect=fake_download_lora + ), + ): + cli.main() + + +def test_download_loras_cli_error(capsys): + from scope.server import download_loras as cli + + async def fake_download_lora(request, lora_dir, civitai_token=None): + raise ValueError("download failed") + + with ( + patch.object( + sys, + "argv", + ["download_loras", "--url", "https://example.com/lora.safetensors"], + ), + patch( + "scope.server.download_loras.get_lora_dir", return_value=Path("/tmp/loras") + ), + patch( + "scope.server.download_loras.download_lora", side_effect=fake_download_lora + ), + pytest.raises(SystemExit) as exc_info, + ): + cli.main() + + assert exc_info.value.code == 1 + captured = capsys.readouterr() + assert "ERROR: download failed" in captured.err + + def test_filename_from_url(): assert ( _filename_from_url("https://example.com/path/model.safetensors") @@ -155,6 +293,24 @@ def fake_http_get(url, dest_path, **kwargs): assert Path(result.filename) == Path("anime/model.safetensors") +def test_download_lora_filename_override(tmp_path: Path): + request = LoRADownloadRequest( + source="url", + url="https://example.com/download/123", + filename="expected-name.safetensors", + ) + + def fake_http_get(url, dest_path, **kwargs): + dest_path.parent.mkdir(parents=True, exist_ok=True) + dest_path.write_bytes(b"url data") + + with patch("scope.server.lora_downloader.http_get", side_effect=fake_http_get): + result = asyncio.run(download_lora(request, tmp_path)) + + assert result.filename == "expected-name.safetensors" + assert (tmp_path / "expected-name.safetensors").exists() + + def test_resolve_civitai_metadata_451_region_blocked(): """HTTP 451 with JSON error body surfaces the API error message.""" mock_response = type("Response", (), {})() diff --git a/tests/test_metrics_reporter.py b/tests/test_metrics_reporter.py new file mode 100644 index 000000000..0a656bf62 --- /dev/null +++ b/tests/test_metrics_reporter.py @@ -0,0 +1,488 @@ +"""Tests for the MetricsReporter: reliability contract, batching, disk resume, mapping, +and TelemetrySink protocol conformance.""" + +import time +from collections import deque +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from scope.server.metrics_reporter import ( + INITIAL_BACKOFF_S, + MAX_BATCH_SIZE, + MetricsReporter, + get_metrics_reporter, + map_envelope_to_event, + set_metrics_reporter, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_envelope( + event_type: str = "stream_heartbeat", + event_id: str | None = None, + **extra_data, +) -> dict: + ts = str(int(time.time() * 1000)) + return { + "id": event_id or f"test-{time.monotonic_ns()}", + "type": "stream_trace", + "timestamp": ts, + "data": { + "type": event_type, + "client_source": "scope", + "timestamp": ts, + **extra_data, + }, + } + + +def _make_response(status_code: int, text: str = "") -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.text = text + return resp + + +# --------------------------------------------------------------------------- +# Event mapping +# --------------------------------------------------------------------------- + + +class TestMapEnvelopeToEvent: + def test_basic_mapping(self): + envelope = _make_envelope("stream_started", event_id="evt-1", session_id="s1") + event = map_envelope_to_event(envelope) + + assert event["type"] == "stream_started" + assert event["id"] == "evt-1" + assert "timestamp" in event + assert event["data"]["session_id"] == "s1" + assert event["data"]["client_source"] == "scope" + + def test_missing_data_type_falls_back_to_outer(self): + envelope = {"id": "x", "type": "stream_trace", "timestamp": "123", "data": {}} + event = map_envelope_to_event(envelope) + assert event["type"] == "stream_trace" + + def test_missing_data_and_outer_type(self): + envelope = {"id": "x", "data": {}} + event = map_envelope_to_event(envelope) + assert event["type"] == "unknown" + + def test_missing_fields_omitted(self): + envelope = {"data": {"type": "error"}} + event = map_envelope_to_event(envelope) + assert "id" not in event + assert "timestamp" not in event + assert event["type"] == "error" + + +# --------------------------------------------------------------------------- +# Global accessors +# --------------------------------------------------------------------------- + + +class TestGlobalAccessors: + def test_get_set_reporter(self): + original = get_metrics_reporter() + try: + reporter = MetricsReporter(api_key="test") + set_metrics_reporter(reporter) + assert get_metrics_reporter() is reporter + finally: + set_metrics_reporter(original) + + def test_is_metrics_reporter_enabled_with_key(self): + with patch.dict("os.environ", {"SCOPE_CLOUD_API_KEY": "test-key"}, clear=False): + from scope.server import metrics_reporter as mr + + assert mr.is_metrics_reporter_enabled() + + def test_is_metrics_reporter_disabled_explicit(self): + with patch.dict( + "os.environ", + {"SCOPE_METRICS_ENABLED": "false", "SCOPE_CLOUD_API_KEY": "key"}, + clear=False, + ): + from scope.server import metrics_reporter as mr + + assert not mr.is_metrics_reporter_enabled() + + +# --------------------------------------------------------------------------- +# MetricsReporter: batching & buffer +# --------------------------------------------------------------------------- + + +class TestEnqueueAndBatching: + def test_enqueue_adds_mapped_events(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(10): + reporter.enqueue(_make_envelope("stream_heartbeat", event_id=f"e-{i}")) + + assert reporter.buffered_count == 10 + + def test_enqueue_noop_when_stopped(self): + reporter = MetricsReporter(api_key="key") + reporter._started = False + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 0 + + def test_enqueue_noop_when_paused(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._paused = True + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 0 + + def test_buffer_max_size_drops_oldest(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._buffer = deque(maxlen=5) + + for i in range(10): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + assert reporter.buffered_count == 5 + batch = reporter._take_batch(5) + ids = [e["id"] for e in batch] + assert ids[0] == "e-5" + + def test_take_batch_respects_max_size(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(1000): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + batch = reporter._take_batch(MAX_BATCH_SIZE) + assert len(batch) == MAX_BATCH_SIZE + assert reporter.buffered_count == 500 + + def test_requeue_batch_preserves_order(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + for i in range(3): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + batch = reporter._take_batch(3) + reporter._requeue_batch(batch) + + recovered = reporter._take_batch(3) + assert [e["id"] for e in recovered] == [e["id"] for e in batch] + + def test_build_body_shape(self): + reporter = MetricsReporter(api_key="key") + events = [map_envelope_to_event(_make_envelope())] + body = reporter._build_body(events) + + assert body["app"] == "scope" + assert "host" in body + assert body["events"] is events + + +# --------------------------------------------------------------------------- +# Reliability contract (response handling) +# --------------------------------------------------------------------------- + + +class TestResponseHandling: + @pytest.mark.anyio + async def test_200_drops_batch_resets_backoff(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = 16.0 + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(200), batch) + + assert reporter._backoff_s == INITIAL_BACKOFF_S + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_400_drops_batch(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(400, "bad request"), batch) + + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_401_pauses_and_requeues(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + await reporter._handle_response(_make_response(401), batch) + + assert reporter._paused is True + assert reporter.buffered_count == 1 + + @pytest.mark.anyio + async def test_413_single_event_drops(self): + """A single oversized event on 413 is dropped (not retried infinitely).""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope(event_id="oversized"))] + await reporter._handle_response(_make_response(413), batch) + + assert reporter.buffered_count == 0 + + @pytest.mark.anyio + async def test_413_multi_event_halves_and_retries(self): + """A multi-event 413 requeues and retries with half batch.""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=_make_response(200)) + reporter._client = mock_client + + batch = [ + map_envelope_to_event(_make_envelope(event_id=f"e-{i}")) for i in range(4) + ] + await reporter._handle_response(_make_response(413), batch) + + # Half batch was retried and accepted (200), rest requeued + assert reporter.buffered_count == 2 + + @pytest.mark.anyio + async def test_429_requeues_batch(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._handle_response(_make_response(429), batch) + + assert reporter.buffered_count == 1 + + @pytest.mark.anyio + async def test_5xx_requeues_with_backoff(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = INITIAL_BACKOFF_S + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ) as mock_sleep: + await reporter._handle_response(_make_response(502), batch) + + assert reporter.buffered_count == 1 + mock_sleep.assert_awaited_once_with(INITIAL_BACKOFF_S) + assert reporter._backoff_s == INITIAL_BACKOFF_S * 2 + + @pytest.mark.anyio + async def test_backoff_capped_at_max(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._backoff_s = 32.0 + + batch = [map_envelope_to_event(_make_envelope())] + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._handle_response(_make_response(500), batch) + + assert reporter._backoff_s == 60.0 + + +# --------------------------------------------------------------------------- +# Flush cycle +# --------------------------------------------------------------------------- + + +class TestFlushOnce: + @pytest.mark.anyio + async def test_flush_once_sends_batch(self): + reporter = MetricsReporter(api_key="test-key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=_make_response(200)) + reporter._client = mock_client + + for i in range(3): + reporter.enqueue(_make_envelope(event_id=f"e-{i}")) + + await reporter._flush_once() + + assert reporter.buffered_count == 0 + mock_client.post.assert_awaited_once() + call_kwargs = mock_client.post.call_args + assert call_kwargs.kwargs["headers"]["Authorization"] == "Bearer test-key" + + @pytest.mark.anyio + async def test_flush_once_noop_on_empty_buffer(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._client = AsyncMock() + + await reporter._flush_once() + + reporter._client.post.assert_not_awaited() + + @pytest.mark.anyio + async def test_flush_network_error_requeues(self): + reporter = MetricsReporter(api_key="key") + reporter._started = True + + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=Exception("connection refused")) + reporter._client = mock_client + + reporter.enqueue(_make_envelope(event_id="net-err")) + + with patch( + "scope.server.metrics_reporter.asyncio.sleep", new_callable=AsyncMock + ): + await reporter._flush_once() + + assert reporter.buffered_count == 1 + + +# --------------------------------------------------------------------------- +# Disk persistence +# --------------------------------------------------------------------------- + + +class TestDiskBuffer: + def test_persist_and_load(self, tmp_path: Path): + buf_path = tmp_path / "buffer.jsonl" + + reporter = MetricsReporter(api_key="key") + reporter._started = True + reporter._buffer_path = buf_path + + reporter.enqueue(_make_envelope(event_id="disk-1")) + reporter.enqueue(_make_envelope(event_id="disk-2")) + + reporter._persist_disk_buffer() + assert buf_path.exists() + lines = buf_path.read_text().strip().splitlines() + assert len(lines) == 2 + + reporter._buffer.clear() + assert reporter.buffered_count == 0 + + reporter._load_disk_buffer() + assert reporter.buffered_count == 2 + assert buf_path.read_text() == "" + + def test_load_handles_missing_file(self, tmp_path: Path): + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = tmp_path / "nonexistent.jsonl" + reporter._load_disk_buffer() + assert reporter.buffered_count == 0 + + def test_load_handles_corrupt_lines(self, tmp_path: Path): + buf_path = tmp_path / "buffer.jsonl" + buf_path.write_text( + '{"type":"ok","data":{}}\nnot-json\n{"type":"ok2","data":{}}\n' + ) + + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = buf_path + reporter._load_disk_buffer() + assert reporter.buffered_count == 2 + + +# --------------------------------------------------------------------------- +# TelemetrySink protocol conformance +# --------------------------------------------------------------------------- + + +class TestTelemetrySinkConformance: + def test_isinstance_check(self): + """MetricsReporter satisfies the TelemetrySink runtime_checkable protocol.""" + from scope.server.kafka_publisher import TelemetrySink + + reporter = MetricsReporter(api_key="key") + assert isinstance(reporter, TelemetrySink) + + def test_emit_delegates_to_enqueue(self): + """emit() is the TelemetrySink entry point and buffers via enqueue().""" + reporter = MetricsReporter(api_key="key") + reporter._started = True + + envelope = _make_envelope("stream_started", event_id="emit-1") + reporter.emit(envelope) + + assert reporter.buffered_count == 1 + batch = reporter._take_batch(1) + assert batch[0]["id"] == "emit-1" + assert batch[0]["type"] == "stream_started" + + def test_publish_raw_event_routes_through_emit(self): + """When MetricsReporter is the active sink, publish_raw_event() routes to emit().""" + from scope.server.kafka_publisher import ( + get_telemetry_sink, + publish_raw_event, + set_telemetry_sink, + ) + + reporter = MetricsReporter(api_key="key") + reporter._started = True + + prev_sink = get_telemetry_sink() + try: + set_telemetry_sink(reporter) + + envelope = _make_envelope("stream_stopped", event_id="raw-1") + publish_raw_event(envelope) + + assert reporter.buffered_count == 1 + finally: + set_telemetry_sink(prev_sink) + + def test_emit_noop_when_stopped(self): + reporter = MetricsReporter(api_key="key") + reporter._started = False + reporter.emit(_make_envelope()) + assert reporter.buffered_count == 0 + + +# --------------------------------------------------------------------------- +# Start / stop lifecycle +# --------------------------------------------------------------------------- + + +class TestLifecycle: + @pytest.mark.anyio + async def test_start_and_stop(self, tmp_path: Path): + reporter = MetricsReporter(api_key="key") + reporter._buffer_path = tmp_path / "empty.jsonl" + started = await reporter.start() + assert started is True + assert reporter.is_running is True + + reporter.enqueue(_make_envelope()) + assert reporter.buffered_count == 1 + + await reporter.stop() + assert reporter.is_running is False + + @pytest.mark.anyio + async def test_start_fails_without_httpx(self): + reporter = MetricsReporter(api_key="key") + with patch.dict("sys.modules", {"httpx": None}): + with patch("builtins.__import__", side_effect=ImportError("no httpx")): + started = await reporter.start() + assert isinstance(started, bool) diff --git a/uv.lock b/uv.lock index b21bcf89f..93f5a7a6d 100644 --- a/uv.lock +++ b/uv.lock @@ -560,7 +560,7 @@ wheels = [ [[package]] name = "daydream-scope" -version = "0.2.4" +version = "0.2.5" source = { editable = "." } dependencies = [ { name = "accelerate" }, @@ -1332,7 +1332,7 @@ wheels = [ [[package]] name = "livepeer-gateway" version = "0.1.0" -source = { git = "https://github.com/livepeer/livepeer-python-gateway#4bf51ae52bd3863174089bcbffbe4ac9b98738c3" } +source = { git = "https://github.com/livepeer/livepeer-python-gateway#03f13ad651582ddca4f94fc868eea3c0a66daa06" } dependencies = [ { name = "aiohttp" }, { name = "av" },