Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions examples/room_manager/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ def get_room_query():

@app.post("/api/rooms/webhook")
def webhook():
notification = receive_binary(request.data)
message = receive_binary(request.data)

if notification:
room_service.handle_notification(notification)
if message:
notifications = message if isinstance(message, list) else [message]

for notification in notifications:
room_service.handle_notification(notification)

return "Webhook Notification Received", 200
52 changes: 44 additions & 8 deletions fishjam/_webhook_notifier.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
"""Module for decoding received webhook notifications from Fishjam."""

from typing import Union
from typing import List, Union

import betterproto

from fishjam.events._protos.fishjam import ServerMessage
from fishjam.events._protos.fishjam import (
ServerMessage,
ServerMessageNotificationBatch,
)
from fishjam.events.allowed_notifications import (
ALLOWED_NOTIFICATIONS,
AllowedNotification,
)


def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
def _content_of(message: ServerMessage) -> Union[AllowedNotification, None]:
"""Return the message's `content` oneof if it is a supported notification."""
_which, content = betterproto.which_one_of(message, "content")
if isinstance(content, ALLOWED_NOTIFICATIONS):
return content
return None


def _unpack_batch(
batch: ServerMessageNotificationBatch,
) -> List[AllowedNotification]:
"""Flatten a notification batch into its supported notifications, in order.

Members whose content is not a supported notification are skipped.

Returns:
list[AllowedNotification]: The supported notifications from the batch.
"""
notifications = []
for message in batch.notifications:
notification = _content_of(message)
if notification is not None:
notifications.append(notification)
return notifications


def receive_binary(
binary: bytes,
) -> Union[AllowedNotification, List[AllowedNotification], None]:
"""Transforms a received protobuf notification into a notification instance.
Comment on lines +43 to 46

The available notifications are listed in `fishjam.events` module.
Expand All @@ -20,13 +51,18 @@ def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
binary: The raw binary data received from the webhook.

Returns:
AllowedNotification | None: The parsed notification object, or None if
the message type is not supported.
AllowedNotification: A single notification when the payload carries one.
list[AllowedNotification]: The unpacked notifications, in order, when the
payload is a batch (webhook batching enabled).
None: When the payload is not a supported notification.
"""
message = ServerMessage().parse(binary)
_which, message = betterproto.which_one_of(message, "content")
_which, content = betterproto.which_one_of(message, "content")

if isinstance(content, ServerMessageNotificationBatch):
return _unpack_batch(content)

if isinstance(message, ALLOWED_NOTIFICATIONS):
return message
if isinstance(content, ALLOWED_NOTIFICATIONS):
return content

return None
6 changes: 6 additions & 0 deletions fishjam/api/_fishjam_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class RoomOptions:
room_type: The use-case of the room. If not provided, this defaults
to conference.
public: True if livestream viewers can omit specifying a token.
batch_webhook_notifications: If true, webhook notifications for this room
are coalesced into a single NotificationBatch per HTTP send instead
of one request per notification.
Comment thread
roznawsk marked this conversation as resolved.
"""

max_peers: int | None = None
Expand All @@ -107,6 +110,8 @@ class RoomOptions:
"""The use-case of the room. If not provided, this defaults to conference."""
public: bool = False
"""True if livestream viewers can omit specifying a token."""
batch_webhook_notifications: bool = False
"""Coalesce webhook notifications into a single NotificationBatch per send."""
Comment thread
roznawsk marked this conversation as resolved.


@dataclass
Expand Down Expand Up @@ -309,6 +314,7 @@ def create_room(self, options: RoomOptions | None = None) -> Room:
webhook_url=options.webhook_url,
room_type=RoomType(options.room_type),
public=options.public,
batch_webhook_notifications=options.batch_webhook_notifications,
)

room = cast(
Expand Down
6 changes: 4 additions & 2 deletions tests/support/webhook_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ def respond_root():
data = request.get_data()
msg = receive_binary(data)
if msg is not None:
for q in QUEUES.values():
q.put(msg)
notifications = msg if isinstance(msg, list) else [msg]
for notification in notifications:
for q in QUEUES.values():
q.put(notification)

return Response(status=200)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_allowed_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"ServerMessageTrackForwarding",
"ServerMessageTrackForwardingRemoved",
"ServerMessageVadNotification",
# Webhook-only transport wrapper; the WebSocket notifier never receives it.
# Transport wrapper, not a user-facing event: `receive_binary` unpacks it
# into the individual notifications it carries.
"ServerMessageNotificationBatch",
# Deprecated in the proto.
"ServerMessageStreamConnected",
Expand Down
30 changes: 30 additions & 0 deletions tests/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,36 @@ async def test_peer_connected_room_deleted(

self.assert_webhook_events(event_checks, event_queue, room.id)

@pytest.mark.asyncio
async def test_batched_webhook_notifications(
self, room_api: FishjamClient, event_queue
):
event_checks = [
ServerMessageRoomCreated,
ServerMessagePeerAdded,
ServerMessagePeerConnected,
ServerMessagePeerDisconnected,
ServerMessagePeerDeleted,
ServerMessageRoomDeleted,
]

options = RoomOptions(webhook_url=WEBHOOK_URL, batch_webhook_notifications=True)
room = room_api.create_room(options=options)
peer, token = room_api.create_peer(room.id)
Comment thread
roznawsk marked this conversation as resolved.

peer_socket = PeerSocket(fishjam_url=FISHJAM_ID)
peer_socket_task = asyncio.ensure_future(peer_socket.connect(token))
try:
await peer_socket.wait_ready()

room_api.delete_peer(room.id, peer.id)
room_api.delete_room(room.id)
finally:
peer_socket_task.cancel()
await asyncio.gather(peer_socket_task, return_exceptions=True)

self.assert_webhook_events(event_checks, event_queue, room.id)

def assert_webhook_events(self, event_checks, event_queue, room_id, timeout=60):
deadline = time.monotonic() + timeout
received = []
Expand Down
89 changes: 89 additions & 0 deletions tests/test_webhook_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from fishjam import receive_binary
from fishjam.events import (
ServerMessagePeerConnected,
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
)
from fishjam.events._protos.fishjam import (
ServerMessage,
ServerMessageAuthRequest,
ServerMessageNotificationBatch,
)


def test_single_notification_is_returned_unwrapped():
binary = bytes(ServerMessage(room_created=ServerMessageRoomCreated(room_id="r1")))

result = receive_binary(binary)

assert isinstance(result, ServerMessageRoomCreated)
assert result.room_id == "r1"


def test_unsupported_single_message_returns_none():
binary = bytes(ServerMessage(auth_request=ServerMessageAuthRequest(token="t")))

assert receive_binary(binary) is None


def test_batch_is_unpacked_into_ordered_list():
binary = bytes(
ServerMessage(
notification_batch=ServerMessageNotificationBatch(
notifications=[
ServerMessage(room_created=ServerMessageRoomCreated(room_id="r1")),
ServerMessage(
peer_connected=ServerMessagePeerConnected(
room_id="r1", peer_id="p1"
)
),
ServerMessage(room_deleted=ServerMessageRoomDeleted(room_id="r1")),
]
)
)
)

result = receive_binary(binary)

assert isinstance(result, list)
assert [type(n) for n in result] == [
ServerMessageRoomCreated,
ServerMessagePeerConnected,
ServerMessageRoomDeleted,
]
assert result[0].room_id == "r1"
assert result[1].peer_id == "p1"


def test_batch_filters_out_unsupported_members():
binary = bytes(
ServerMessage(
notification_batch=ServerMessageNotificationBatch(
notifications=[
ServerMessage(room_created=ServerMessageRoomCreated(room_id="r1")),
ServerMessage(auth_request=ServerMessageAuthRequest(token="t")),
ServerMessage(room_deleted=ServerMessageRoomDeleted(room_id="r1")),
]
)
)
)

result = receive_binary(binary)

assert isinstance(result, list)
assert [type(n) for n in result] == [
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
]


def test_empty_batch_returns_empty_list():
binary = bytes(
ServerMessage(
notification_batch=ServerMessageNotificationBatch(notifications=[])
)
)

result = receive_binary(binary)

assert result == []
Loading
Loading