diff --git a/videodb/collection.py b/videodb/collection.py index b7ed62e..fdf56e9 100644 --- a/videodb/collection.py +++ b/videodb/collection.py @@ -180,6 +180,8 @@ def connect_rtstream( ws_connection_id: str = None, ingest_mode: str = "pull", protocol: str = None, + wait: bool = True, + timeout: int = 180, ) -> RTStream: """Connect to an rtstream. @@ -201,6 +203,9 @@ def connect_rtstream( :param str ws_connection_id: WebSocket connection ID for receiving events (optional) :param str ingest_mode: ``"pull"`` (default) or ``"push"`` :param str protocol: Push protocol when ``ingest_mode="push"`` (e.g. ``"rtmp"``) + :param bool wait: For ``ingest_mode="push"``, block until the destination is + provisioned and ``push_url`` is available (default: True). Ignored for pull. + :param int timeout: Max seconds to wait for push provisioning (default: 180) :return: :class:`RTStream ` object """ if not name: @@ -246,7 +251,14 @@ def connect_rtstream( path=f"{ApiPath.rtstream}", data=data, ) - return RTStream(self._connection, **rtstream_data) + rtstream = RTStream(self._connection, **rtstream_data) + + # Push destinations are provisioned asynchronously server-side; poll until + # the push URL is available (or the stream fails) before returning. + if ingest_mode == "push" and wait: + rtstream.wait_until_ready(timeout=timeout) + + return rtstream def get_rtstream(self, id: str) -> RTStream: """Get an rtstream by its ID. diff --git a/videodb/rtstream.py b/videodb/rtstream.py index 0fbeea7..0765260 100644 --- a/videodb/rtstream.py +++ b/videodb/rtstream.py @@ -1,3 +1,5 @@ +import time + from typing import Optional, List, Dict, Any from videodb._constants import ( @@ -412,6 +414,41 @@ def __repr__(self) -> str: f"push_url={self.push_url})" ) + def refresh(self) -> "RTStream": + """Re-fetch this rtstream's state from the server (in place). + + :return: self + :rtype: :class:`RTStream ` + """ + data = self._connection.get(path=f"{ApiPath.rtstream}/{self.id}") + self.__init__(self._connection, **data) + return self + + def wait_until_ready(self, timeout: int = 180, poll_interval: int = 3) -> "RTStream": + """Poll until a push destination is provisioned (``push_url`` available). + + Used for ``ingest_mode="push"`` streams, which are provisioned asynchronously. + + :param int timeout: Max seconds to wait (default: 180) + :param int poll_interval: Seconds between polls (default: 3) + :return: self + :rtype: :class:`RTStream ` + :raises Exception: if provisioning fails + :raises TimeoutError: if not ready within ``timeout`` + """ + elapsed = 0 + while elapsed < timeout: + if self.push_url: + return self + if self.status == "failed": + raise Exception(f"RTStream {self.id} provisioning failed") + time.sleep(poll_interval) + elapsed += poll_interval + self.refresh() + raise TimeoutError( + f"Timed out waiting for push destination after {timeout}s (rtstream {self.id})" + ) + def start(self): """Connect to the rtstream.