From 86c1f7a5b012c27c387c3a5c3f7da7f9e8ffe706 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:37:25 +0000 Subject: [PATCH 01/74] Added AudioStream library for robot test to be able to simulate stream --- .../advanced_omi_backend/clients/__init__.py | 11 + .../clients/audio_stream_client.py | 556 ++++++++++++++++++ tests/libs/__init__.py | 1 + tests/libs/audio_stream_library.py | 126 ++++ 4 files changed, 694 insertions(+) create mode 100644 backends/advanced/src/advanced_omi_backend/clients/__init__.py create mode 100644 backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py create mode 100644 tests/libs/__init__.py create mode 100644 tests/libs/audio_stream_library.py diff --git a/backends/advanced/src/advanced_omi_backend/clients/__init__.py b/backends/advanced/src/advanced_omi_backend/clients/__init__.py new file mode 100644 index 00000000..099f3c45 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/clients/__init__.py @@ -0,0 +1,11 @@ +"""Client implementations for Friend-Lite backend. + +This module provides reusable client implementations that can be used for: +- Integration testing +- CLI tools +- External integrations +""" + +from advanced_omi_backend.clients.audio_stream_client import AudioStreamClient + +__all__ = ["AudioStreamClient"] diff --git a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py new file mode 100644 index 00000000..af89fd51 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py @@ -0,0 +1,556 @@ +"""WebSocket client for audio streaming using Wyoming protocol. + +This client mirrors the protocol implementation in websocket_controller.py +and can be used for integration testing and external integrations. + +Protocol flow: +1. Connect to WebSocket with token and device_name +2. Receive "ready" message from server (PCM endpoint only) +3. Send "audio-start" with format and mode +4. Send audio chunks (Wyoming protocol or raw binary) +5. Send "audio-stop" to finalize session + +Example usage (blocking): + ```python + import asyncio + from advanced_omi_backend.clients import AudioStreamClient + + async def main(): + client = AudioStreamClient("http://localhost:8000", "your-jwt-token") + await client.connect() + await client.stream_wav_file("/path/to/audio.wav") + await client.close() + + asyncio.run(main()) + ``` + +Example usage (non-blocking for testing): + ```python + from advanced_omi_backend.clients.audio_stream_client import StreamManager + + manager = StreamManager() + stream_id = manager.start_stream("http://localhost:8000", "token", "device") + manager.send_chunks_from_file(stream_id, "/path/to/audio.wav", num_chunks=10) + # ... do other things while stream is open ... + manager.stop_stream(stream_id) + ``` +""" + +import asyncio +import json +import logging +import threading +import uuid +import wave +from pathlib import Path +from typing import Dict, Optional, Union + +import websockets +from websockets.client import WebSocketClientProtocol + +from advanced_omi_backend.constants import OMI_CHANNELS, OMI_SAMPLE_RATE, OMI_SAMPLE_WIDTH + +logger = logging.getLogger(__name__) + + +class AudioStreamClient: + """WebSocket client for streaming audio using Wyoming protocol. + + This client implements the same protocol as the server expects in + websocket_controller.py, ensuring consistency between client and server. + """ + + def __init__( + self, + base_url: str, + token: str, + device_name: str = "python-client", + endpoint: str = "ws_pcm", + ): + """Initialize the audio stream client. + + Args: + base_url: Base URL of the backend (e.g., "http://localhost:8000") + token: JWT authentication token + device_name: Device name for client identification + endpoint: WebSocket endpoint ("ws_pcm" or "ws_omi") + """ + self.base_url = base_url + self.token = token + self.device_name = device_name + self.endpoint = endpoint + self.ws: Optional[WebSocketClientProtocol] = None + self.chunk_count = 0 + self.total_bytes = 0 + + @property + def ws_url(self) -> str: + """Build WebSocket URL from base URL.""" + url = self.base_url.replace("http://", "ws://").replace("https://", "wss://") + return f"{url}/{self.endpoint}?token={self.token}&device_name={self.device_name}" + + async def connect(self, wait_for_ready: bool = True) -> WebSocketClientProtocol: + """Connect to the WebSocket endpoint. + + Args: + wait_for_ready: If True, wait for "ready" message from server (PCM endpoint) + + Returns: + The WebSocket connection + + Raises: + RuntimeError: If connection fails or ready message not received + """ + logger.info(f"Connecting to {self.ws_url}") + self.ws = await websockets.connect(self.ws_url) + logger.info("WebSocket connected") + + if wait_for_ready and self.endpoint == "ws_pcm": + # PCM endpoint sends "ready" message after auth (line 261-268 in websocket_controller.py) + ready_msg = await self.ws.recv() + ready = json.loads(ready_msg.strip() if isinstance(ready_msg, str) else ready_msg.decode().strip()) + if ready.get("type") != "ready": + raise RuntimeError(f"Expected 'ready' message, got: {ready}") + logger.info("Received ready message from server") + + return self.ws + + async def send_audio_start( + self, + recording_mode: str = "streaming", + sample_rate: int = OMI_SAMPLE_RATE, + sample_width: int = OMI_SAMPLE_WIDTH, + channels: int = OMI_CHANNELS, + ) -> None: + """Send Wyoming audio-start event. + + Args: + recording_mode: "streaming" or "batch" + sample_rate: Audio sample rate in Hz (default: 16000) + sample_width: Bytes per sample (default: 2 for 16-bit) + channels: Number of audio channels (default: 1) + + Note: + The mode is inside the "data" dict, matching _handle_audio_session_start + in websocket_controller.py (line 618). + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = { + "type": "audio-start", + "data": { + "rate": sample_rate, + "width": sample_width, + "channels": channels, + "mode": recording_mode, + }, + "payload_length": None, + } + await self.ws.send(json.dumps(header) + "\n") + logger.info(f"Sent audio-start with mode={recording_mode}") + + async def send_audio_chunk_wyoming( + self, + audio_data: bytes, + sample_rate: int = OMI_SAMPLE_RATE, + sample_width: int = OMI_SAMPLE_WIDTH, + channels: int = OMI_CHANNELS, + ) -> None: + """Send audio chunk using Wyoming protocol (JSON header + binary payload). + + This matches the handler at lines 979-1007 in websocket_controller.py. + + Args: + audio_data: Raw PCM audio bytes + sample_rate: Audio sample rate in Hz + sample_width: Bytes per sample + channels: Number of audio channels + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = { + "type": "audio-chunk", + "payload_length": len(audio_data), + "data": { + "rate": sample_rate, + "width": sample_width, + "channels": channels, + }, + } + # Send JSON header followed by binary payload + await self.ws.send(json.dumps(header) + "\n") + await self.ws.send(audio_data) + + self.chunk_count += 1 + self.total_bytes += len(audio_data) + + if self.chunk_count <= 3 or self.chunk_count % 100 == 0: + logger.debug(f"Sent audio chunk #{self.chunk_count}: {len(audio_data)} bytes") + + async def send_audio_chunk_raw(self, audio_data: bytes) -> None: + """Send raw binary audio without Wyoming header (legacy mode). + + This matches the handler at lines 1016-1035 in websocket_controller.py. + + Args: + audio_data: Raw PCM audio bytes + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + await self.ws.send(audio_data) + + self.chunk_count += 1 + self.total_bytes += len(audio_data) + + async def send_audio_stop(self) -> None: + """Send Wyoming audio-stop event to finalize the session.""" + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = {"type": "audio-stop"} + await self.ws.send(json.dumps(header) + "\n") + logger.info(f"Sent audio-stop (total: {self.chunk_count} chunks, {self.total_bytes} bytes)") + + async def send_ping(self) -> None: + """Send keepalive ping.""" + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = {"type": "ping", "payload_length": None} + await self.ws.send(json.dumps(header) + "\n") + logger.debug("Sent ping") + + async def stream_wav_file( + self, + wav_path: Union[str, Path], + chunk_duration_ms: int = 100, + use_wyoming: bool = True, + recording_mode: str = "streaming", + realtime_factor: float = 0.1, + ) -> int: + """Stream a WAV file in chunks, simulating real-time audio. + + Args: + wav_path: Path to the WAV file + chunk_duration_ms: Duration of each chunk in milliseconds + use_wyoming: If True, use Wyoming protocol; if False, send raw binary + recording_mode: "streaming" or "batch" + realtime_factor: Fraction of real-time to simulate (0.1 = 10x speed) + + Returns: + Number of chunks sent + """ + wav_path = Path(wav_path) + if not wav_path.exists(): + raise FileNotFoundError(f"WAV file not found: {wav_path}") + + with wave.open(str(wav_path), "rb") as wav: + sample_rate = wav.getframerate() + channels = wav.getnchannels() + sample_width = wav.getsampwidth() + + logger.info( + f"Streaming {wav_path.name}: {sample_rate}Hz, {channels}ch, {sample_width * 8}-bit" + ) + + # Calculate chunk size + bytes_per_sample = sample_width * channels + samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) + + # Send audio-start + await self.send_audio_start( + recording_mode=recording_mode, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + + # Reset counters + self.chunk_count = 0 + self.total_bytes = 0 + + # Stream chunks + while True: + chunk = wav.readframes(samples_per_chunk) + if not chunk: + break + + if use_wyoming: + await self.send_audio_chunk_wyoming( + chunk, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + else: + await self.send_audio_chunk_raw(chunk) + + # Simulate real-time delay + if realtime_factor > 0: + await asyncio.sleep(chunk_duration_ms / 1000 * realtime_factor) + + # Send audio-stop + await self.send_audio_stop() + + logger.info(f"Finished streaming: {self.chunk_count} chunks, {self.total_bytes} bytes") + return self.chunk_count + + async def close(self) -> None: + """Close the WebSocket connection.""" + if self.ws: + await self.ws.close() + self.ws = None + logger.info("WebSocket connection closed") + + async def __aenter__(self) -> "AudioStreamClient": + """Async context manager entry.""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + """Async context manager exit.""" + await self.close() + + +# Synchronous wrapper for Robot Framework and other sync contexts +def stream_audio_file( + base_url: str, + token: str, + wav_path: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", + use_wyoming: bool = True, +) -> int: + """Synchronous wrapper for streaming audio file. + + This function is designed for use with Robot Framework or other + synchronous test frameworks. + + Args: + base_url: Base URL of the backend + token: JWT authentication token + wav_path: Path to WAV file + device_name: Device name for client identification + recording_mode: "streaming" or "batch" + use_wyoming: If True, use Wyoming protocol + + Returns: + Number of chunks sent + """ + + async def _run() -> int: + async with AudioStreamClient(base_url, token, device_name) as client: + return await client.stream_wav_file( + wav_path, + use_wyoming=use_wyoming, + recording_mode=recording_mode, + ) + + return asyncio.run(_run()) + + +class StreamSession: + """Holds state for an active streaming session.""" + + def __init__( + self, + stream_id: str, + client: AudioStreamClient, + loop: asyncio.AbstractEventLoop, + thread: threading.Thread, + ): + self.stream_id = stream_id + self.client = client + self.loop = loop + self.thread = thread + self.connected = False + self.audio_started = False + self.chunk_count = 0 + self.error: Optional[str] = None + + +class StreamManager: + """Manages multiple non-blocking audio streams for testing. + + This allows tests to start a stream, perform checks while streaming, + and then stop the stream - mimicking real client behavior. + + Example: + manager = StreamManager() + stream_id = manager.start_stream(base_url, token, "test-device") + manager.send_chunks_from_file(stream_id, "audio.wav", num_chunks=10) + # ... check jobs, verify state ... + manager.stop_stream(stream_id) + """ + + def __init__(self): + self._sessions: Dict[str, StreamSession] = {} + + def start_stream( + self, + base_url: str, + token: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", + ) -> str: + """Start a new audio stream (non-blocking). + + Args: + base_url: Backend URL + token: JWT token + device_name: Device name for client ID + recording_mode: "streaming" or "batch" + + Returns: + stream_id: Unique ID for this stream session + """ + stream_id = str(uuid.uuid4())[:8] + + # Create event loop for this stream's thread + loop = asyncio.new_event_loop() + + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + thread = threading.Thread(target=run_loop, daemon=True) + thread.start() + + # Create client + client = AudioStreamClient(base_url, token, device_name) + + session = StreamSession(stream_id, client, loop, thread) + self._sessions[stream_id] = session + + # Connect and send audio-start + async def _connect_and_start(): + try: + await client.connect() + session.connected = True + await client.send_audio_start(recording_mode=recording_mode) + session.audio_started = True + logger.info(f"Stream {stream_id} started for {device_name}") + except Exception as e: + session.error = str(e) + logger.error(f"Stream {stream_id} failed to start: {e}") + + future = asyncio.run_coroutine_threadsafe(_connect_and_start(), loop) + future.result(timeout=10) # Wait for connection + + if session.error: + raise RuntimeError(f"Failed to start stream: {session.error}") + + return stream_id + + def send_chunks_from_file( + self, + stream_id: str, + wav_path: str, + num_chunks: Optional[int] = None, + chunk_duration_ms: int = 100, + realtime_pacing: bool = False, + ) -> int: + """Send audio chunks from a WAV file. + + Args: + stream_id: Stream session ID + wav_path: Path to WAV file + num_chunks: Number of chunks to send (None = all) + chunk_duration_ms: Duration per chunk in ms + realtime_pacing: If True, sleep between chunks to simulate real-time streaming + + Returns: + Number of chunks sent + """ + session = self._sessions.get(stream_id) + if not session: + raise ValueError(f"Unknown stream_id: {stream_id}") + + if not session.audio_started: + raise RuntimeError("Stream not started") + + wav_path = Path(wav_path) + if not wav_path.exists(): + raise FileNotFoundError(f"WAV file not found: {wav_path}") + + async def _send_chunks() -> int: + with wave.open(str(wav_path), "rb") as wav: + sample_rate = wav.getframerate() + channels = wav.getnchannels() + sample_width = wav.getsampwidth() + + samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) + chunks_sent = 0 + chunk_duration_seconds = chunk_duration_ms / 1000.0 + + while True: + if num_chunks is not None and chunks_sent >= num_chunks: + break + + chunk = wav.readframes(samples_per_chunk) + if not chunk: + break + + await session.client.send_audio_chunk_wyoming( + chunk, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + chunks_sent += 1 + session.chunk_count += 1 + + # Optional: Sleep to maintain real-time pacing + if realtime_pacing: + await asyncio.sleep(chunk_duration_seconds) + + return chunks_sent + + future = asyncio.run_coroutine_threadsafe(_send_chunks(), session.loop) + return future.result(timeout=60) + + def stop_stream(self, stream_id: str) -> int: + """Stop a stream and close the connection. + + Args: + stream_id: Stream session ID + + Returns: + Total chunks sent during this session + """ + session = self._sessions.get(stream_id) + if not session: + raise ValueError(f"Unknown stream_id: {stream_id}") + + async def _stop(): + if session.audio_started: + await session.client.send_audio_stop() + await session.client.close() + + future = asyncio.run_coroutine_threadsafe(_stop(), session.loop) + future.result(timeout=10) + + # Stop the event loop + session.loop.call_soon_threadsafe(session.loop.stop) + session.thread.join(timeout=5) + + total_chunks = session.chunk_count + del self._sessions[stream_id] + + logger.info(f"Stream {stream_id} stopped, sent {total_chunks} chunks") + return total_chunks + + def get_session(self, stream_id: str) -> Optional[StreamSession]: + """Get session info for a stream.""" + return self._sessions.get(stream_id) + + def cleanup_all(self): + """Stop all active streams.""" + for stream_id in list(self._sessions.keys()): + try: + self.stop_stream(stream_id) + except Exception as e: + logger.warning(f"Error stopping stream {stream_id}: {e}") diff --git a/tests/libs/__init__.py b/tests/libs/__init__.py new file mode 100644 index 00000000..6a9fd0f3 --- /dev/null +++ b/tests/libs/__init__.py @@ -0,0 +1 @@ +"""Robot Framework test libraries.""" diff --git a/tests/libs/audio_stream_library.py b/tests/libs/audio_stream_library.py new file mode 100644 index 00000000..7c2ddcee --- /dev/null +++ b/tests/libs/audio_stream_library.py @@ -0,0 +1,126 @@ +"""Robot Framework library for audio streaming via WebSocket. + +This library wraps the AudioStreamClient from advanced_omi_backend.clients +for use in Robot Framework tests. + +Usage in Robot Framework: + Library ../libs/audio_stream_library.py + + # Blocking mode (streams entire file) + Stream Audio File base_url=http://localhost:8000 token=${TOKEN} + ... wav_path=/path/to/audio.wav device_name=robot-test + + # Non-blocking mode (for testing during stream) + ${stream_id}= Start Audio Stream http://localhost:8000 ${TOKEN} device-name + Send Audio Chunks ${stream_id} /path/to/audio.wav num_chunks=10 + # ... perform checks while stream is open ... + Stop Audio Stream ${stream_id} +""" + +import sys +from pathlib import Path +from typing import Optional + +# Add the backend src to path so we can import the client +backend_src = Path(__file__).parent.parent.parent / "backends" / "advanced" / "src" +if str(backend_src) not in sys.path: + sys.path.insert(0, str(backend_src)) + +from advanced_omi_backend.clients import AudioStreamClient +from advanced_omi_backend.clients.audio_stream_client import StreamManager, stream_audio_file as _stream_audio_file + +# Module-level manager for non-blocking streams +_manager = StreamManager() + + +# ============================================================================= +# Blocking Mode (simple, streams entire file) +# ============================================================================= + +def stream_audio_file( + base_url: str, + token: str, + wav_path: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", + use_wyoming: bool = True, +) -> int: + """Stream a WAV file via WebSocket (blocking).""" + return _stream_audio_file( + base_url=base_url, + token=token, + wav_path=wav_path, + device_name=device_name, + recording_mode=recording_mode, + use_wyoming=use_wyoming, + ) + + +# ============================================================================= +# Non-blocking Mode (for testing during stream) +# ============================================================================= + +def start_audio_stream( + base_url: str, + token: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", +) -> str: + """Start a new audio stream (non-blocking).""" + return _manager.start_stream( + base_url=base_url, + token=token, + device_name=device_name, + recording_mode=recording_mode, + ) + + +def send_audio_chunks( + stream_id: str, + wav_path: str, + num_chunks: Optional[int] = None, + chunk_duration_ms: int = 100, + realtime_pacing: bool = False, +) -> int: + """Send audio chunks from a WAV file to an open stream. + + Args: + stream_id: Stream session ID + wav_path: Path to WAV file + num_chunks: Number of chunks to send (None = all) + chunk_duration_ms: Duration per chunk in ms + realtime_pacing: If True, sleep between chunks to simulate real-time streaming + + Returns: + Number of chunks sent + """ + return _manager.send_chunks_from_file( + stream_id=stream_id, + wav_path=wav_path, + num_chunks=num_chunks, + chunk_duration_ms=chunk_duration_ms, + realtime_pacing=realtime_pacing, + ) + + +def stop_audio_stream(stream_id: str) -> int: + """Stop an audio stream and close the connection.""" + return _manager.stop_stream(stream_id) + + +def cleanup_all_streams(): + """Stop all active streams.""" + _manager.cleanup_all() + + +# ============================================================================= +# Advanced Usage +# ============================================================================= + +def get_audio_stream_client( + base_url: str, + token: str, + device_name: str = "robot-test", +) -> AudioStreamClient: + """Get an AudioStreamClient instance for advanced usage.""" + return AudioStreamClient(base_url, token, device_name) From b11729118ab994311e14c403a7fc6076bb0809da Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:38:02 +0000 Subject: [PATCH 02/74] removed * which breaks windows --- ...e_tests.robot => client_queue_tests.robot} | 80 +++++++------------ 1 file changed, 30 insertions(+), 50 deletions(-) rename tests/endpoints/{*client_queue_tests.robot => client_queue_tests.robot} (69%) diff --git a/tests/endpoints/*client_queue_tests.robot b/tests/endpoints/client_queue_tests.robot similarity index 69% rename from tests/endpoints/*client_queue_tests.robot rename to tests/endpoints/client_queue_tests.robot index ace3588a..3ea5bac4 100644 --- a/tests/endpoints/*client_queue_tests.robot +++ b/tests/endpoints/client_queue_tests.robot @@ -2,8 +2,8 @@ Documentation Client and Queue Management API Tests Library RequestsLibrary Library Collections -Resource ../resources/setup_resources.robot -Resource ../resources/session_resources.robot +Resource ../setup/setup_keywords.robot +Resource ../setup/teardown_keywords.robot Resource ../resources/user_resources.robot Suite Setup Suite Setup Suite Teardown Delete All Sessions @@ -12,7 +12,7 @@ Suite Teardown Delete All Sessions Get Active Clients Test [Documentation] Test getting active client information - [Tags] client active positive + [Tags] client active positive speed-fast Create API Session admin_session ${response}= GET On Session admin_session /api/clients/active @@ -30,7 +30,7 @@ Get Active Clients Test Get Queue Jobs Test [Documentation] Test getting queue jobs with pagination - [Tags] queue jobs positive + [Tags] queue jobs positive speed-fast Create API Session admin_session &{params}= Create Dictionary limit=20 offset=0 @@ -52,7 +52,7 @@ Get Queue Jobs Test Get Queue Jobs With Different Limits Test [Documentation] Test queue jobs pagination with different limits - [Tags] queue jobs pagination positive + [Tags] queue jobs pagination positive speed-fast Get Anonymous Session anon_session Create API Session admin_session @@ -77,53 +77,45 @@ Get Queue Jobs With Different Limits Test Get Queue Statistics Test [Documentation] Test getting queue statistics - [Tags] queue statistics positive - Get Anonymous Session anon_session - - Create API Session admin_session + [Tags] queue statistics positive speed-fast - ${response}= GET On Session admin_session /api/queue/stats + ${response}= GET On Session api /api/queue/stats Should Be Equal As Integers ${response.status_code} 200 ${stats}= Set Variable ${response.json()} - Dictionary Should Contain Key ${stats} queued - Dictionary Should Contain Key ${stats} processing - Dictionary Should Contain Key ${stats} completed - Dictionary Should Contain Key ${stats} failed + Dictionary Should Contain Key ${stats} queued_jobs + Dictionary Should Contain Key ${stats} processing_jobs + Dictionary Should Contain Key ${stats} completed_jobs + Dictionary Should Contain Key ${stats} failed_jobs + Dictionary Should Contain Key ${stats} total_jobs + Dictionary Should Contain Key ${stats} cancelled_jobs + Dictionary Should Contain Key ${stats} deferred_jobs - # All counts should be non-negative - Should Be True ${stats}[queued] >= 0 - Should Be True ${stats}[processing] >= 0 - Should Be True ${stats}[completed] >= 0 - Should Be True ${stats}[failed] >= 0 Get Queue Health Test [Documentation] Test getting queue health status - [Tags] queue health positive - Get Anonymous Session anon_session + [Tags] queue health positive speed-fast - Create API Session admin_session - ${response}= GET On Session admin_session /api/queue/health + ${response}= GET On Session api /api/queue/worker-details Should Be Equal As Integers ${response.status_code} 200 ${health}= Set Variable ${response.json()} - Dictionary Should Contain Key ${health} status - Dictionary Should Contain Key ${health} worker_running - Dictionary Should Contain Key ${health} message + Dictionary Should Contain Key ${health} workers + Dictionary Should Contain Key ${health} redis_connection # Status should be one of expected values - Should Be True '${health}[status]' in ['healthy', 'stopped', 'unhealthy'] + Should Be True '${health}[redis_connection]' in ['healthy', 'stopped', 'unhealthy'] Queue Jobs User Isolation Test [Documentation] Test that regular users only see their own queue jobs - [Tags] queue security isolation + [Tags] queue security isolation speed-fast Get Anonymous Session anon_session Create API Session admin_session # Create a test user - ${test_user}= Create Test User admin_session test-user-${RANDOM_ID}@example.com test-password-123 - Create API Session user_session email=test-user-${RANDOM_ID}@example.com password=test-password-123 + ${test_user}= Create Test User admin_session + Create API Session user_session email=${test_user}[email] password=${TEST_USER_PASSWORD} # Get user's jobs (should be filtered to their user_id) ${response}= GET On Session user_session /api/queue/jobs @@ -140,11 +132,11 @@ Queue Jobs User Isolation Test END # Cleanup - Delete Test User ${test_user}[user_id] + Delete User admin_session ${test_user}[id] Invalid Queue Parameters Test [Documentation] Test queue endpoints with invalid parameters - [Tags] queue negative validation + [Tags] queue negative validation speed-fast Get Anonymous Session anon_session Create API Session admin_session @@ -166,41 +158,29 @@ Invalid Queue Parameters Test Unauthorized Client Access Test [Documentation] Test that client endpoints require authentication - [Tags] client security negative + [Tags] client security negative speed-fast Get Anonymous Session session # Try to access active clients without token - ${response}= GET On Session ${session} /api/clients/active expected_status=401 + ${response}= GET On Session session /api/clients/active expected_status=401 Should Be Equal As Integers ${response.status_code} 401 Unauthorized Queue Access Test [Documentation] Test that queue endpoints require authentication - [Tags] queue security negative + [Tags] queue security negative speed-fast Get Anonymous Session session # Try to access queue jobs without token - ${response}= GET On Session ${session} /api/queue/jobs expected_status=401 + ${response}= GET On Session session /api/queue/jobs expected_status=401 Should Be Equal As Integers ${response.status_code} 401 # Try to access queue stats without token - ${response}= GET On Session ${session} /api/queue/stats expected_status=401 + ${response}= GET On Session session /api/queue/stats expected_status=401 Should Be Equal As Integers ${response.status_code} 401 -Queue Health Public Access Test - [Documentation] Test that queue health endpoint is publicly accessible - [Tags] queue health public - Get Anonymous Session session - - # Queue health should be accessible without authentication - ${response}= GET On Session ${session} /api/queue/health - Should Be Equal As Integers ${response.status_code} 200 - - ${health}= Set Variable ${response.json()} - Dictionary Should Contain Key ${health} status - Client Manager Integration Test [Documentation] Test client manager functionality - [Tags] client manager integration + [Tags] client manager integration speed-fast Get Anonymous Session anon_session Create API Session admin_session From 2128ea9e67e008d52c67db3ea0fbab6ba631b3e3 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:39:28 +0000 Subject: [PATCH 03/74] Added middleware so server shows API responses in logs and masks the health checks --- .../advanced/src/advanced_omi_backend/main.py | 2 +- .../middleware/app_middleware.py | 151 +++++++++++++++++- 2 files changed, 151 insertions(+), 2 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/main.py b/backends/advanced/src/advanced_omi_backend/main.py index a9d9d47c..df51e1cc 100644 --- a/backends/advanced/src/advanced_omi_backend/main.py +++ b/backends/advanced/src/advanced_omi_backend/main.py @@ -44,6 +44,6 @@ host=host, port=port, reload=False, # Set to True for development - access_log=True, + access_log=False, # Disabled - using custom RequestLoggingMiddleware instead log_level="info" ) diff --git a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py index f886d31f..be2f2705 100644 --- a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py +++ b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py @@ -4,17 +4,21 @@ Centralizes CORS configuration and global exception handlers. """ +import json import logging +import time from typing import Optional from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from pymongo.errors import ConnectionFailure, PyMongoError +from starlette.middleware.base import BaseHTTPMiddleware from advanced_omi_backend.app_config import get_app_config logger = logging.getLogger(__name__) +request_logger = logging.getLogger("api.requests") def setup_cors_middleware(app: FastAPI) -> None: @@ -34,6 +38,147 @@ def setup_cors_middleware(app: FastAPI) -> None: ) +class RequestLoggingMiddleware(BaseHTTPMiddleware): + """ + Middleware to log API requests and JSON responses. + + Excludes: + - Authentication endpoints (login, logout) + - WebSocket connections + - Binary file responses (audio, images) + - Streaming responses + """ + + # Paths to exclude from logging + EXCLUDED_PATHS = { + "/auth/jwt/login", + "/auth/cookie/login", + "/auth/jwt/logout", + "/auth/cookie/logout", + "/ws", + "/ws_omi", + "/ws_pcm", + "/mcp", + "/health", + "/auth/health", + "/readiness", + } + + # Binary content types to exclude + BINARY_CONTENT_TYPES = { + "audio/", + "image/", + "video/", + "application/octet-stream", + } + + def should_log_request(self, path: str) -> bool: + """Determine if request should be logged.""" + # Exclude exact path matches + if path in self.EXCLUDED_PATHS: + return False + + # Exclude paths starting with excluded prefixes + for excluded in self.EXCLUDED_PATHS: + if path.startswith(excluded): + return False + + # Exclude audio file serving + if path.startswith("/audio/"): + return False + + return True + + def should_log_response_body(self, content_type: str) -> bool: + """Determine if response body should be logged.""" + if not content_type: + return True + + # Exclude binary content types + for binary_type in self.BINARY_CONTENT_TYPES: + if content_type.startswith(binary_type): + return False + + return True + + async def dispatch(self, request: Request, call_next): + """Process request and log request/response information.""" + path = request.url.path + + # Skip logging for excluded paths + if not self.should_log_request(path): + return await call_next(request) + + # Start timing + start_time = time.time() + + # Log request + request_logger.info(f"→ {request.method} {path}") + + # Process request + response = await call_next(request) + + # Calculate duration + duration_ms = (time.time() - start_time) * 1000 + + # Check if we should log response body + content_type = response.headers.get("content-type", "") + should_log_body = self.should_log_response_body(content_type) + + # Skip body logging for streaming responses + if isinstance(response, StreamingResponse): + request_logger.info( + f"← {request.method} {path} - {response.status_code} " + f"(streaming response) - {duration_ms:.2f}ms" + ) + return response + + # For non-streaming responses, try to extract and log JSON body + if should_log_body and response.status_code != 204: # No content + try: + # Read response body + response_body = b"" + async for chunk in response.body_iterator: + response_body += chunk + + # Try to parse as JSON for pretty printing + try: + json_body = json.loads(response_body) + formatted_json = json.dumps(json_body, indent=2) + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms\n" + f"Response body:\n{formatted_json}" + ) + except (json.JSONDecodeError, UnicodeDecodeError): + # Not JSON or not UTF-8, just log the status + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms " + f"(non-JSON response)" + ) + + # Recreate response with the body we consumed + from starlette.responses import Response + return Response( + content=response_body, + status_code=response.status_code, + headers=dict(response.headers), + media_type=response.media_type, + ) + except Exception as e: + # If anything goes wrong, just log basic info + request_logger.warning( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms " + f"(error reading response: {e})" + ) + return response + else: + # Just log status for responses without body + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms" + ) + return response + + def setup_exception_handlers(app: FastAPI) -> None: """Configure global exception handlers for the FastAPI application.""" @@ -87,5 +232,9 @@ async def http_exception_handler(request: Request, exc: HTTPException): def setup_middleware(app: FastAPI) -> None: """Set up all middleware for the FastAPI application.""" + # Add request logging middleware + app.add_middleware(RequestLoggingMiddleware) + logger.info("📝 Request logging middleware enabled") + setup_cors_middleware(app) setup_exception_handlers(app) \ No newline at end of file From 1515d35ffa0feac929b27cce9d11c8d6576fecb0 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:40:47 +0000 Subject: [PATCH 04/74] removed old task manager --- .../advanced/src/advanced_omi_backend/app_factory.py | 9 --------- backends/advanced/src/advanced_omi_backend/client.py | 4 ---- .../routers/modules/system_routes.py | 12 +++--------- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index 4d879301..52a48093 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -73,10 +73,6 @@ async def lifespan(app: FastAPI): application_logger.error(f"Failed to create admin user: {e}") # Don't raise here as this is not critical for startup - # Initialize task manager - task_manager = init_task_manager() - await task_manager.start() - application_logger.info("Task manager started") # Initialize Redis connection for RQ try: @@ -156,11 +152,6 @@ async def lifespan(app: FastAPI): except Exception as e: application_logger.error(f"Error closing Redis audio streaming client: {e}") - # Shutdown task manager - task_manager = get_task_manager() - await task_manager.shutdown() - application_logger.info("Task manager shut down") - # Stop metrics collection and save final report application_logger.info("Metrics collection stopped") diff --git a/backends/advanced/src/advanced_omi_backend/client.py b/backends/advanced/src/advanced_omi_backend/client.py index 0cf6a1e2..be92716e 100644 --- a/backends/advanced/src/advanced_omi_backend/client.py +++ b/backends/advanced/src/advanced_omi_backend/client.py @@ -169,10 +169,6 @@ async def disconnect(self): # Close current conversation await self.close_current_conversation() - # Cancel any tasks for this client - task_manager = get_task_manager() - await task_manager.cancel_tasks_for_client(self.client_id) - # Clean up state self.speech_segments.clear() self.current_speech_start.clear() diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py index 95569afa..3c97bd55 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py @@ -7,7 +7,7 @@ import logging from typing import Optional -from fastapi import APIRouter, Depends, Request +from fastapi import APIRouter, Body, Depends, Request from advanced_omi_backend.auth import current_active_user, current_superuser from advanced_omi_backend.controllers import system_controller, session_controller, queue_controller @@ -30,12 +30,6 @@ async def get_auth_config(): return await system_controller.get_auth_config() -@router.get("/processor/status") -async def get_processor_status(current_user: User = Depends(current_superuser)): - """Get processor queue status and health. Admin only.""" - return await system_controller.get_processor_status() - - @router.get("/diarization-settings") async def get_diarization_settings(current_user: User = Depends(current_superuser)): """Get current diarization settings. Admin only.""" @@ -88,7 +82,7 @@ async def get_memory_config_raw(current_user: User = Depends(current_superuser)) @router.post("/admin/memory/config/raw") async def update_memory_config_raw( - config_yaml: str, + config_yaml: str = Body(..., embed=True), current_user: User = Depends(current_superuser) ): """Update memory configuration YAML and hot reload. Admin only.""" @@ -97,7 +91,7 @@ async def update_memory_config_raw( @router.post("/admin/memory/config/validate") async def validate_memory_config( - config_yaml: str, + config_yaml: str = Body(..., embed=True), current_user: User = Depends(current_superuser) ): """Validate memory configuration YAML syntax. Admin only.""" From c9740845c3453d647041caadf64c96c976c50269 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:42:05 +0000 Subject: [PATCH 05/74] enable job filtering by type --- .../controllers/queue_controller.py | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index ced15fc7..2071181d 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -93,14 +93,22 @@ def get_job_stats() -> Dict[str, Any]: } -def get_jobs(limit: int = 20, offset: int = 0, queue_name: str = None) -> Dict[str, Any]: +def get_jobs( + limit: int = 20, + offset: int = 0, + queue_name: str = None, + job_type: str = None, + client_id: str = None +) -> Dict[str, Any]: """ - Get jobs from a specific queue or all queues. + Get jobs from a specific queue or all queues with optional filtering. Args: limit: Maximum number of jobs to return offset: Number of jobs to skip queue_name: Specific queue name or None for all queues + job_type: Filter by job type (matches func_name, e.g., "speech_detection") + client_id: Filter by client_id in job meta (partial match) Returns: Dict with jobs list and pagination metadata matching frontend expectations @@ -130,11 +138,21 @@ def get_jobs(limit: int = 20, offset: int = 0, queue_name: str = None) -> Dict[s user_id = job.kwargs.get("user_id", "") if job.kwargs else "" # Extract just the function name (e.g., "listen_for_speech_job" from "module.listen_for_speech_job") - job_type = job.func_name.split('.')[-1] if job.func_name else "unknown" + func_name = job.func_name.split('.')[-1] if job.func_name else "unknown" + + # Apply job_type filter + if job_type and job_type not in func_name: + continue + + # Apply client_id filter (partial match in meta) + if client_id: + job_client_id = job.meta.get("client_id", "") if job.meta else "" + if client_id not in job_client_id: + continue all_jobs.append({ "job_id": job.id, - "job_type": job_type, + "job_type": func_name, "user_id": user_id, "status": status, "priority": "normal", # RQ doesn't track priority in metadata From 87f4ff5afcb395683b565374843a029481dd0032 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:42:34 +0000 Subject: [PATCH 06/74] removed deprecvated processors --- .../controllers/system_controller.py | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index b75f22c8..5bc0b35d 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -55,47 +55,6 @@ async def get_auth_config(): } -async def get_processor_status(): - """Get RQ worker and queue status.""" - try: - # Get RQ queue health (new architecture) - from advanced_omi_backend.controllers.queue_controller import get_queue_health - queue_health = get_queue_health() - - status = { - "architecture": "rq_workers", # New RQ-based architecture - "timestamp": int(time.time()), - "workers": { - "total": queue_health.get("total_workers", 0), - "active": queue_health.get("active_workers", 0), - "idle": queue_health.get("idle_workers", 0), - "details": queue_health.get("workers", []) - }, - "queues": { - "transcription": queue_health.get("queues", {}).get("transcription", {}), - "memory": queue_health.get("queues", {}).get("memory", {}), - "default": queue_health.get("queues", {}).get("default", {}) - } - } - - # Get task manager status if available - try: - task_manager = get_task_manager() - if task_manager: - task_status = task_manager.get_health_status() - status["task_manager"] = task_status - except Exception as e: - status["task_manager"] = {"error": str(e)} - - return status - - except Exception as e: - logger.error(f"Error getting processor status: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"error": f"Failed to get processor status: {str(e)}"} - ) - - # Audio file processing functions moved to audio_controller.py From 16181e92bf3763593853125507299fe993258e4f Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:43:33 +0000 Subject: [PATCH 07/74] Enabled creation and update of users to get tests to pass --- .../controllers/user_controller.py | 26 +++++++++---------- .../src/advanced_omi_backend/models/user.py | 19 ++++++-------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py index 224695d0..ba7dd753 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py @@ -59,13 +59,13 @@ async def create_user(user_data: UserCreate): # Create the user through the user manager user = await user_manager.create(user_data) + # Return the full user object (serialized via UserRead schema) + from advanced_omi_backend.models.user import UserRead + user_read = UserRead.model_validate(user) + return JSONResponse( status_code=201, - content={ - "message": f"User {user.email} created successfully", - "user_id": str(user.id), - "user_email": user.email, - }, + content=user_read.model_dump(mode='json'), ) except Exception as e: @@ -81,7 +81,6 @@ async def create_user(user_data: UserCreate): async def update_user(user_id: str, user_data: UserUpdate): """Update an existing user.""" - print("DEBUG: New update_user function is being called!") try: # Validate ObjectId format try: @@ -109,16 +108,17 @@ async def update_user(user_id: str, user_data: UserUpdate): # Convert to User object for the manager user_obj = User(**existing_user) - # Update the user using the fastapi-users manager (now with fix for missing method) - updated_user = await user_manager.update(user_obj, user_data) + # Update the user using the fastapi-users manager + # Note: signature is update(user_update, user) - update data first, then user object + updated_user = await user_manager.update(user_data, user_obj) + + # Return the full user object (serialized via UserRead schema) + from advanced_omi_backend.models.user import UserRead + user_read = UserRead.model_validate(updated_user) return JSONResponse( status_code=200, - content={ - "message": f"User {updated_user.email} updated successfully", - "user_id": str(updated_user.id), - "user_email": updated_user.email, - }, + content=user_read.model_dump(mode='json'), ) except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/models/user.py b/backends/advanced/src/advanced_omi_backend/models/user.py index a3779021..b0ced195 100644 --- a/backends/advanced/src/advanced_omi_backend/models/user.py +++ b/backends/advanced/src/advanced_omi_backend/models/user.py @@ -33,19 +33,16 @@ class UserUpdate(BaseUserUpdate): display_name: Optional[str] = None is_superuser: Optional[bool] = None + def create_update_dict(self): + """Create update dictionary for regular user operations.""" + update_dict = super().create_update_dict() + if self.display_name is not None: + update_dict["display_name"] = self.display_name + return update_dict + def create_update_dict_superuser(self): """Create update dictionary for superuser operations.""" - update_dict = {} - if self.email is not None: - update_dict["email"] = self.email - if self.password is not None: - update_dict["password"] = self.password - if self.is_active is not None: - update_dict["is_active"] = self.is_active - if self.is_verified is not None: - update_dict["is_verified"] = self.is_verified - if self.is_superuser is not None: - update_dict["is_superuser"] = self.is_superuser + update_dict = super().create_update_dict_superuser() if self.display_name is not None: update_dict["display_name"] = self.display_name return update_dict From 13f890dd39a5e947bb6db9d8c8122761cbc27444 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:44:30 +0000 Subject: [PATCH 08/74] added computed properties for conversation model to get active transcript version --- .../models/conversation.py | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 56f55ede..714ef6ea 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -7,7 +7,7 @@ from datetime import datetime from typing import Dict, List, Optional, Any, Union -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, model_validator, computed_field from enum import Enum import uuid @@ -146,6 +146,7 @@ def clean_legacy_data(cls, data: Any) -> Any: return data + @computed_field @property def active_transcript(self) -> Optional["Conversation.TranscriptVersion"]: """Get the currently active transcript version.""" @@ -157,6 +158,7 @@ def active_transcript(self) -> Optional["Conversation.TranscriptVersion"]: return version return None + @computed_field @property def active_memory(self) -> Optional["Conversation.MemoryVersion"]: """Get the currently active memory version.""" @@ -169,16 +171,48 @@ def active_memory(self) -> Optional["Conversation.MemoryVersion"]: return None # Convenience properties that return data from active transcript version + @computed_field @property def transcript(self) -> Optional[str]: """Get transcript text from active transcript version.""" return self.active_transcript.transcript if self.active_transcript else None + @computed_field @property def segments(self) -> List["Conversation.SpeakerSegment"]: """Get segments from active transcript version.""" return self.active_transcript.segments if self.active_transcript else [] + @computed_field + @property + def segment_count(self) -> int: + """Get segment count from active transcript version.""" + return len(self.segments) if self.segments else 0 + + @computed_field + @property + def memory_count(self) -> int: + """Get memory count from active memory version.""" + return self.active_memory.memory_count if self.active_memory else 0 + + @computed_field + @property + def has_memory(self) -> bool: + """Check if conversation has any memory versions.""" + return len(self.memory_versions) > 0 + + @computed_field + @property + def transcript_version_count(self) -> int: + """Get count of transcript versions.""" + return len(self.transcript_versions) + + @computed_field + @property + def memory_version_count(self) -> int: + """Get count of memory versions.""" + return len(self.memory_versions) + def add_transcript_version( self, version_id: str, From 2cc3972f51ff1a3c8dc3fc34fede82f7de719873 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:45:19 +0000 Subject: [PATCH 09/74] moved queue health endpoint, and added better job query functions --- .../routers/modules/health_routes.py | 46 +++++++---- .../routers/modules/queue_routes.py | 82 ++++++++++++++++--- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py index 4981ca39..37913c48 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py @@ -149,25 +149,39 @@ async def health_check(): # Check Redis and RQ Workers (critical for queue processing) try: - from rq import Worker + from advanced_omi_backend.controllers.queue_controller import get_queue_health - # Test Redis connection - await asyncio.wait_for(asyncio.to_thread(redis_conn.ping), timeout=5.0) + # Get queue health (includes Redis connection test and worker count) + queue_health = await asyncio.wait_for( + asyncio.to_thread(get_queue_health), timeout=5.0 + ) - # Count active workers - workers = Worker.all(connection=redis_conn) - worker_count = len(workers) - active_workers = len([w for w in workers if w.state == 'busy']) - idle_workers = worker_count - active_workers + # Check if Redis is healthy + redis_healthy = queue_health.get("redis_connection") == "healthy" + worker_count = queue_health.get("total_workers", 0) + active_workers = queue_health.get("active_workers", 0) + idle_workers = queue_health.get("idle_workers", 0) + + if redis_healthy: + health_status["services"]["redis"] = { + "status": "✅ Connected", + "healthy": True, + "critical": True, + "worker_count": worker_count, + "active_workers": active_workers, + "idle_workers": idle_workers, + "queues": queue_health.get("queues", {}) + } + else: + health_status["services"]["redis"] = { + "status": f"❌ Connection Failed: {queue_health.get('redis_connection')}", + "healthy": False, + "critical": True, + "worker_count": 0 + } + overall_healthy = False + critical_services_healthy = False - health_status["services"]["redis"] = { - "status": "✅ Connected", - "healthy": True, - "critical": True, - "worker_count": worker_count, - "active_workers": active_workers, - "idle_workers": idle_workers - } except asyncio.TimeoutError: health_status["services"]["redis"] = { "status": "❌ Connection Timeout (5s)", diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py index 3e540b19..2cfe1955 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py @@ -9,7 +9,7 @@ from typing import List, Optional from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, get_queue_health, redis_conn, QUEUE_NAMES +from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, redis_conn, QUEUE_NAMES from advanced_omi_backend.users import User from rq.job import Job import redis.asyncio as aioredis @@ -23,11 +23,13 @@ async def list_jobs( limit: int = Query(20, ge=1, le=100, description="Number of jobs to return"), offset: int = Query(0, ge=0, description="Number of jobs to skip"), queue_name: str = Query(None, description="Filter by queue name"), + job_type: str = Query(None, description="Filter by job type (matches func_name)"), + client_id: str = Query(None, description="Filter by client_id in meta"), current_user: User = Depends(current_active_user) ): """List jobs with pagination and filtering.""" try: - result = get_jobs(limit=limit, offset=offset, queue_name=queue_name) + result = get_jobs(limit=limit, offset=offset, queue_name=queue_name, job_type=job_type, client_id=client_id) # Filter jobs by user if not admin if not current_user.is_superuser: @@ -48,6 +50,44 @@ async def list_jobs( return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}} +@router.get("/jobs/{job_id}/status") +async def get_job_status( + job_id: str, + current_user: User = Depends(current_active_user) +): + """Get just the status of a specific job (lightweight endpoint).""" + try: + job = Job.fetch(job_id, connection=redis_conn) + + # Check user permission (non-admins can only see their own jobs) + if not current_user.is_superuser: + job_user_id = job.kwargs.get("user_id") if job.kwargs else None + if job_user_id != str(current_user.user_id): + raise HTTPException(status_code=403, detail="Access forbidden") + + # Determine status from registries + status = "unknown" + if job.is_queued: + status = "queued" + elif job.is_started: + status = "processing" + elif job.is_finished: + status = "completed" + elif job.is_failed: + status = "failed" + elif job.is_deferred: + status = "deferred" + + return { + "job_id": job.id, + "status": status + } + + except Exception as e: + logger.error(f"Failed to get job status {job_id}: {e}") + raise HTTPException(status_code=404, detail="Job not found") + + @router.get("/jobs/{job_id}") async def get_job( job_id: str, @@ -299,20 +339,38 @@ async def get_queue_stats_endpoint( return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, "cancelled_jobs": 0, "deferred_jobs": 0} -@router.get("/health") -async def get_queue_health_endpoint(): - """Get queue system health status.""" +@router.get("/worker-details") +async def get_queue_worker_details( + current_user: User = Depends(current_active_user) +): + """Get detailed queue and worker status including task manager health.""" try: - health = get_queue_health() - return health + from advanced_omi_backend.controllers.queue_controller import get_queue_health + from advanced_omi_backend.task_manager import get_task_manager + import time - except Exception as e: - logger.error(f"Failed to get queue health: {e}") - return { - "status": "unhealthy", - "message": f"Health check failed: {str(e)}" + # Get queue health directly + queue_health = get_queue_health() + + status = { + "architecture": "rq_workers", + "timestamp": int(time.time()), + "workers": { + "total": queue_health.get("total_workers", 0), + "active": queue_health.get("active_workers", 0), + "idle": queue_health.get("idle_workers", 0), + "details": queue_health.get("workers", []) + }, + "queues": queue_health.get("queues", {}), + "redis_connection": queue_health.get("redis_connection", "unknown") } + return status + + except Exception as e: + logger.error(f"Failed to get queue worker details: {e}") + raise HTTPException(status_code=500, detail=f"Failed to get worker details: {str(e)}") + @router.get("/streams") async def get_stream_stats( From 8782c1ae00040cb0960545747a01243586e22d8e Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 14:57:46 +0000 Subject: [PATCH 10/74] added params to force waiting for chunk queue to be empty test framework streams faster than realtime and so we need this to wait --- backends/advanced/docker-compose-test.yml | 8 ++++++++ .../workers/conversation_jobs.py | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 7f2bb942..029d0238 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -42,6 +42,10 @@ services: - DISABLE_SPEAKER_RECOGNITION=false - SPEAKER_SERVICE_URL=https://localhost:8085 - CORS_ORIGINS=http://localhost:3001,http://localhost:8001,https://localhost:3001,https://localhost:8001 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true depends_on: qdrant-test: condition: service_started @@ -144,6 +148,10 @@ services: - OPENMEMORY_USER_ID=${OPENMEMORY_USER_ID:-openmemory} - DISABLE_SPEAKER_RECOGNITION=false - SPEAKER_SERVICE_URL=https://localhost:8085 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true depends_on: friend-backend-test: condition: service_healthy diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 67e4d935..cf486909 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -247,9 +247,15 @@ async def open_conversation_job( last_inactivity_log_time = time.time() # Track when we last logged inactivity last_word_count = 0 # Track word count to detect actual new speech + # Test mode: wait for audio queue to drain before timing out + # In real usage, ambient noise keeps connection alive. In tests, chunks arrive in bursts. + wait_for_queue_drain = os.getenv("WAIT_FOR_AUDIO_QUEUE_DRAIN", "false").lower() == "true" + logger.info( f"📊 Conversation timeout configured: {inactivity_timeout_minutes} minutes ({inactivity_timeout_seconds}s)" ) + if wait_for_queue_drain: + logger.info("🧪 Test mode: Waiting for audio queue to drain before timeout") while True: # Check if session is finalizing (set by producer when recording stops) @@ -315,6 +321,20 @@ async def open_conversation_job( last_inactivity_log_time = current_time if inactivity_duration > inactivity_timeout_seconds: + # In test mode, check if there are pending chunks before timing out + if wait_for_queue_drain: + # Check audio persistence queue length + persist_queue_key = f"audio:queue:{session_id}" + queue_length = await redis_client.llen(persist_queue_key) + + if queue_length > 0: + logger.info( + f"🧪 Test mode: Inactivity timeout reached but {queue_length} chunks still in queue, " + f"waiting for processing..." + ) + await asyncio.sleep(1) + continue + logger.info( f"🕐 Conversation {conversation_id} inactive for " f"{inactivity_duration/60:.1f} minutes (threshold: {inactivity_timeout_minutes} min), " From a766026d37a3690d810d1462b285b444046e4fec Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 15:00:29 +0000 Subject: [PATCH 11/74] ensure segments are updated on transcript versions --- .../src/advanced_omi_backend/workers/audio_jobs.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 92e4676a..ba535796 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -130,9 +130,16 @@ async def process_cropping_job( # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename + # Update the active transcript version segments - if conversation.active_transcript: - conversation.active_transcript.segments = updated_segments + # Find and update the version directly in the list to ensure Beanie detects the change + if conversation.active_transcript_version: + for i, version in enumerate(conversation.transcript_versions): + if version.version_id == conversation.active_transcript_version: + conversation.transcript_versions[i].segments = updated_segments + logger.info(f"📝 Updated segments in transcript version {version.version_id[:12]}") + break + await conversation.save() logger.info(f"💾 Updated conversation {conversation_id[:12]} with cropped_audio_path and adjusted {len(updated_segments)} segment timestamps") From 83b9753c23718ce64b798719d35fe890d833c41f Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 15:05:23 +0000 Subject: [PATCH 12/74] added self healing function if redis loses the worker connection --- backends/advanced/start-k8s.sh | 193 ++++++++++++++++++++--------- backends/advanced/start-workers.sh | 164 ++++++++++++++---------- 2 files changed, 236 insertions(+), 121 deletions(-) diff --git a/backends/advanced/start-k8s.sh b/backends/advanced/start-k8s.sh index 24a3e6e7..487b82c8 100755 --- a/backends/advanced/start-k8s.sh +++ b/backends/advanced/start-k8s.sh @@ -15,6 +15,7 @@ echo " MONGODB_URI: ${MONGODB_URI:-NOT_SET}" # Function to handle shutdown shutdown() { echo "🛑 Shutting down services..." + kill $MONITOR_PID 2>/dev/null || true kill $AUDIO_WORKER_1_PID 2>/dev/null || true kill $RQ_WORKER_1_PID 2>/dev/null || true kill $RQ_WORKER_2_PID 2>/dev/null || true @@ -58,20 +59,141 @@ python3 -c " from rq import Worker from redis import Redis import os +import socket redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') redis_conn = Redis.from_url(redis_url) +hostname = socket.gethostname() -# Get all workers and clean up dead ones +# Only clean up workers from THIS hostname (pod) workers = Worker.all(connection=redis_conn) +cleaned = 0 for worker in workers: - # Force cleanup of all registered workers from previous runs - worker.register_death() -print(f'Cleaned up {len(workers)} stale workers') + if hostname in worker.name: + worker.register_death() + cleaned += 1 +print(f'Cleaned up {cleaned} stale workers from {hostname}') " 2>/dev/null || echo "No stale workers to clean" sleep 1 +# Function to start all workers +start_workers() { + # NEW WORKERS - Redis Streams multi-provider architecture + # Single worker ensures sequential processing of audio chunks (matching start-workers.sh) + echo "🎵 Starting audio stream Deepgram worker (1 worker for sequential processing)..." + if python3 -m advanced_omi_backend.workers.audio_stream_deepgram_worker & + then + AUDIO_WORKER_1_PID=$! + echo " ✅ Deepgram stream worker started with PID: $AUDIO_WORKER_1_PID" + else + echo " ❌ Failed to start Deepgram stream worker" + exit 1 + fi + + # Start 3 RQ workers listening to ALL queues (matching start-workers.sh) + echo "🔧 Starting RQ workers (3 workers, all queues: transcription, memory, default)..." + if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + then + RQ_WORKER_1_PID=$! + echo " ✅ RQ worker 1 started with PID: $RQ_WORKER_1_PID" + else + echo " ❌ Failed to start RQ worker 1" + kill $AUDIO_WORKER_1_PID 2>/dev/null || true + exit 1 + fi + + if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + then + RQ_WORKER_2_PID=$! + echo " ✅ RQ worker 2 started with PID: $RQ_WORKER_2_PID" + else + echo " ❌ Failed to start RQ worker 2" + kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID 2>/dev/null || true + exit 1 + fi + + if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + then + RQ_WORKER_3_PID=$! + echo " ✅ RQ worker 3 started with PID: $RQ_WORKER_3_PID" + else + echo " ❌ Failed to start RQ worker 3" + kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID $RQ_WORKER_2_PID 2>/dev/null || true + exit 1 + fi + + # Start 1 dedicated audio persistence worker (matching start-workers.sh) + echo "💾 Starting audio persistence worker (1 worker for audio queue)..." + if python3 -m advanced_omi_backend.workers.rq_worker_entry audio & + then + AUDIO_PERSISTENCE_WORKER_PID=$! + echo " ✅ Audio persistence worker started with PID: $AUDIO_PERSISTENCE_WORKER_PID" + else + echo " ❌ Failed to start audio persistence worker" + kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID 2>/dev/null || true + exit 1 + fi + + echo "✅ All workers started:" + echo " - Audio stream worker: $AUDIO_WORKER_1_PID (Redis Streams consumer - sequential processing)" + echo " - RQ worker 1: $RQ_WORKER_1_PID (transcription, memory, default)" + echo " - RQ worker 2: $RQ_WORKER_2_PID (transcription, memory, default)" + echo " - RQ worker 3: $RQ_WORKER_3_PID (transcription, memory, default)" + echo " - Audio persistence worker: $AUDIO_PERSISTENCE_WORKER_PID (audio queue - file rotation)" +} + +# Function to check worker registration health +check_worker_health() { + WORKER_COUNT=$(python3 -c " +from rq import Worker +from redis import Redis +import os +import sys + +try: + redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') + r = Redis.from_url(redis_url) + workers = Worker.all(connection=r) + print(len(workers)) +except Exception as e: + print('0', file=sys.stderr) + sys.exit(1) +" 2>/dev/null || echo "0") + echo "$WORKER_COUNT" +} + +# Self-healing monitoring function +monitor_worker_health() { + local CHECK_INTERVAL=10 # Check every 10 seconds + local MIN_WORKERS=3 # Expect at least 3 RQ workers + + echo "🩺 Starting self-healing monitor (check interval: ${CHECK_INTERVAL}s, min workers: ${MIN_WORKERS})" + + while true; do + sleep $CHECK_INTERVAL + + WORKER_COUNT=$(check_worker_health) + + if [ "$WORKER_COUNT" -lt "$MIN_WORKERS" ]; then + echo "⚠️ Self-healing: Only $WORKER_COUNT workers registered (expected >= $MIN_WORKERS)" + echo "🔧 Self-healing: Restarting all workers to restore registration..." + + # Kill all workers + kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true + wait 2>/dev/null || true + + # Restart workers + start_workers + + # Verify recovery + sleep 3 + NEW_WORKER_COUNT=$(check_worker_health) + echo "✅ Self-healing: Workers restarted - new count: $NEW_WORKER_COUNT" + fi + done +} + # OLD WORKERS - Disabled for testing new Redis Streams architecture # These have been renamed to old_audio_stream_worker.py and old_transcription_stream_worker.py # echo "🎵 Starting Redis Streams audio workers (2 workers)..." @@ -115,61 +237,16 @@ sleep 1 # exit 1 # fi -# NEW WORKERS - Redis Streams multi-provider architecture -# Single worker ensures sequential processing of audio chunks (matching start-workers.sh) -echo "🎵 Starting audio stream Deepgram worker (1 worker for sequential processing)..." -if python3 -m advanced_omi_backend.workers.audio_stream_deepgram_worker & -then - AUDIO_WORKER_1_PID=$! - echo " ✅ Deepgram stream worker started with PID: $AUDIO_WORKER_1_PID" -else - echo " ❌ Failed to start Deepgram stream worker" - exit 1 -fi - -# Start 3 RQ workers listening to ALL queues (matching start-workers.sh) -echo "🔧 Starting RQ workers (3 workers, all queues: transcription, memory, default)..." -if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -then - RQ_WORKER_1_PID=$! - echo " ✅ RQ worker 1 started with PID: $RQ_WORKER_1_PID" -else - echo " ❌ Failed to start RQ worker 1" - kill $AUDIO_WORKER_1_PID 2>/dev/null || true - exit 1 -fi - -if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -then - RQ_WORKER_2_PID=$! - echo " ✅ RQ worker 2 started with PID: $RQ_WORKER_2_PID" -else - echo " ❌ Failed to start RQ worker 2" - kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID 2>/dev/null || true - exit 1 -fi +# Configure Python logging for workers +export PYTHONUNBUFFERED=1 -if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -then - RQ_WORKER_3_PID=$! - echo " ✅ RQ worker 3 started with PID: $RQ_WORKER_3_PID" -else - echo " ❌ Failed to start RQ worker 3" - kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID $RQ_WORKER_2_PID 2>/dev/null || true - exit 1 -fi +# Start all workers +start_workers -# Start 1 dedicated audio persistence worker (matching start-workers.sh) -echo "💾 Starting audio persistence worker (1 worker for audio queue)..." -if python3 -m advanced_omi_backend.workers.rq_worker_entry audio & -then - AUDIO_PERSISTENCE_WORKER_PID=$! - echo " ✅ Audio persistence worker started with PID: $AUDIO_PERSISTENCE_WORKER_PID" -else - echo " ❌ Failed to start audio persistence worker" - kill $AUDIO_WORKER_1_PID $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID 2>/dev/null || true - exit 1 -fi +# Start self-healing monitor in background +monitor_worker_health & +MONITOR_PID=$! +echo "🩺 Self-healing monitor started: PID $MONITOR_PID" # Give workers a moment to start sleep 3 @@ -192,6 +269,7 @@ echo " - RQ worker 1: $RQ_WORKER_1_PID (transcription, memory, default)" echo " - RQ worker 2: $RQ_WORKER_2_PID (transcription, memory, default)" echo " - RQ worker 3: $RQ_WORKER_3_PID (transcription, memory, default)" echo " - Audio persistence worker: $AUDIO_PERSISTENCE_WORKER_PID (audio queue - file rotation)" +echo " - Self-healing monitor: $MONITOR_PID" echo " - FastAPI Backend: $BACKEND_PID" # Wait for any process to exit @@ -200,6 +278,7 @@ wait -n # If we get here, one process has exited - kill the others echo "⚠️ One service exited, stopping all services..." # Kill only non-empty PIDs +[ -n "$MONITOR_PID" ] && kill $MONITOR_PID 2>/dev/null || true [ -n "$AUDIO_WORKER_1_PID" ] && kill $AUDIO_WORKER_1_PID 2>/dev/null || true [ -n "$RQ_WORKER_1_PID" ] && kill $RQ_WORKER_1_PID 2>/dev/null || true [ -n "$RQ_WORKER_2_PID" ] && kill $RQ_WORKER_2_PID 2>/dev/null || true diff --git a/backends/advanced/start-workers.sh b/backends/advanced/start-workers.sh index d9386d37..2500530c 100755 --- a/backends/advanced/start-workers.sh +++ b/backends/advanced/start-workers.sh @@ -31,9 +31,96 @@ print(f'Cleaned up {cleaned} stale workers from {hostname}') sleep 1 +# Function to start all workers +start_workers() { + echo "🔧 Starting RQ workers (6 workers, all queues: transcription, memory, default)..." + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_1_PID=$! + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_2_PID=$! + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_3_PID=$! + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_4_PID=$! + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_5_PID=$! + uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & + RQ_WORKER_6_PID=$! + + echo "💾 Starting audio persistence worker (1 worker for audio queue)..." + uv run python -m advanced_omi_backend.workers.rq_worker_entry audio & + AUDIO_PERSISTENCE_WORKER_PID=$! + + echo "🎵 Starting audio stream Deepgram worker (1 worker for sequential processing)..." + uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_worker & + AUDIO_STREAM_WORKER_PID=$! + + echo "✅ All workers started:" + echo " - RQ worker 1: PID $RQ_WORKER_1_PID (transcription, memory, default)" + echo " - RQ worker 2: PID $RQ_WORKER_2_PID (transcription, memory, default)" + echo " - RQ worker 3: PID $RQ_WORKER_3_PID (transcription, memory, default)" + echo " - RQ worker 4: PID $RQ_WORKER_4_PID (transcription, memory, default)" + echo " - RQ worker 5: PID $RQ_WORKER_5_PID (transcription, memory, default)" + echo " - RQ worker 6: PID $RQ_WORKER_6_PID (transcription, memory, default)" + echo " - Audio persistence worker: PID $AUDIO_PERSISTENCE_WORKER_PID (audio queue - file rotation)" + echo " - Audio stream worker: PID $AUDIO_STREAM_WORKER_PID (Redis Streams consumer - sequential processing)" +} + +# Function to check worker registration health +check_worker_health() { + WORKER_COUNT=$(uv run python -c " +from rq import Worker +from redis import Redis +import os +import sys + +try: + redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') + r = Redis.from_url(redis_url) + workers = Worker.all(connection=r) + print(len(workers)) +except Exception as e: + print('0', file=sys.stderr) + sys.exit(1) +" 2>/dev/null || echo "0") + echo "$WORKER_COUNT" +} + +# Self-healing monitoring function +monitor_worker_health() { + local CHECK_INTERVAL=10 # Check every 10 seconds + local MIN_WORKERS=6 # Expect at least 6 RQ workers + + echo "🩺 Starting self-healing monitor (check interval: ${CHECK_INTERVAL}s, min workers: ${MIN_WORKERS})" + + while true; do + sleep $CHECK_INTERVAL + + WORKER_COUNT=$(check_worker_health) + + if [ "$WORKER_COUNT" -lt "$MIN_WORKERS" ]; then + echo "⚠️ Self-healing: Only $WORKER_COUNT workers registered (expected >= $MIN_WORKERS)" + echo "🔧 Self-healing: Restarting all workers to restore registration..." + + # Kill all workers + kill $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID $RQ_WORKER_4_PID $RQ_WORKER_5_PID $RQ_WORKER_6_PID $AUDIO_PERSISTENCE_WORKER_PID $AUDIO_STREAM_WORKER_PID 2>/dev/null || true + wait 2>/dev/null || true + + # Restart workers + start_workers + + # Verify recovery + sleep 3 + NEW_WORKER_COUNT=$(check_worker_health) + echo "✅ Self-healing: Workers restarted - new count: $NEW_WORKER_COUNT" + fi + done +} + # Function to handle shutdown shutdown() { echo "🛑 Shutting down workers..." + kill $MONITOR_PID 2>/dev/null || true kill $RQ_WORKER_1_PID 2>/dev/null || true kill $RQ_WORKER_2_PID 2>/dev/null || true kill $RQ_WORKER_3_PID 2>/dev/null || true @@ -41,8 +128,7 @@ shutdown() { kill $RQ_WORKER_5_PID 2>/dev/null || true kill $RQ_WORKER_6_PID 2>/dev/null || true kill $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true - [ -n "$AUDIO_STREAM_WORKER_PID" ] && kill $AUDIO_STREAM_WORKER_PID 2>/dev/null || true - [ -n "$PARAKEET_STREAM_WORKER_PID" ] && kill $PARAKEET_STREAM_WORKER_PID 2>/dev/null || true + kill $AUDIO_STREAM_WORKER_PID 2>/dev/null || true wait echo "✅ All workers stopped" exit 0 @@ -54,70 +140,20 @@ trap shutdown SIGTERM SIGINT # Configure Python logging for RQ workers export PYTHONUNBUFFERED=1 -# Start 6 RQ workers listening to ALL queues -echo "🔧 Starting RQ workers (6 workers, all queues: transcription, memory, default)..." -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_1_PID=$! -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_2_PID=$! -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_3_PID=$! -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_4_PID=$! -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_5_PID=$! -uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & -RQ_WORKER_6_PID=$! - -# Start 1 dedicated audio persistence worker -# Single worker for audio persistence jobs (file rotation) -echo "💾 Starting audio persistence worker (1 worker for audio queue)..." -uv run python -m advanced_omi_backend.workers.rq_worker_entry audio & -AUDIO_PERSISTENCE_WORKER_PID=$! - -# Start 1 audio stream worker for Deepgram (only if DEEPGRAM_API_KEY is set) -# Single worker ensures sequential processing of audio chunks -if [ -n "$DEEPGRAM_API_KEY" ]; then - echo "🎵 Starting audio stream Deepgram worker (1 worker for sequential processing)..." - uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_worker & - AUDIO_STREAM_WORKER_PID=$! -else - echo "⏭️ Skipping Deepgram audio stream worker (DEEPGRAM_API_KEY not set)" - AUDIO_STREAM_WORKER_PID="" -fi - -# Start 1 audio stream worker for Parakeet (only if PARAKEET_ASR_URL or OFFLINE_ASR_TCP_URI is set) -# Single worker ensures sequential processing of audio chunks -PARAKEET_URL="${PARAKEET_ASR_URL:-${OFFLINE_ASR_TCP_URI:-}}" -if [ -n "$PARAKEET_URL" ]; then - echo "🎤 Starting audio stream Parakeet worker (1 worker for sequential processing)..." - uv run python -m advanced_omi_backend.workers.audio_stream_parakeet_worker & - PARAKEET_STREAM_WORKER_PID=$! -else - echo "⏭️ Skipping Parakeet audio stream worker (PARAKEET_ASR_URL or OFFLINE_ASR_TCP_URI not set)" - PARAKEET_STREAM_WORKER_PID="" -fi - -echo "✅ All workers started:" -echo " - RQ worker 1: PID $RQ_WORKER_1_PID (transcription, memory, default)" -echo " - RQ worker 2: PID $RQ_WORKER_2_PID (transcription, memory, default)" -echo " - RQ worker 3: PID $RQ_WORKER_3_PID (transcription, memory, default)" -echo " - RQ worker 4: PID $RQ_WORKER_4_PID (transcription, memory, default)" -echo " - RQ worker 5: PID $RQ_WORKER_5_PID (transcription, memory, default)" -echo " - RQ worker 6: PID $RQ_WORKER_6_PID (transcription, memory, default)" -echo " - Audio persistence worker: PID $AUDIO_PERSISTENCE_WORKER_PID (audio queue - file rotation)" -if [ -n "$AUDIO_STREAM_WORKER_PID" ]; then - echo " - Audio stream Deepgram worker: PID $AUDIO_STREAM_WORKER_PID (Redis Streams consumer - sequential processing)" -fi -if [ -n "$PARAKEET_STREAM_WORKER_PID" ]; then - echo " - Audio stream Parakeet worker: PID $PARAKEET_STREAM_WORKER_PID (Redis Streams consumer - sequential processing)" -fi - -# Wait for any process to exit +# Start all workers +start_workers + +# Start self-healing monitor in background +monitor_worker_health & +MONITOR_PID=$! +echo "🩺 Self-healing monitor started: PID $MONITOR_PID" + +# Wait for any worker process to exit wait -n -# If we get here, one process has exited - kill the others +# If we get here, one worker process has exited - kill everything echo "⚠️ One worker exited, stopping all workers..." +kill $MONITOR_PID 2>/dev/null || true kill $RQ_WORKER_1_PID 2>/dev/null || true kill $RQ_WORKER_2_PID 2>/dev/null || true kill $RQ_WORKER_3_PID 2>/dev/null || true @@ -125,7 +161,7 @@ kill $RQ_WORKER_4_PID 2>/dev/null || true kill $RQ_WORKER_5_PID 2>/dev/null || true kill $RQ_WORKER_6_PID 2>/dev/null || true kill $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true -[ -n "$AUDIO_STREAM_WORKER_PID" ] && kill $AUDIO_STREAM_WORKER_PID 2>/dev/null || true +kill $AUDIO_STREAM_WORKER_PID 2>/dev/null || true wait echo "🔄 All workers stopped" From 31adba6fc5cdb140f3f20fdb3627edc63bcea912 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 22 Nov 2025 16:24:25 +0000 Subject: [PATCH 13/74] Changed function to get the conversation audio in order to properly auth --- .../advanced/src/advanced_omi_backend/auth.py | 29 +++++ .../controllers/audio_controller.py | 44 +++++++ .../routers/modules/audio_routes.py | 122 +++++++++--------- 3 files changed, 137 insertions(+), 58 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/auth.py b/backends/advanced/src/advanced_omi_backend/auth.py index e47a3b9e..a39637f1 100644 --- a/backends/advanced/src/advanced_omi_backend/auth.py +++ b/backends/advanced/src/advanced_omi_backend/auth.py @@ -119,9 +119,38 @@ def get_jwt_strategy() -> JWTStrategy: # User dependencies for protecting endpoints current_active_user = fastapi_users.current_user(active=True) +current_active_user_optional = fastapi_users.current_user(active=True, optional=True) current_superuser = fastapi_users.current_user(active=True, superuser=True) +async def get_user_from_token_param(token: str) -> Optional[User]: + """ + Get user from JWT token string (for query parameter authentication). + + This is useful for endpoints that need to support token-based auth via query params, + such as HTML audio elements that can't set custom headers. + + Args: + token: JWT token string + + Returns: + User object if token is valid and user is active, None otherwise + """ + if not token: + return None + try: + strategy = get_jwt_strategy() + user_db_gen = get_user_db() + user_db = await user_db_gen.__anext__() + user_manager = UserManager(user_db) + user = await strategy.read_token(token, user_manager) + if user and user.is_active: + return user + except Exception: + pass + return None + + def get_accessible_user_ids(user: User) -> list[str] | None: """ Get list of user IDs that the current user can access data for. diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 4ca72ca0..e1e41996 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -192,6 +192,50 @@ async def upload_and_process_audio_files( ) +async def get_conversation_audio_path(conversation_id: str, user: User, cropped: bool = False) -> Path: + """ + Get the file path for a conversation's audio file. + + Args: + conversation_id: The conversation ID + user: The authenticated user + cropped: If True, return cropped audio path; if False, return original audio path + + Returns: + Path object for the audio file + + Raises: + ValueError: If conversation not found, access denied, or audio file not available + """ + # Get conversation by conversation_id (UUID field, not _id) + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + + if not conversation: + raise ValueError("Conversation not found") + + # Check ownership (admins can access all files) + if not user.is_superuser and conversation.user_id != str(user.user_id): + raise ValueError("Access denied") + + # Get the appropriate audio path + audio_path = conversation.cropped_audio_path if cropped else conversation.audio_path + + if not audio_path: + audio_type = "cropped" if cropped else "original" + raise ValueError(f"No {audio_type} audio file available for this conversation") + + # Build full file path + from advanced_omi_backend.app_config import get_audio_chunk_dir + audio_dir = get_audio_chunk_dir() + file_path = audio_dir / audio_path + + # Check if file exists + if not file_path.exists() or not file_path.is_file(): + raise ValueError("Audio file not found on disk") + + return file_path + + async def get_cropped_audio_info(audio_uuid: str, user: User): """ Get audio cropping metadata from the conversation. diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 4c0f756b..3c9d606e 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -4,18 +4,79 @@ Handles audio file uploads, processing job management, and audio file serving. """ -from pathlib import Path +from typing import Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from fastapi.responses import FileResponse -from advanced_omi_backend.auth import current_superuser, current_active_user +from advanced_omi_backend.auth import current_superuser, current_active_user_optional, get_user_from_token_param from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User -from advanced_omi_backend.app_config import get_audio_chunk_dir router = APIRouter(prefix="/audio", tags=["audio"]) +@router.get("/get_audio/{conversation_id}") +async def get_conversation_audio( + conversation_id: str, + cropped: bool = Query(default=False, description="Serve cropped (speech-only) audio instead of original"), + token: Optional[str] = Query(default=None, description="JWT token for audio element access"), + current_user: Optional[User] = Depends(current_active_user_optional), +): + """ + Serve audio file for a conversation. + + This endpoint uses conversation_id for direct lookup and ownership verification, + which is more efficient than querying by filename. + + Supports both header-based auth (Authorization: Bearer) and query param token + for