Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "daydream-scope-desktop",
"productName": "Daydream Scope",
"version": "0.2.4",
"version": "0.2.5",
"description": "Daydream Scope",
"main": ".vite/build/main.js",
"scripts": {
Expand Down
4 changes: 2 additions & 2 deletions frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion frontend/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "daydream-scope-frontend",
"private": true,
"version": "0.2.4",
"version": "0.2.5",
"type": "module",
"scripts": {
"dev": "vite",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "daydream-scope"
version = "0.2.4"
version = "0.2.5"
description = "A tool for running and customizing real-time, interactive generative AI pipelines and models"
readme = "README.md"
requires-python = ">=3.12"
Expand Down
72 changes: 70 additions & 2 deletions src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import queue
import shutil
import threading
import time
import uuid
from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass, field
Expand All @@ -41,11 +42,12 @@
from pydantic import BaseModel

import scope.server.app as scope_app_module
from scope.cloud.trickle_events_sink import TrickleEventsSink
from scope.core.outputs import HARDWARE_SINK_MODES
from scope.server.app import app as scope_app
from scope.server.app import lifespan as scope_lifespan
from scope.server.frame_processor import FrameProcessor
from scope.server.kafka_publisher import publish_event
from scope.server.kafka_publisher import publish_event, set_telemetry_sink
from scope.server.logs_config import (
LOG_FORMAT,
ScopeLogContextFilter,
Expand All @@ -57,6 +59,8 @@
scope_client: httpx.AsyncClient | None = None
_connection_active = False
_connection_count = 0
# Monotonic anchor for measuring runner cold-start (boot -> runner_ready).
_runner_start_monotonic = time.monotonic()

STREAM_TASK_SHUTDOWN_GRACE_S = 1.0
STREAM_TASK_CANCEL_TIMEOUT_S = 1.0
Expand All @@ -65,7 +69,7 @@
# embedded assets (base64-encoded images/video) into a single request and
# easily blow past the 1 MiB library default, which would tear down the
# control-channel reader mid-import.
MAX_CONTROL_EVENT_BYTES = 128 * 1024 * 1024
MAX_CONTROL_EVENT_BYTES = 5 * 1024 * 1024
REMOTE_VIDEO_CLOCK_RATE = 90_000
REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE)
ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets")
Expand Down Expand Up @@ -153,6 +157,35 @@ def _build_connection_info() -> dict[str, Any]:
}


def _publish_media_loop_error(
session: LivepeerSession, error_type: str, exc: Exception
) -> None:
"""Emit a structured error telemetry event for a failed media loop.

These loops previously only logged on failure; surfacing them as telemetry
lets the client (the egress point) see runner-side media failures that have
no reachable Kafka of their own.

NOTE: This only fires in cloud mode when the user has opted in by providing
their SCOPE_CLOUD_API_KEY. In local/standalone mode, no telemetry leaves
the machine.
"""
publish_event(
event_type="error",
session_id=session.session_id,
connection_id=session.manifest_id,
user_id=session.user_id,
error={
"error_type": error_type,
"message": str(exc),
"exception_type": type(exc).__name__,
"recoverable": False,
},
connection_info=session.connection_info,
metadata={"mode": "livepeer"},
)


async def _shutdown_task(
task: asyncio.Task | None,
*,
Expand Down Expand Up @@ -423,6 +456,7 @@ async def _media_input_loop(
raise
except Exception as exc:
logger.error("Media input loop failed: %s", exc)
_publish_media_loop_error(session, "media_input_loop_failed", exc)
finally:
try:
await media_output.close()
Expand Down Expand Up @@ -551,6 +585,7 @@ async def _media_output_loop(
raise
except Exception as exc:
logger.error("Media output loop failed: %s", exc)
_publish_media_loop_error(session, "media_output_loop_failed", exc)
finally:
try:
await publisher.close()
Expand Down Expand Up @@ -670,6 +705,7 @@ async def _media_audio_output_loop(
raise
except Exception as exc:
logger.error("Media audio output loop failed: %s", exc)
_publish_media_loop_error(session, "media_audio_output_loop_failed", exc)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we published these metrics only for users who opted in for sending analytics?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today egress is gated on SCOPE_CLOUD_API_KEY being set at process startup (operator/cloud config), so ordinary local desktop users emit nothing — it's effectively opt-in-by-config rather than always-on. What it does not yet do is honor a per-user "share analytics" preference if the product expects one. @leszko — could you confirm the intended consent model? If there's meant to be a dedicated analytics-consent setting, I'll gate telemetry behind it in a follow-up before this reaches main.

finally:
try:
await publisher.close()
Expand Down Expand Up @@ -1277,13 +1313,36 @@ async def _forward_notifications_to_events() -> None:

notif_task = asyncio.create_task(_forward_notifications_to_events())

# Route runner-side telemetry (publish_event calls from the embedded Scope
# app) onto the events channel instead of a Kafka broker the runner cannot
# reach. The client re-publishes whatever arrives.
runner_loop = asyncio.get_running_loop()
telemetry_sink = TrickleEventsSink(runner_loop)
set_telemetry_sink(telemetry_sink)
telemetry_task = asyncio.create_task(
telemetry_sink.drain_to(events_writer, stop_event)
)

try:
await events_writer.write(
{
"type": "runner_ready",
"runner_job_id": os.getenv("FAL_JOB_ID") or os.getenv("FAL_RUNNER_ID"),
}
)
# Emit cold-start timing as telemetry (distinct from the control-protocol
# runner_ready above, which the client consumes to unblock startup).
publish_event(
event_type="runner_ready",
session_id=session.session_id,
connection_id=session.manifest_id,
user_id=session.user_id,
connection_info=session.connection_info,
metadata={
"mode": "livepeer",
"startup_ms": int((time.monotonic() - _runner_start_monotonic) * 1000),
},
)
async for message in JSONLReader(control_url)(
max_event_bytes=MAX_CONTROL_EVENT_BYTES
):
Expand Down Expand Up @@ -1318,6 +1377,15 @@ async def _forward_notifications_to_events() -> None:
except asyncio.CancelledError:
pass
session.notification_queue = None
# Stop accepting telemetry before draining so late publish_event calls
# during shutdown no-op instead of enqueueing onto a closing queue.
set_telemetry_sink(None)
if not telemetry_sink.request_stop():
telemetry_task.cancel()
try:
await telemetry_task
except asyncio.CancelledError:
pass
await _stop_stream(session)
try:
await events_writer.close()
Expand Down
104 changes: 104 additions & 0 deletions src/scope/cloud/trickle_events_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Telemetry sink that forwards events over the trickle events channel.

On the cloud runner there is normally no reachable Kafka broker, so telemetry
emitted via ``scope.server.kafka_publisher.publish_event`` must instead be
forwarded to the local SDK client. This sink enqueues each (already-built) event
envelope and a drain task writes it onto the per-session events channel as a
``{"type": "telemetry", "event": {...}}`` message. The client receives it and
re-publishes verbatim to its own egress sink.

This mirrors the notification-forwarding pattern in ``livepeer_app.py``
(``_enqueue_notification`` / ``_forward_notifications_to_events``): a bounded
queue with drop-oldest semantics so telemetry is best-effort and never blocks
the media or control paths. It structurally satisfies the
``scope.server.kafka_publisher.TelemetrySink`` protocol.
"""

import asyncio
import logging
from typing import Any

logger = logging.getLogger(__name__)

# Bounded so a stalled/slow events channel can never grow memory without limit.
DEFAULT_QUEUE_MAXSIZE = 256


class TrickleEventsSink:
"""Forward telemetry events onto the trickle events channel.

``emit`` is safe to call from worker threads (FrameProcessor). It marshals
onto the runner event loop and drops the oldest queued event on overflow.
"""

def __init__(
self,
loop: asyncio.AbstractEventLoop,
maxsize: int = DEFAULT_QUEUE_MAXSIZE,
):
self._loop = loop
self._queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue(
maxsize=maxsize
)
self._drop_warned = False

def emit(self, event: dict[str, Any]) -> None:
"""Thread-safe, non-blocking enqueue (drop-oldest on overflow)."""

def _put_with_drop() -> None:
try:
self._queue.put_nowait(event)
except asyncio.QueueFull:
# Drop the oldest event to make room for the newest.
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
self._queue.put_nowait(event)
except asyncio.QueueFull:
if not self._drop_warned:
logger.warning("Dropped telemetry event under queue pressure")
self._drop_warned = True

try:
self._loop.call_soon_threadsafe(_put_with_drop)
except RuntimeError:
# Loop is closing; drop silently.
pass

async def drain_to(self, events_writer: Any, stop_event: asyncio.Event) -> None:
"""Drain queued events onto the events channel until stopped.

Exits on the ``stop_event``, on a ``None`` sentinel (see
``request_stop``), or on cancellation.
"""
try:
while not stop_event.is_set():
try:
event = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except TimeoutError:
continue
if event is None:
break
try:
await events_writer.write({"type": "telemetry", "event": event})
except Exception:
logger.debug(
"Failed to forward telemetry to events channel",
exc_info=True,
)
except asyncio.CancelledError:
pass

def request_stop(self) -> bool:
"""Enqueue the sentinel to wake the drain loop so it exits.

Must be called from the runner event loop thread. Returns ``False`` if
the queue is saturated and the caller should cancel the drain task.
"""
try:
self._queue.put_nowait(None)
return True
except asyncio.QueueFull:
return False
39 changes: 39 additions & 0 deletions src/scope/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
)
from .kafka_publisher import (
KafkaPublisher,
install_default_egress_sink,
is_kafka_enabled,
set_kafka_publisher,
)
Expand All @@ -87,6 +88,11 @@
)
from .lora_downloader import LoRADownloadRequest, LoRADownloadResult
from .mcp_router import router as mcp_router
from .metrics_reporter import (
MetricsReporter,
is_metrics_reporter_enabled,
set_metrics_reporter,
)
from .models_config import (
ensure_models_dir,
get_assets_dir,
Expand Down Expand Up @@ -344,6 +350,8 @@ def configure_static_files():
livepeer = None
# Global Kafka publisher instance (optional, initialized if credentials are present)
kafka_publisher = None
# Global metrics reporter instance (optional, forwards telemetry to Daydream /v1/metrics)
metrics_reporter_instance = None
# Global tempo sync manager instance
tempo_sync = None
# Global OSC server instance
Expand Down Expand Up @@ -392,6 +400,7 @@ async def lifespan(app: FastAPI):
webrtc_manager, \
pipeline_manager, \
kafka_publisher, \
metrics_reporter_instance, \
livepeer, \
tempo_sync, \
osc_server, \
Expand Down Expand Up @@ -454,6 +463,30 @@ async def lifespan(app: FastAPI):
kafka_publisher = None
logger.warning("Kafka publisher failed to start")

# Initialize metrics reporter (forwards telemetry to Daydream /v1/metrics).
# Only active when the user has opted in to cloud features by setting
# SCOPE_CLOUD_API_KEY (their Daydream platform API key). In local/standalone
# mode this block is skipped entirely — no telemetry leaves the machine.
if is_metrics_reporter_enabled():
api_key = os.getenv("SCOPE_CLOUD_API_KEY", "")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's that SCOPE_CLOUD_API_KEY env variable? Is it set only in the cloud?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SCOPE_CLOUD_API_KEY is the user's Daydream platform API key (the same key used for cloud connect/billing), supplied via --cloud-api-key or the env var. The metrics reporter only starts when it's present at startup (is_metrics_reporter_enabled()), so an ordinary local run without it emits nothing — egress is effectively cloud-only.

if api_key:
metrics_reporter_instance = MetricsReporter(api_key=api_key)
if await metrics_reporter_instance.start():
set_metrics_reporter(metrics_reporter_instance)
logger.info("Metrics reporter initialized")
else:
metrics_reporter_instance = None
logger.warning("Metrics reporter failed to start")
else:
logger.debug("Metrics reporter skipped: no SCOPE_CLOUD_API_KEY")

# Install the default telemetry egress sink for local mode so that
# publish_event() calls route through MetricsReporter (or Kafka) even
# without a cloud connection. In cloud mode livepeer.py re-calls this
# after connecting, which is fine (idempotent selection).
if install_default_egress_sink():
logger.info("Telemetry egress sink installed at startup")

# Start OSC UDP server on the same port as the HTTP API
from .osc_server import OSCServer

Expand Down Expand Up @@ -527,6 +560,12 @@ async def lifespan(app: FastAPI):
await livepeer.disconnect()
logger.info("Livepeer connection shutdown complete")

if metrics_reporter_instance:
logger.info("Shutting down metrics reporter...")
await metrics_reporter_instance.stop()
set_metrics_reporter(None)
logger.info("Metrics reporter shutdown complete")

if kafka_publisher:
logger.info("Shutting down Kafka publisher...")
await kafka_publisher.stop()
Expand Down
Loading
Loading