From 291c5c93c0ba60409f5f26b21494a1d34e4d11b4 Mon Sep 17 00:00:00 2001 From: emranemran Date: Mon, 20 Apr 2026 12:37:34 -0700 Subject: [PATCH] feat: bring livepeer runner Kafka events to parity with cloud-relay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #956 started publishing websocket_connected / websocket_disconnected from the livepeer fal wrapper using the orchestrator-provided manifest_id. But the rest of the session lifecycle (pipeline_loaded, session_created, stream_started, stream_heartbeat, stream_stopped, playback_ready, and the error variants) continued to either not fire or fire with null user_id / connection_id in livepeer mode because the runner built FrameProcessor without those fields and never persisted manifest_id. - Add manifest_id / session_id / connection_info fields to LivepeerSession and populate them right after parsing the job_info (src/scope/cloud/livepeer_app.py). - Thread user_id, session_id, manifest_id (as connection_id), and connection_info into FrameProcessor so every event it emits matches the wrapper's websocket_connected. - Explicitly publish session_created after FrameProcessor.start() and session_closed after stop(), mirroring the shape of the existing webrtc.py emissions — livepeer mode doesn't hit the WebRTC offer handler so this has to happen here. - Swap the pipeline/load body injection to use manifest_id instead of the runner's random internal UUID, so pipeline_loaded correlates too; pass connection_info along. - Allow NOMAD_DC / FAL_JOB_ID / FAL_RUNNER_ID / FAL_LOG_LABELS / FAL_MACHINE_TYPE through the runner subprocess env_allowlist so _build_connection_info() can reconstruct the same dict the wrapper uses. After this, ClickHouse queries filtered by user_id or connection_id (= manifest_id) see the full session lifecycle for livepeer mode, not just the two wrapper-layer events. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: emranemran --- src/scope/cloud/livepeer_app.py | 67 ++++++++++++++++++++++++++++- src/scope/cloud/livepeer_fal_app.py | 7 +++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index d75f985a5..54d0b91c5 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -44,6 +44,7 @@ from scope.server.app import app as scope_app from scope.server.app import lifespan as scope_lifespan from scope.server.frame_processor import FrameProcessor +from scope.server.kafka_publisher import publish_event from scope.server.media_packets import ensure_video_packet logger = logging.getLogger(__name__) @@ -113,6 +114,29 @@ class LivepeerSession: media_publishes: list[MediaPublish | None] = field(default_factory=list) user_id: str | None = None connection_id: str | None = None + manifest_id: str | None = None + session_id: str | None = None + connection_info: dict[str, Any] | None = None + + +def _build_connection_info() -> dict[str, Any]: + """Mirror the fal-wrapper's connection_info shape from env vars. + + Keeps the runner's events (stream_*, pipeline_*) carrying the same + fal-region / runner-id / log-labels the wrapper's websocket_connected + uses, so downstream consumers can correlate them. + """ + fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown") + try: + fal_log_labels = json.loads(fal_log_labels_raw) + except (json.JSONDecodeError, TypeError): + fal_log_labels = fal_log_labels_raw + return { + "gpu_type": os.getenv("FAL_MACHINE_TYPE", "GPU-H100"), + "fal_region": os.getenv("NOMAD_DC", "unknown"), + "fal_runner_id": os.getenv("FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown")), + "fal_log_labels": fal_log_labels, + } async def _shutdown_task( @@ -327,6 +351,16 @@ async def _stop_stream(session: LivepeerSession) -> None: if session.frame_processor is not None: session.frame_processor.stop() session.frame_processor = None + # Pair with the session_created emitted at stream start. + if session.session_id is not None: + publish_event( + event_type="session_closed", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + connection_info=session.connection_info, + ) + session.session_id = None if session.active_channels: channel_urls = [ch["url"] for ch in session.active_channels] @@ -704,7 +738,10 @@ async def _handle_api_request( "error": f"Failed restart websocket handshake: {exc}", } - # Pass through validated user_id for pipeline load requests. + # Pass through validated user_id and the orchestrator-provided + # manifest_id for pipeline load requests. Using manifest_id (not the + # runner's internal connection_id) so pipeline_loaded events correlate + # with the fal wrapper's websocket_connected.connection_id. if ( method == "POST" and normalized_path == "/api/v1/pipeline/load" @@ -712,7 +749,9 @@ async def _handle_api_request( and session.user_id ): body["user_id"] = session.user_id - body["connection_id"] = session.connection_id + body["connection_id"] = session.manifest_id or session.connection_id + if session.connection_info is not None: + body["connection_info"] = session.connection_info client = scope_client if client is None: @@ -1004,8 +1043,25 @@ async def _handle_control_message( "produces_video": produces_video, "produces_audio": produces_audio, }, + session_id=session.session_id, + user_id=session.user_id, + connection_id=session.manifest_id, + connection_info=session.connection_info, ) session.frame_processor.start() + # Emit session_created so livepeer-mode event streams match the + # cloud-relay shape (webrtc.py:731 fires this in relay mode; we + # mirror it here because livepeer doesn't go through the WebRTC + # offer handler). + publish_event( + event_type="session_created", + session_id=session.session_id, + connection_id=session.manifest_id, + pipeline_ids=pipeline_ids if pipeline_ids else None, + user_id=session.user_id, + metadata={"mode": "livepeer"}, + connection_info=session.connection_info, + ) session.media_stop_event.clear() session.active_channels = active_channels session.input_subscribe_urls = input_subscribe_urls @@ -1303,6 +1359,13 @@ async def websocket_endpoint(ws: WebSocket) -> None: # TODO move this into the top level request params.pop("daydream_user_id", None) session.user_id = user_id + # Persist fields needed downstream so Kafka events emitted by the + # inner pipeline_manager / FrameProcessor correlate with the fal + # wrapper's websocket_connected (which uses manifest_id as + # connection_id). + session.manifest_id = job_info.manifest_id + session.session_id = str(uuid.uuid4()) + session.connection_info = _build_connection_info() if not job_info.control_url: await ws.send_text( diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index d0a57ee4b..6134b0ec1 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -390,6 +390,13 @@ def setup(self): "KAFKA_TOPIC", "KAFKA_SASL_USERNAME", "KAFKA_SASL_PASSWORD", + # fal runtime metadata — used by the runner to build + # connection_info on Kafka events so they match the wrapper. + "NOMAD_DC", + "FAL_JOB_ID", + "FAL_RUNNER_ID", + "FAL_LOG_LABELS", + "FAL_MACHINE_TYPE", ] runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ} runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache")