diff --git a/backends/advanced/src/advanced_omi_backend/__init__.py b/backends/advanced/src/advanced_omi_backend/__init__.py index 30235e86..8eb09ac9 100644 --- a/backends/advanced/src/advanced_omi_backend/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/__init__.py @@ -2,6 +2,4 @@ __version__ = "0.1.0" -from .database import AudioChunksRepository - -__all__ = ["AudioChunksRepository"] +__all__ = [] diff --git a/backends/advanced/src/advanced_omi_backend/app_config.py b/backends/advanced/src/advanced_omi_backend/app_config.py index 36a6ae02..4caa70c5 100644 --- a/backends/advanced/src/advanced_omi_backend/app_config.py +++ b/backends/advanced/src/advanced_omi_backend/app_config.py @@ -30,7 +30,6 @@ def __init__(self): self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017") self.mongo_client = AsyncIOMotorClient(self.mongodb_uri) self.db = self.mongo_client.get_default_database("friend-lite") - self.chunks_col = self.db["audio_chunks"] self.users_col = self.db["users"] self.speakers_col = self.db["speakers"] @@ -104,7 +103,6 @@ def get_audio_chunk_dir() -> Path: def get_mongo_collections(): """Get MongoDB collections.""" return { - 'chunks': app_config.chunks_col, 'users': app_config.users_col, 'speakers': app_config.speakers_col, } diff --git a/backends/advanced/src/advanced_omi_backend/client.py b/backends/advanced/src/advanced_omi_backend/client.py index 30b3cc62..0cf6a1e2 100644 --- a/backends/advanced/src/advanced_omi_backend/client.py +++ b/backends/advanced/src/advanced_omi_backend/client.py @@ -12,7 +12,6 @@ from pathlib import Path from typing import Dict, List, Optional, Tuple -from advanced_omi_backend.database import AudioChunksRepository from advanced_omi_backend.task_manager import get_task_manager from wyoming.audio import AudioChunk @@ -29,14 +28,12 @@ class ClientState: def __init__( self, client_id: str, - ac_db_collection_helper: AudioChunksRepository, chunk_dir: Path, user_id: str, user_email: Optional[str] = None, ): self.client_id = client_id self.connected = True - self.db_helper = ac_db_collection_helper self.chunk_dir = chunk_dir # Store user data for memory processing diff --git a/backends/advanced/src/advanced_omi_backend/client_manager.py b/backends/advanced/src/advanced_omi_backend/client_manager.py index b48cd51c..7a74f036 100644 --- a/backends/advanced/src/advanced_omi_backend/client_manager.py +++ b/backends/advanced/src/advanced_omi_backend/client_manager.py @@ -131,7 +131,7 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, from advanced_omi_backend.client import ClientState # Create client state - client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email) + client_state = ClientState(client_id, chunk_dir, user_id, user_email) # Atomically add to internal storage and register mapping self._active_clients[client_id] = client_state diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 51d0a2a1..483958a6 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -4,7 +4,7 @@ Handles audio file uploads and processes them directly. Simplified to write files immediately and enqueue transcription. -Also includes audio cropping operations that work with the audio_chunks collection. +Also includes audio cropping operations that work with the Conversation model. """ import logging @@ -23,8 +23,7 @@ from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.models.user import User from advanced_omi_backend.models.conversation import create_conversation -from advanced_omi_backend.database import AudioChunksRepository, chunks_col -from advanced_omi_backend.client_manager import client_belongs_to_user +from advanced_omi_backend.models.conversation import Conversation logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -115,6 +114,8 @@ async def upload_and_process_audio_files( title=title, summary="Processing uploaded audio file..." ) + # Set audio_path so the frontend can play the audio + conversation.audio_path = wav_filename await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID @@ -194,32 +195,32 @@ async def upload_and_process_audio_files( async def get_cropped_audio_info(audio_uuid: str, user: User): """ - Get audio cropping metadata from the audio_chunks collection. + Get audio cropping metadata from the conversation. This is an audio service operation that retrieves cropping-related metadata such as speech segments, cropped audio path, and cropping timestamps. Used for: Checking cropping status and retrieving audio processing details. - Works with: audio_chunks collection (audio service operations). + Works with: Conversation model. """ try: - # Find the audio chunk - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: + # Find the conversation + conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) + if not conversation: return JSONResponse(status_code=404, content={"error": "Conversation not found"}) # Check ownership for non-admin users if not user.is_superuser: - if not client_belongs_to_user(chunk["client_id"], user.user_id): + if conversation.user_id != str(user.user_id): return JSONResponse(status_code=404, content={"error": "Conversation not found"}) return { "audio_uuid": audio_uuid, - "cropped_audio_path": chunk.get("cropped_audio_path"), - "speech_segments": chunk.get("speech_segments", []), - "cropped_duration": chunk.get("cropped_duration"), - "cropped_at": chunk.get("cropped_at"), - "original_audio_path": chunk.get("audio_path"), + "cropped_audio_path": conversation.cropped_audio_path, + "speech_segments": conversation.speech_segments if hasattr(conversation, 'speech_segments') else [], + "cropped_duration": conversation.cropped_duration if hasattr(conversation, 'cropped_duration') else None, + "cropped_at": conversation.cropped_at if hasattr(conversation, 'cropped_at') else None, + "original_audio_path": conversation.audio_path, } except Exception as e: @@ -236,20 +237,20 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User): to extract only speech segments from the full audio file. Used for: Re-processing audio when cropping failed or needs updating. - Works with: audio_chunks collection and audio_utils cropping functions. + Works with: Conversation model and audio_utils cropping functions. """ try: - # Find the audio chunk - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: + # Find the conversation + conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) + if not conversation: return JSONResponse(status_code=404, content={"error": "Conversation not found"}) # Check ownership for non-admin users if not user.is_superuser: - if not client_belongs_to_user(chunk["client_id"], user.user_id): + if conversation.user_id != str(user.user_id): return JSONResponse(status_code=404, content={"error": "Conversation not found"}) - audio_path = chunk.get("audio_path") + audio_path = conversation.audio_path if not audio_path: return JSONResponse( status_code=400, content={"error": "No audio file found for this conversation"} @@ -277,8 +278,8 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User): } ) - # Get speech segments from the chunk - speech_segments = chunk.get("speech_segments", []) + # Get speech segments from the conversation + speech_segments = conversation.speech_segments if hasattr(conversation, 'speech_segments') else [] if not speech_segments: return JSONResponse( status_code=400, @@ -289,20 +290,21 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User): cropped_filename = f"cropped_{audio_uuid}.wav" output_path = Path("/app/audio_chunks") / cropped_filename - # Get repository for database updates - chunk_repo = AudioChunksRepository(chunks_col) - - # Reprocess the audio cropping + # Reprocess the audio cropping (simplified without repository) try: - result = await _process_audio_cropping_with_relative_timestamps( + from advanced_omi_backend.utils.audio_utils import extract_speech_segments + + success = await extract_speech_segments( str(full_audio_path), speech_segments, - str(output_path), - audio_uuid, - chunk_repo + str(output_path) ) - if result: + if success: + # Update conversation with cropped audio path + conversation.cropped_audio_path = cropped_filename + await conversation.save() + audio_logger.info(f"Successfully reprocessed audio cropping for {audio_uuid}") return JSONResponse( content={"message": f"Audio cropping reprocessed for {audio_uuid}"} diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 86e00ad3..0a82efdf 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -11,7 +11,7 @@ ClientManager, client_belongs_to_user, ) -from advanced_omi_backend.database import chunks_col +from advanced_omi_backend.models.audio_file import AudioFile from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.users import User from fastapi.responses import JSONResponse @@ -174,32 +174,27 @@ async def delete_conversation(audio_uuid: str, user: User): # Detailed debugging only when debug level is enabled if logger.isEnabledFor(logging.DEBUG): - total_count = await chunks_col.count_documents({}) - logger.debug(f"Total conversations in collection: {total_count}") + total_count = await AudioFile.count() + logger.debug(f"Total audio files in collection: {total_count}") logger.debug(f"UUID length: {len(audio_uuid)}, type: {type(audio_uuid)}") - # First, get the audio chunk record to check ownership and get conversation_id - audio_chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) + # First, get the audio file record to check ownership and get conversation_id + audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid) if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Audio chunk lookup result: {'found' if audio_chunk else 'not found'}") - if audio_chunk: - logger.debug(f"Found audio chunk with client_id: {audio_chunk.get('client_id')}") - logger.debug(f"Audio chunk has conversation_id: {audio_chunk.get('conversation_id')}") - else: - # Try alternative queries for debugging - regex_result = await chunks_col.find_one({"audio_uuid": {"$regex": f"^{audio_uuid}$", "$options": "i"}}) - contains_result = await chunks_col.find_one({"audio_uuid": {"$regex": audio_uuid}}) - logger.debug(f"Alternative query attempts - case insensitive: {'found' if regex_result else 'not found'}, substring: {'found' if contains_result else 'not found'}") - - if not audio_chunk: + logger.debug(f"Audio file lookup result: {'found' if audio_file else 'not found'}") + if audio_file: + logger.debug(f"Found audio file with client_id: {audio_file.client_id}") + logger.debug(f"Audio file has conversation_id: {audio_file.conversation_id}") + + if not audio_file: return JSONResponse( status_code=404, - content={"error": f"Audio chunk with audio_uuid '{audio_uuid}' not found"} + content={"error": f"Audio file with audio_uuid '{audio_uuid}' not found"} ) # Check if user has permission to delete this conversation - client_id = audio_chunk.get("client_id") + client_id = audio_file.client_id if not user.is_superuser and not client_belongs_to_user(client_id, user.user_id): logger.warning( f"User {user.user_id} attempted to delete conversation {audio_uuid} without permission" @@ -213,25 +208,18 @@ async def delete_conversation(audio_uuid: str, user: User): ) # Get audio file paths for deletion - audio_path = audio_chunk.get("audio_path") - cropped_audio_path = audio_chunk.get("cropped_audio_path") + audio_path = audio_file.audio_path + cropped_audio_path = audio_file.cropped_audio_path - # Get conversation_id if this audio chunk has an associated conversation - conversation_id = audio_chunk.get("conversation_id") + # Get conversation_id if this audio file has an associated conversation + conversation_id = audio_file.conversation_id conversation_deleted = False - # Delete from audio_chunks collection first - audio_result = await chunks_col.delete_one({"audio_uuid": audio_uuid}) - - if audio_result.deleted_count == 0: - return JSONResponse( - status_code=404, - content={"error": f"Failed to delete audio chunk with audio_uuid '{audio_uuid}'"} - ) - - logger.info(f"Deleted audio chunk {audio_uuid}") + # Delete the audio file from database first + await audio_file.delete() + logger.info(f"Deleted audio file {audio_uuid}") - # If this audio chunk has an associated conversation, delete it using Beanie + # If this audio file has an associated conversation, delete it using Beanie if conversation_id: try: conversation_model = await Conversation.find_one(Conversation.conversation_id == conversation_id) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 3a4e5163..19852e68 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -13,11 +13,6 @@ save_diarization_settings_to_file, ) from advanced_omi_backend.models.user import User -# TODO: Remove old processor architecture -# from advanced_omi_backend.processors import get_processor_manager -def get_processor_manager(): - """Stub - processors being removed.""" - return None from advanced_omi_backend.task_manager import get_task_manager from fastapi.responses import JSONResponse @@ -61,52 +56,23 @@ async def get_auth_config(): async def get_all_processing_tasks(): - """Get all active processing tasks.""" - try: - processor_manager = get_processor_manager() - return processor_manager.get_all_processing_status() - except Exception as e: - logger.error(f"Error getting processing tasks: {e}") - return JSONResponse( - status_code=500, content={"error": f"Failed to get processing tasks: {str(e)}"} - ) + """Get all active processing tasks. + + NOTE: This function is deprecated - old processor architecture has been removed. + Kept for backward compatibility but always returns empty list. + """ + logger.warning("get_all_processing_tasks called - deprecated function") + return [] async def get_processing_task_status(client_id: str): - """Get processing task status for a specific client.""" - try: - processor_manager = get_processor_manager() - processing_status = processor_manager.get_processing_status(client_id) - - # Check if transcription is marked as started but not completed, and verify with database - stages = processing_status.get("stages", {}) - transcription_stage = stages.get("transcription", {}) - - """This is a hack to update it the DB INCASE a process failed - if transcription_stage.get("status") == "started" and not transcription_stage.get("completed", False): - # Check if transcription is actually complete by checking the database - try: - chunk = await chunks_col.find_one({"client_id": client_id}) - if chunk and chunk.get("transcript") and len(chunk.get("transcript", [])) > 0: - # Transcription is complete! Update the processor state - processor_manager.track_processing_stage( - client_id, - "transcription", - "completed", - {"audio_uuid": chunk.get("audio_uuid"), "segments": len(chunk.get("transcript", []))} - ) - logger.info(f"Detected transcription completion for client {client_id} ({len(chunk.get('transcript', []))} segments)") - # Get updated status - processing_status = processor_manager.get_processing_status(client_id) - except Exception as e: - logger.debug(f"Error checking transcription completion: {e}") - """ - return processing_status - except Exception as e: - logger.error(f"Error getting processing task status for {client_id}: {e}") - return JSONResponse( - status_code=500, content={"error": f"Failed to get processing task status: {str(e)}"} - ) + """Get processing task status for a specific client. + + NOTE: This function is deprecated - old processor architecture has been removed. + Kept for backward compatibility but always returns None. + """ + logger.warning(f"get_processing_task_status called for {client_id} - deprecated function") + return None async def get_processor_status(): diff --git a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py index dfb4b752..224695d0 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py @@ -15,8 +15,9 @@ UserManager, ) from advanced_omi_backend.client_manager import get_user_clients_all -from advanced_omi_backend.database import chunks_col, db, users_col +from advanced_omi_backend.database import db, users_col from advanced_omi_backend.memory import get_memory_service +from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.users import User, UserCreate, UserUpdate logger = logging.getLogger(__name__) @@ -174,8 +175,8 @@ async def delete_user( deleted_data["user_deleted"] = user_result.deleted_count > 0 if delete_conversations: - # Delete all conversations (audio chunks) for this user - conversations_result = await chunks_col.delete_many({"client_id": user_id}) + # Delete all conversations for this user + conversations_result = await Conversation.find(Conversation.user_id == user_id).delete() deleted_data["conversations_deleted"] = conversations_result.deleted_count if delete_memories: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 98e96734..fd3ec304 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -93,27 +93,16 @@ async def parse_wyoming_protocol(ws: WebSocket) -> tuple[dict, Optional[bytes]]: async def create_client_state(client_id: str, user, device_name: Optional[str] = None): """Create and register a new client state.""" - # Get client manager and repository + # Get client manager client_manager = get_client_manager() - from advanced_omi_backend.database import AudioChunksRepository - from motor.motor_asyncio import AsyncIOMotorClient - - # MongoDB Configuration - MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") - mongo_client = AsyncIOMotorClient(MONGODB_URI) - db = mongo_client.get_default_database("friend-lite") - chunks_col = db["audio_chunks"] - - # Initialize repository - ac_repository = AudioChunksRepository(chunks_col) - + # Directory where WAV chunks are written from pathlib import Path CHUNK_DIR = Path("./audio_chunks") # This will be mounted to ./data/audio_chunks by Docker - + # Use ClientManager for atomic client creation and registration client_state = client_manager.create_client( - client_id, ac_repository, CHUNK_DIR, user.user_id, user.email + client_id, CHUNK_DIR, user.user_id, user.email ) # Also track in persistent mapping (for database queries) diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index 36d17ebb..cca103ea 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -7,10 +7,6 @@ import logging import os -import time -from datetime import UTC, datetime -from typing import Optional -import uuid from motor.motor_asyncio import AsyncIOMotorClient @@ -30,9 +26,8 @@ # Collection references (for non-Beanie collections) users_col = db["users"] -chunks_col = db["audio_chunks"] # Still used by AudioChunksRepository -# Note: conversations collection managed by Beanie +# Note: conversations collection managed by Beanie (Document model) # Note: processing_runs replaced by RQ job tracking # Beanie initialization happens in main.py during application startup @@ -46,506 +41,6 @@ def get_collections(): """Get commonly used collection references.""" return { "users_col": users_col, - "chunks_col": chunks_col, } -class AudioChunksRepository: - """Async helpers for the audio_chunks collection.""" - - def __init__(self, collection): - self.col = collection - - async def create_chunk( - self, - *, - audio_uuid, - audio_path, - client_id, - timestamp, - user_id=None, - user_email=None, - transcript=None, - memories=None, - transcription_status="PENDING", - memory_processing_status="PENDING", - ): - # Create initial transcript version if provided - transcript_versions = [] - active_transcript_version = None - - if transcript: - version_id = str(uuid.uuid4()) - transcript_versions.append({ - "version_id": version_id, - "segments": transcript, - "status": transcription_status, - "provider": None, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - }) - active_transcript_version = version_id - - # Create initial memory version if provided - memory_versions = [] - active_memory_version = None - - if memories: - version_id = str(uuid.uuid4()) - memory_versions.append({ - "version_id": version_id, - "memories": memories, - "status": memory_processing_status, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "transcript_version_id": active_transcript_version - }) - active_memory_version = version_id - - doc = { - "audio_uuid": audio_uuid, - "audio_path": audio_path, - "client_id": client_id, - "timestamp": timestamp, - "user_id": user_id, - "user_email": user_email, - - # Versioned transcript data - "transcript_versions": transcript_versions, - "active_transcript_version": active_transcript_version, - - # Versioned memory data - "memory_versions": memory_versions, - "active_memory_version": active_memory_version, - - # Compatibility fields (computed from active versions) - "transcript": transcript or [], - "memories": memories or [], - "transcription_status": transcription_status, - "memory_processing_status": memory_processing_status, - "raw_transcript_data": {} - } - await self.col.insert_one(doc) - - async def add_transcript_segment(self, audio_uuid, transcript_segment): - """Add a single transcript segment to the conversation. - - Interface compatibility method - adds to active transcript version. - Creates first transcript version if none exists. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_transcript_version") - if not active_version: - # Create initial version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "segments": [transcript_segment], - "status": "PENDING", - "provider": None, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - } - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"transcript_versions": version_data}, - "$set": { - "active_transcript_version": version_id, - # Update compatibility field too - "transcript": [transcript_segment] - } - } - ) - else: - # Add to existing active version - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": { - f"transcript_versions.$[version].segments": transcript_segment, - # Update compatibility field too - "transcript": transcript_segment - } - }, - array_filters=[{"version.version_id": active_version}] - ) - - return result.modified_count > 0 - - - async def store_raw_transcript_data(self, audio_uuid, raw_data, provider): - """Store raw transcript data from transcription provider.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - "raw_transcript_data": { - "provider": provider, - "data": raw_data, - "stored_at": datetime.now(UTC).isoformat(), - } - } - }, - ) - - async def get_chunk(self, audio_uuid): - """Get a chunk by audio_uuid.""" - return await self.col.find_one({"audio_uuid": audio_uuid}) - - async def add_memory_reference(self, audio_uuid: str, memory_id: str, status: str = "created"): - """Add memory reference to audio chunk.""" - memory_ref = { - "memory_id": memory_id, - "created_at": datetime.now(UTC).isoformat(), - "status": status, - "updated_at": datetime.now(UTC).isoformat(), - } - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$push": {"memories": memory_ref}} - ) - if result.modified_count > 0: - logger.info(f"Added memory reference {memory_id} to audio {audio_uuid}") - return result.modified_count > 0 - - async def update_memory_status(self, audio_uuid: str, memory_id: str, status: str): - """Update memory status in audio chunk.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid, "memories.memory_id": memory_id}, - { - "$set": { - "memories.$.status": status, - "memories.$.updated_at": datetime.now(UTC).isoformat(), - } - }, - ) - if result.modified_count > 0: - logger.info(f"Updated memory {memory_id} status to {status} for audio {audio_uuid}") - return result.modified_count > 0 - - async def remove_memory_reference(self, audio_uuid: str, memory_id: str): - """Remove memory reference from audio chunk.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$pull": {"memories": {"memory_id": memory_id}}} - ) - if result.modified_count > 0: - logger.info(f"Removed memory reference {memory_id} from audio {audio_uuid}") - return result.modified_count > 0 - - async def get_chunk_by_audio_uuid(self, audio_uuid: str): - """Get a chunk document by audio_uuid.""" - return await self.col.find_one({"audio_uuid": audio_uuid}) - - async def get_transcript_segments(self, audio_uuid: str): - """Get transcript segments for a specific audio UUID from active version.""" - document = await self.col.find_one( - {"audio_uuid": audio_uuid}, - {"transcript_versions": 1, "active_transcript_version": 1, "transcript": 1} - ) - - if not document: - return [] - - # Try to get from active version first (new versioned approach) - active_version_id = document.get("active_transcript_version") - if active_version_id and "transcript_versions" in document: - for version in document["transcript_versions"]: - if version.get("version_id") == active_version_id: - return version.get("segments", []) - - # Fallback to legacy transcript field for backward compatibility - if "transcript" in document: - return document["transcript"] - - return [] - - async def update_transcript(self, audio_uuid, full_transcript): - """Update the entire transcript list (for compatibility).""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$set": {"transcript": full_transcript}} - ) - - async def update_segment_timing(self, audio_uuid, segment_index, start_time, end_time): - """Update timing information for a specific transcript segment.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - f"transcript.{segment_index}.start": start_time, - f"transcript.{segment_index}.end": end_time, - } - }, - ) - - async def update_segment_speaker(self, audio_uuid, segment_index, speaker_id): - """Update the speaker for a specific transcript segment.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": {f"transcript.{segment_index}.speaker": speaker_id}}, - ) - if result.modified_count > 0: - logger.info(f"Updated segment {segment_index} speaker to {speaker_id} for {audio_uuid}") - return result.modified_count > 0 - - async def update_cropped_audio( - self, - audio_uuid: str, - cropped_path: str, - speech_segments: list[tuple[float, float]], - ): - """Update the chunk with cropped audio information.""" - cropped_duration = sum(end - start for start, end in speech_segments) - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - "cropped_audio_path": cropped_path, - "speech_segments": [ - {"start": start, "end": end} for start, end in speech_segments - ], - "cropped_duration": cropped_duration, - "cropped_at": datetime.now(UTC), - } - }, - ) - if result.modified_count > 0: - logger.info(f"Updated cropped audio info for {audio_uuid}: {cropped_path}") - return result.modified_count > 0 - - - async def update_memory_processing_status( - self, audio_uuid: str, status: str, error_message: str = None - ): - """Update memory processing status and completion timestamp. - - Interface compatibility method - updates active memory version. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_memory_version") - if not active_version: - # Create initial memory version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "memories": [], - "status": status, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "transcript_version_id": chunk.get("active_transcript_version") - } - if error_message: - version_data["error_message"] = error_message - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"memory_versions": version_data}, - "$set": { - "active_memory_version": version_id, - "memory_processing_status": status, - "memory_processing_updated_at": datetime.now(UTC).isoformat(), - } - } - ) - else: - # Update existing active version - update_doc = { - f"memory_versions.$[version].status": status, - f"memory_versions.$[version].updated_at": datetime.now(UTC), - "memory_processing_status": status, - "memory_processing_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["memory_processing_completed_at"] = datetime.now(UTC).isoformat() - if error_message: - update_doc[f"memory_versions.$[version].error_message"] = error_message - update_doc["memory_processing_error"] = error_message - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc}, - array_filters=[{"version.version_id": active_version}] - ) - - if result.modified_count > 0: - logger.info(f"Updated memory processing status to {status} for {audio_uuid}") - return result.modified_count > 0 - - async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None - ): - """Update transcription processing status and completion timestamp. - - Interface compatibility method - updates active transcript version. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_transcript_version") - if not active_version: - # Create initial transcript version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "transcript": "", - "segments": [], - "status": status, - "provider": provider, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - } - if error_message: - version_data["error_message"] = error_message - - update_doc = { - "active_transcript_version": version_id, - "transcription_status": status, - "transcription_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"transcript_versions": version_data}, - "$set": update_doc - } - ) - else: - # Update existing active version - update_doc = { - "transcript_versions.$[version].status": status, - "transcript_versions.$[version].updated_at": datetime.now(UTC).isoformat(), - "transcription_status": status, - "transcription_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if error_message: - update_doc["transcript_versions.$[version].error_message"] = error_message - update_doc["transcription_error"] = error_message - if provider: - update_doc["transcript_versions.$[version].provider"] = provider - update_doc["transcript_provider"] = provider - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc}, - array_filters=[{"version.version_id": active_version}] - ) - - if result.modified_count > 0: - logger.info(f"Updated transcription status to {status} for {audio_uuid}") - return result.modified_count > 0 - - # ======================================== - # SPEECH-DRIVEN CONVERSATIONS METHODS - # ======================================== - - async def add_audio_file_path(self, audio_uuid: str, file_path: str): - """Add new audio file path to existing session.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"audio_file_paths": file_path}, - "$set": {"updated_at": datetime.now(UTC).isoformat()} - } - ) - if result.modified_count > 0: - logger.info(f"Added audio file path {file_path} to session {audio_uuid}") - return result.modified_count > 0 - - async def update_speech_detection(self, audio_uuid: str, **speech_data): - """Update speech detection results.""" - update_doc = { - "updated_at": datetime.now(UTC).isoformat() - } - - # Add speech detection fields - for key, value in speech_data.items(): - if key in ["has_speech", "conversation_created", "conversation_id", - "speech_start_time", "speech_end_time", "status"]: - update_doc[key] = value - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc} - ) - if result.modified_count > 0: - logger.info(f"Updated speech detection for {audio_uuid}: {speech_data}") - return result.modified_count > 0 - - async def mark_conversation_created(self, audio_uuid: str, conversation_id: str): - """Mark that conversation was created for this audio.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": { - "conversation_created": True, - "conversation_id": conversation_id, - "has_speech": True, - "status": "speech_detected", - "updated_at": datetime.now(UTC).isoformat() - }} - ) - if result.modified_count > 0: - logger.info(f"Marked conversation created for {audio_uuid} with ID {conversation_id}") - return result.modified_count > 0 - - async def get_sessions_with_speech(self, user_id: str, limit: int = 100): - """Get audio sessions that have detected speech.""" - cursor = self.col.find({ - "user_id": user_id, - "has_speech": True, - "conversation_created": True - }).sort("timestamp", -1).limit(limit) - - return await cursor.to_list(length=None) - - async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: str = None, provider: str = None - ): - """Update transcription status and completion timestamp. - - Args: - audio_uuid: UUID of the audio chunk - status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY') - error_message: Optional error message if status is 'FAILED' - provider: Optional provider name for successful transcriptions - """ - update_doc = { - "transcription_status": status, - "updated_at": datetime.now(UTC).isoformat() - } - - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if provider: - update_doc["transcription_provider"] = provider - elif status == "FAILED" and error_message: - update_doc["transcription_error"] = error_message - elif status == "EMPTY": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if provider: - update_doc["transcription_provider"] = provider - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$set": update_doc} - ) - return result.modified_count > 0 - - -# ConversationsRepository removed - use Beanie Conversation model directly -# ProcessingRunsRepository removed - use RQ job tracking instead diff --git a/backends/advanced/src/advanced_omi_backend/memory/compat_service.py b/backends/advanced/src/advanced_omi_backend/memory/compat_service.py index e3cb9827..3814f29e 100644 --- a/backends/advanced/src/advanced_omi_backend/memory/compat_service.py +++ b/backends/advanced/src/advanced_omi_backend/memory/compat_service.py @@ -281,13 +281,13 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> # Get memories first memories = await self.get_all_memories(user_id, limit) - # Import database connection + # Import Conversation model try: - from advanced_omi_backend.database import chunks_col + from advanced_omi_backend.models.conversation import Conversation except ImportError: - memory_logger.error("Cannot import database connection") + memory_logger.error("Cannot import Conversation model") return memories # Return memories without transcript enrichment - + # Extract source IDs for bulk query source_ids = [] for memory in memories: @@ -295,12 +295,15 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> source_id = metadata.get("source_id") or metadata.get("audio_uuid") # Backward compatibility if source_id: source_ids.append(source_id) - - # Bulk query for chunks (support both old audio_uuid and new source_id) - chunks_cursor = chunks_col.find({"audio_uuid": {"$in": source_ids}}) - chunks_by_id = {} - async for chunk in chunks_cursor: - chunks_by_id[chunk["audio_uuid"]] = chunk + + # Bulk query for conversations (support both old audio_uuid and new source_id) + conversations_list = await Conversation.find( + Conversation.audio_uuid.in_(source_ids) + ).to_list() + + conversations_by_id = {} + for conv in conversations_list: + conversations_by_id[conv.audio_uuid] = conv enriched_memories = [] @@ -328,15 +331,15 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> enriched_memory["client_id"] = metadata.get("client_id") enriched_memory["user_email"] = metadata.get("user_email") - # Get transcript from bulk-loaded chunks - chunk = chunks_by_id.get(source_id) - if chunk: - transcript_segments = chunk.get("transcript", []) + # Get transcript from bulk-loaded conversations + conversation = conversations_by_id.get(source_id) + if conversation: + transcript_segments = conversation.segments if transcript_segments: full_transcript = " ".join( - segment.get("text", "") + segment.text for segment in transcript_segments - if isinstance(segment, dict) and segment.get("text") + if segment.text ) if full_transcript.strip(): diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index cba23c41..2415337d 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -70,7 +70,7 @@ class MemoryVersion(BaseModel): # Core identifiers conversation_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique conversation identifier") - audio_uuid: Indexed(str) = Field(description="Link to audio_chunks collection") + audio_uuid: Indexed(str) = Field(description="Session/audio identifier (for tracking audio files)") user_id: Indexed(str) = Field(description="User who owns this conversation") client_id: Indexed(str) = Field(description="Client device identifier") @@ -304,7 +304,7 @@ def create_conversation( Factory function to create a new conversation. Args: - audio_uuid: Link to audio_chunks collection + audio_uuid: Unique identifier for the audio session user_id: User who owns this conversation client_id: Client device identifier conversation_id: Optional unique conversation identifier (auto-generated if not provided) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 2eb67607..4c0f756b 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -1,14 +1,17 @@ """ -Audio file upload routes. +Audio file upload and serving routes. -Handles audio file uploads and processing job management. +Handles audio file uploads, processing job management, and audio file serving. """ -from fastapi import APIRouter, Depends, File, Query, UploadFile +from pathlib import Path +from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile +from fastapi.responses import FileResponse -from advanced_omi_backend.auth import current_superuser +from advanced_omi_backend.auth import current_superuser, current_active_user from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User +from advanced_omi_backend.app_config import get_audio_chunk_dir router = APIRouter(prefix="/audio", tags=["audio"]) @@ -33,3 +36,58 @@ async def upload_audio_files( return await audio_controller.upload_and_process_audio_files( current_user, files, device_name, auto_generate_client ) + + +@router.get("/{filename}") +async def serve_audio_file( + filename: str, + current_user: User = Depends(current_active_user), +): + """ + Serve audio files for playback. + + This endpoint allows authenticated users to access audio files stored on disk. + Users can only access their own audio files (ownership verified via conversation lookup). + + Args: + filename: Name of the audio file (e.g., "audio_uuid.wav" or "cropped_audio_uuid.wav") + current_user: Authenticated user + + Returns: + FileResponse with the audio file + + Raises: + 404: If file not found + 403: If user doesn't own the conversation associated with this audio + """ + # Get audio chunk directory + audio_dir = get_audio_chunk_dir() + file_path = audio_dir / filename + + # Check if file exists + if not file_path.exists() or not file_path.is_file(): + raise HTTPException(status_code=404, detail="Audio file not found") + + # Security check: Verify user owns the conversation associated with this audio + # Extract audio_uuid from filename (handle both "uuid.wav" and "cropped_uuid.wav") + from advanced_omi_backend.models.conversation import Conversation + + # Remove "cropped_" prefix if present + audio_uuid = filename.replace("cropped_", "").replace(".wav", "") + + # Find conversation by audio_uuid + conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found for this audio file") + + # Check ownership (admins can access all files) + if not current_user.is_superuser and conversation.user_id != str(current_user.user_id): + raise HTTPException(status_code=403, detail="Access denied") + + # Serve the file + return FileResponse( + path=str(file_path), + media_type="audio/wav", + filename=filename + ) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index ac426ee8..54e40d36 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -10,10 +10,6 @@ from fastapi import APIRouter, Depends, Query from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.client_manager import ( - ClientManager, - get_client_manager_dependency, -) from advanced_omi_backend.controllers import conversation_controller, audio_controller from advanced_omi_backend.users import User @@ -26,12 +22,9 @@ async def close_current_conversation( client_id: str, current_user: User = Depends(current_active_user), - client_manager: ClientManager = Depends(get_client_manager_dependency), ): - """Close the current conversation for a specific client. Users can only close their own conversations.""" - return await conversation_controller.close_current_conversation( - client_id, current_user, client_manager - ) + """Close the current active conversation for a client. Works for both connected and disconnected clients.""" + return await conversation_controller.close_current_conversation(client_id, current_user) @router.get("") @@ -113,9 +106,9 @@ async def get_conversation_version_history( return await conversation_controller.get_conversation_version_history(conversation_id, current_user) -@router.delete("/{audio_uuid}") +@router.delete("/{conversation_id}") async def delete_conversation( - audio_uuid: str, current_user: User = Depends(current_active_user) + conversation_id: str, current_user: User = Depends(current_active_user) ): """Delete a conversation and its associated audio file. Users can only delete their own conversations.""" - return await conversation_controller.delete_conversation(audio_uuid, current_user) + return await conversation_controller.delete_conversation(conversation_id, current_user) diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py index 2a4aeaf9..20d8de7c 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py @@ -16,7 +16,6 @@ if TYPE_CHECKING: from advanced_omi_backend.client import ClientState - from advanced_omi_backend.database import AudioChunksRepository logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -264,13 +263,15 @@ async def _process_audio_cropping_with_relative_timestamps( speech_segments: list[tuple[float, float]], output_path: str, audio_uuid: str, - chunk_repo: Optional['AudioChunksRepository'] = None, + _deprecated_chunk_repo=None, # Deprecated - kept for backward compatibility ) -> bool: """ Process audio cropping with speech segments already in relative format. The segments are expected to be in relative format (seconds from audio start), as provided by Deepgram transcription. No timestamp conversion is needed. + + Note: Database updates are now handled by the caller (audio_jobs.py). """ try: # Validate input segments @@ -310,10 +311,7 @@ async def _process_audio_cropping_with_relative_timestamps( success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) if success: - # Update database with cropped file info cropped_filename = output_path.split("/")[-1] - if chunk_repo is not None: - await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) logger.info(f"Successfully processed cropped audio: {cropped_filename}") return True else: diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 1c7b227a..e9742a85 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -25,6 +25,7 @@ async def process_cropping_job( conversation_id: str, audio_path: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -34,7 +35,7 @@ async def process_cropping_job( 1. Reads transcript segments from conversation 2. Extracts speech timestamps 3. Creates cropped audio file with only speech segments - 4. Updates audio_chunks collection with cropped file path + 4. Updates conversation with cropped file path Args: conversation_id: Conversation ID @@ -46,7 +47,6 @@ async def process_cropping_job( """ from pathlib import Path from advanced_omi_backend.utils.audio_utils import _process_audio_cropping_with_relative_timestamps - from advanced_omi_backend.database import get_collections, AudioChunksRepository from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.config import CHUNK_DIR @@ -78,17 +78,13 @@ async def process_cropping_job( cropped_filename = f"cropped_{original_path.name}" output_path = CHUNK_DIR / cropped_filename - # Get repository for database updates - collections = get_collections() - repository = AudioChunksRepository(collections["chunks_col"]) - - # Process cropping + # Process cropping (no repository needed - we update conversation directly) success = await _process_audio_cropping_with_relative_timestamps( str(original_path), speech_segments, str(output_path), audio_uuid, - repository + None # No repository - we update conversation model directly ) if not success: @@ -140,6 +136,7 @@ async def audio_streaming_persistence_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 1d3400c3..58288b4a 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -21,6 +21,7 @@ async def open_conversation_job( client_id: str, speech_detected_at: float, speech_job_id: str = None, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -403,6 +404,7 @@ async def open_conversation_job( @async_job(redis=True, beanie=True) async def generate_title_summary_job( conversation_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ diff --git a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py index c1a6dfc0..91518047 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py @@ -21,6 +21,7 @@ @async_job(redis=True, beanie=True) async def process_memory_job( conversation_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -208,10 +209,7 @@ def enqueue_memory_processing( job = memory_queue.enqueue( process_memory_job, - client_id, - user_id, - user_email, - conversation_id, + conversation_id, # Only argument needed - job fetches conversation data internally job_timeout=timeout_mapping.get(priority, 1800), result_ttl=JOB_RESULT_TTL, job_id=f"memory_{conversation_id[:8]}", diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index 80434232..b9420d0a 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -20,6 +20,7 @@ async def check_enrolled_speakers_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -106,6 +107,7 @@ async def recognise_speakers_job( audio_path: str, transcript_text: str, words: list, + *, redis_client=None ) -> Dict[str, Any]: """ diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index e081786a..60f6cb05 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -120,6 +120,7 @@ async def transcribe_full_audio_job( audio_path: str, version_id: str, trigger: str = "reprocess", + *, redis_client=None ) -> Dict[str, Any]: """ @@ -335,6 +336,7 @@ async def stream_speech_detection_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 370117f1..cd783ae6 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -62,11 +62,11 @@ export default function Conversations() { // Transcript expand/collapse state const [expandedTranscripts, setExpandedTranscripts] = useState>(new Set()) // Audio playback state - const [playingSegment, setPlayingSegment] = useState(null) // Format: "audioUuid-segmentIndex" + const [playingSegment, setPlayingSegment] = useState(null) // Format: "conversationId-segmentIndex" const audioRefs = useRef<{ [key: string]: HTMLAudioElement }>({}) const segmentTimerRef = useRef(null) - // Reprocessing state + // Reprocessing state - keyed by conversation_id to avoid conflicts with shared audio_uuid const [openDropdown, setOpenDropdown] = useState(null) const [reprocessingTranscript, setReprocessingTranscript] = useState>(new Set()) const [reprocessingMemory, setReprocessingMemory] = useState>(new Set()) @@ -184,7 +184,7 @@ export default function Conversations() { } } - const handleDeleteConversation = async (audioUuid: string) => { + const handleDeleteConversation = async (conversationId: string | undefined, audioUuid: string) => { try { const confirmed = window.confirm('Are you sure you want to delete this conversation? This action cannot be undone.') if (!confirmed) return @@ -192,10 +192,13 @@ export default function Conversations() { setDeletingConversation(prev => new Set(prev).add(audioUuid)) setOpenDropdown(null) - const response = await conversationsApi.delete(audioUuid) + const response = await conversationsApi.delete(conversationId!) if (response.status === 200) { - // Refresh conversations to show updated data + // Immediately remove from UI for instant feedback + setConversations(prev => prev.filter(c => c.conversation_id !== conversationId)) + + // Then refresh to ensure consistency with server await loadConversations() } else { setError(`Failed to delete conversation: ${response.data?.error || 'Unknown error'}`) @@ -254,12 +257,12 @@ export default function Conversations() { } } - const handleSegmentPlayPause = (audioUuid: string, segmentIndex: number, segment: any, audioPath: string) => { - const segmentId = `${audioUuid}-${segmentIndex}`; - + const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, audioPath: string) => { + const segmentId = `${conversationId}-${segmentIndex}`; + // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[audioUuid]; + const audio = audioRefs.current[conversationId]; if (audio) { audio.pause(); } @@ -270,11 +273,11 @@ export default function Conversations() { setPlayingSegment(null); return; } - + // Stop any currently playing segment if (playingSegment) { - const [currentAudioUuid] = playingSegment.split('-'); - const currentAudio = audioRefs.current[currentAudioUuid]; + const [currentConversationId] = playingSegment.split('-'); + const currentAudio = audioRefs.current[currentConversationId]; if (currentAudio) { currentAudio.pause(); } @@ -283,24 +286,24 @@ export default function Conversations() { segmentTimerRef.current = null; } } - + // Get or create audio element for this conversation - let audio = audioRefs.current[audioUuid]; + let audio = audioRefs.current[conversationId]; if (!audio) { audio = new Audio(`${BACKEND_URL}/audio/${audioPath}`); - audioRefs.current[audioUuid] = audio; - + audioRefs.current[conversationId] = audio; + // Add event listener to handle when audio ends naturally audio.addEventListener('ended', () => { setPlayingSegment(null); }); } - + // Set the start time and play audio.currentTime = segment.start; audio.play().then(() => { setPlayingSegment(segmentId); - + // Set a timer to stop at the segment end time const duration = (segment.end - segment.start) * 1000; // Convert to milliseconds segmentTimerRef.current = window.setTimeout(() => { @@ -458,7 +461,8 @@ export default function Conversations() { {/* Dropdown Menu */} - {openDropdown === conversation.audio_uuid && ( + {openDropdown === (conversation.conversation_id || conversation.audio_uuid) && (
)} - +
{debugMode && ( diff --git a/backends/advanced/webui/src/pages/Queue.tsx b/backends/advanced/webui/src/pages/Queue.tsx index e5b27dca..ea783ed9 100644 --- a/backends/advanced/webui/src/pages/Queue.tsx +++ b/backends/advanced/webui/src/pages/Queue.tsx @@ -289,8 +289,14 @@ const Queue: React.FC = () => { console.log(`📂 Auto-expanded ${expandedJobsCount} job card(s) in active conversations`); setExpandedJobs(newExpandedJobs); } - } catch (error) { + } catch (error: any) { console.error('❌ Error fetching dashboard data:', error); + + // If it's a 401 error, stop auto-refresh to prevent repeated failed requests + if (error?.response?.status === 401) { + console.warn('🔐 Authentication error detected - disabling auto-refresh'); + setAutoRefreshEnabled(false); + } } finally { setLoading(false); setRefreshing(false);