Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
958 changes: 62 additions & 896 deletions CLAUDE.md

Large diffs are not rendered by default.

4,644 changes: 4,644 additions & 0 deletions backends/advanced/Docs/architecture.excalidraw

Large diffs are not rendered by default.

123 changes: 123 additions & 0 deletions backends/advanced/src/advanced_omi_backend/audio_processing_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Audio processing data types for unified pipeline.

Provides common data structures for both WebSocket and file upload processing.
"""

from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
import uuid

from .job_tracker import AudioSource


@dataclass
class AudioProcessingItem:
"""Common data structure for all audio processing (WebSocket and file upload)."""

# Identifiers
audio_uuid: str
user_id: str
user_email: str

# Audio source information
audio_source: AudioSource # WEBSOCKET or FILE_UPLOAD
client_id: Optional[str] = None # For websocket processing
device_name: Optional[str] = None # For file upload processing

# Audio data (one of these will be set)
audio_chunks: Optional[List[bytes]] = None # For websocket (buffered chunks)
audio_file_path: Optional[str] = None # For file upload

# Audio format information
sample_rate: int = 16000
channels: int = 1
sample_width: int = 2 # 2 bytes = 16-bit

# Processing metadata
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
duration_seconds: Optional[float] = None
file_size_bytes: Optional[int] = None

@classmethod
def from_websocket(
cls,
audio_chunks: List[bytes],
client_id: str,
user_id: str,
user_email: str,
sample_rate: int = 16000
) -> "AudioProcessingItem":
"""Create from WebSocket audio chunks."""
return cls(
audio_uuid=str(uuid.uuid4()),
user_id=user_id,
user_email=user_email,
audio_source=AudioSource.WEBSOCKET,
client_id=client_id,
audio_chunks=audio_chunks,
sample_rate=sample_rate
)

@classmethod
def from_file_upload(
cls,
audio_file_path: str,
client_id: str,
device_name: str,
user_id: str,
user_email: str
) -> "AudioProcessingItem":
"""Create from uploaded file."""
file_path = Path(audio_file_path)
return cls(
audio_uuid=str(uuid.uuid4()),
user_id=user_id,
user_email=user_email,
audio_source=AudioSource.FILE_UPLOAD,
client_id=client_id,
device_name=device_name,
audio_file_path=audio_file_path,
file_size_bytes=file_path.stat().st_size if file_path.exists() else None
)

def get_identifier(self) -> str:
"""Get the appropriate identifier for this processing item."""
if self.audio_source == AudioSource.WEBSOCKET:
return self.client_id
else:
return self.device_name or "file_upload"


@dataclass
class TranscriptionItem:
"""Data for transcription processing."""
audio_uuid: str
audio_file_path: str
client_id: str
user_id: str
user_email: str
job_id: Optional[str] = None
audio_chunk: Optional[any] = None # For legacy transcription flow


@dataclass
class MemoryProcessingItem:
"""Data for memory processing."""
conversation_id: str
user_id: str
user_email: str
client_id: str # Required for memory service
transcript_version_id: Optional[str] = None # Use None for active version
job_id: Optional[str] = None


@dataclass
class CroppingItem:
"""Data for audio cropping/optimization."""
audio_uuid: str
audio_file_path: str
segments: List
job_id: Optional[str] = None
202 changes: 166 additions & 36 deletions backends/advanced/src/advanced_omi_backend/audio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
###############################################################################

import asyncio
import io
import logging
import os
import time
import uuid
import wave
from pathlib import Path

# Type import to avoid circular imports
from typing import TYPE_CHECKING, Optional

import numpy as np
from advanced_omi_backend.audio_processing_types import AudioSource
from advanced_omi_backend.job_tracker import StageEvent
from wyoming.audio import AudioChunk

if TYPE_CHECKING:
Expand All @@ -29,63 +36,186 @@ async def process_audio_chunk(
user_id: str,
user_email: str,
audio_format: dict,
client_state: Optional["ClientState"] = None
) -> None:
"""Process a single audio chunk through the standard pipeline.
"""Process a single audio chunk by sending it to TranscriptionManager.

This function encapsulates the common pattern used across all audio input sources:
1. Create AudioChunk with format details
2. Queue AudioProcessingItem to processor
3. Update client state if provided
For WebSocket streaming:
- Creates ONE job per session (on first chunk)
- Accumulates chunks in TranscriptionManager
- Job completes when session ends (audio-stop)

Args:
audio_data: Raw audio bytes
client_id: Client identifier
user_id: User identifier
user_email: User email
audio_format: Dict containing {rate, width, channels, timestamp}
client_state: Optional ClientState for state updates
"""

from advanced_omi_backend.processors import (
AudioProcessingItem,
get_processor_manager,
)
# Import here to avoid circular import (processors imports from audio_utils)
from advanced_omi_backend.processors import get_processor_manager

# Extract format details
rate = audio_format.get("rate", 16000)
width = audio_format.get("width", 2)
channels = audio_format.get("channels", 1)
timestamp = audio_format.get("timestamp")

# Use current time if no timestamp provided
if timestamp is None:
timestamp = int(time.time() * 1000)
# Get processor manager and ensure transcription manager exists
processor_manager = get_processor_manager()
await processor_manager.ensure_transcription_manager(client_id)

# Get the transcription manager for this client
transcription_manager = processor_manager.transcription_managers.get(client_id)
if not transcription_manager:
logger.warning(f"No transcription manager found for {client_id} after ensure")
return

# Check if this is a new streaming session
is_new_session = client_id not in processor_manager.active_audio_uuids

if is_new_session:
# Generate audio_uuid for new session
audio_uuid = str(uuid.uuid4())
processor_manager.active_audio_uuids[client_id] = audio_uuid
processor_manager.chunk_counters[client_id] = 0 # Initialize chunk counter
logger.info(f"New streaming session for {client_id}: {audio_uuid}")

# Create audio file and file sink (matching old _audio_processor pattern)
timestamp = int(time.time())
wav_filename = f"{timestamp}_{client_id}_{audio_uuid}.wav"
wav_file_path = str(processor_manager.chunk_dir / wav_filename)

# Create file sink with sample rate from first chunk
sink = processor_manager._new_local_file_sink(wav_file_path, rate)
await sink.open()
processor_manager.active_file_sinks[client_id] = sink

# Create database entry for audio session
await processor_manager.repository.create_chunk(
audio_uuid=audio_uuid,
audio_path=wav_filename,
client_id=client_id,
timestamp=timestamp,
user_id=user_id,
user_email=user_email,
)

logger.info(f"Created audio file and database entry: {wav_filename}")

# Build conditional stages list based on configuration
stages = ["audio", "transcription"]
if processor_manager.speaker_recognition_enabled:
stages.append("speaker_recognition")
stages.append("memory")
if processor_manager.cropping_enabled:
stages.append("cropping")

# Create ONE pipeline job for this entire streaming session
job_id = await processor_manager.job_tracker.create_pipeline_job(
audio_source=AudioSource.WEBSOCKET,
user_id=user_id,
identifier=client_id,
audio_uuid=audio_uuid,
stages=stages
)

# Store job_id by audio_uuid (survives client disconnect)
processor_manager.audio_uuid_to_job[audio_uuid] = job_id

# Create AudioChunk with format details
# Mark audio stage as in-progress (will complete when session ends)
await processor_manager.job_tracker.track_stage_event(
job_id, "audio", StageEvent.ENQUEUE
)

logger.info(f"Created streaming job {job_id} for session {audio_uuid} with stages: {stages}")
else:
audio_uuid = processor_manager.active_audio_uuids[client_id]

# Create AudioChunk for this data
chunk = AudioChunk(
audio=audio_data,
rate=rate,
width=width,
channels=channels,
timestamp=timestamp
channels=channels
)

# Create AudioProcessingItem and queue for processing
processor_manager = get_processor_manager()
processing_item = AudioProcessingItem(
client_id=client_id,
user_id=user_id,
user_email=user_email,
audio_chunk=chunk,
timestamp=timestamp
)
# Increment chunk counter for this client
processor_manager.chunk_counters[client_id] = processor_manager.chunk_counters.get(client_id, 0) + 1
chunk_count = processor_manager.chunk_counters[client_id]

# Write chunk to file sink (for all chunks)
if client_id in processor_manager.active_file_sinks:
sink = processor_manager.active_file_sinks[client_id]
await sink.write(chunk)

await processor_manager.queue_audio(processing_item)
# Log first 5 chunks and every 1000th chunk
if chunk_count <= 5 or chunk_count % 1000 == 0:
logger.info(f"Wrote chunk #{chunk_count} to file for {client_id} ({len(audio_data)} bytes)")

# Update client state if provided
if client_state is not None:
client_state.update_audio_received(chunk)
# Send chunk to transcription manager (it will accumulate them for batch processing)
await transcription_manager.transcribe_chunk(audio_uuid, chunk, client_id)


async def load_audio_file_as_chunk(audio_path: Path) -> AudioChunk:
"""Load existing audio file into Wyoming AudioChunk format for reprocessing.

Args:
audio_path: Path to the audio file on disk

Returns:
AudioChunk object ready for processing

Raises:
FileNotFoundError: If audio file doesn't exist
ValueError: If audio file format is invalid
"""
try:
# Read the audio file
with open(audio_path, 'rb') as f:
file_content = f.read()

# Process WAV file using existing pattern from system_controller.py
with wave.open(io.BytesIO(file_content), "rb") as wav_file:
sample_rate = wav_file.getframerate()
sample_width = wav_file.getsampwidth()
channels = wav_file.getnchannels()
audio_data = wav_file.readframes(wav_file.getnframes())

# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")

# Validate format matches expected (16kHz, mono, 16-bit)
if sample_rate != 16000:
raise ValueError(f"Audio file has sample rate {sample_rate}Hz, expected 16kHz")
if channels != 1:
raise ValueError(f"Audio file has {channels} channels, expected mono")
if sample_width != 2:
raise ValueError(f"Audio file has {sample_width}-byte samples, expected 2 bytes")

# Create AudioChunk with current timestamp
chunk = AudioChunk(
audio=audio_data,
rate=sample_rate,
width=sample_width,
channels=channels,
timestamp=int(time.time() * 1000)
)

logger.info(f"Loaded audio file {audio_path} as AudioChunk ({len(audio_data)} bytes)")
return chunk

except FileNotFoundError:
logger.error(f"Audio file not found: {audio_path}")
raise
except Exception as e:
logger.error(f"Error loading audio file {audio_path}: {e}")
raise ValueError(f"Invalid audio file format: {e}")


async def _process_audio_cropping_with_relative_timestamps(
Expand All @@ -103,7 +233,7 @@ async def _process_audio_cropping_with_relative_timestamps(
# Convert absolute timestamps to relative timestamps
# Extract file start time from filename: timestamp_client_uuid.wav
filename = original_path.split("/")[-1]
logger.info(f"🕐 Parsing filename: {filename}")
logger.info(f"Parsing filename: {filename}")
filename_parts = filename.split("_")
if len(filename_parts) < 3:
logger.error(
Expand Down Expand Up @@ -144,9 +274,9 @@ async def _process_audio_cropping_with_relative_timestamps(

relative_segments.append((start_rel, end_rel))

logger.info(f"🕐 Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}")
logger.info(f"🕐 Absolute segments: {speech_segments}")
logger.info(f"🕐 Relative segments: {relative_segments}")
logger.info(f"Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}")
logger.info(f"Absolute segments: {speech_segments}")
logger.info(f"Relative segments: {relative_segments}")

# Validate that we have valid relative segments after conversion
if not relative_segments:
Expand Down
Loading