-
Notifications
You must be signed in to change notification settings - Fork 59
feat(cloud): relay telemetry over the trickle events channel #1040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
37f556b
8a5d97d
5d6b6c4
8e28c24
a847b5c
7612e69
05f7bb1
4c92368
a0a061d
c0ff154
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| """Telemetry sink that forwards events over the trickle events channel. | ||
|
|
||
| On the cloud runner there is normally no reachable Kafka broker, so telemetry | ||
| emitted via ``scope.server.kafka_publisher.publish_event`` must instead be | ||
| forwarded to the local SDK client. This sink enqueues each (already-built) event | ||
| envelope and a drain task writes it onto the per-session events channel as a | ||
| ``{"type": "telemetry", "event": {...}}`` message. The client receives it and | ||
| re-publishes verbatim to its own egress sink. | ||
|
|
||
| This mirrors the notification-forwarding pattern in ``livepeer_app.py`` | ||
| (``_enqueue_notification`` / ``_forward_notifications_to_events``): a bounded | ||
| queue with drop-oldest semantics so telemetry is best-effort and never blocks | ||
| the media or control paths. It structurally satisfies the | ||
| ``scope.server.kafka_publisher.TelemetrySink`` protocol. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Bounded so a stalled/slow events channel can never grow memory without limit. | ||
| DEFAULT_QUEUE_MAXSIZE = 256 | ||
|
|
||
|
|
||
| class TrickleEventsSink: | ||
| """Forward telemetry events onto the trickle events channel. | ||
|
|
||
| ``emit`` is safe to call from worker threads (FrameProcessor). It marshals | ||
| onto the runner event loop and drops the oldest queued event on overflow. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| loop: asyncio.AbstractEventLoop, | ||
| maxsize: int = DEFAULT_QUEUE_MAXSIZE, | ||
| ): | ||
| self._loop = loop | ||
| self._queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue( | ||
| maxsize=maxsize | ||
| ) | ||
| self._drop_warned = False | ||
|
|
||
| def emit(self, event: dict[str, Any]) -> None: | ||
| """Thread-safe, non-blocking enqueue (drop-oldest on overflow).""" | ||
|
|
||
| def _put_with_drop() -> None: | ||
| try: | ||
| self._queue.put_nowait(event) | ||
| except asyncio.QueueFull: | ||
| # Drop the oldest event to make room for the newest. | ||
| try: | ||
| self._queue.get_nowait() | ||
| except asyncio.QueueEmpty: | ||
| pass | ||
| try: | ||
| self._queue.put_nowait(event) | ||
| except asyncio.QueueFull: | ||
| if not self._drop_warned: | ||
| logger.warning("Dropped telemetry event under queue pressure") | ||
| self._drop_warned = True | ||
|
|
||
| try: | ||
| self._loop.call_soon_threadsafe(_put_with_drop) | ||
| except RuntimeError: | ||
| # Loop is closing; drop silently. | ||
| pass | ||
|
|
||
| async def drain_to(self, events_writer: Any, stop_event: asyncio.Event) -> None: | ||
| """Drain queued events onto the events channel until stopped. | ||
|
|
||
| Exits on the ``stop_event``, on a ``None`` sentinel (see | ||
| ``request_stop``), or on cancellation. | ||
| """ | ||
| try: | ||
| while not stop_event.is_set(): | ||
| try: | ||
| event = await asyncio.wait_for(self._queue.get(), timeout=1.0) | ||
| except TimeoutError: | ||
| continue | ||
| if event is None: | ||
| break | ||
| try: | ||
| await events_writer.write({"type": "telemetry", "event": event}) | ||
| except Exception: | ||
| logger.debug( | ||
| "Failed to forward telemetry to events channel", | ||
| exc_info=True, | ||
| ) | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| def request_stop(self) -> bool: | ||
| """Enqueue the sentinel to wake the drain loop so it exits. | ||
|
|
||
| Must be called from the runner event loop thread. Returns ``False`` if | ||
| the queue is saturated and the caller should cancel the drain task. | ||
| """ | ||
| try: | ||
| self._queue.put_nowait(None) | ||
| return True | ||
| except asyncio.QueueFull: | ||
| return False |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ | |
| ) | ||
| from .kafka_publisher import ( | ||
| KafkaPublisher, | ||
| install_default_egress_sink, | ||
| is_kafka_enabled, | ||
| set_kafka_publisher, | ||
| ) | ||
|
|
@@ -87,6 +88,11 @@ | |
| ) | ||
| from .lora_downloader import LoRADownloadRequest, LoRADownloadResult | ||
| from .mcp_router import router as mcp_router | ||
| from .metrics_reporter import ( | ||
| MetricsReporter, | ||
| is_metrics_reporter_enabled, | ||
| set_metrics_reporter, | ||
| ) | ||
| from .models_config import ( | ||
| ensure_models_dir, | ||
| get_assets_dir, | ||
|
|
@@ -344,6 +350,8 @@ def configure_static_files(): | |
| livepeer = None | ||
| # Global Kafka publisher instance (optional, initialized if credentials are present) | ||
| kafka_publisher = None | ||
| # Global metrics reporter instance (optional, forwards telemetry to Daydream /v1/metrics) | ||
| metrics_reporter_instance = None | ||
| # Global tempo sync manager instance | ||
| tempo_sync = None | ||
| # Global OSC server instance | ||
|
|
@@ -392,6 +400,7 @@ async def lifespan(app: FastAPI): | |
| webrtc_manager, \ | ||
| pipeline_manager, \ | ||
| kafka_publisher, \ | ||
| metrics_reporter_instance, \ | ||
| livepeer, \ | ||
| tempo_sync, \ | ||
| osc_server, \ | ||
|
|
@@ -454,6 +463,30 @@ async def lifespan(app: FastAPI): | |
| kafka_publisher = None | ||
| logger.warning("Kafka publisher failed to start") | ||
|
|
||
| # Initialize metrics reporter (forwards telemetry to Daydream /v1/metrics). | ||
| # Only active when the user has opted in to cloud features by setting | ||
| # SCOPE_CLOUD_API_KEY (their Daydream platform API key). In local/standalone | ||
| # mode this block is skipped entirely — no telemetry leaves the machine. | ||
| if is_metrics_reporter_enabled(): | ||
| api_key = os.getenv("SCOPE_CLOUD_API_KEY", "") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's that
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| if api_key: | ||
| metrics_reporter_instance = MetricsReporter(api_key=api_key) | ||
| if await metrics_reporter_instance.start(): | ||
| set_metrics_reporter(metrics_reporter_instance) | ||
| logger.info("Metrics reporter initialized") | ||
| else: | ||
| metrics_reporter_instance = None | ||
| logger.warning("Metrics reporter failed to start") | ||
| else: | ||
| logger.debug("Metrics reporter skipped: no SCOPE_CLOUD_API_KEY") | ||
|
|
||
| # Install the default telemetry egress sink for local mode so that | ||
| # publish_event() calls route through MetricsReporter (or Kafka) even | ||
| # without a cloud connection. In cloud mode livepeer.py re-calls this | ||
| # after connecting, which is fine (idempotent selection). | ||
| if install_default_egress_sink(): | ||
| logger.info("Telemetry egress sink installed at startup") | ||
|
|
||
| # Start OSC UDP server on the same port as the HTTP API | ||
| from .osc_server import OSCServer | ||
|
|
||
|
|
@@ -527,6 +560,12 @@ async def lifespan(app: FastAPI): | |
| await livepeer.disconnect() | ||
| logger.info("Livepeer connection shutdown complete") | ||
|
|
||
| if metrics_reporter_instance: | ||
| logger.info("Shutting down metrics reporter...") | ||
| await metrics_reporter_instance.stop() | ||
| set_metrics_reporter(None) | ||
| logger.info("Metrics reporter shutdown complete") | ||
|
|
||
| if kafka_publisher: | ||
| logger.info("Shutting down Kafka publisher...") | ||
| await kafka_publisher.stop() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we published these metrics only for users who opted in for sending analytics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Today egress is gated on
SCOPE_CLOUD_API_KEYbeing set at process startup (operator/cloud config), so ordinary local desktop users emit nothing — it's effectively opt-in-by-config rather than always-on. What it does not yet do is honor a per-user "share analytics" preference if the product expects one. @leszko — could you confirm the intended consent model? If there's meant to be a dedicated analytics-consent setting, I'll gate telemetry behind it in a follow-up before this reachesmain.