Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "daydream-scope-desktop",
"productName": "Daydream Scope",
"version": "0.2.4",
"version": "0.2.5",
"description": "Daydream Scope",
"main": ".vite/build/main.js",
"scripts": {
Expand Down
4 changes: 2 additions & 2 deletions frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion frontend/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "daydream-scope-frontend",
"private": true,
"version": "0.2.4",
"version": "0.2.5",
"type": "module",
"scripts": {
"dev": "vite",
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "daydream-scope"
version = "0.2.4"
version = "0.2.5"
description = "A tool for running and customizing real-time, interactive generative AI pipelines and models"
readme = "README.md"
requires-python = ">=3.12"
Expand Down Expand Up @@ -83,7 +83,9 @@ midi = [
daydream-scope = "scope.server.app:main"
build = "scope.server.build:main"
download_models = "scope.server.download_models:main"
download_loras = "scope.server.download_loras:main"
livepeer-runner = "scope.cloud.livepeer_app:main"
livepeer-scope = "scope.cloud.livepeer_scope_app:main"

[project.urls]
Homepage = "https://github.com/daydreamlive/scope"
Expand Down
139 changes: 121 additions & 18 deletions src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import queue
import shutil
import threading
import time
import uuid
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
Expand All @@ -41,11 +42,12 @@
from pydantic import BaseModel

import scope.server.app as scope_app_module
from scope.cloud.trickle_events_sink import TrickleEventsSink
from scope.core.outputs import HARDWARE_SINK_MODES
from scope.server.app import app as scope_app
from scope.server.app import lifespan as scope_lifespan
from scope.server.frame_processor import FrameProcessor
from scope.server.kafka_publisher import publish_event
from scope.server.kafka_publisher import publish_event, set_telemetry_sink
from scope.server.logs_config import (
LOG_FORMAT,
ScopeLogContextFilter,
Expand All @@ -57,6 +59,8 @@
scope_client: httpx.AsyncClient | None = None
_connection_active = False
_connection_count = 0
# Monotonic anchor for measuring runner cold-start (boot -> runner_ready).
_runner_start_monotonic = time.monotonic()

STREAM_TASK_SHUTDOWN_GRACE_S = 1.0
STREAM_TASK_CANCEL_TIMEOUT_S = 1.0
Expand All @@ -65,7 +69,7 @@
# embedded assets (base64-encoded images/video) into a single request and
# easily blow past the 1 MiB library default, which would tear down the
# control-channel reader mid-import.
MAX_CONTROL_EVENT_BYTES = 128 * 1024 * 1024
MAX_CONTROL_EVENT_BYTES = 5 * 1024 * 1024
REMOTE_VIDEO_CLOCK_RATE = 90_000
REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE)
ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets")
Expand Down Expand Up @@ -153,6 +157,35 @@ def _build_connection_info() -> dict[str, Any]:
}


def _publish_media_loop_error(
session: LivepeerSession, error_type: str, exc: Exception
) -> None:
"""Emit a structured error telemetry event for a failed media loop.

These loops previously only logged on failure; surfacing them as telemetry
lets the client (the egress point) see runner-side media failures that have
no reachable Kafka of their own.

NOTE: This only fires in cloud mode when the user has opted in by providing
their SCOPE_CLOUD_API_KEY. In local/standalone mode, no telemetry leaves
the machine.
"""
publish_event(
event_type="error",
session_id=session.session_id,
connection_id=session.manifest_id,
user_id=session.user_id,
error={
"error_type": error_type,
"message": str(exc),
"exception_type": type(exc).__name__,
"recoverable": False,
},
connection_info=session.connection_info,
metadata={"mode": "livepeer"},
)


async def _shutdown_task(
task: asyncio.Task | None,
*,
Expand Down Expand Up @@ -257,6 +290,14 @@ def _resolve_produces_video(
return bool(status_info.get("produces_video", True))


def _public_stream_channel(channel: dict[str, Any]) -> dict[str, Any]:
return {
"url": channel["url"],
"direction": channel["direction"],
"mime_type": channel.get("mime_type", ""),
}


async def _request_stream_channels(
session: LivepeerSession,
*,
Expand Down Expand Up @@ -294,13 +335,17 @@ async def _request_stream_channels(
mime_type = channel.get("mime_type")
if not isinstance(url, str) or not isinstance(ch_direction, str):
continue
normalized.append(
{
"url": url,
"direction": ch_direction,
"mime_type": mime_type if isinstance(mime_type, str) else "",
}
)
internal_url = channel.get("internal_url")
internal_url = internal_url if isinstance(internal_url, str) else ""
normalized_channel = {
"url": url,
"direction": ch_direction,
"mime_type": mime_type if isinstance(mime_type, str) else "",
"io_url": internal_url or url,
}
if internal_url:
normalized_channel["internal_url"] = internal_url
normalized.append(normalized_channel)
return normalized


Expand Down Expand Up @@ -423,6 +468,7 @@ async def _media_input_loop(
raise
except Exception as exc:
logger.error("Media input loop failed: %s", exc)
_publish_media_loop_error(session, "media_input_loop_failed", exc)
finally:
try:
await media_output.close()
Expand Down Expand Up @@ -551,6 +597,7 @@ async def _media_output_loop(
raise
except Exception as exc:
logger.error("Media output loop failed: %s", exc)
_publish_media_loop_error(session, "media_output_loop_failed", exc)
finally:
try:
await publisher.close()
Expand Down Expand Up @@ -670,6 +717,7 @@ async def _media_audio_output_loop(
raise
except Exception as exc:
logger.error("Media audio output loop failed: %s", exc)
_publish_media_loop_error(session, "media_audio_output_loop_failed", exc)
finally:
try:
await publisher.close()
Expand Down Expand Up @@ -987,14 +1035,14 @@ async def _handle_control_message(
inbound_url: str | None = None
for channel in channels:
ch = {
**channel,
**_public_stream_channel(channel),
"role": "input",
"input_track_index": input_idx,
"source_node_id": source_node_id,
}
active_channels.append(ch)
if channel["direction"] == "in":
inbound_url = channel["url"]
inbound_url = channel["io_url"]
if inbound_url is None:
raise RuntimeError("response did not include input track URL")
input_subscribe_urls[input_idx] = inbound_url
Expand All @@ -1009,15 +1057,15 @@ async def _handle_control_message(
)
for channel in channels:
ch = {
**channel,
**_public_stream_channel(channel),
"role": "output",
"output_track_index": output_idx,
"sink_node_id": sink_node_id,
"record_node_id": record_node_id,
}
active_channels.append(ch)
if channel["direction"] == "out":
outbound_url = channel["url"]
outbound_url = channel["io_url"]
if outbound_url is None:
raise RuntimeError("response did not include output track URL")
output_publish_urls[output_idx] = outbound_url
Expand All @@ -1029,13 +1077,13 @@ async def _handle_control_message(
)
for channel in channels:
ch = {
**channel,
**_public_stream_channel(channel),
"role": "output_audio",
"output_media_kind": "audio",
}
active_channels.append(ch)
if channel["direction"] == "out":
audio_publish_url = channel["url"]
audio_publish_url = channel["io_url"]
if audio_publish_url is None:
raise RuntimeError(
"response did not include audio output track URL"
Expand Down Expand Up @@ -1277,13 +1325,36 @@ async def _forward_notifications_to_events() -> None:

notif_task = asyncio.create_task(_forward_notifications_to_events())

# Route runner-side telemetry (publish_event calls from the embedded Scope
# app) onto the events channel instead of a Kafka broker the runner cannot
# reach. The client re-publishes whatever arrives.
runner_loop = asyncio.get_running_loop()
telemetry_sink = TrickleEventsSink(runner_loop)
set_telemetry_sink(telemetry_sink)
telemetry_task = asyncio.create_task(
telemetry_sink.drain_to(events_writer, stop_event)
)

try:
await events_writer.write(
{
"type": "runner_ready",
"runner_job_id": os.getenv("FAL_JOB_ID") or os.getenv("FAL_RUNNER_ID"),
}
)
# Emit cold-start timing as telemetry (distinct from the control-protocol
# runner_ready above, which the client consumes to unblock startup).
publish_event(
event_type="runner_ready",
session_id=session.session_id,
connection_id=session.manifest_id,
user_id=session.user_id,
connection_info=session.connection_info,
metadata={
"mode": "livepeer",
"startup_ms": int((time.monotonic() - _runner_start_monotonic) * 1000),
},
)
async for message in JSONLReader(control_url)(
max_event_bytes=MAX_CONTROL_EVENT_BYTES
):
Expand Down Expand Up @@ -1318,6 +1389,15 @@ async def _forward_notifications_to_events() -> None:
except asyncio.CancelledError:
pass
session.notification_queue = None
# Stop accepting telemetry before draining so late publish_event calls
# during shutdown no-op instead of enqueueing onto a closing queue.
set_telemetry_sink(None)
if not telemetry_sink.request_stop():
telemetry_task.cancel()
try:
await telemetry_task
except asyncio.CancelledError:
pass
await _stop_stream(session)
try:
await events_writer.close()
Expand Down Expand Up @@ -1501,9 +1581,32 @@ async def websocket_endpoint(ws: WebSocket) -> None:
# Complete the handshake with the orchestrator
await ws.send_json({"type": "started"})

# Keep the WebSocket open and route orchestrator responses used by control handlers.
# Keep the WebSocket open and route orchestrator responses used by control
# handlers. Also watch the control-channel task: if the trickle
# control/events channels are removed, the task exits and this websocket
# session should unwind instead of waiting forever for websocket traffic.
while True:
raw_message = await ws.receive_text()
receive_task = asyncio.create_task(ws.receive_text())
done, pending = await asyncio.wait(
{receive_task, control_task},
return_when=asyncio.FIRST_COMPLETED,
)
if control_task in done:
if receive_task in pending:
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
logger.info("Control channel ended; closing websocket session")
with suppress(Exception):
await ws.close(
code=4000,
reason="control channel ended",
)
break

raw_message = receive_task.result()
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
Expand Down
Loading
Loading