Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from scope.server.app import app as scope_app
from scope.server.app import lifespan as scope_lifespan
from scope.server.frame_processor import FrameProcessor
from scope.server.kafka_publisher import publish_event
from scope.server.kafka_publisher import publish_event, set_event_sink
from scope.server.logs_config import (
LOG_FORMAT,
ScopeLogContextFilter,
Expand Down Expand Up @@ -1277,6 +1277,28 @@ async def _forward_notifications_to_events() -> None:

notif_task = asyncio.create_task(_forward_notifications_to_events())

# Forward stream_trace envelopes over the trickle events channel so the
# Scope client can relay them to the Daydream /v1/metrics endpoint.
runner_loop = asyncio.get_running_loop()

def _trickle_event_sink(envelope: dict) -> None:
"""Sync callback: schedule an async write on the runner loop."""

async def _write() -> None:
try:
await events_writer.write({"type": "network_event", "event": envelope})
except Exception:
logger.debug(
"Failed to forward network_event to events channel", exc_info=True
)

try:
runner_loop.call_soon_threadsafe(lambda: runner_loop.create_task(_write()))
except RuntimeError:
pass

set_event_sink(_trickle_event_sink)

try:
await events_writer.write(
{
Expand Down Expand Up @@ -1318,6 +1340,7 @@ async def _forward_notifications_to_events() -> None:
except asyncio.CancelledError:
pass
session.notification_queue = None
set_event_sink(None)
await _stop_stream(session)
try:
await events_writer.close()
Expand Down
32 changes: 32 additions & 0 deletions src/scope/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from .kafka_publisher import (
KafkaPublisher,
is_kafka_enabled,
set_event_sink,
set_kafka_publisher,
)
from .logs_config import (
Expand All @@ -87,6 +88,12 @@
)
from .lora_downloader import LoRADownloadRequest, LoRADownloadResult
from .mcp_router import router as mcp_router
from .metrics_reporter import (
MetricsReporter,
is_metrics_reporter_enabled,
report_event,
set_metrics_reporter,
)
from .models_config import (
ensure_models_dir,
get_assets_dir,
Expand Down Expand Up @@ -344,6 +351,8 @@ def configure_static_files():
livepeer = None
# Global Kafka publisher instance (optional, initialized if credentials are present)
kafka_publisher = None
# Global metrics reporter instance (optional, forwards network_events to Daydream /v1/metrics)
metrics_reporter_instance = None
# Global tempo sync manager instance
tempo_sync = None
# Global OSC server instance
Expand Down Expand Up @@ -392,6 +401,7 @@ async def lifespan(app: FastAPI):
webrtc_manager, \
pipeline_manager, \
kafka_publisher, \
metrics_reporter_instance, \
livepeer, \
tempo_sync, \
osc_server, \
Expand Down Expand Up @@ -454,6 +464,21 @@ async def lifespan(app: FastAPI):
kafka_publisher = None
logger.warning("Kafka publisher failed to start")

# Initialize metrics reporter (forwards network_events to Daydream /v1/metrics)
if is_metrics_reporter_enabled():
api_key = os.getenv("SCOPE_CLOUD_API_KEY", "")
if api_key:
metrics_reporter_instance = MetricsReporter(api_key=api_key)
if await metrics_reporter_instance.start():
set_metrics_reporter(metrics_reporter_instance)
set_event_sink(lambda envelope: report_event(envelope))
logger.info("Metrics reporter initialized")
else:
metrics_reporter_instance = None
logger.warning("Metrics reporter failed to start")
else:
logger.debug("Metrics reporter skipped: no SCOPE_CLOUD_API_KEY")

# Start OSC UDP server on the same port as the HTTP API
from .osc_server import OSCServer

Expand Down Expand Up @@ -527,6 +552,13 @@ async def lifespan(app: FastAPI):
await livepeer.disconnect()
logger.info("Livepeer connection shutdown complete")

if metrics_reporter_instance:
logger.info("Shutting down metrics reporter...")
set_event_sink(None)
await metrics_reporter_instance.stop()
set_metrics_reporter(None)
logger.info("Metrics reporter shutdown complete")

if kafka_publisher:
logger.info("Shutting down Kafka publisher...")
await kafka_publisher.stop()
Expand Down
36 changes: 36 additions & 0 deletions src/scope/server/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import os
import threading
from collections.abc import Callable
from typing import Any

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -184,6 +185,9 @@ async def publish_async(
"data": data,
}

# Forward to metrics reporter / trickle sink (parallel to Kafka)
_dispatch_to_sink(event)

try:
# Use event ID as key (matching Go format)
key = event_id
Expand Down Expand Up @@ -277,6 +281,38 @@ def set_kafka_publisher(publisher: KafkaPublisher | None):
_publisher = publisher


# Event sink: receives built envelopes for forwarding to metrics reporter
# or trickle channel. Set via set_event_sink() so the runner can redirect
# envelopes without importing server code.
_event_sink: Callable[[dict[str, Any]], None] | None = None


def get_event_sink() -> Callable[[dict[str, Any]], None] | None:
"""Get the registered event sink callback."""
return _event_sink


def set_event_sink(sink: Callable[[dict[str, Any]], None] | None) -> None:
"""Register a callback that receives every built stream_trace envelope.

In local mode this feeds the MetricsReporter directly. On the runner it
writes ``{type: "network_event", event: <envelope>}`` to the trickle
events channel.
"""
global _event_sink
_event_sink = sink


def _dispatch_to_sink(envelope: dict[str, Any]) -> None:
"""Forward envelope to the registered sink, if any. Never raises."""
sink = _event_sink
if sink is not None:
try:
sink(envelope)
except Exception as exc:
logger.debug("Event sink dispatch failed: %s", exc)


def publish_event(
event_type: str,
session_id: str | None = None,
Expand Down
18 changes: 18 additions & 0 deletions src/scope/server/livepeer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,10 @@ async def _events_loop(self) -> None:
_forward_runner_notification(event.get("payload"))
continue

if msg_type == "network_event":
_forward_network_event(event.get("event"))
continue

logger.debug(f"Event: {event}")
except asyncio.CancelledError:
pass
Expand Down Expand Up @@ -1279,3 +1283,17 @@ def _forward_runner_notification(payload: Any) -> None:
manager.broadcast_notification(payload)
except Exception:
logger.debug("Failed to re-broadcast runner notification", exc_info=True)


def _forward_network_event(envelope: Any) -> None:
"""Forward a runner-originated network_event to the metrics reporter."""
if not isinstance(envelope, dict):
return
from .metrics_reporter import report_event

try:
report_event(envelope)
except Exception:
logger.debug(
"Failed to forward network_event to metrics reporter", exc_info=True
)
Loading
Loading