Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion videodb/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def connect_rtstream(
ws_connection_id: str = None,
ingest_mode: str = "pull",
protocol: str = None,
wait: bool = True,
timeout: int = 180,
) -> RTStream:
"""Connect to an rtstream.

Expand All @@ -201,6 +203,9 @@ def connect_rtstream(
:param str ws_connection_id: WebSocket connection ID for receiving events (optional)
:param str ingest_mode: ``"pull"`` (default) or ``"push"``
:param str protocol: Push protocol when ``ingest_mode="push"`` (e.g. ``"rtmp"``)
:param bool wait: For ``ingest_mode="push"``, block until the destination is
provisioned and ``push_url`` is available (default: True). Ignored for pull.
:param int timeout: Max seconds to wait for push provisioning (default: 180)
:return: :class:`RTStream <RTStream>` object
"""
if not name:
Expand Down Expand Up @@ -246,7 +251,14 @@ def connect_rtstream(
path=f"{ApiPath.rtstream}",
data=data,
)
return RTStream(self._connection, **rtstream_data)
rtstream = RTStream(self._connection, **rtstream_data)

# Push destinations are provisioned asynchronously server-side; poll until
# the push URL is available (or the stream fails) before returning.
if ingest_mode == "push" and wait:
rtstream.wait_until_ready(timeout=timeout)

return rtstream

def get_rtstream(self, id: str) -> RTStream:
"""Get an rtstream by its ID.
Expand Down
37 changes: 37 additions & 0 deletions videodb/rtstream.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

from typing import Optional, List, Dict, Any

from videodb._constants import (
Expand Down Expand Up @@ -412,6 +414,41 @@ def __repr__(self) -> str:
f"push_url={self.push_url})"
)

def refresh(self) -> "RTStream":
"""Re-fetch this rtstream's state from the server (in place).

:return: self
:rtype: :class:`RTStream <RTStream>`
"""
data = self._connection.get(path=f"{ApiPath.rtstream}/{self.id}")
self.__init__(self._connection, **data)
return self

def wait_until_ready(self, timeout: int = 180, poll_interval: int = 3) -> "RTStream":
"""Poll until a push destination is provisioned (``push_url`` available).

Used for ``ingest_mode="push"`` streams, which are provisioned asynchronously.

:param int timeout: Max seconds to wait (default: 180)
:param int poll_interval: Seconds between polls (default: 3)
:return: self
:rtype: :class:`RTStream <RTStream>`
:raises Exception: if provisioning fails
:raises TimeoutError: if not ready within ``timeout``
"""
elapsed = 0
while elapsed < timeout:
if self.push_url:
return self
if self.status == "failed":
raise Exception(f"RTStream {self.id} provisioning failed")
time.sleep(poll_interval)
elapsed += poll_interval
self.refresh()
raise TimeoutError(
f"Timed out waiting for push destination after {timeout}s (rtstream {self.id})"
)

def start(self):
"""Connect to the rtstream.

Expand Down
Loading