Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions backends/advanced/src/advanced_omi_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@

__version__ = "0.1.0"

from .database import AudioChunksRepository

__all__ = ["AudioChunksRepository"]
__all__ = []
2 changes: 0 additions & 2 deletions backends/advanced/src/advanced_omi_backend/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def __init__(self):
self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017")
self.mongo_client = AsyncIOMotorClient(self.mongodb_uri)
self.db = self.mongo_client.get_default_database("friend-lite")
self.chunks_col = self.db["audio_chunks"]
self.users_col = self.db["users"]
self.speakers_col = self.db["speakers"]

Expand Down Expand Up @@ -104,7 +103,6 @@ def get_audio_chunk_dir() -> Path:
def get_mongo_collections():
"""Get MongoDB collections."""
return {
'chunks': app_config.chunks_col,
'users': app_config.users_col,
'speakers': app_config.speakers_col,
}
Expand Down
3 changes: 0 additions & 3 deletions backends/advanced/src/advanced_omi_backend/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from advanced_omi_backend.database import AudioChunksRepository
from advanced_omi_backend.task_manager import get_task_manager
from wyoming.audio import AudioChunk

Expand All @@ -29,14 +28,12 @@ class ClientState:
def __init__(
self,
client_id: str,
ac_db_collection_helper: AudioChunksRepository,
chunk_dir: Path,
user_id: str,
user_email: Optional[str] = None,
):
self.client_id = client_id
self.connected = True
self.db_helper = ac_db_collection_helper
self.chunk_dir = chunk_dir

# Store user data for memory processing
Expand Down
5 changes: 2 additions & 3 deletions backends/advanced/src/advanced_omi_backend/client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def get_client_count(self) -> int:
"""
return len(self._active_clients)

def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState":
def create_client(self, client_id: str, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState":
"""
Atomically create and register a new client.

Expand All @@ -113,7 +113,6 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str,

Args:
client_id: Unique client identifier
ac_repository: Audio chunks repository
chunk_dir: Directory for audio chunks
user_id: User ID who owns this client
user_email: Optional user email
Expand All @@ -131,7 +130,7 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str,
from advanced_omi_backend.client import ClientState

# Create client state
client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email)
client_state = ClientState(client_id, chunk_dir, user_id, user_email)

# Atomically add to internal storage and register mapping
self._active_clients[client_id] = client_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Handles audio file uploads and processes them directly.
Simplified to write files immediately and enqueue transcription.

Also includes audio cropping operations that work with the audio_chunks collection.
Also includes audio cropping operations that work with the Conversation model.
"""

import logging
Expand All @@ -18,13 +18,11 @@
from advanced_omi_backend.utils.audio_utils import (
AudioValidationError,
write_audio_file,
_process_audio_cropping_with_relative_timestamps,
)
from advanced_omi_backend.models.job import JobPriority
from advanced_omi_backend.models.user import User
from advanced_omi_backend.models.conversation import create_conversation
from advanced_omi_backend.database import AudioChunksRepository, chunks_col
from advanced_omi_backend.client_manager import client_belongs_to_user
from advanced_omi_backend.models.conversation import Conversation

logger = logging.getLogger(__name__)
audio_logger = logging.getLogger("audio_processing")
Expand Down Expand Up @@ -115,6 +113,8 @@ async def upload_and_process_audio_files(
title=title,
summary="Processing uploaded audio file..."
)
# Set audio_path so the frontend can play the audio
conversation.audio_path = wav_filename
await conversation.insert()
conversation_id = conversation.conversation_id # Get the auto-generated ID

Expand Down Expand Up @@ -194,141 +194,35 @@ async def upload_and_process_audio_files(

async def get_cropped_audio_info(audio_uuid: str, user: User):
"""
Get audio cropping metadata from the audio_chunks collection.
Get audio cropping metadata from the conversation.

This is an audio service operation that retrieves cropping-related metadata
such as speech segments, cropped audio path, and cropping timestamps.

Used for: Checking cropping status and retrieving audio processing details.
Works with: audio_chunks collection (audio service operations).
Works with: Conversation model.
"""
try:
# Find the audio chunk
chunk = await chunks_col.find_one({"audio_uuid": audio_uuid})
if not chunk:
# Find the conversation
conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid)
if not conversation:
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

# Check ownership for non-admin users
if not user.is_superuser:
if not client_belongs_to_user(chunk["client_id"], user.user_id):
if conversation.user_id != str(user.user_id):
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

return {
"audio_uuid": audio_uuid,
"cropped_audio_path": chunk.get("cropped_audio_path"),
"speech_segments": chunk.get("speech_segments", []),
"cropped_duration": chunk.get("cropped_duration"),
"cropped_at": chunk.get("cropped_at"),
"original_audio_path": chunk.get("audio_path"),
"cropped_audio_path": conversation.cropped_audio_path,
"speech_segments": conversation.speech_segments if hasattr(conversation, 'speech_segments') else [],
"cropped_duration": conversation.cropped_duration if hasattr(conversation, 'cropped_duration') else None,
"cropped_at": conversation.cropped_at if hasattr(conversation, 'cropped_at') else None,
"original_audio_path": conversation.audio_path,
}

except Exception as e:
# Database or unexpected errors when fetching audio metadata
audio_logger.exception("Error fetching cropped audio info")
return JSONResponse(status_code=500, content={"error": "Error fetching cropped audio info"})


async def reprocess_audio_cropping(audio_uuid: str, user: User):
"""
Re-process audio cropping operation for an audio file.

This is an audio service operation that re-runs the audio cropping process
to extract only speech segments from the full audio file.

Used for: Re-processing audio when cropping failed or needs updating.
Works with: audio_chunks collection and audio_utils cropping functions.
"""
try:
# Find the audio chunk
chunk = await chunks_col.find_one({"audio_uuid": audio_uuid})
if not chunk:
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

# Check ownership for non-admin users
if not user.is_superuser:
if not client_belongs_to_user(chunk["client_id"], user.user_id):
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

audio_path = chunk.get("audio_path")
if not audio_path:
return JSONResponse(
status_code=400, content={"error": "No audio file found for this conversation"}
)

# Check if file exists - try multiple possible locations
possible_paths = [
Path("/app/audio_chunks") / audio_path,
Path(audio_path), # fallback to relative path
]

full_audio_path = None
for path in possible_paths:
if path.exists():
full_audio_path = path
break

if not full_audio_path:
return JSONResponse(
status_code=422,
content={
"error": "Audio file not found on disk",
"details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations",
"searched_paths": [str(p) for p in possible_paths]
}
)

# Get speech segments from the chunk
speech_segments = chunk.get("speech_segments", [])
if not speech_segments:
return JSONResponse(
status_code=400,
content={"error": "No speech segments found for this conversation"}
)

# Generate output path for cropped audio
cropped_filename = f"cropped_{audio_uuid}.wav"
output_path = Path("/app/audio_chunks") / cropped_filename

# Get repository for database updates
chunk_repo = AudioChunksRepository(chunks_col)

# Reprocess the audio cropping
try:
result = await _process_audio_cropping_with_relative_timestamps(
str(full_audio_path),
speech_segments,
str(output_path),
audio_uuid,
chunk_repo
)

if result:
audio_logger.info(f"Successfully reprocessed audio cropping for {audio_uuid}")
return JSONResponse(
content={"message": f"Audio cropping reprocessed for {audio_uuid}"}
)
else:
audio_logger.error(f"Failed to reprocess audio cropping for {audio_uuid}")
return JSONResponse(
status_code=500, content={"error": "Failed to reprocess audio cropping"}
)

except (OSError, IOError) as processing_error:
# File I/O errors during audio cropping
audio_logger.exception("File I/O error during audio cropping reprocessing")
return JSONResponse(
status_code=500,
content={"error": f"Audio processing failed: {str(processing_error)}"},
)
except Exception as processing_error:
# Unexpected errors during cropping operation
audio_logger.exception("Unexpected error during audio cropping reprocessing")
return JSONResponse(
status_code=500,
content={"error": f"Audio processing failed: {str(processing_error)}"},
)

except Exception as e:
# Database or unexpected errors in reprocessing handler
audio_logger.exception("Error reprocessing audio cropping")
return JSONResponse(status_code=500, content={"error": "Error reprocessing audio cropping"})
Loading
Loading