Skip to content

Memory leak: FfiQueue broadcasts all events to all subscribers without filtering #563

@Hormold

Description

@Hormold

Summary

When using multiple AudioStream instances (e.g., for multi-participant transcription), memory grows continuously because FfiQueue broadcasts ALL FFI events to ALL subscribers. Each subscriber creates objects for every event before filtering, causing memory accumulation.

Environment

  • livekit-rtc: latest
  • Python: 3.13
  • Use case: Real-time transcription agent processing multiple participants

Reproduction

  1. Create an agent that subscribes to audio from multiple participants
  2. For each participant, create rtc.AudioStream(track)
  3. Run for 30+ minutes with active audio

Each AudioStream.__init__ calls:

self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)

Observed Behavior

Memory grows ~12-15 MB/min per active AudioStream. In a 30-min session with 4 participants:

Start: RSS=312 MB, FFI events=0
End: RSS=1291 MB, FFI events=903,154

Tracemalloc shows accumulation at:

  • _ffi_client.py:123 — loop.call_soon_threadsafe(queue.put_nowait, item)
  • _ffi_client.py:151 — proto_ffi.FfiEvent()
  • asyncio/base_events.py:853 — events.Handle()

Root Cause

In _ffi_client.py:

  class FfiQueue(Generic[T]):
      def put(self, item: T) -> None:
          with self._lock:
              for queue, loop in self._subscribers:
                  loop.call_soon_threadsafe(queue.put_nowait, item)  # ALL events to ALL subscribers

Each call_soon_threadsafe() allocates:

  • asyncio.Handle object
  • contextvars.copy_context()
  • Queue item

With N subscribers, every FFI event creates N × objects. AudioStream filters with wait_for(predicate) but objects are already allocated.

Proposed Solution

Add event-type filtering to subscribe():

  def subscribe(
      self,
      loop: Optional[asyncio.AbstractEventLoop] = None,
      event_types: set[str] | None = None  # e.g., {"audio_stream_event"}
  ) -> Queue[T]:
      ...

  def put(self, item: T) -> None:
      which = item.WhichOneof("message")
      with self._lock:
          for queue, loop, event_types in self._subscribers:
              if event_types is None or which in event_types:
                  loop.call_soon_threadsafe(queue.put_nowait, item)

Then AudioStream subscribes with:

  self._ffi_queue = FfiClient.instance.queue.subscribe(
      self._loop,
      event_types={"audio_stream_event"}
  )

Impact

  • Long-running agents hit OOM (with 6GB limit: ~3.5 hours)
  • Event loop lag grows with memory (60s timer takes 80s by end of session)
  • User-visible latency in real-time features

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions