From f3688aac81dc614c15d9015f239f257c041b5c1f Mon Sep 17 00:00:00 2001 From: Patrick Laing Date: Thu, 14 May 2026 10:17:17 -0700 Subject: [PATCH] small tweak to frame processor to fix NDI output in perform mode Signed-off-by: Patrick Laing --- src/scope/server/frame_processor.py | 12 +++++- tests/test_frame_processor_sinks.py | 57 +++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 tests/test_frame_processor_sinks.py diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ebfc0a002..94300b4d3 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -130,6 +130,10 @@ def __init__( self._pending_node_params: list[tuple[str, dict[str, Any]]] = [] # The processor whose output we read in graph mode (legacy get() path) self._sink_processor: PipelineProcessor | None = None + # First graph sink node. Its WebRTC output is the canonical stream used + # for generic perform-mode output_sinks when perform mode runs as a + # generated linear graph. + self._primary_sink_node_id: str | None = None # Source manager (sources, source queues, hardware input) self._source_manager = SourceManager() @@ -543,7 +547,10 @@ def get_packet_from_sink(self, sink_node_id: str) -> VideoPacket | None: return None packet = self.sink_manager.get_packet_from_sink(sink_node_id) if packet is not None: - self._frames_out += 1 + if sink_node_id == self._primary_sink_node_id: + self._on_frame_output(packet) + else: + self._frames_out += 1 return packet def get_from_sink(self, sink_node_id: str) -> torch.Tensor | None: @@ -1007,6 +1014,9 @@ def _setup_graph(self, graph): ) self._sink_processor = graph_run.sink_processor + self._primary_sink_node_id = ( + graph_run.output_node_ids[0] if graph_run.output_node_ids else None + ) self.pipeline_processors = graph_run.processors self.pipeline_ids = graph_run.pipeline_ids diff --git a/tests/test_frame_processor_sinks.py b/tests/test_frame_processor_sinks.py new file mode 100644 index 000000000..c2e5f8b26 --- /dev/null +++ b/tests/test_frame_processor_sinks.py @@ -0,0 +1,57 @@ +import numpy as np +import torch + +from scope.server.frame_processor import FrameProcessor +from scope.server.media_packets import VideoPacket + + +class _FakeSinkManager: + def __init__(self, packets: dict[str, VideoPacket]): + self._packets = dict(packets) + self.fanned_frames: list[np.ndarray] = [] + + @property + def has_generic_sinks(self) -> bool: + return True + + def get_packet_from_sink(self, sink_node_id: str) -> VideoPacket | None: + return self._packets.pop(sink_node_id, None) + + def fan_out_frame(self, frame_np: np.ndarray) -> None: + self.fanned_frames.append(frame_np.copy()) + + +def _make_frame_processor(primary_sink_node_id: str) -> FrameProcessor: + processor = object.__new__(FrameProcessor) + processor.running = True + processor._primary_sink_node_id = primary_sink_node_id + processor._frames_out = 0 + processor._playback_ready_emitted = True + return processor + + +def test_primary_graph_sink_feeds_generic_output_sinks(): + frame = torch.full((2, 3, 3), 127, dtype=torch.uint8) + sink_manager = _FakeSinkManager({"output": VideoPacket(tensor=frame)}) + processor = _make_frame_processor("output") + processor.sink_manager = sink_manager + + packet = processor.get_packet_from_sink("output") + + assert packet is not None + assert processor._frames_out == 1 + assert len(sink_manager.fanned_frames) == 1 + np.testing.assert_array_equal(sink_manager.fanned_frames[0], frame.numpy()) + + +def test_secondary_graph_sink_does_not_duplicate_generic_output_sinks(): + frame = torch.full((2, 3, 3), 255, dtype=torch.uint8) + sink_manager = _FakeSinkManager({"secondary": VideoPacket(tensor=frame)}) + processor = _make_frame_processor("output") + processor.sink_manager = sink_manager + + packet = processor.get_packet_from_sink("secondary") + + assert packet is not None + assert processor._frames_out == 1 + assert sink_manager.fanned_frames == []