Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ npm run web
```bash
# ASR Services
cd extras/asr-services
docker compose up parakeet # Offline ASR with Parakeet
docker compose up parakeet-asr # Offline ASR with Parakeet

# Speaker Recognition (with tests)
cd extras/speaker-recognition
Expand All @@ -136,13 +136,6 @@ docker compose up --build

## Architecture Overview

### Core Structure
- **backends/advanced-backend/**: Primary FastAPI backend with real-time audio processing
- `src/main.py`: Central FastAPI application with WebSocket audio streaming
- `src/auth.py`: Email-based authentication with JWT tokens
- `src/memory/`: LLM-powered conversation memory system using mem0
- `webui/`: React-based web dashboard for conversation and user management

### Key Components
- **Audio Pipeline**: Real-time Opus/PCM → Application-level processing → Deepgram/Mistral transcription → memory extraction
- **Wyoming Protocol**: WebSocket communication uses Wyoming protocol (JSONL + binary) for structured audio sessions
Expand Down Expand Up @@ -1214,12 +1207,6 @@ curl http://[gpu-machine-ip]:8085/health # Speaker recognition

### Troubleshooting Distributed Setup

**Common Issues:**
- **CORS errors**: Tailscale IPs are automatically supported, but verify CORS_ORIGINS if using custom IPs
- **Service discovery**: Use `tailscale ip` to find machine IPs
- **Port conflicts**: Ensure services use different ports on shared machines
- **Authentication**: Services must be accessible without authentication for inter-service communication

**Debugging Commands:**
```bash
# Check Tailscale connectivity
Expand Down
67 changes: 67 additions & 0 deletions backends/advanced/src/advanced_omi_backend/audio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import logging
import os
import time
import wave
import io
import numpy as np
from pathlib import Path

# Type import to avoid circular imports
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -88,6 +92,69 @@ async def process_audio_chunk(
client_state.update_audio_received(chunk)


async def load_audio_file_as_chunk(audio_path: Path) -> AudioChunk:
"""Load existing audio file into Wyoming AudioChunk format for reprocessing.

Args:
audio_path: Path to the audio file on disk

Returns:
AudioChunk object ready for processing

Raises:
FileNotFoundError: If audio file doesn't exist
ValueError: If audio file format is invalid
"""
try:
# Read the audio file
with open(audio_path, 'rb') as f:
file_content = f.read()

# Process WAV file using existing pattern from system_controller.py
with wave.open(io.BytesIO(file_content), "rb") as wav_file:
sample_rate = wav_file.getframerate()
sample_width = wav_file.getsampwidth()
channels = wav_file.getnchannels()
audio_data = wav_file.readframes(wav_file.getnframes())

# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")

# Validate format matches expected (16kHz, mono, 16-bit)
if sample_rate != 16000:
raise ValueError(f"Audio file has sample rate {sample_rate}Hz, expected 16kHz")
if channels != 1:
raise ValueError(f"Audio file has {channels} channels, expected mono")
if sample_width != 2:
raise ValueError(f"Audio file has {sample_width}-byte samples, expected 2 bytes")

# Create AudioChunk with current timestamp
chunk = AudioChunk(
audio=audio_data,
rate=sample_rate,
width=sample_width,
channels=channels,
timestamp=int(time.time() * 1000)
)

logger.info(f"Loaded audio file {audio_path} as AudioChunk ({len(audio_data)} bytes)")
return chunk

except FileNotFoundError:
logger.error(f"Audio file not found: {audio_path}")
raise
except Exception as e:
logger.error(f"Error loading audio file {audio_path}: {e}")
raise ValueError(f"Invalid audio file format: {e}")


async def _process_audio_cropping_with_relative_timestamps(
original_path: str,
speech_segments: list[tuple[float, float]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
import asyncio
import hashlib
import logging
import os
import time
from pathlib import Path
from typing import Optional

from advanced_omi_backend.audio_utils import (
_process_audio_cropping_with_relative_timestamps,
load_audio_file_as_chunk,
)
from advanced_omi_backend.client_manager import (
ClientManager,
client_belongs_to_user,
get_user_clients_all,
)
from advanced_omi_backend.database import AudioChunksRepository, ProcessingRunsRepository, chunks_col, processing_runs_col, conversations_col, ConversationsRepository
from advanced_omi_backend.users import User
from advanced_omi_backend.processors import get_processor_manager, TranscriptionItem, MemoryProcessingItem
from advanced_omi_backend.users import User, get_user_by_id
from fastapi.responses import JSONResponse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -585,9 +588,10 @@ async def reprocess_transcript(conversation_id: str, user: User):
)

# Generate configuration hash for duplicate detection
transcription_provider = os.getenv("TRANSCRIPTION_PROVIDER", "deepgram")
config_data = {
"audio_path": str(full_audio_path),
"transcription_provider": "deepgram", # This would come from settings
"transcription_provider": transcription_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -613,18 +617,37 @@ async def reprocess_transcript(conversation_id: str, user: User):
status_code=500, content={"error": "Failed to create transcript version"}
)

# TODO: Queue audio for reprocessing with ProcessorManager
# This is where we would integrate with the existing processor
# For now, we'll return the version ID for the caller to handle
# NEW: Load audio file and queue for transcription processing
try:
# Load audio file as AudioChunk
audio_chunk = await load_audio_file_as_chunk(full_audio_path)

# Create TranscriptionItem for reprocessing
transcription_item = TranscriptionItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
audio_uuid=audio_uuid,
audio_chunk=audio_chunk
)

# Queue for transcription processing
processor_manager = get_processor_manager()
await processor_manager.queue_transcription(transcription_item)

logger.info(f"Queued transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

logger.info(f"Created transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
except Exception as e:
logger.error(f"Error queuing transcript reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Transcript reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down Expand Up @@ -673,9 +696,10 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
)

# Generate configuration hash for duplicate detection
memory_provider = os.getenv("MEMORY_PROVIDER", "friend_lite")
config_data = {
"transcript_version_id": transcript_version_id,
"memory_provider": "friend_lite", # This would come from settings
"memory_provider": memory_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -702,18 +726,42 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
status_code=500, content={"error": "Failed to create memory version"}
)

# TODO: Queue memory extraction for processing
# This is where we would integrate with the existing memory processor
# NEW: Queue memory processing
try:
# Get user email for memory processing
user_obj = await get_user_by_id(str(user.user_id))
if not user_obj:
return JSONResponse(
status_code=500, content={"error": "User not found for memory processing"}
)

# Create MemoryProcessingItem for reprocessing
memory_item = MemoryProcessingItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
user_email=user_obj.email,
conversation_id=conversation_id
)

# Queue for memory processing
processor_manager = get_processor_manager()
await processor_manager.queue_memory(memory_item)

logger.info(f"Created memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
logger.info(f"Queued memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

except Exception as e:
logger.error(f"Error queuing memory reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue memory reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Memory reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"transcript_version_id": transcript_version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down
Loading