diff --git a/backends/advanced/src/advanced_omi_backend/__init__.py b/backends/advanced/src/advanced_omi_backend/__init__.py index 30235e86..8eb09ac9 100644 --- a/backends/advanced/src/advanced_omi_backend/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/__init__.py @@ -2,6 +2,4 @@ __version__ = "0.1.0" -from .database import AudioChunksRepository - -__all__ = ["AudioChunksRepository"] +__all__ = [] diff --git a/backends/advanced/src/advanced_omi_backend/app_config.py b/backends/advanced/src/advanced_omi_backend/app_config.py index 36a6ae02..4caa70c5 100644 --- a/backends/advanced/src/advanced_omi_backend/app_config.py +++ b/backends/advanced/src/advanced_omi_backend/app_config.py @@ -30,7 +30,6 @@ def __init__(self): self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017") self.mongo_client = AsyncIOMotorClient(self.mongodb_uri) self.db = self.mongo_client.get_default_database("friend-lite") - self.chunks_col = self.db["audio_chunks"] self.users_col = self.db["users"] self.speakers_col = self.db["speakers"] @@ -104,7 +103,6 @@ def get_audio_chunk_dir() -> Path: def get_mongo_collections(): """Get MongoDB collections.""" return { - 'chunks': app_config.chunks_col, 'users': app_config.users_col, 'speakers': app_config.speakers_col, } diff --git a/backends/advanced/src/advanced_omi_backend/client.py b/backends/advanced/src/advanced_omi_backend/client.py index 30b3cc62..0cf6a1e2 100644 --- a/backends/advanced/src/advanced_omi_backend/client.py +++ b/backends/advanced/src/advanced_omi_backend/client.py @@ -12,7 +12,6 @@ from pathlib import Path from typing import Dict, List, Optional, Tuple -from advanced_omi_backend.database import AudioChunksRepository from advanced_omi_backend.task_manager import get_task_manager from wyoming.audio import AudioChunk @@ -29,14 +28,12 @@ class ClientState: def __init__( self, client_id: str, - ac_db_collection_helper: AudioChunksRepository, chunk_dir: Path, user_id: str, user_email: Optional[str] = None, ): self.client_id = client_id self.connected = True - self.db_helper = ac_db_collection_helper self.chunk_dir = chunk_dir # Store user data for memory processing diff --git a/backends/advanced/src/advanced_omi_backend/client_manager.py b/backends/advanced/src/advanced_omi_backend/client_manager.py index b48cd51c..5a3131b5 100644 --- a/backends/advanced/src/advanced_omi_backend/client_manager.py +++ b/backends/advanced/src/advanced_omi_backend/client_manager.py @@ -104,7 +104,7 @@ def get_client_count(self) -> int: """ return len(self._active_clients) - def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState": + def create_client(self, client_id: str, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState": """ Atomically create and register a new client. @@ -113,7 +113,6 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, Args: client_id: Unique client identifier - ac_repository: Audio chunks repository chunk_dir: Directory for audio chunks user_id: User ID who owns this client user_email: Optional user email @@ -131,7 +130,7 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, from advanced_omi_backend.client import ClientState # Create client state - client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email) + client_state = ClientState(client_id, chunk_dir, user_id, user_email) # Atomically add to internal storage and register mapping self._active_clients[client_id] = client_state diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 51d0a2a1..4ca72ca0 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -4,7 +4,7 @@ Handles audio file uploads and processes them directly. Simplified to write files immediately and enqueue transcription. -Also includes audio cropping operations that work with the audio_chunks collection. +Also includes audio cropping operations that work with the Conversation model. """ import logging @@ -18,13 +18,11 @@ from advanced_omi_backend.utils.audio_utils import ( AudioValidationError, write_audio_file, - _process_audio_cropping_with_relative_timestamps, ) from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.models.user import User from advanced_omi_backend.models.conversation import create_conversation -from advanced_omi_backend.database import AudioChunksRepository, chunks_col -from advanced_omi_backend.client_manager import client_belongs_to_user +from advanced_omi_backend.models.conversation import Conversation logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -115,6 +113,8 @@ async def upload_and_process_audio_files( title=title, summary="Processing uploaded audio file..." ) + # Set audio_path so the frontend can play the audio + conversation.audio_path = wav_filename await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID @@ -194,141 +194,35 @@ async def upload_and_process_audio_files( async def get_cropped_audio_info(audio_uuid: str, user: User): """ - Get audio cropping metadata from the audio_chunks collection. + Get audio cropping metadata from the conversation. This is an audio service operation that retrieves cropping-related metadata such as speech segments, cropped audio path, and cropping timestamps. Used for: Checking cropping status and retrieving audio processing details. - Works with: audio_chunks collection (audio service operations). + Works with: Conversation model. """ try: - # Find the audio chunk - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: + # Find the conversation + conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) + if not conversation: return JSONResponse(status_code=404, content={"error": "Conversation not found"}) # Check ownership for non-admin users if not user.is_superuser: - if not client_belongs_to_user(chunk["client_id"], user.user_id): + if conversation.user_id != str(user.user_id): return JSONResponse(status_code=404, content={"error": "Conversation not found"}) return { "audio_uuid": audio_uuid, - "cropped_audio_path": chunk.get("cropped_audio_path"), - "speech_segments": chunk.get("speech_segments", []), - "cropped_duration": chunk.get("cropped_duration"), - "cropped_at": chunk.get("cropped_at"), - "original_audio_path": chunk.get("audio_path"), + "cropped_audio_path": conversation.cropped_audio_path, + "speech_segments": conversation.speech_segments if hasattr(conversation, 'speech_segments') else [], + "cropped_duration": conversation.cropped_duration if hasattr(conversation, 'cropped_duration') else None, + "cropped_at": conversation.cropped_at if hasattr(conversation, 'cropped_at') else None, + "original_audio_path": conversation.audio_path, } except Exception as e: # Database or unexpected errors when fetching audio metadata audio_logger.exception("Error fetching cropped audio info") return JSONResponse(status_code=500, content={"error": "Error fetching cropped audio info"}) - - -async def reprocess_audio_cropping(audio_uuid: str, user: User): - """ - Re-process audio cropping operation for an audio file. - - This is an audio service operation that re-runs the audio cropping process - to extract only speech segments from the full audio file. - - Used for: Re-processing audio when cropping failed or needs updating. - Works with: audio_chunks collection and audio_utils cropping functions. - """ - try: - # Find the audio chunk - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: - return JSONResponse(status_code=404, content={"error": "Conversation not found"}) - - # Check ownership for non-admin users - if not user.is_superuser: - if not client_belongs_to_user(chunk["client_id"], user.user_id): - return JSONResponse(status_code=404, content={"error": "Conversation not found"}) - - audio_path = chunk.get("audio_path") - if not audio_path: - return JSONResponse( - status_code=400, content={"error": "No audio file found for this conversation"} - ) - - # Check if file exists - try multiple possible locations - possible_paths = [ - Path("/app/audio_chunks") / audio_path, - Path(audio_path), # fallback to relative path - ] - - full_audio_path = None - for path in possible_paths: - if path.exists(): - full_audio_path = path - break - - if not full_audio_path: - return JSONResponse( - status_code=422, - content={ - "error": "Audio file not found on disk", - "details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations", - "searched_paths": [str(p) for p in possible_paths] - } - ) - - # Get speech segments from the chunk - speech_segments = chunk.get("speech_segments", []) - if not speech_segments: - return JSONResponse( - status_code=400, - content={"error": "No speech segments found for this conversation"} - ) - - # Generate output path for cropped audio - cropped_filename = f"cropped_{audio_uuid}.wav" - output_path = Path("/app/audio_chunks") / cropped_filename - - # Get repository for database updates - chunk_repo = AudioChunksRepository(chunks_col) - - # Reprocess the audio cropping - try: - result = await _process_audio_cropping_with_relative_timestamps( - str(full_audio_path), - speech_segments, - str(output_path), - audio_uuid, - chunk_repo - ) - - if result: - audio_logger.info(f"Successfully reprocessed audio cropping for {audio_uuid}") - return JSONResponse( - content={"message": f"Audio cropping reprocessed for {audio_uuid}"} - ) - else: - audio_logger.error(f"Failed to reprocess audio cropping for {audio_uuid}") - return JSONResponse( - status_code=500, content={"error": "Failed to reprocess audio cropping"} - ) - - except (OSError, IOError) as processing_error: - # File I/O errors during audio cropping - audio_logger.exception("File I/O error during audio cropping reprocessing") - return JSONResponse( - status_code=500, - content={"error": f"Audio processing failed: {str(processing_error)}"}, - ) - except Exception as processing_error: - # Unexpected errors during cropping operation - audio_logger.exception("Unexpected error during audio cropping reprocessing") - return JSONResponse( - status_code=500, - content={"error": f"Audio processing failed: {str(processing_error)}"}, - ) - - except Exception as e: - # Database or unexpected errors in reprocessing handler - audio_logger.exception("Error reprocessing audio cropping") - return JSONResponse(status_code=500, content={"error": "Error reprocessing audio cropping"}) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 4b852dae..09befebb 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -11,7 +11,7 @@ ClientManager, client_belongs_to_user, ) -from advanced_omi_backend.database import chunks_col +from advanced_omi_backend.models.audio_file import AudioFile from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.users import User from fastapi.responses import JSONResponse @@ -96,23 +96,26 @@ async def get_conversation(conversation_id: str, user: User): if not user.is_superuser and conversation.user_id != str(user.user_id): return JSONResponse(status_code=403, content={"error": "Access forbidden"}) - # Format conversation for API response - use model_dump and add computed fields + # Format conversation for API response - clean, no duplication formatted_conversation = conversation.model_dump( - mode='json', # Automatically converts datetime to ISO strings, handles nested models + mode='json', exclude={'id'} # Exclude MongoDB internal _id ) - # Add computed fields not in the model - formatted_conversation.update({ - "timestamp": 0, # Legacy field - using created_at instead - "has_memory": bool(conversation.memories), - "version_info": { - "transcript_count": len(conversation.transcript_versions), - "memory_count": len(conversation.memory_versions), - "active_transcript_version": conversation.active_transcript_version, - "active_memory_version": conversation.active_memory_version - } - }) + # Clean up transcript versions - remove heavy metadata.words + if 'transcript_versions' in formatted_conversation: + for version in formatted_conversation['transcript_versions']: + if 'metadata' in version and version['metadata']: + # Remove words array - not needed by frontend + version['metadata'].pop('words', None) + # Remove redundant speaker_recognition counts (derivable from segments) + if 'speaker_recognition' in version['metadata']: + sr = version['metadata']['speaker_recognition'] + sr.pop('total_segments', None) # Derivable from len(segments) + sr.pop('speaker_count', None) # Derivable from identified_speakers + + # Add minimal computed fields + formatted_conversation['has_memory'] = len(conversation.memory_versions) > 0 return {"conversation": formatted_conversation} @@ -134,29 +137,26 @@ async def get_conversations(user: User): # Admins see all conversations user_conversations = await Conversation.find_all().sort(-Conversation.created_at).to_list() - # Convert conversations to API format + # Convert conversations to API format - minimal for list view conversations = [] for conv in user_conversations: - # Ensure legacy fields are populated from active transcript version - conv._update_legacy_transcript_fields() - - # Format conversation for list - include segments but exclude large nested fields + # Format conversation for list - exclude heavy version data conv_dict = conv.model_dump( - mode='json', # Automatically converts datetime to ISO strings - exclude={'id', 'transcript_versions', 'memory_versions'} # Include segments for UI display + mode='json', + exclude={'id', 'transcript_versions', 'memory_versions'} # Exclude large version arrays ) - # Add computed/external fields + # Add computed fields + # segment_count - count from active transcript version + segment_count = 0 + if conv.active_transcript: + segment_count = len(conv.active_transcript.segments) if conv.active_transcript.segments else 0 + conv_dict.update({ - "timestamp": 0, # Legacy field - using created_at instead - "segment_count": len(conv.segments) if conv.segments else 0, - "has_memory": bool(conv.memories), - "version_info": { - "transcript_count": len(conv.transcript_versions), - "memory_count": len(conv.memory_versions), - "active_transcript_version": conv.active_transcript_version, - "active_memory_version": conv.active_memory_version - } + "segment_count": segment_count, + "has_memory": len(conv.memory_versions) > 0, + "transcript_version_count": len(conv.transcript_versions), + "memory_version_count": len(conv.memory_versions) }) conversations.append(conv_dict) @@ -168,86 +168,52 @@ async def get_conversations(user: User): return JSONResponse(status_code=500, content={"error": "Error fetching conversations"}) -async def delete_conversation(audio_uuid: str, user: User): - """Delete a conversation and its associated audio file. Users can only delete their own conversations.""" +async def delete_conversation(conversation_id: str, user: User): + """Delete a conversation and its associated audio files. Users can only delete their own conversations.""" try: # Create masked identifier for logging - masked_uuid = f"{audio_uuid[:8]}...{audio_uuid[-4:]}" if len(audio_uuid) > 12 else "***" - logger.info(f"Attempting to delete conversation: {masked_uuid}") - - # Detailed debugging only when debug level is enabled - if logger.isEnabledFor(logging.DEBUG): - total_count = await chunks_col.count_documents({}) - logger.debug(f"Total conversations in collection: {total_count}") - logger.debug(f"UUID length: {len(audio_uuid)}, type: {type(audio_uuid)}") - - # First, get the audio chunk record to check ownership and get conversation_id - audio_chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Audio chunk lookup result: {'found' if audio_chunk else 'not found'}") - if audio_chunk: - logger.debug(f"Found audio chunk with client_id: {audio_chunk.get('client_id')}") - logger.debug(f"Audio chunk has conversation_id: {audio_chunk.get('conversation_id')}") - else: - # Try alternative queries for debugging - regex_result = await chunks_col.find_one({"audio_uuid": {"$regex": f"^{audio_uuid}$", "$options": "i"}}) - contains_result = await chunks_col.find_one({"audio_uuid": {"$regex": audio_uuid}}) - logger.debug(f"Alternative query attempts - case insensitive: {'found' if regex_result else 'not found'}, substring: {'found' if contains_result else 'not found'}") - - if not audio_chunk: + masked_id = f"{conversation_id[:8]}...{conversation_id[-4:]}" if len(conversation_id) > 12 else "***" + logger.info(f"Attempting to delete conversation: {masked_id}") + + # Find the conversation using Beanie + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + + if not conversation: return JSONResponse( status_code=404, - content={"error": f"Audio chunk with audio_uuid '{audio_uuid}' not found"} + content={"error": f"Conversation '{conversation_id}' not found"} ) - # Check if user has permission to delete this conversation - client_id = audio_chunk.get("client_id") - if not user.is_superuser and not client_belongs_to_user(client_id, user.user_id): + # Check ownership for non-admin users + if not user.is_superuser and conversation.user_id != str(user.user_id): logger.warning( - f"User {user.user_id} attempted to delete conversation {audio_uuid} without permission" + f"User {user.user_id} attempted to delete conversation {conversation_id} without permission" ) return JSONResponse( status_code=403, content={ "error": "Access forbidden. You can only delete your own conversations.", - "details": f"Conversation '{audio_uuid}' does not belong to your account." + "details": f"Conversation '{conversation_id}' does not belong to your account." } ) - # Get audio file paths for deletion - audio_path = audio_chunk.get("audio_path") - cropped_audio_path = audio_chunk.get("cropped_audio_path") - - # Get conversation_id if this audio chunk has an associated conversation - conversation_id = audio_chunk.get("conversation_id") - conversation_deleted = False + # Get file paths before deletion + audio_path = conversation.audio_path + cropped_audio_path = conversation.cropped_audio_path + audio_uuid = conversation.audio_uuid + client_id = conversation.client_id - # Delete from audio_chunks collection first - audio_result = await chunks_col.delete_one({"audio_uuid": audio_uuid}) + # Delete the conversation from database + await conversation.delete() + logger.info(f"Deleted conversation {conversation_id}") - if audio_result.deleted_count == 0: - return JSONResponse( - status_code=404, - content={"error": f"Failed to delete audio chunk with audio_uuid '{audio_uuid}'"} - ) - - logger.info(f"Deleted audio chunk {audio_uuid}") - - # If this audio chunk has an associated conversation, delete it using Beanie - if conversation_id: - try: - conversation_model = await Conversation.find_one(Conversation.conversation_id == conversation_id) - if conversation_model: - await conversation_model.delete() - conversation_deleted = True - logger.info(f"Deleted conversation {conversation_id} associated with audio chunk {audio_uuid}") - else: - logger.warning(f"Conversation {conversation_id} not found in conversations collection, but audio chunk was deleted") - except Exception as e: - logger.warning(f"Failed to delete conversation {conversation_id}: {e}") + # Also delete from legacy AudioFile collection if it exists (backward compatibility) + audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid) + if audio_file: + await audio_file.delete() + logger.info(f"Deleted legacy audio file record for {audio_uuid}") - # Delete associated audio files + # Delete associated audio files from disk deleted_files = [] if audio_path: try: @@ -271,29 +237,26 @@ async def delete_conversation(audio_uuid: str, user: User): except Exception as e: logger.warning(f"Failed to delete cropped audio file {cropped_audio_path}: {e}") - logger.info(f"Successfully deleted conversation {audio_uuid} for user {user.user_id}") + logger.info(f"Successfully deleted conversation {conversation_id} for user {user.user_id}") # Prepare response message - delete_summary = [] - delete_summary.append("audio chunk") - if conversation_deleted: - delete_summary.append("conversation record") + delete_summary = ["conversation"] if deleted_files: delete_summary.append(f"{len(deleted_files)} audio file(s)") return JSONResponse( status_code=200, content={ - "message": f"Successfully deleted {', '.join(delete_summary)} for '{audio_uuid}'", + "message": f"Successfully deleted {', '.join(delete_summary)} '{conversation_id}'", "deleted_files": deleted_files, "client_id": client_id, "conversation_id": conversation_id, - "conversation_deleted": conversation_deleted + "audio_uuid": audio_uuid } ) except Exception as e: - logger.error(f"Error deleting conversation {audio_uuid}: {e}") + logger.error(f"Error deleting conversation {conversation_id}: {e}") return JSONResponse( status_code=500, content={"error": f"Failed to delete conversation: {str(e)}"} diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index a6a406c8..ced15fc7 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -39,12 +39,12 @@ QUEUE_NAMES = [DEFAULT_QUEUE, TRANSCRIPTION_QUEUE, MEMORY_QUEUE, AUDIO_QUEUE] # Job retention configuration -JOB_RESULT_TTL = int(os.getenv("RQ_RESULT_TTL", 3600)) # 1 hour default +JOB_RESULT_TTL = int(os.getenv("RQ_RESULT_TTL", 86400)) # 24 hour default # Create queues with custom result TTL -transcription_queue = Queue(TRANSCRIPTION_QUEUE, connection=redis_conn, default_timeout=300) +transcription_queue = Queue(TRANSCRIPTION_QUEUE, connection=redis_conn, default_timeout=86400) # 24 hours for streaming jobs memory_queue = Queue(MEMORY_QUEUE, connection=redis_conn, default_timeout=300) -audio_queue = Queue(AUDIO_QUEUE, connection=redis_conn, default_timeout=3600) # 1 hour timeout for long sessions +audio_queue = Queue(AUDIO_QUEUE, connection=redis_conn, default_timeout=86400) # 24 hours for all-day sessions default_queue = Queue(DEFAULT_QUEUE, connection=redis_conn, default_timeout=300) @@ -271,7 +271,7 @@ def start_streaming_jobs( session_id, user_id, client_id, - job_timeout=3600, # 1 hour for long recordings + job_timeout=86400, # 24 hours for all-day sessions result_ttl=JOB_RESULT_TTL, job_id=f"speech-detect_{session_id[:12]}", description=f"Listening for speech...", @@ -281,7 +281,7 @@ def start_streaming_jobs( # Store job ID for cleanup (keyed by client_id for easy WebSocket cleanup) try: - redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) # 1 hour TTL + redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=86400) # 24 hour TTL logger.info(f"๐ Stored speech detection job ID for client {client_id}") except Exception as e: logger.warning(f"โ ๏ธ Failed to store job ID for {client_id}: {e}") @@ -294,7 +294,7 @@ def start_streaming_jobs( session_id, user_id, client_id, - job_timeout=3600, # 1 hour for long recordings + job_timeout=86400, # 24 hours for all-day sessions result_ttl=JOB_RESULT_TTL, job_id=f"audio-persist_{session_id[:12]}", description=f"Audio persistence for session {session_id[:12]}", @@ -321,8 +321,8 @@ def start_post_conversation_jobs( Start post-conversation processing jobs after conversation is created. This creates the standard processing chain after a conversation is created: - 1. Audio cropping job - Removes silence from audio - 2. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 1. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 2. Audio cropping job - Removes silence from audio 3. Speaker recognition job - Identifies speakers in audio 4. Memory extraction job - Extracts memories from conversation (parallel) 5. Title/summary generation job - Generates title and summary (parallel) @@ -348,7 +348,29 @@ def start_post_conversation_jobs( version_id = transcript_version_id or str(uuid.uuid4()) - # Step 1: Audio cropping job + # Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps) + # Even for streaming, we need batch transcription before cropping to fix cumulative timestamps + transcribe_job_id = f"transcribe_{conversation_id[:12]}" + logger.info(f"๐ DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + audio_uuid, + audio_file_path, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + depends_on=depends_on_job, + job_id=transcribe_job_id, + description=f"Transcribe conversation {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + ) + logger.info(f"๐ฅ RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}") + crop_depends_on = transcription_job + + # Step 2: Audio cropping job (depends on transcription if it ran, otherwise depends_on_job) crop_job_id = f"crop_{conversation_id[:12]}" logger.info(f"๐ DEBUG: Creating crop job with job_id={crop_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -358,38 +380,15 @@ def start_post_conversation_jobs( audio_file_path, job_timeout=300, # 5 minutes result_ttl=JOB_RESULT_TTL, - depends_on=depends_on_job, + depends_on=crop_depends_on, job_id=crop_job_id, description=f"Crop audio for conversation {conversation_id[:8]}", meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} ) logger.info(f"๐ฅ RQ: Enqueued cropping job {cropping_job.id}, meta={cropping_job.meta}") - # Step 2: Transcription job (conditional) - transcription_job = None - if post_transcription: - transcribe_job_id = f"transcribe_{conversation_id[:12]}" - logger.info(f"๐ DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") - - transcription_job = transcription_queue.enqueue( - transcribe_full_audio_job, - conversation_id, - audio_uuid, - audio_file_path, - version_id, - "batch", # trigger - job_timeout=1800, # 30 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=cropping_job, - job_id=transcribe_job_id, - description=f"Transcribe conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} - ) - logger.info(f"๐ฅ RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta} (depends on {cropping_job.id})") - speaker_depends_on = transcription_job - else: - logger.info(f"โญ๏ธ RQ: Skipping transcription (streaming already has transcript)") - speaker_depends_on = cropping_job + # Speaker recognition depends on cropping + speaker_depends_on = cropping_job # Step 3: Speaker recognition job speaker_job_id = f"speaker_{conversation_id[:12]}" diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 3a4e5163..b75f22c8 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -13,11 +13,6 @@ save_diarization_settings_to_file, ) from advanced_omi_backend.models.user import User -# TODO: Remove old processor architecture -# from advanced_omi_backend.processors import get_processor_manager -def get_processor_manager(): - """Stub - processors being removed.""" - return None from advanced_omi_backend.task_manager import get_task_manager from fastapi.responses import JSONResponse @@ -60,55 +55,6 @@ async def get_auth_config(): } -async def get_all_processing_tasks(): - """Get all active processing tasks.""" - try: - processor_manager = get_processor_manager() - return processor_manager.get_all_processing_status() - except Exception as e: - logger.error(f"Error getting processing tasks: {e}") - return JSONResponse( - status_code=500, content={"error": f"Failed to get processing tasks: {str(e)}"} - ) - - -async def get_processing_task_status(client_id: str): - """Get processing task status for a specific client.""" - try: - processor_manager = get_processor_manager() - processing_status = processor_manager.get_processing_status(client_id) - - # Check if transcription is marked as started but not completed, and verify with database - stages = processing_status.get("stages", {}) - transcription_stage = stages.get("transcription", {}) - - """This is a hack to update it the DB INCASE a process failed - if transcription_stage.get("status") == "started" and not transcription_stage.get("completed", False): - # Check if transcription is actually complete by checking the database - try: - chunk = await chunks_col.find_one({"client_id": client_id}) - if chunk and chunk.get("transcript") and len(chunk.get("transcript", [])) > 0: - # Transcription is complete! Update the processor state - processor_manager.track_processing_stage( - client_id, - "transcription", - "completed", - {"audio_uuid": chunk.get("audio_uuid"), "segments": len(chunk.get("transcript", []))} - ) - logger.info(f"Detected transcription completion for client {client_id} ({len(chunk.get('transcript', []))} segments)") - # Get updated status - processing_status = processor_manager.get_processing_status(client_id) - except Exception as e: - logger.debug(f"Error checking transcription completion: {e}") - """ - return processing_status - except Exception as e: - logger.error(f"Error getting processing task status for {client_id}: {e}") - return JSONResponse( - status_code=500, content={"error": f"Failed to get processing task status: {str(e)}"} - ) - - async def get_processor_status(): """Get RQ worker and queue status.""" try: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py index dfb4b752..224695d0 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py @@ -15,8 +15,9 @@ UserManager, ) from advanced_omi_backend.client_manager import get_user_clients_all -from advanced_omi_backend.database import chunks_col, db, users_col +from advanced_omi_backend.database import db, users_col from advanced_omi_backend.memory import get_memory_service +from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.users import User, UserCreate, UserUpdate logger = logging.getLogger(__name__) @@ -174,8 +175,8 @@ async def delete_user( deleted_data["user_deleted"] = user_result.deleted_count > 0 if delete_conversations: - # Delete all conversations (audio chunks) for this user - conversations_result = await chunks_col.delete_many({"client_id": user_id}) + # Delete all conversations for this user + conversations_result = await Conversation.find(Conversation.user_id == user_id).delete() deleted_data["conversations_deleted"] = conversations_result.deleted_count if delete_memories: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 53a580a7..2448bfbf 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -93,27 +93,16 @@ async def parse_wyoming_protocol(ws: WebSocket) -> tuple[dict, Optional[bytes]]: async def create_client_state(client_id: str, user, device_name: Optional[str] = None): """Create and register a new client state.""" - # Get client manager and repository + # Get client manager client_manager = get_client_manager() - from advanced_omi_backend.database import AudioChunksRepository - from motor.motor_asyncio import AsyncIOMotorClient - - # MongoDB Configuration - MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") - mongo_client = AsyncIOMotorClient(MONGODB_URI) - db = mongo_client.get_default_database("friend-lite") - chunks_col = db["audio_chunks"] - - # Initialize repository - ac_repository = AudioChunksRepository(chunks_col) - + # Directory where WAV chunks are written from pathlib import Path CHUNK_DIR = Path("./audio_chunks") # This will be mounted to ./data/audio_chunks by Docker - + # Use ClientManager for atomic client creation and registration client_state = client_manager.create_client( - client_id, ac_repository, CHUNK_DIR, user.user_id, user.email + client_id, CHUNK_DIR, user.user_id, user.email ) # Also track in persistent mapping (for database queries) diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index 36d17ebb..cca103ea 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -7,10 +7,6 @@ import logging import os -import time -from datetime import UTC, datetime -from typing import Optional -import uuid from motor.motor_asyncio import AsyncIOMotorClient @@ -30,9 +26,8 @@ # Collection references (for non-Beanie collections) users_col = db["users"] -chunks_col = db["audio_chunks"] # Still used by AudioChunksRepository -# Note: conversations collection managed by Beanie +# Note: conversations collection managed by Beanie (Document model) # Note: processing_runs replaced by RQ job tracking # Beanie initialization happens in main.py during application startup @@ -46,506 +41,6 @@ def get_collections(): """Get commonly used collection references.""" return { "users_col": users_col, - "chunks_col": chunks_col, } -class AudioChunksRepository: - """Async helpers for the audio_chunks collection.""" - - def __init__(self, collection): - self.col = collection - - async def create_chunk( - self, - *, - audio_uuid, - audio_path, - client_id, - timestamp, - user_id=None, - user_email=None, - transcript=None, - memories=None, - transcription_status="PENDING", - memory_processing_status="PENDING", - ): - # Create initial transcript version if provided - transcript_versions = [] - active_transcript_version = None - - if transcript: - version_id = str(uuid.uuid4()) - transcript_versions.append({ - "version_id": version_id, - "segments": transcript, - "status": transcription_status, - "provider": None, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - }) - active_transcript_version = version_id - - # Create initial memory version if provided - memory_versions = [] - active_memory_version = None - - if memories: - version_id = str(uuid.uuid4()) - memory_versions.append({ - "version_id": version_id, - "memories": memories, - "status": memory_processing_status, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "transcript_version_id": active_transcript_version - }) - active_memory_version = version_id - - doc = { - "audio_uuid": audio_uuid, - "audio_path": audio_path, - "client_id": client_id, - "timestamp": timestamp, - "user_id": user_id, - "user_email": user_email, - - # Versioned transcript data - "transcript_versions": transcript_versions, - "active_transcript_version": active_transcript_version, - - # Versioned memory data - "memory_versions": memory_versions, - "active_memory_version": active_memory_version, - - # Compatibility fields (computed from active versions) - "transcript": transcript or [], - "memories": memories or [], - "transcription_status": transcription_status, - "memory_processing_status": memory_processing_status, - "raw_transcript_data": {} - } - await self.col.insert_one(doc) - - async def add_transcript_segment(self, audio_uuid, transcript_segment): - """Add a single transcript segment to the conversation. - - Interface compatibility method - adds to active transcript version. - Creates first transcript version if none exists. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_transcript_version") - if not active_version: - # Create initial version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "segments": [transcript_segment], - "status": "PENDING", - "provider": None, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - } - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"transcript_versions": version_data}, - "$set": { - "active_transcript_version": version_id, - # Update compatibility field too - "transcript": [transcript_segment] - } - } - ) - else: - # Add to existing active version - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": { - f"transcript_versions.$[version].segments": transcript_segment, - # Update compatibility field too - "transcript": transcript_segment - } - }, - array_filters=[{"version.version_id": active_version}] - ) - - return result.modified_count > 0 - - - async def store_raw_transcript_data(self, audio_uuid, raw_data, provider): - """Store raw transcript data from transcription provider.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - "raw_transcript_data": { - "provider": provider, - "data": raw_data, - "stored_at": datetime.now(UTC).isoformat(), - } - } - }, - ) - - async def get_chunk(self, audio_uuid): - """Get a chunk by audio_uuid.""" - return await self.col.find_one({"audio_uuid": audio_uuid}) - - async def add_memory_reference(self, audio_uuid: str, memory_id: str, status: str = "created"): - """Add memory reference to audio chunk.""" - memory_ref = { - "memory_id": memory_id, - "created_at": datetime.now(UTC).isoformat(), - "status": status, - "updated_at": datetime.now(UTC).isoformat(), - } - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$push": {"memories": memory_ref}} - ) - if result.modified_count > 0: - logger.info(f"Added memory reference {memory_id} to audio {audio_uuid}") - return result.modified_count > 0 - - async def update_memory_status(self, audio_uuid: str, memory_id: str, status: str): - """Update memory status in audio chunk.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid, "memories.memory_id": memory_id}, - { - "$set": { - "memories.$.status": status, - "memories.$.updated_at": datetime.now(UTC).isoformat(), - } - }, - ) - if result.modified_count > 0: - logger.info(f"Updated memory {memory_id} status to {status} for audio {audio_uuid}") - return result.modified_count > 0 - - async def remove_memory_reference(self, audio_uuid: str, memory_id: str): - """Remove memory reference from audio chunk.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$pull": {"memories": {"memory_id": memory_id}}} - ) - if result.modified_count > 0: - logger.info(f"Removed memory reference {memory_id} from audio {audio_uuid}") - return result.modified_count > 0 - - async def get_chunk_by_audio_uuid(self, audio_uuid: str): - """Get a chunk document by audio_uuid.""" - return await self.col.find_one({"audio_uuid": audio_uuid}) - - async def get_transcript_segments(self, audio_uuid: str): - """Get transcript segments for a specific audio UUID from active version.""" - document = await self.col.find_one( - {"audio_uuid": audio_uuid}, - {"transcript_versions": 1, "active_transcript_version": 1, "transcript": 1} - ) - - if not document: - return [] - - # Try to get from active version first (new versioned approach) - active_version_id = document.get("active_transcript_version") - if active_version_id and "transcript_versions" in document: - for version in document["transcript_versions"]: - if version.get("version_id") == active_version_id: - return version.get("segments", []) - - # Fallback to legacy transcript field for backward compatibility - if "transcript" in document: - return document["transcript"] - - return [] - - async def update_transcript(self, audio_uuid, full_transcript): - """Update the entire transcript list (for compatibility).""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$set": {"transcript": full_transcript}} - ) - - async def update_segment_timing(self, audio_uuid, segment_index, start_time, end_time): - """Update timing information for a specific transcript segment.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - f"transcript.{segment_index}.start": start_time, - f"transcript.{segment_index}.end": end_time, - } - }, - ) - - async def update_segment_speaker(self, audio_uuid, segment_index, speaker_id): - """Update the speaker for a specific transcript segment.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": {f"transcript.{segment_index}.speaker": speaker_id}}, - ) - if result.modified_count > 0: - logger.info(f"Updated segment {segment_index} speaker to {speaker_id} for {audio_uuid}") - return result.modified_count > 0 - - async def update_cropped_audio( - self, - audio_uuid: str, - cropped_path: str, - speech_segments: list[tuple[float, float]], - ): - """Update the chunk with cropped audio information.""" - cropped_duration = sum(end - start for start, end in speech_segments) - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - "cropped_audio_path": cropped_path, - "speech_segments": [ - {"start": start, "end": end} for start, end in speech_segments - ], - "cropped_duration": cropped_duration, - "cropped_at": datetime.now(UTC), - } - }, - ) - if result.modified_count > 0: - logger.info(f"Updated cropped audio info for {audio_uuid}: {cropped_path}") - return result.modified_count > 0 - - - async def update_memory_processing_status( - self, audio_uuid: str, status: str, error_message: str = None - ): - """Update memory processing status and completion timestamp. - - Interface compatibility method - updates active memory version. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_memory_version") - if not active_version: - # Create initial memory version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "memories": [], - "status": status, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "transcript_version_id": chunk.get("active_transcript_version") - } - if error_message: - version_data["error_message"] = error_message - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"memory_versions": version_data}, - "$set": { - "active_memory_version": version_id, - "memory_processing_status": status, - "memory_processing_updated_at": datetime.now(UTC).isoformat(), - } - } - ) - else: - # Update existing active version - update_doc = { - f"memory_versions.$[version].status": status, - f"memory_versions.$[version].updated_at": datetime.now(UTC), - "memory_processing_status": status, - "memory_processing_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["memory_processing_completed_at"] = datetime.now(UTC).isoformat() - if error_message: - update_doc[f"memory_versions.$[version].error_message"] = error_message - update_doc["memory_processing_error"] = error_message - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc}, - array_filters=[{"version.version_id": active_version}] - ) - - if result.modified_count > 0: - logger.info(f"Updated memory processing status to {status} for {audio_uuid}") - return result.modified_count > 0 - - async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None - ): - """Update transcription processing status and completion timestamp. - - Interface compatibility method - updates active transcript version. - """ - chunk = await self.get_chunk(audio_uuid) - if not chunk: - return False - - active_version = chunk.get("active_transcript_version") - if not active_version: - # Create initial transcript version if none exists - version_id = str(uuid.uuid4()) - version_data = { - "version_id": version_id, - "transcript": "", - "segments": [], - "status": status, - "provider": provider, - "created_at": datetime.now(UTC).isoformat(), - "processing_run_id": None, - "raw_data": {} - } - if error_message: - version_data["error_message"] = error_message - - update_doc = { - "active_transcript_version": version_id, - "transcription_status": status, - "transcription_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"transcript_versions": version_data}, - "$set": update_doc - } - ) - else: - # Update existing active version - update_doc = { - "transcript_versions.$[version].status": status, - "transcript_versions.$[version].updated_at": datetime.now(UTC).isoformat(), - "transcription_status": status, - "transcription_updated_at": datetime.now(UTC).isoformat(), - } - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if error_message: - update_doc["transcript_versions.$[version].error_message"] = error_message - update_doc["transcription_error"] = error_message - if provider: - update_doc["transcript_versions.$[version].provider"] = provider - update_doc["transcript_provider"] = provider - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc}, - array_filters=[{"version.version_id": active_version}] - ) - - if result.modified_count > 0: - logger.info(f"Updated transcription status to {status} for {audio_uuid}") - return result.modified_count > 0 - - # ======================================== - # SPEECH-DRIVEN CONVERSATIONS METHODS - # ======================================== - - async def add_audio_file_path(self, audio_uuid: str, file_path: str): - """Add new audio file path to existing session.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$push": {"audio_file_paths": file_path}, - "$set": {"updated_at": datetime.now(UTC).isoformat()} - } - ) - if result.modified_count > 0: - logger.info(f"Added audio file path {file_path} to session {audio_uuid}") - return result.modified_count > 0 - - async def update_speech_detection(self, audio_uuid: str, **speech_data): - """Update speech detection results.""" - update_doc = { - "updated_at": datetime.now(UTC).isoformat() - } - - # Add speech detection fields - for key, value in speech_data.items(): - if key in ["has_speech", "conversation_created", "conversation_id", - "speech_start_time", "speech_end_time", "status"]: - update_doc[key] = value - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": update_doc} - ) - if result.modified_count > 0: - logger.info(f"Updated speech detection for {audio_uuid}: {speech_data}") - return result.modified_count > 0 - - async def mark_conversation_created(self, audio_uuid: str, conversation_id: str): - """Mark that conversation was created for this audio.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": { - "conversation_created": True, - "conversation_id": conversation_id, - "has_speech": True, - "status": "speech_detected", - "updated_at": datetime.now(UTC).isoformat() - }} - ) - if result.modified_count > 0: - logger.info(f"Marked conversation created for {audio_uuid} with ID {conversation_id}") - return result.modified_count > 0 - - async def get_sessions_with_speech(self, user_id: str, limit: int = 100): - """Get audio sessions that have detected speech.""" - cursor = self.col.find({ - "user_id": user_id, - "has_speech": True, - "conversation_created": True - }).sort("timestamp", -1).limit(limit) - - return await cursor.to_list(length=None) - - async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: str = None, provider: str = None - ): - """Update transcription status and completion timestamp. - - Args: - audio_uuid: UUID of the audio chunk - status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY') - error_message: Optional error message if status is 'FAILED' - provider: Optional provider name for successful transcriptions - """ - update_doc = { - "transcription_status": status, - "updated_at": datetime.now(UTC).isoformat() - } - - if status == "COMPLETED": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if provider: - update_doc["transcription_provider"] = provider - elif status == "FAILED" and error_message: - update_doc["transcription_error"] = error_message - elif status == "EMPTY": - update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() - if provider: - update_doc["transcription_provider"] = provider - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$set": update_doc} - ) - return result.modified_count > 0 - - -# ConversationsRepository removed - use Beanie Conversation model directly -# ProcessingRunsRepository removed - use RQ job tracking instead diff --git a/backends/advanced/src/advanced_omi_backend/memory/compat_service.py b/backends/advanced/src/advanced_omi_backend/memory/compat_service.py index e3cb9827..3814f29e 100644 --- a/backends/advanced/src/advanced_omi_backend/memory/compat_service.py +++ b/backends/advanced/src/advanced_omi_backend/memory/compat_service.py @@ -281,13 +281,13 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> # Get memories first memories = await self.get_all_memories(user_id, limit) - # Import database connection + # Import Conversation model try: - from advanced_omi_backend.database import chunks_col + from advanced_omi_backend.models.conversation import Conversation except ImportError: - memory_logger.error("Cannot import database connection") + memory_logger.error("Cannot import Conversation model") return memories # Return memories without transcript enrichment - + # Extract source IDs for bulk query source_ids = [] for memory in memories: @@ -295,12 +295,15 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> source_id = metadata.get("source_id") or metadata.get("audio_uuid") # Backward compatibility if source_id: source_ids.append(source_id) - - # Bulk query for chunks (support both old audio_uuid and new source_id) - chunks_cursor = chunks_col.find({"audio_uuid": {"$in": source_ids}}) - chunks_by_id = {} - async for chunk in chunks_cursor: - chunks_by_id[chunk["audio_uuid"]] = chunk + + # Bulk query for conversations (support both old audio_uuid and new source_id) + conversations_list = await Conversation.find( + Conversation.audio_uuid.in_(source_ids) + ).to_list() + + conversations_by_id = {} + for conv in conversations_list: + conversations_by_id[conv.audio_uuid] = conv enriched_memories = [] @@ -328,15 +331,15 @@ async def get_memories_with_transcripts(self, user_id: str, limit: int = 100) -> enriched_memory["client_id"] = metadata.get("client_id") enriched_memory["user_email"] = metadata.get("user_email") - # Get transcript from bulk-loaded chunks - chunk = chunks_by_id.get(source_id) - if chunk: - transcript_segments = chunk.get("transcript", []) + # Get transcript from bulk-loaded conversations + conversation = conversations_by_id.get(source_id) + if conversation: + transcript_segments = conversation.segments if transcript_segments: full_transcript = " ".join( - segment.get("text", "") + segment.text for segment in transcript_segments - if isinstance(segment, dict) and segment.get("text") + if segment.text ) if full_transcript.strip(): diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 367c0daf..56f55ede 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -70,7 +70,7 @@ class MemoryVersion(BaseModel): # Core identifiers conversation_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique conversation identifier") - audio_uuid: Indexed(str) = Field(description="Link to audio_chunks collection") + audio_uuid: Indexed(str) = Field(description="Session/audio identifier (for tracking audio files)") user_id: Indexed(str) = Field(description="User who owns this conversation") client_id: Indexed(str) = Field(description="Client device identifier") @@ -81,9 +81,15 @@ class MemoryVersion(BaseModel): # Creation metadata created_at: Indexed(datetime) = Field(default_factory=datetime.utcnow, description="When the conversation was created") + # Processing status tracking + deleted: bool = Field(False, description="Whether this conversation was deleted due to processing failure") + deletion_reason: Optional[str] = Field(None, description="Reason for deletion (no_meaningful_speech, audio_file_not_ready, etc.)") + deleted_at: Optional[datetime] = Field(None, description="When the conversation was marked as deleted") + # Summary fields (auto-generated from transcript) title: Optional[str] = Field(None, description="Auto-generated conversation title") - summary: Optional[str] = Field(None, description="Auto-generated conversation summary") + summary: Optional[str] = Field(None, description="Auto-generated short summary (1-2 sentences)") + detailed_summary: Optional[str] = Field(None, description="Auto-generated detailed summary (comprehensive, corrected content)") # Versioned processing transcript_versions: List["Conversation.TranscriptVersion"] = Field( @@ -105,30 +111,18 @@ class MemoryVersion(BaseModel): description="Version ID of currently active memory extraction" ) - # Legacy fields (auto-populated from active versions) - transcript: Union[str, List[Dict[str, Any]], None] = Field(None, description="Current transcript text") - segments: List["Conversation.SpeakerSegment"] = Field(default_factory=list, description="Current transcript segments") - memories: List[Dict[str, Any]] = Field(default_factory=list, description="Current extracted memories") - memory_count: int = Field(default=0, description="Current memory count") + # Legacy fields removed - use transcript_versions[active_transcript_version] and memory_versions[active_memory_version] + # Frontend should access: conversation.active_transcript.segments, conversation.active_transcript.transcript @model_validator(mode='before') @classmethod def clean_legacy_data(cls, data: Any) -> Any: """Clean up legacy/malformed data before Pydantic validation.""" - - #TODO Unsure that we need this, likely best to migrate database on startup, or mimic the old structure better + if not isinstance(data, dict): return data - # Fix legacy transcript field if it's a dict (should be string or None) - if isinstance(data.get('transcript'), dict): - data['transcript'] = None - - # Fix legacy segments field if it's a dict (should be list) - if isinstance(data.get('segments'), dict): - data['segments'] = [] - - # Fix malformed transcript_versions + # Fix malformed transcript_versions (from old schema versions) if 'transcript_versions' in data and isinstance(data['transcript_versions'], list): for version in data['transcript_versions']: if isinstance(version, dict): @@ -150,28 +144,6 @@ def clean_legacy_data(cls, data: Any) -> Any: elif not isinstance(segment['speaker'], str): segment['speaker'] = "unknown" - # Also fix legacy segments field - if 'segments' in data and isinstance(data['segments'], list): - for segment in data['segments']: - if isinstance(segment, dict) and 'speaker' in segment: - if isinstance(segment['speaker'], int): - segment['speaker'] = f"Speaker {segment['speaker']}" - elif not isinstance(segment['speaker'], str): - segment['speaker'] = "unknown" - - # Populate legacy fields from active transcript version if they're empty - active_version_id = data.get('active_transcript_version') - if active_version_id and 'transcript_versions' in data and isinstance(data['transcript_versions'], list): - for version in data['transcript_versions']: - if isinstance(version, dict) and version.get('version_id') == active_version_id: - # Populate transcript if missing - if not data.get('transcript') and version.get('transcript'): - data['transcript'] = version['transcript'] - # Populate segments if missing or empty - if (not data.get('segments') or len(data.get('segments', [])) == 0) and version.get('segments'): - data['segments'] = version['segments'] - break - return data @property @@ -196,6 +168,17 @@ def active_memory(self) -> Optional["Conversation.MemoryVersion"]: return version return None + # Convenience properties that return data from active transcript version + @property + def transcript(self) -> Optional[str]: + """Get transcript text from active transcript version.""" + return self.active_transcript.transcript if self.active_transcript else None + + @property + def segments(self) -> List["Conversation.SpeakerSegment"]: + """Get segments from active transcript version.""" + return self.active_transcript.segments if self.active_transcript else [] + def add_transcript_version( self, version_id: str, @@ -223,7 +206,6 @@ def add_transcript_version( if set_as_active: self.active_transcript_version = version_id - self._update_legacy_transcript_fields() return new_version @@ -254,7 +236,6 @@ def add_memory_version( if set_as_active: self.active_memory_version = version_id - self._update_legacy_memory_fields(memory_count) return new_version @@ -263,7 +244,6 @@ def set_active_transcript_version(self, version_id: str) -> bool: for version in self.transcript_versions: if version.version_id == version_id: self.active_transcript_version = version_id - self._update_legacy_transcript_fields() return True return False @@ -272,26 +252,9 @@ def set_active_memory_version(self, version_id: str) -> bool: for version in self.memory_versions: if version.version_id == version_id: self.active_memory_version = version_id - self._update_legacy_memory_fields(version.memory_count) return True return False - def _update_legacy_transcript_fields(self): - """Update legacy transcript fields from active version.""" - active = self.active_transcript - if active: - self.transcript = active.transcript - self.segments = active.segments - else: - self.transcript = None - self.segments = [] - - def _update_legacy_memory_fields(self, memory_count: int): - """Update legacy memory fields from active version.""" - self.memory_count = memory_count - # Note: actual memories list would need to be fetched from memory storage - # This is just the count for now - class Settings: name = "conversations" indexes = [ @@ -317,7 +280,7 @@ def create_conversation( Factory function to create a new conversation. Args: - audio_uuid: Link to audio_chunks collection + audio_uuid: Unique identifier for the audio session user_id: User who owns this conversation client_id: Client device identifier conversation_id: Optional unique conversation identifier (auto-generated if not provided) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 2eb67607..4c0f756b 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -1,14 +1,17 @@ """ -Audio file upload routes. +Audio file upload and serving routes. -Handles audio file uploads and processing job management. +Handles audio file uploads, processing job management, and audio file serving. """ -from fastapi import APIRouter, Depends, File, Query, UploadFile +from pathlib import Path +from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile +from fastapi.responses import FileResponse -from advanced_omi_backend.auth import current_superuser +from advanced_omi_backend.auth import current_superuser, current_active_user from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User +from advanced_omi_backend.app_config import get_audio_chunk_dir router = APIRouter(prefix="/audio", tags=["audio"]) @@ -33,3 +36,58 @@ async def upload_audio_files( return await audio_controller.upload_and_process_audio_files( current_user, files, device_name, auto_generate_client ) + + +@router.get("/{filename}") +async def serve_audio_file( + filename: str, + current_user: User = Depends(current_active_user), +): + """ + Serve audio files for playback. + + This endpoint allows authenticated users to access audio files stored on disk. + Users can only access their own audio files (ownership verified via conversation lookup). + + Args: + filename: Name of the audio file (e.g., "audio_uuid.wav" or "cropped_audio_uuid.wav") + current_user: Authenticated user + + Returns: + FileResponse with the audio file + + Raises: + 404: If file not found + 403: If user doesn't own the conversation associated with this audio + """ + # Get audio chunk directory + audio_dir = get_audio_chunk_dir() + file_path = audio_dir / filename + + # Check if file exists + if not file_path.exists() or not file_path.is_file(): + raise HTTPException(status_code=404, detail="Audio file not found") + + # Security check: Verify user owns the conversation associated with this audio + # Extract audio_uuid from filename (handle both "uuid.wav" and "cropped_uuid.wav") + from advanced_omi_backend.models.conversation import Conversation + + # Remove "cropped_" prefix if present + audio_uuid = filename.replace("cropped_", "").replace(".wav", "") + + # Find conversation by audio_uuid + conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found for this audio file") + + # Check ownership (admins can access all files) + if not current_user.is_superuser and conversation.user_id != str(current_user.user_id): + raise HTTPException(status_code=403, detail="Access denied") + + # Serve the file + return FileResponse( + path=str(file_path), + media_type="audio/wav", + filename=filename + ) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index ac426ee8..e2b76f7d 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -10,10 +10,6 @@ from fastapi import APIRouter, Depends, Query from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.client_manager import ( - ClientManager, - get_client_manager_dependency, -) from advanced_omi_backend.controllers import conversation_controller, audio_controller from advanced_omi_backend.users import User @@ -26,12 +22,9 @@ async def close_current_conversation( client_id: str, current_user: User = Depends(current_active_user), - client_manager: ClientManager = Depends(get_client_manager_dependency), ): - """Close the current conversation for a specific client. Users can only close their own conversations.""" - return await conversation_controller.close_current_conversation( - client_id, current_user, client_manager - ) + """Close the current active conversation for a client. Works for both connected and disconnected clients.""" + return await conversation_controller.close_current_conversation(client_id, current_user) @router.get("") @@ -57,15 +50,6 @@ async def get_cropped_audio_info( return await audio_controller.get_cropped_audio_info(audio_uuid, current_user) -# Deprecated -@router.post("/{audio_uuid}/reprocess") -async def reprocess_audio_cropping( - audio_uuid: str, current_user: User = Depends(current_active_user) -): - """Reprocess audio cropping for a conversation. Users can only reprocess their own conversations.""" - return await audio_controller.reprocess_audio_cropping(audio_uuid, current_user) - - # New reprocessing endpoints @router.post("/{conversation_id}/reprocess-transcript") async def reprocess_transcript( @@ -113,9 +97,9 @@ async def get_conversation_version_history( return await conversation_controller.get_conversation_version_history(conversation_id, current_user) -@router.delete("/{audio_uuid}") +@router.delete("/{conversation_id}") async def delete_conversation( - audio_uuid: str, current_user: User = Depends(current_active_user) + conversation_id: str, current_user: User = Depends(current_active_user) ): """Delete a conversation and its associated audio file. Users can only delete their own conversations.""" - return await conversation_controller.delete_conversation(audio_uuid, current_user) + return await conversation_controller.delete_conversation(conversation_id, current_user) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py index c03a7802..95569afa 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py @@ -30,20 +30,6 @@ async def get_auth_config(): return await system_controller.get_auth_config() -@router.get("/processor/tasks") -async def get_all_processing_tasks(current_user: User = Depends(current_superuser)): - """Get all active processing tasks. Admin only.""" - return await system_controller.get_all_processing_tasks() - - -@router.get("/processor/tasks/{client_id}") -async def get_processing_task_status( - client_id: str, current_user: User = Depends(current_superuser) -): - """Get processing task status for a specific client. Admin only.""" - return await system_controller.get_processing_task_status(client_id) - - @router.get("/processor/status") async def get_processor_status(current_user: User = Depends(current_superuser)): """Get processor queue status and health. Admin only.""" diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py index 2a4aeaf9..111c7abc 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py @@ -16,7 +16,6 @@ if TYPE_CHECKING: from advanced_omi_backend.client import ClientState - from advanced_omi_backend.database import AudioChunksRepository logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -264,13 +263,18 @@ async def _process_audio_cropping_with_relative_timestamps( speech_segments: list[tuple[float, float]], output_path: str, audio_uuid: str, - chunk_repo: Optional['AudioChunksRepository'] = None, -) -> bool: + _deprecated_chunk_repo=None, # Deprecated - kept for backward compatibility +) -> tuple[bool, list[dict]]: """ Process audio cropping with speech segments already in relative format. The segments are expected to be in relative format (seconds from audio start), as provided by Deepgram transcription. No timestamp conversion is needed. + + Note: Database updates are now handled by the caller (audio_jobs.py). + + Returns: + Tuple of (success: bool, segment_mapping: list[dict]) """ try: # Validate input segments @@ -306,22 +310,19 @@ async def _process_audio_cropping_with_relative_timestamps( logger.warning( f"No valid segments for cropping {audio_uuid}" ) - return False + return False, [] - success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) + success, segment_mapping = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) if success: - # Update database with cropped file info cropped_filename = output_path.split("/")[-1] - if chunk_repo is not None: - await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) logger.info(f"Successfully processed cropped audio: {cropped_filename}") - return True + return True, segment_mapping else: logger.error(f"Failed to crop audio for {audio_uuid}") - return False + return False, segment_mapping except Exception as e: logger.error(f"Error in audio cropping task for {audio_uuid}: {e}", exc_info=True) - return False + return False, [] def write_pcm_to_wav( @@ -369,29 +370,72 @@ def write_pcm_to_wav( async def _crop_audio_with_ffmpeg( original_path: str, speech_segments: list[tuple[float, float]], output_path: str -) -> bool: - """Use ffmpeg to crop audio - runs as async subprocess, no GIL issues""" +) -> tuple[bool, list[dict]]: + """ + Use ffmpeg to crop audio - runs as async subprocess, no GIL issues. + + Returns: + Tuple of (success: bool, segment_mapping: list[dict]) + + segment_mapping contains one entry per input segment with: + - original_index: Index in input speech_segments + - original_start/end: Original timestamps in source audio + - cropped_start/end: Where the speech starts/ends in cropped file (None if filtered) + - kept: Whether segment was kept (True) or filtered out (False) + """ logger.info(f"Cropping audio {original_path} with {len(speech_segments)} speech segments") if not speech_segments: logger.warning(f"No speech segments to crop for {original_path}") - return False + return False, [] # Check if the original file exists if not os.path.exists(original_path): logger.error(f"Original audio file does not exist: {original_path}") - return False + return False, [] - # Filter out segments that are too short + # Filter out segments that are too short and build mapping filtered_segments = [] - for start, end in speech_segments: + segment_mapping = [] + current_cropped_offset = 0.0 + + for idx, (start, end) in enumerate(speech_segments): duration = end - start if duration >= MIN_SPEECH_SEGMENT_DURATION: # Add padding around speech segments padded_start = max(0, start - CROPPING_CONTEXT_PADDING) padded_end = end + CROPPING_CONTEXT_PADDING + padded_duration = padded_end - padded_start + filtered_segments.append((padded_start, padded_end)) + + # Calculate where the speech (not padding) appears in cropped file + # The cropped file will have: [padding_before][speech][padding_after] + padding_before = start - padded_start + speech_start_in_cropped = current_cropped_offset + padding_before + speech_end_in_cropped = speech_start_in_cropped + duration + + segment_mapping.append({ + "original_index": idx, + "original_start": start, + "original_end": end, + "cropped_start": speech_start_in_cropped, + "cropped_end": speech_end_in_cropped, + "kept": True + }) + + # Move offset by the full padded duration + current_cropped_offset += padded_duration else: + # Segment filtered out + segment_mapping.append({ + "original_index": idx, + "original_start": start, + "original_end": end, + "cropped_start": None, + "cropped_end": None, + "kept": False + }) logger.debug( f"Skipping short segment: {start}-{end} ({duration:.2f}s < {MIN_SPEECH_SEGMENT_DURATION}s)" ) @@ -400,7 +444,7 @@ async def _crop_audio_with_ffmpeg( logger.warning( f"No segments meet minimum duration ({MIN_SPEECH_SEGMENT_DURATION}s) for {original_path}" ) - return False + return False, segment_mapping logger.info( f"Cropping audio {original_path} with {len(filtered_segments)} speech segments (filtered from {len(speech_segments)})" @@ -452,12 +496,12 @@ async def _crop_audio_with_ffmpeg( logger.info( f"Successfully cropped {original_path} -> {output_path} ({cropped_duration:.1f}s from {len(filtered_segments)} segments)" ) - return True + return True, segment_mapping else: error_msg = stderr.decode() if stderr else "Unknown ffmpeg error" logger.error(f"ffmpeg failed for {original_path}: {error_msg}") - return False + return False, segment_mapping except Exception as e: logger.error(f"Error running ffmpeg on {original_path}: {e}", exc_info=True) - return False + return False, segment_mapping diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index ef83f3ba..363b2efd 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -4,8 +4,12 @@ Extracted from legacy TranscriptionService to be reusable across V2 architecture. """ +import asyncio import logging -from typing import Optional +import time +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, Any, List from advanced_omi_backend.config import get_speech_detection_settings from advanced_omi_backend.llm_client import async_generate @@ -37,10 +41,7 @@ def is_meaningful_speech(combined_results: dict) -> bool: if not combined_results.get("text"): return False - transcript_data = { - "text": combined_results["text"], - "words": combined_results.get("words", []) - } + transcript_data = {"text": combined_results["text"], "words": combined_results.get("words", [])} speech_analysis = analyze_speech(transcript_data) return speech_analysis["has_speech"] @@ -82,17 +83,14 @@ def analyze_speech(transcript_data: dict) -> dict: # Method 1: Word-level analysis (preferred - has confidence scores and timing) if words: # Filter by confidence threshold - valid_words = [ - w for w in words - if w.get("confidence", 0) >= settings["min_confidence"] - ] + valid_words = [w for w in words if w.get("confidence", 0) >= settings["min_confidence"]] if len(valid_words) < settings["min_words"]: return { "has_speech": False, "reason": f"Not enough valid words ({len(valid_words)} < {settings['min_words']})", "word_count": len(valid_words), - "duration": 0.0 + "duration": 0.0, } # Calculate speech duration from word timing @@ -107,7 +105,7 @@ def analyze_speech(transcript_data: dict) -> dict: "speech_start": speech_start, "speech_end": speech_end, "duration": speech_duration, - "reason": f"Valid speech detected ({len(valid_words)} words, {speech_duration:.1f}s)" + "reason": f"Valid speech detected ({len(valid_words)} words, {speech_duration:.1f}s)", } # Method 2: Text-only fallback (when no word-level data available) @@ -122,7 +120,7 @@ def analyze_speech(transcript_data: dict) -> dict: "speech_end": 0.0, "duration": 0.0, "reason": f"Valid speech detected ({word_count} words, no timing data)", - "fallback": True + "fallback": True, } # No speech detected @@ -130,20 +128,35 @@ def analyze_speech(transcript_data: dict) -> dict: "has_speech": False, "reason": "No meaningful speech content detected", "word_count": 0, - "duration": 0.0 + "duration": 0.0, } -async def generate_title(text: str) -> str: +async def generate_title(text: str, segments: Optional[list] = None) -> str: """ Generate an LLM-powered title from conversation text. Args: - text: Conversation transcript + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: + [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, uses speaker-aware conversation formatting Returns: str: Generated title (3-6 words) or fallback + + Note: + Title intentionally does NOT include speaker names - focuses on topic/theme only. """ + # Format conversation text from segments if provided + if segments: + conversation_text = "" + for segment in segments[:10]: # Use first 10 segments for title generation + segment_text = segment.get("text", "").strip() + if segment_text: + conversation_text += f"{segment_text}\n" + text = conversation_text if conversation_text.strip() else text + if not text or len(text.strip()) < 10: return "Conversation" @@ -155,6 +168,7 @@ async def generate_title(text: str) -> str: Rules: - Maximum 6 words - Capture the main topic or theme +- Do NOT include speaker names or participants - No quotes or special characters - Examples: "Planning Weekend Trip", "Work Project Discussion", "Medical Appointment" @@ -171,28 +185,58 @@ async def generate_title(text: str) -> str: return title[:40] + "..." if len(title) > 40 else title or "Conversation" -async def generate_summary(text: str) -> str: +async def generate_short_summary(text: str, segments: Optional[list] = None) -> str: """ - Generate an LLM-powered summary from conversation text. + Generate a brief LLM-powered summary from conversation text. Args: - text: Conversation transcript + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: + [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, includes speaker context in summary Returns: - str: Generated summary (1-2 sentences, max 120 chars) or fallback + str: Generated short summary (1-2 sentences, max 120 chars) or fallback """ - if not text or len(text.strip()) < 10: + # Format conversation text from segments if provided + conversation_text = text + include_speakers = False + + if segments: + formatted_text = "" + speakers_in_conv = set() + for segment in segments: + speaker = segment.get("speaker", "") + segment_text = segment.get("text", "").strip() + if segment_text: + if speaker: + formatted_text += f"{speaker}: {segment_text}\n" + speakers_in_conv.add(speaker) + else: + formatted_text += f"{segment_text}\n" + + if formatted_text.strip(): + conversation_text = formatted_text + include_speakers = len(speakers_in_conv) > 0 + + if not conversation_text or len(conversation_text.strip()) < 10: return "No content" try: + speaker_instruction = ( + '- Include speaker names when relevant (e.g., "John discusses X with Sarah")\n' + if include_speakers + else "" + ) + prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation: -"{text[:1000]}" +"{conversation_text[:1000]}" Rules: - Maximum 120 characters - 1-2 complete sentences -- Capture key topics and outcomes +{speaker_instruction}- Capture key topics and outcomes - Use present tense - Be specific and informative @@ -202,113 +246,411 @@ async def generate_summary(text: str) -> str: return summary.strip().strip('"').strip("'") or "No content" except Exception as e: - logger.warning(f"Failed to generate LLM summary: {e}") + logger.warning(f"Failed to generate LLM short summary: {e}") # Fallback to simple summary generation - return text[:120] + "..." if len(text) > 120 else text or "No content" + return ( + conversation_text[:120] + "..." + if len(conversation_text) > 120 + else conversation_text or "No content" + ) -async def generate_title_with_speakers(segments: list) -> str: +# Backward compatibility alias +async def generate_summary(text: str) -> str: + """ + Backward compatibility wrapper for generate_short_summary. + + Deprecated: Use generate_short_summary instead. """ - Generate an LLM-powered title from conversation segments with speaker information. + return await generate_short_summary(text) + + +async def generate_detailed_summary(text: str, segments: Optional[list] = None) -> str: + """ + Generate a comprehensive, detailed summary of the conversation. + + This summary provides full information about what was discussed and said, + correcting transcript errors and removing filler words to create a higher + quality information set. Not word-for-word like the transcript, but captures + all key points, context, and meaningful content. Args: - segments: List of dicts with: + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, includes speaker attribution in detailed summary Returns: - str: Generated title (max 40 chars) or fallback + str: Comprehensive detailed summary (multiple paragraphs) or fallback """ - if not segments: - return "Conversation" - - # Format conversation with speaker names - conversation_text = "" - for segment in segments[:10]: # Use first 10 segments for title generation - speaker = segment.get("speaker", "") - text = segment.get("text", "").strip() - if text: - if speaker: - conversation_text += f"{speaker}: {text}\n" - else: - conversation_text += f"{text}\n" - - if not conversation_text.strip(): - return "Conversation" + # Format conversation text from segments if provided + conversation_text = text + include_speakers = False + + if segments: + formatted_text = "" + speakers_in_conv = set() + for segment in segments: + speaker = segment.get("speaker", "") + segment_text = segment.get("text", "").strip() + if segment_text: + if speaker: + formatted_text += f"{speaker}: {segment_text}\n" + speakers_in_conv.add(speaker) + else: + formatted_text += f"{segment_text}\n" + + if formatted_text.strip(): + conversation_text = formatted_text + include_speakers = len(speakers_in_conv) > 0 + + if not conversation_text or len(conversation_text.strip()) < 10: + return "No meaningful content to summarize" try: - prompt = f"""Generate a concise title (max 40 characters) for this conversation: + speaker_instruction = ( + """- Attribute key points and statements to specific speakers when relevant +- Capture the flow of conversation between participants +- Note any agreements, disagreements, or important exchanges +""" + if include_speakers + else "" + ) + + prompt = f"""Generate a comprehensive, detailed summary of this conversation transcript. -"{conversation_text[:500]}" +TRANSCRIPT: +"{conversation_text}" + +INSTRUCTIONS: +Your task is to create a high-quality, detailed summary of a conversation transcription that captures the full information and context of what was discussed. This is NOT a brief summary - provide comprehensive coverage. Rules: -- Maximum 40 characters -- Include speaker names if relevant -- Capture the main topic -- Be specific and informative +- We know it's a conversation, so no need to say "This conversation involved..." +- Provide complete coverage of all topics, points, and important details discussed +- Correct obvious transcription errors and remove filler words (um, uh, like, you know) +- Organize information logically by topic or chronologically as appropriate +- Use clear, well-structured paragraphs or bullet points, but make the length relative to the amound of content. +- Maintain the meaning and intent of what was said, but improve clarity and coherence +- Include relevant context, decisions made, action items mentioned, and conclusions reached +{speaker_instruction}- Write in a natural, flowing narrative style +- Only include word-for-word quotes if it's more efficiency than rephrasing +- Focus on substantive content - what was actually discussed and decided -Title:""" +Think of this as creating a high-quality information set that someone could use to understand everything important that happened in this conversation without reading the full transcript. - title = await async_generate(prompt, temperature=0.3) - title = title.strip().strip('"').strip("'") - return title[:40] + "..." if len(title) > 40 else title or "Conversation" +DETAILED SUMMARY:""" + + summary = await async_generate(prompt, temperature=0.3) + return summary.strip().strip('"').strip("'") or "No meaningful content to summarize" except Exception as e: - logger.warning(f"Failed to generate LLM title with speakers: {e}") - # Fallback to simple title generation - words = conversation_text.split()[:6] - title = " ".join(words) - return title[:40] + "..." if len(title) > 40 else title or "Conversation" + logger.warning(f"Failed to generate detailed summary: {e}") + # Fallback to returning cleaned transcript + lines = conversation_text.split("\n") + cleaned = "\n".join(line.strip() for line in lines if line.strip()) + return ( + cleaned[:2000] + "..." + if len(cleaned) > 2000 + else cleaned or "No meaningful content to summarize" + ) + + +# Backward compatibility aliases for deprecated speaker-specific methods +async def generate_title_with_speakers(segments: list) -> str: + """ + Deprecated: Use generate_title(text, segments=segments) instead. + + Backward compatibility wrapper. + """ + if not segments: + return "Conversation" + # Extract text from segments for compatibility + text = "\n".join(s.get("text", "") for s in segments if s.get("text")) + return await generate_title(text, segments=segments) async def generate_summary_with_speakers(segments: list) -> str: """ - Generate an LLM-powered summary from conversation segments with speaker information. + Deprecated: Use generate_short_summary(text, segments=segments) instead. - Args: - segments: List of dicts with: - [{"speaker": str, "text": str, "start": float, "end": float}, ...] - - Returns: - str: Generated summary (1-2 sentences, max 120 chars) or fallback + Backward compatibility wrapper. """ if not segments: return "No content" + # Extract text from segments for compatibility + text = "\n".join(s.get("text", "") for s in segments if s.get("text")) + return await generate_short_summary(text, segments=segments) - # Format conversation with speaker names - conversation_text = "" - speakers_in_conv = set() - for segment in segments: - speaker = segment.get("speaker", "") - text = segment.get("text", "").strip() - if text: - if speaker: - conversation_text += f"{speaker}: {text}\n" - speakers_in_conv.add(speaker) - else: - conversation_text += f"{text}\n" - - if not conversation_text.strip(): - return "No content" - try: - prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation with speakers: +# ============================================================================ +# Conversation Job Helpers +# ============================================================================ -"{conversation_text[:1000]}" -Rules: -- Maximum 120 characters -- 1-2 complete sentences -- Include speaker names when relevant (e.g., "John discusses X with Sarah") -- Capture key topics and outcomes -- Use present tense -- Be specific and informative +async def link_job_metadata_to_conversation( + conversation_id: str, speech_job_id: Optional[str], current_job +) -> None: + """ + Link job metadata to conversation for Queue UI tracking. -Summary:""" + Updates metadata for current job and optionally parent jobs (speech detection, speaker check). + Uses "first conversation wins" pattern - only links if conversation_id not already set. - summary = await async_generate(prompt, temperature=0.3) - return summary.strip().strip('"').strip("'") or "No content" + Args: + conversation_id: Conversation ID to link + speech_job_id: Optional parent speech detection job ID + current_job: Current RQ job instance (must not be None) + """ + from advanced_omi_backend.controllers.queue_controller import redis_conn + + # Update current job metadata + if not current_job.meta: + current_job.meta = {} + current_job.meta["conversation_id"] = conversation_id + current_job.save_meta() + logger.info(f"๐ Linked job {current_job.id[:12]} to conversation {conversation_id[:12]}") + + # Update parent jobs if provided + if not speech_job_id: + return + + _link_parent_job(speech_job_id, conversation_id, redis_conn) + +def _link_parent_job(job_id: str, conversation_id: str, redis_conn) -> None: + """ + Link a parent job to conversation, with cascading to speaker check job. + + Helper function to reduce nesting in link_job_metadata_to_conversation. + """ + from rq.job import Job + + try: + job = Job.fetch(job_id, connection=redis_conn) except Exception as e: - logger.warning(f"Failed to generate LLM summary with speakers: {e}") - # Fallback to simple summary generation - return conversation_text[:120] + "..." if len(conversation_text) > 120 else conversation_text or "No content" + logger.warning(f"โ ๏ธ Failed to fetch job {job_id[:12]}: {e}") + return + + if not job.meta: + logger.debug(f"Job {job_id[:12]} has no metadata, skipping") + return + + # Check if already linked (first conversation wins) + existing_conv_id = job.meta.get("conversation_id") + if existing_conv_id: + logger.info(f"โญ๏ธ Job {job_id[:12]} already linked to {existing_conv_id[:12]}, skipping") + return + + # Link this job + job.meta["conversation_id"] = conversation_id + job.meta.pop("session_level", None) # Remove session-level flag + job.save_meta() + logger.info(f"๐ Linked job {job_id[:12]} to conversation {conversation_id[:12]}") + + # Cascade to speaker check job if present + speaker_check_job_id = job.meta.get("speaker_check_job_id") + if speaker_check_job_id: + _link_parent_job(speaker_check_job_id, conversation_id, redis_conn) + + +async def signal_conversation_file_rotation( + session_id: str, conversation_id: str, redis_client +) -> None: + """ + Signal audio persistence job to rotate to new conversation file. + + Sets Redis key that audio_streaming_persistence_job watches to detect + when a new conversation starts and rotate the output file accordingly. + + Args: + session_id: Session ID + conversation_id: Conversation ID to rotate to + redis_client: Redis client instance + """ + # Signal audio persistence job to rotate to this conversation's file + rotation_signal_key = f"conversation:current:{session_id}" + await redis_client.set(rotation_signal_key, conversation_id, ex=86400) # 24 hour TTL + logger.info( + f"๐ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}" + ) + + +def extract_speakers_from_segments(segments: List[Dict[str, Any]]) -> List[str]: + """ + Extract unique speaker names from segments. + + Args: + segments: List of segments with speaker information + + Returns: + List of unique speaker names (excluding "Unknown") + """ + speakers = [] + if segments: + for seg in segments: + speaker = seg.get("speaker", "Unknown") + if speaker and speaker != "Unknown" and speaker not in speakers: + speakers.append(speaker) + return speakers + + +async def track_speech_activity( + speech_analysis: Dict[str, Any], last_word_count: int, conversation_id: str, redis_client +) -> tuple[float, int]: + """ + Track new speech activity and update last speech timestamp. + + Uses word count instead of chunk count to avoid false positives from noise/silence. + + Args: + speech_analysis: Speech analysis results from analyze_speech() + last_word_count: Previous word count + conversation_id: Conversation ID for Redis key + redis_client: Redis client instance + + Returns: + Tuple of (last_meaningful_speech_time, new_word_count) + """ + current_word_count = speech_analysis.get("word_count", 0) + + if current_word_count > last_word_count: + last_meaningful_speech_time = time.time() + + # Store timestamp in Redis for visibility/debugging + await redis_client.set( + f"conversation:last_speech:{conversation_id}", + last_meaningful_speech_time, + ex=86400, # 24 hour TTL + ) + logger.debug( + f"๐ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp" + ) + + return last_meaningful_speech_time, current_word_count + + # No new speech - return None to indicate no update + return None, last_word_count + + +async def update_job_progress_metadata( + current_job, + conversation_id: str, + session_id: str, + client_id: str, + combined: Dict[str, Any], + speech_analysis: Dict[str, Any], + speakers: List[str], + last_meaningful_speech_time: float, +) -> None: + """ + Update job metadata with current conversation progress. + + Args: + current_job: Current RQ job instance + conversation_id: Conversation ID + session_id: Session ID + client_id: Client ID + combined: Combined transcription results + speech_analysis: Speech analysis results + speakers: List of speakers + last_meaningful_speech_time: Timestamp of last speech + """ + if not current_job: + return + + if not current_job.meta: + current_job.meta = {} + + # Set created_at only once (first time we update metadata) + if "created_at" not in current_job.meta: + current_job.meta["created_at"] = datetime.now().isoformat() + + current_job.meta.update( + { + "conversation_id": conversation_id, + "audio_uuid": session_id, # Link to session for job grouping + "client_id": client_id, # Ensure client_id is always present + "transcript": ( + combined["text"][:500] + "..." if len(combined["text"]) > 500 else combined["text"] + ), # First 500 chars + "transcript_length": len(combined["text"]), + "speakers": speakers, + "word_count": speech_analysis.get("word_count", 0), + "duration_seconds": speech_analysis.get("duration", 0), + "has_speech": speech_analysis.get("has_speech", False), + "last_update": datetime.now().isoformat(), + "inactivity_seconds": time.time() - last_meaningful_speech_time, + "chunks_processed": combined["chunk_count"], + } + ) + current_job.save_meta() + + +async def wait_for_audio_file( + conversation_id: str, redis_client, max_wait_seconds: int = 30 +) -> Optional[str]: + """ + Wait for audio persistence job to write audio file path to Redis. + + Polls Redis for audio file path with configurable timeout. + + Args: + conversation_id: Conversation ID + redis_client: Redis client instance + max_wait_seconds: Maximum wait time in seconds (default: 30) + + Returns: + Audio file path (str) if ready, None if timeout + """ + audio_file_key = f"audio:file:{conversation_id}" + wait_start = time.time() + + while time.time() - wait_start < max_wait_seconds: + file_path_bytes = await redis_client.get(audio_file_key) + if file_path_bytes: + wait_duration = time.time() - wait_start + logger.info(f"โ Audio file ready after {wait_duration:.1f}s") + return file_path_bytes.decode() + + # Log progress every 5 seconds + elapsed = time.time() - wait_start + if elapsed % 5 == 0: + logger.info( + f"โณ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)" + ) + + await asyncio.sleep(0.5) # Check every 500ms + + logger.error( + f"โ Audio file path not found in Redis after {max_wait_seconds}s (key: {audio_file_key})" + ) + logger.warning( + "โ ๏ธ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription" + ) + return None + + +async def mark_conversation_deleted(conversation_id: str, deletion_reason: str) -> None: + """ + Mark a conversation as deleted with a specific reason. + + Uses soft delete pattern - conversation remains in database but marked as deleted. + + Args: + conversation_id: Conversation ID to mark as deleted + deletion_reason: Reason for deletion (e.g., "no_meaningful_speech", "audio_file_not_ready") + """ + from advanced_omi_backend.models.conversation import Conversation + + logger.warning( + f"๐๏ธ Marking conversation {conversation_id} as deleted - reason: {deletion_reason}" + ) + + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + conversation.deleted = True + conversation.deletion_reason = deletion_reason + conversation.deleted_at = datetime.utcnow() + await conversation.save() + logger.info(f"โ Marked conversation {conversation_id} as deleted") diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 1c7b227a..92e4676a 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -25,6 +25,7 @@ async def process_cropping_job( conversation_id: str, audio_path: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -34,7 +35,7 @@ async def process_cropping_job( 1. Reads transcript segments from conversation 2. Extracts speech timestamps 3. Creates cropped audio file with only speech segments - 4. Updates audio_chunks collection with cropped file path + 4. Updates conversation with cropped file path Args: conversation_id: Conversation ID @@ -46,7 +47,6 @@ async def process_cropping_job( """ from pathlib import Path from advanced_omi_backend.utils.audio_utils import _process_audio_cropping_with_relative_timestamps - from advanced_omi_backend.database import get_collections, AudioChunksRepository from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.config import CHUNK_DIR @@ -58,7 +58,7 @@ async def process_cropping_job( if not conversation: raise ValueError(f"Conversation {conversation_id} not found") - # Extract speech segments from transcript + # Extract speech segments from transcript (property returns data from active version) segments = conversation.segments if not segments or len(segments) == 0: logger.warning(f"โ ๏ธ No segments found for conversation {conversation_id}, skipping cropping") @@ -78,17 +78,13 @@ async def process_cropping_job( cropped_filename = f"cropped_{original_path.name}" output_path = CHUNK_DIR / cropped_filename - # Get repository for database updates - collections = get_collections() - repository = AudioChunksRepository(collections["chunks_col"]) - - # Process cropping - success = await _process_audio_cropping_with_relative_timestamps( + # Process cropping (no repository needed - we update conversation directly) + success, segment_mapping = await _process_audio_cropping_with_relative_timestamps( str(original_path), speech_segments, str(output_path), audio_uuid, - repository + None # No repository - we update conversation model directly ) if not success: @@ -99,13 +95,46 @@ async def process_cropping_job( "reason": "cropping_failed" } - # Calculate cropped duration - cropped_duration_seconds = sum(end - start for start, end in speech_segments) + # Calculate actual cropped duration from kept segments + kept_segments = [m for m in segment_mapping if m["kept"]] + if kept_segments: + # Duration is end of last kept segment + cropped_duration_seconds = kept_segments[-1]["cropped_end"] + else: + cropped_duration_seconds = 0.0 + + # Update segment timestamps using the mapping + # Only keep segments that weren't filtered out + updated_segments = [] + for i, seg in enumerate(segments): + if i >= len(segment_mapping): + logger.warning(f"โ ๏ธ Segment {i} not in mapping, skipping") + continue + + mapping = segment_mapping[i] + if mapping["kept"]: + # Segment was kept - use the cropped timestamps + updated_seg = seg.model_copy() + updated_seg.start = mapping["cropped_start"] + updated_seg.end = mapping["cropped_end"] + updated_segments.append(updated_seg) + logger.debug( + f"Segment {i}: {seg.start:.2f}-{seg.end:.2f}s โ " + f"{updated_seg.start:.2f}-{updated_seg.end:.2f}s (in cropped audio)" + ) + else: + # Segment was filtered out (too short) + logger.debug( + f"Segment {i} filtered out (duration {seg.end - seg.start:.2f}s < MIN_SPEECH_SEGMENT_DURATION)" + ) - # Update conversation with cropped audio path + # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename + # Update the active transcript version segments + if conversation.active_transcript: + conversation.active_transcript.segments = updated_segments await conversation.save() - logger.info(f"๐พ Updated conversation {conversation_id[:12]} with cropped_audio_path: {cropped_filename}") + logger.info(f"๐พ Updated conversation {conversation_id[:12]} with cropped_audio_path and adjusted {len(updated_segments)} segment timestamps") logger.info(f"โ RQ: Completed audio cropping for conversation {conversation_id} ({cropped_duration_seconds:.1f}s)") @@ -140,6 +169,7 @@ async def audio_streaming_persistence_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -183,7 +213,7 @@ async def audio_streaming_persistence_job( # Job control session_key = f"audio:session:{session_id}" - max_runtime = 3540 # 59 minutes + max_runtime = 86340 # 24 hours - 60 seconds (graceful exit before RQ timeout) start_time = time.time() from advanced_omi_backend.config import CHUNK_DIR @@ -302,7 +332,7 @@ async def audio_streaming_persistence_job( # Store file path in Redis (keyed by conversation_id, not session_id) audio_file_key = f"audio:file:{current_conversation_id}" - await redis_client.set(audio_file_key, str(file_path), ex=3600) + await redis_client.set(audio_file_key, str(file_path), ex=86400) # 24 hour TTL logger.info(f"๐พ Stored audio file path in Redis: {audio_file_key}") else: # Key deleted - conversation ended, close current file diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 1d3400c3..67e4d935 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -6,14 +6,154 @@ import asyncio import logging -import time +import time, os +from datetime import datetime from typing import Dict, Any - +from rq.job import Job from advanced_omi_backend.models.job import async_job +from advanced_omi_backend.controllers.queue_controller import redis_conn + +from advanced_omi_backend.utils.conversation_utils import ( + analyze_speech, + extract_speakers_from_segments, + track_speech_activity, + update_job_progress_metadata, +) +from advanced_omi_backend.utils.conversation_utils import ( + is_meaningful_speech, + mark_conversation_deleted, +) + +from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs logger = logging.getLogger(__name__) +async def handle_end_of_conversation( + session_id: str, + conversation_id: str, + client_id: str, + user_id: str, + start_time: float, + last_result_count: int, + timeout_triggered: bool, + redis_client, +) -> Dict[str, Any]: + """ + Handle end-of-conversation cleanup and session restart logic. + + This function is called at the end of open_conversation_job to: + 1. Clean up Redis streams and tracking keys + 2. Increment conversation count for the session + 3. Re-enqueue speech detection job if session is still active + + Args: + session_id: Stream session ID + conversation_id: Conversation ID that just completed + client_id: Client ID + user_id: User ID + start_time: Job start time (for runtime calculation) + last_result_count: Number of transcription results processed + timeout_triggered: Whether closure was due to inactivity timeout + redis_client: Redis client instance + + Returns: + Dict with conversation_id, conversation_count, final_result_count, runtime_seconds, timeout_triggered + """ + # Clean up Redis streams to prevent memory leaks + try: + # NOTE: Do NOT delete audio:stream:{client_id} here! + # The audio stream is per-client (WebSocket connection), not per-conversation. + # It's still actively receiving audio and will be reused by the next conversation. + # Only delete it on WebSocket disconnect (handled in websocket_controller.py) + + # Delete the transcription results stream (per-session/conversation) + results_stream_key = f"transcription:results:{session_id}" + await redis_client.delete(results_stream_key) + logger.info(f"๐งน Deleted results stream: {results_stream_key}") + + # Set TTL on session key (expire after 1 hour) + session_key = f"audio:session:{session_id}" + await redis_client.expire(session_key, 3600) + logger.info(f"โฐ Set TTL on session key: {session_key}") + except Exception as cleanup_error: + logger.warning(f"โ ๏ธ Error during stream cleanup: {cleanup_error}") + + # Clean up Redis tracking keys so speech detection job knows conversation is complete + open_job_key = f"open_conversation:session:{session_id}" + await redis_client.delete(open_job_key) + logger.info(f"๐งน Cleaned up tracking key {open_job_key}") + + # Delete the conversation:current signal so audio persistence knows conversation ended + current_conversation_key = f"conversation:current:{session_id}" + await redis_client.delete(current_conversation_key) + logger.info(f"๐งน Deleted conversation:current signal for session {session_id[:12]}") + + # Increment conversation count for this session + conversation_count_key = f"session:conversation_count:{session_id}" + conversation_count = await redis_client.incr(conversation_count_key) + await redis_client.expire(conversation_count_key, 3600) # 1 hour TTL + logger.info(f"๐ Conversation count for session {session_id}: {conversation_count}") + + # Check if session is still active (user still recording) and restart listening jobs + session_key = f"audio:session:{session_id}" + session_status = await redis_client.hget(session_key, "status") + if session_status: + status_str = ( + session_status.decode() if isinstance(session_status, bytes) else session_status + ) + + if status_str == "active": + # Session still active - enqueue new speech detection for next conversation + logger.info( + f"๐ Enqueueing new speech detection (conversation #{conversation_count + 1})" + ) + + from advanced_omi_backend.controllers.queue_controller import ( + transcription_queue, + redis_conn, + JOB_RESULT_TTL, + ) + from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job + + # Enqueue speech detection job for next conversation (audio persistence keeps running) + speech_job = transcription_queue.enqueue( + stream_speech_detection_job, + session_id, + user_id, + client_id, + job_timeout=3600, + result_ttl=JOB_RESULT_TTL, + job_id=f"speech-detect_{session_id[:12]}_{conversation_count}", + description=f"Listening for speech (conversation #{conversation_count + 1})", + meta={"audio_uuid": session_id, "client_id": client_id, "session_level": True}, + ) + + # Store job ID for cleanup (keyed by client_id for WebSocket cleanup) + try: + redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) + logger.info(f"๐ Stored speech detection job ID for client {client_id}") + except Exception as e: + logger.warning(f"โ ๏ธ Failed to store job ID for {client_id}: {e}") + + logger.info(f"โ Enqueued speech detection job {speech_job.id}") + else: + logger.info( + f"Session {session_id} status={status_str}, not restarting (user stopped recording)" + ) + else: + logger.info(f"Session {session_id} not found, not restarting (session ended)") + + return { + "conversation_id": conversation_id, + "conversation_count": conversation_count, + "deleted": False, # This conversation was not deleted (normal completion) + "final_result_count": last_result_count, + "runtime_seconds": time.time() - start_time, + "timeout_triggered": timeout_triggered, + } + + @async_job(redis=True, beanie=True) async def open_conversation_job( session_id: str, @@ -21,7 +161,8 @@ async def open_conversation_job( client_id: str, speech_detected_at: float, speech_job_id: str = None, - redis_client=None + *, + redis_client=None, ) -> Dict[str, Any]: """ Long-running RQ job that creates and continuously updates conversation with transcription results. @@ -45,18 +186,22 @@ async def open_conversation_job( from advanced_omi_backend.models.conversation import Conversation, create_conversation from rq import get_current_job - logger.info(f"๐ Creating and opening conversation for session {session_id} (speech detected at {speech_detected_at})") + logger.info( + f"๐ Creating and opening conversation for session {session_id} (speech detected at {speech_detected_at})" + ) # Get current job for meta storage current_job = get_current_job() - + current_job.meta = {} + current_job.save_meta() + # Create minimal streaming conversation (conversation_id auto-generated) conversation = create_conversation( audio_uuid=session_id, user_id=user_id, client_id=client_id, title="Recording...", - summary="Transcribing audio..." + summary="Transcribing audio...", ) # Save to database @@ -64,66 +209,37 @@ async def open_conversation_job( conversation_id = conversation.conversation_id # Get the auto-generated ID logger.info(f"โ Created streaming conversation {conversation_id} for session {session_id}") - # Update speech detection job metadata with conversation_id - if speech_job_id: - try: - from rq.job import Job - from advanced_omi_backend.controllers.queue_controller import redis_conn - - speech_job = Job.fetch(speech_job_id, connection=redis_conn) - if speech_job and speech_job.meta: - # Only update if conversation_id not already set (first conversation wins) - if not speech_job.meta.get('conversation_id'): - speech_job.meta['conversation_id'] = conversation_id - # Remove session_level flag - now linked to conversation - speech_job.meta.pop('session_level', None) - speech_job.save_meta() - logger.info(f"๐ Updated speech job {speech_job_id[:12]} with conversation_id") - else: - logger.info(f"โญ๏ธ Speech job {speech_job_id[:12]} already linked to conversation {speech_job.meta.get('conversation_id')[:12]}") - - # Also update the speaker check job if referenced in speech job metadata - # Only update if it doesn't already have a conversation_id (first conversation wins) - speaker_check_job_id = speech_job.meta.get('speaker_check_job_id') - if speaker_check_job_id: - try: - speaker_check_job = Job.fetch(speaker_check_job_id, connection=redis_conn) - if speaker_check_job and speaker_check_job.meta: - # Only update if conversation_id not already set - if not speaker_check_job.meta.get('conversation_id'): - speaker_check_job.meta['conversation_id'] = conversation_id - speaker_check_job.save_meta() - logger.info(f"๐ Updated speaker check job {speaker_check_job_id} with conversation_id") - else: - logger.info(f"โญ๏ธ Speaker check job {speaker_check_job_id} already linked to conversation {speaker_check_job.meta.get('conversation_id')[:12]}") - except Exception as speaker_err: - logger.warning(f"โ ๏ธ Failed to update speaker check job metadata: {speaker_err}") - except Exception as e: - logger.warning(f"โ ๏ธ Failed to update speech job metadata: {e}") - - # Store conversation_id in Redis for finalize job to find - conversation_key = f"conversation:session:{session_id}" - await redis_client.set(conversation_key, conversation_id, ex=3600) - logger.info(f"๐พ Stored conversation ID in Redis: {conversation_key}") - + # Link job metadata to conversation (cascading updates) + current_job.meta["conversation_id"] = conversation_id + current_job.save_meta() + speech_job = Job.fetch(speech_job_id, connection=redis_conn) + speech_job.meta["conversation_id"] = conversation_id + speech_job.save_meta() + speaker_check_job_id = speech_job.meta.get("speaker_check_job_id") + if speaker_check_job_id: + speaker_check_job = Job.fetch(speaker_check_job_id, connection=redis_conn) + speaker_check_job.meta["conversation_id"] = conversation_id + speaker_check_job.save_meta() + # Signal audio persistence job to rotate to this conversation's file - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.set(current_conversation_key, conversation_id, ex=3600) - logger.info(f"๐ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}") + rotation_signal_key = f"conversation:current:{session_id}" + await redis_client.set(rotation_signal_key, conversation_id, ex=86400) # 24 hour TTL + logger.info( + f"๐ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}" + ) # Use redis_client parameter aggregator = TranscriptionResultsAggregator(redis_client) # Job control session_key = f"audio:session:{session_id}" - max_runtime = 3540 # 59 minutes (graceful exit before RQ timeout at 60 min) + max_runtime = 10740 # 3 hours - 60 seconds (single conversations shouldn't exceed 3 hours) start_time = time.time() last_result_count = 0 finalize_received = False # Inactivity timeout configuration - import os inactivity_timeout_seconds = float(os.getenv("SPEECH_INACTIVITY_THRESHOLD_SECONDS", "60")) inactivity_timeout_minutes = inactivity_timeout_seconds / 60 last_meaningful_speech_time = time.time() # Initialize with conversation start @@ -131,7 +247,9 @@ async def open_conversation_job( last_inactivity_log_time = time.time() # Track when we last logged inactivity last_word_count = 0 # Track word count to detect actual new speech - logger.info(f"๐ Conversation timeout configured: {inactivity_timeout_minutes} minutes ({inactivity_timeout_seconds}s)") + logger.info( + f"๐ Conversation timeout configured: {inactivity_timeout_minutes} minutes ({inactivity_timeout_seconds}s)" + ) while True: # Check if session is finalizing (set by producer when recording stops) @@ -139,7 +257,9 @@ async def open_conversation_job( status = await redis_client.hget(session_key, "status") if status and status.decode() in ["finalizing", "complete"]: finalize_received = True - logger.info(f"๐ Session finalizing, waiting for audio persistence job to complete...") + logger.info( + f"๐ Session finalizing, waiting for audio persistence job to complete..." + ) break # Exit immediately when finalize signal received # Check max runtime timeout @@ -152,63 +272,36 @@ async def open_conversation_job( current_count = combined["chunk_count"] # Analyze speech content using detailed analysis - from advanced_omi_backend.utils.conversation_utils import analyze_speech - transcript_data = { - "text": combined["text"], - "words": combined.get("words", []) - } + + transcript_data = {"text": combined["text"], "words": combined.get("words", [])} speech_analysis = analyze_speech(transcript_data) # Extract speaker information from segments - speakers = [] segments = combined.get("segments", []) - if segments: - for seg in segments: - speaker = seg.get("speaker", "Unknown") - if speaker and speaker != "Unknown" and speaker not in speakers: - speakers.append(speaker) - - # Check if NEW speech arrived (word count increased) - # Track word count instead of chunk count to avoid resetting on noise/silence chunks - current_word_count = speech_analysis.get("word_count", 0) - if current_word_count > last_word_count: - last_meaningful_speech_time = time.time() - last_word_count = current_word_count - # Store timestamp in Redis for visibility/debugging - await redis_client.set( - f"conversation:last_speech:{conversation_id}", - last_meaningful_speech_time, - ex=3600 # 1 hour TTL - ) - logger.debug(f"๐ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp") - - # Update job meta with current state - if current_job: - if not current_job.meta: - current_job.meta = {} - - from datetime import datetime + speakers = extract_speakers_from_segments(segments) - # Set created_at only once (first time we update metadata) - if 'created_at' not in current_job.meta: - current_job.meta['created_at'] = datetime.now().isoformat() + # Track new speech activity (word count based) + new_speech_time, last_word_count = await track_speech_activity( + speech_analysis=speech_analysis, + last_word_count=last_word_count, + conversation_id=conversation_id, + redis_client=redis_client, + ) + if new_speech_time: + last_meaningful_speech_time = new_speech_time - current_job.meta.update({ - "conversation_id": conversation_id, - "audio_uuid": session_id, # Link to session for job grouping - "client_id": client_id, # Ensure client_id is always present - "transcript": combined["text"][:500] + "..." if len(combined["text"]) > 500 else combined["text"], # First 500 chars - "transcript_length": len(combined["text"]), - "speakers": speakers, - "word_count": speech_analysis.get("word_count", 0), - "duration_seconds": speech_analysis.get("duration", 0), - "has_speech": speech_analysis.get("has_speech", False), - "last_update": datetime.now().isoformat(), - "inactivity_seconds": time.time() - last_meaningful_speech_time, - "chunks_processed": current_count - }) - current_job.save_meta() + # Update job metadata with current progress + await update_job_progress_metadata( + current_job=current_job, + conversation_id=conversation_id, + session_id=session_id, + client_id=client_id, + combined=combined, + speech_analysis=speech_analysis, + speakers=speakers, + last_meaningful_speech_time=last_meaningful_speech_time, + ) # Check inactivity timeout and log every 10 seconds inactivity_duration = time.time() - last_meaningful_speech_time @@ -216,7 +309,9 @@ async def open_conversation_job( # Log inactivity every 10 seconds if current_time - last_inactivity_log_time >= 10: - logger.info(f"โฑ๏ธ Time since last speech: {inactivity_duration:.1f}s (timeout: {inactivity_timeout_seconds:.0f}s)") + logger.info( + f"โฑ๏ธ Time since last speech: {inactivity_duration:.1f}s (timeout: {inactivity_timeout_seconds:.0f}s)" + ) last_inactivity_log_time = current_time if inactivity_duration > inactivity_timeout_seconds: @@ -232,181 +327,132 @@ async def open_conversation_job( finalize_received = True break - # Update conversation if new results arrived + # Track results progress (conversation will get transcript from transcription job) if current_count > last_result_count: - # Update conversation in MongoDB - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id + logger.info( + f"๐ Conversation {conversation_id} progress: " + f"{current_count} results, {len(combined['text'])} chars, {len(combined['segments'])} segments" ) - - if conversation: - conversation.transcript = combined["text"] - conversation.segments = combined["segments"] - await conversation.save() - - logger.info( - f"๐ Updated conversation {conversation_id}: " - f"{current_count} results, {len(combined['text'])} chars, {len(combined['segments'])} segments" - ) - else: - logger.warning(f"โ ๏ธ Conversation {conversation_id} not found") - last_result_count = current_count await asyncio.sleep(1) # Check every second for responsiveness - logger.info(f"โ Conversation {conversation_id} updates complete, waiting for audio file to be ready...") - - # Wait for audio_streaming_persistence_job to complete and write the file path - # Audio persistence now writes files per-conversation, so key uses conversation_id - audio_file_key = f"audio:file:{conversation_id}" - file_path_bytes = None - max_wait_audio = 30 # Maximum 30 seconds to wait for audio file - wait_start = time.time() - - while time.time() - wait_start < max_wait_audio: - file_path_bytes = await redis_client.get(audio_file_key) - if file_path_bytes: - wait_duration = time.time() - wait_start - logger.info(f"โ Audio file ready after {wait_duration:.1f}s") - break + logger.info( + f"โ Conversation {conversation_id} updates complete, checking for meaningful speech..." + ) - # Check if still within reasonable time - elapsed = time.time() - wait_start - if elapsed % 5 == 0: # Log every 5 seconds - logger.info(f"โณ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)") + # FINAL VALIDATION: Check if conversation has meaningful speech before post-processing + # This prevents empty/noise-only conversations from being processed and saved + combined = await aggregator.get_combined_results(session_id) - await asyncio.sleep(0.5) # Check every 500ms - if not file_path_bytes: - logger.error(f"โ Audio file path not found in Redis after {max_wait_audio}s (key: {audio_file_key})") - logger.warning(f"โ ๏ธ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription") - else: - file_path = file_path_bytes.decode() - logger.info(f"๐ Retrieved audio file path: {file_path}") + if not is_meaningful_speech(combined): + logger.warning( + f"โ ๏ธ Conversation {conversation_id} has no meaningful speech after finalization" + ) - # Update conversation with audio file path - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id + # Mark conversation as deleted (soft delete) + await mark_conversation_deleted( + conversation_id=conversation_id, + deletion_reason="no_meaningful_speech", ) - if conversation: - # Store just the filename (relative to CHUNK_DIR) - from pathlib import Path - audio_filename = Path(file_path).name - conversation.audio_path = audio_filename - await conversation.save() - logger.info(f"๐พ Updated conversation {conversation_id[:12]} with audio_path: {audio_filename}") - else: - logger.warning(f"โ ๏ธ Conversation {conversation_id} not found for audio_path update") - # Enqueue post-conversation processing pipeline - from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs + logger.info("โ Marked conversation as deleted, session can continue") - job_ids = start_post_conversation_jobs( + # Call shared cleanup/restart logic before returning + return await handle_end_of_conversation( + session_id=session_id, conversation_id=conversation_id, - audio_uuid=session_id, - audio_file_path=file_path, + client_id=client_id, user_id=user_id, - post_transcription=True # Run batch transcription for streaming audio + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, ) - logger.info( - f"๐ฅ Pipeline: transcribe({job_ids['transcription']}) โ " - f"speaker({job_ids['speaker_recognition']}) โ " - f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})]" - ) - - # Wait a moment to ensure jobs are registered in RQ - await asyncio.sleep(0.5) + logger.info("โ Conversation has meaningful speech, proceeding with post-processing") - # Clean up Redis streams to prevent memory leaks - try: - # NOTE: Do NOT delete audio:stream:{client_id} here! - # The audio stream is per-client (WebSocket connection), not per-conversation. - # It's still actively receiving audio and will be reused by the next conversation. - # Only delete it on WebSocket disconnect (handled in websocket_controller.py) + # Wait for audio_streaming_persistence_job to complete and write the file path + from advanced_omi_backend.utils.conversation_utils import wait_for_audio_file - # Delete the transcription results stream (per-session/conversation) - results_stream_key = f"transcription:results:{session_id}" - await redis_client.delete(results_stream_key) - logger.info(f"๐งน Deleted results stream: {results_stream_key}") + file_path = await wait_for_audio_file( + conversation_id=conversation_id, redis_client=redis_client, max_wait_seconds=30 + ) - # Set TTL on session key (expire after 1 hour) - await redis_client.expire(session_key, 3600) - logger.info(f"โฐ Set TTL on session key: {session_key}") - except Exception as cleanup_error: - logger.warning(f"โ ๏ธ Error during stream cleanup: {cleanup_error}") + if not file_path: + # Mark conversation as deleted - has speech but no audio file to process + await mark_conversation_deleted( + conversation_id=conversation_id, + deletion_reason="audio_file_not_ready", + ) - # Clean up Redis tracking keys so speech detection job knows conversation is complete - open_job_key = f"open_conversation:session:{session_id}" - await redis_client.delete(open_job_key) - logger.info(f"๐งน Cleaned up tracking key {open_job_key}") + # Call shared cleanup/restart logic before returning + return await handle_end_of_conversation( + session_id=session_id, + conversation_id=conversation_id, + client_id=client_id, + user_id=user_id, + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, + ) - # Delete the conversation:current signal so audio persistence knows conversation ended - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.delete(current_conversation_key) - logger.info(f"๐งน Deleted conversation:current signal for session {session_id[:12]}") + logger.info(f"๐ Retrieved audio file path: {file_path}") - # Increment conversation count for this session - conversation_count_key = f"session:conversation_count:{session_id}" - conversation_count = await redis_client.incr(conversation_count_key) - await redis_client.expire(conversation_count_key, 3600) # 1 hour TTL - logger.info(f"๐ Conversation count for session {session_id}: {conversation_count}") + # Update conversation with audio file path + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + # Store just the filename (relative to CHUNK_DIR) + from pathlib import Path - # Check if session is still active (user still recording) and restart listening jobs - session_status = await redis_client.hget(session_key, "status") - if session_status: - status_str = session_status.decode() if isinstance(session_status, bytes) else session_status + audio_filename = Path(file_path).name + conversation.audio_path = audio_filename + await conversation.save() + logger.info( + f"๐พ Updated conversation {conversation_id[:12]} with audio_path: {audio_filename}" + ) + else: + logger.warning(f"โ ๏ธ Conversation {conversation_id} not found for audio_path update") - if status_str == "active": - # Session still active - enqueue new speech detection for next conversation - logger.info(f"๐ Enqueueing new speech detection (conversation #{conversation_count + 1})") + # Enqueue post-conversation processing pipeline - from advanced_omi_backend.controllers.queue_controller import transcription_queue, redis_conn, JOB_RESULT_TTL - from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job - # Enqueue speech detection job for next conversation (audio persistence keeps running) - speech_job = transcription_queue.enqueue( - stream_speech_detection_job, - session_id, - user_id, - client_id, - job_timeout=3600, - result_ttl=JOB_RESULT_TTL, - job_id=f"speech-detect_{session_id[:12]}_{conversation_count}", - description=f"Listening for speech (conversation #{conversation_count + 1})", - meta={'audio_uuid': session_id, 'client_id': client_id, 'session_level': True} - ) + job_ids = start_post_conversation_jobs( + conversation_id=conversation_id, + audio_uuid=session_id, + audio_file_path=file_path, + user_id=user_id, + post_transcription=True, # Run batch transcription for streaming audio + ) - # Store job ID for cleanup (keyed by client_id for WebSocket cleanup) - try: - redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) - logger.info(f"๐ Stored speech detection job ID for client {client_id}") - except Exception as e: - logger.warning(f"โ ๏ธ Failed to store job ID for {client_id}: {e}") + logger.info( + f"๐ฅ Pipeline: transcribe({job_ids['transcription']}) โ " + f"speaker({job_ids['speaker_recognition']}) โ " + f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})]" + ) - logger.info(f"โ Enqueued speech detection job {speech_job.id}") - else: - logger.info(f"Session {session_id} status={status_str}, not restarting (user stopped recording)") - else: - logger.info(f"Session {session_id} not found, not restarting (session ended)") + # Wait a moment to ensure jobs are registered in RQ + await asyncio.sleep(0.5) - return { - "conversation_id": conversation_id, - "conversation_count": conversation_count, - "final_result_count": last_result_count, - "runtime_seconds": time.time() - start_time, - "timeout_triggered": timeout_triggered - } + # Call shared cleanup/restart logic + return await handle_end_of_conversation( + session_id=session_id, + conversation_id=conversation_id, + client_id=client_id, + user_id=user_id, + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, + ) @async_job(redis=True, beanie=True) -async def generate_title_summary_job( - conversation_id: str, - redis_client=None -) -> Dict[str, Any]: +async def generate_title_summary_job(conversation_id: str, *, redis_client=None) -> Dict[str, Any]: """ - Generate title and summary for a conversation using LLM. + Generate title, short summary, and detailed summary for a conversation using LLM. This job runs independently of transcription and memory jobs to ensure conversations always get meaningful titles and summaries, even if other @@ -419,12 +465,13 @@ async def generate_title_summary_job( redis_client: Redis client (injected by decorator) Returns: - Dict with generated title and summary + Dict with generated title, summary, and detailed_summary """ from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.utils.conversation_utils import ( - generate_title_with_speakers, - generate_summary_with_speakers + generate_title, + generate_short_summary, + generate_detailed_summary, ) logger.info(f"๐ Starting title/summary generation for conversation {conversation_id}") @@ -437,40 +484,48 @@ async def generate_title_summary_job( logger.error(f"Conversation {conversation_id} not found") return {"success": False, "error": "Conversation not found"} - # Get segments from active transcript version + # Get transcript and segments (properties return data from active transcript version) + transcript_text = conversation.transcript or "" segments = conversation.segments or [] - if not segments or len(segments) == 0: - logger.warning(f"โ ๏ธ No segments available for conversation {conversation_id}") + if not transcript_text and (not segments or len(segments) == 0): + logger.warning(f"โ ๏ธ No transcript or segments available for conversation {conversation_id}") return { "success": False, - "error": "No segments available", - "conversation_id": conversation_id + "error": "No transcript or segments available", + "conversation_id": conversation_id, } - # Generate title and summary using speaker-aware utilities + # Generate title, short summary, and detailed summary using unified utilities try: - logger.info(f"๐ค Generating title/summary using LLM for conversation {conversation_id}") + logger.info( + f"๐ค Generating title/summary/detailed_summary using LLM for conversation {conversation_id}" + ) # Convert segments to dict format expected by utils - segment_dicts = [ - { - "speaker": seg.speaker, - "text": seg.text, - "start": seg.start, - "end": seg.end - } - for seg in segments - ] - - # Generate title and summary with speaker awareness - title = await generate_title_with_speakers(segment_dicts) - summary = await generate_summary_with_speakers(segment_dicts) + segment_dicts = None + if segments and len(segments) > 0: + segment_dicts = [ + {"speaker": seg.speaker, "text": seg.text, "start": seg.start, "end": seg.end} + for seg in segments + ] + + # Generate all three summaries in parallel for efficiency + import asyncio + + title, short_summary, detailed_summary = await asyncio.gather( + generate_title(transcript_text, segments=segment_dicts), + generate_short_summary(transcript_text, segments=segment_dicts), + generate_detailed_summary(transcript_text, segments=segment_dicts), + ) conversation.title = title - conversation.summary = summary + conversation.summary = short_summary + conversation.detailed_summary = detailed_summary - logger.info(f"โ Generated title: '{conversation.title}', summary: '{conversation.summary}'") + logger.info(f"โ Generated title: '{conversation.title}'") + logger.info(f"โ Generated summary: '{conversation.summary}'") + logger.info(f"โ Generated detailed summary: {len(conversation.detailed_summary)} chars") except Exception as gen_error: logger.error(f"โ Title/summary generation failed: {gen_error}") @@ -478,7 +533,7 @@ async def generate_title_summary_job( "success": False, "error": str(gen_error), "conversation_id": conversation_id, - "processing_time_seconds": time.time() - start_time + "processing_time_seconds": time.time() - start_time, } # Save the updated conversation @@ -488,25 +543,34 @@ async def generate_title_summary_job( # Update job metadata from rq import get_current_job + current_job = get_current_job() if current_job: if not current_job.meta: current_job.meta = {} - current_job.meta.update({ - "conversation_id": conversation_id, - "title": conversation.title, - "summary": conversation.summary, - "segment_count": len(segments), - "processing_time": processing_time - }) + current_job.meta.update( + { + "conversation_id": conversation_id, + "title": conversation.title, + "summary": conversation.summary, + "detailed_summary_length": ( + len(conversation.detailed_summary) if conversation.detailed_summary else 0 + ), + "segment_count": len(segments), + "processing_time": processing_time, + } + ) current_job.save_meta() - logger.info(f"โ Title/summary generation completed for {conversation_id} in {processing_time:.2f}s") + logger.info( + f"โ Title/summary generation completed for {conversation_id} in {processing_time:.2f}s" + ) return { "success": True, "conversation_id": conversation_id, "title": conversation.title, "summary": conversation.summary, - "processing_time_seconds": processing_time + "detailed_summary": conversation.detailed_summary, + "processing_time_seconds": processing_time, } diff --git a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py index c1a6dfc0..91518047 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py @@ -21,6 +21,7 @@ @async_job(redis=True, beanie=True) async def process_memory_job( conversation_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -208,10 +209,7 @@ def enqueue_memory_processing( job = memory_queue.enqueue( process_memory_job, - client_id, - user_id, - user_email, - conversation_id, + conversation_id, # Only argument needed - job fetches conversation data internally job_timeout=timeout_mapping.get(priority, 1800), result_ttl=JOB_RESULT_TTL, job_id=f"memory_{conversation_id[:8]}", diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index 80434232..d9165b2d 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -20,6 +20,7 @@ async def check_enrolled_speakers_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -106,6 +107,7 @@ async def recognise_speakers_job( audio_path: str, transcript_text: str, words: list, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -219,19 +221,31 @@ async def recognise_speakers_job( logger.info(f"๐ค Speaker recognition returned {len(speaker_segments)} segments") # Update the transcript version segments with identified speakers + # Filter out empty segments (diarization sometimes creates segments with no text) updated_segments = [] + empty_segment_count = 0 for seg in speaker_segments: + segment_text = seg.get("text", "").strip() + + # Skip segments with no text + if not segment_text: + empty_segment_count += 1 + continue + speaker_name = seg.get("identified_as") or seg.get("speaker", "Unknown") updated_segments.append( Conversation.SpeakerSegment( start=seg.get("start", 0), end=seg.get("end", 0), - text=seg.get("text", ""), + text=segment_text, speaker=speaker_name, confidence=seg.get("confidence") ) ) + if empty_segment_count > 0: + logger.info(f"๐ Filtered out {empty_segment_count} empty segments from speaker recognition") + # Update the transcript version transcript_version.segments = updated_segments @@ -254,10 +268,6 @@ async def recognise_speakers_job( "processing_time_seconds": time.time() - start_time } - # Update legacy fields if this is the active version - if conversation.active_transcript_version == version_id: - conversation.segments = updated_segments - await conversation.save() processing_time = time.time() - start_time diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 664f621f..9690f286 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -120,6 +120,7 @@ async def transcribe_full_audio_job( audio_path: str, version_id: str, trigger: str = "reprocess", + *, redis_client=None ) -> Dict[str, Any]: """ @@ -368,6 +369,7 @@ async def stream_speech_detection_job( session_id: str, user_id: str, client_id: str, + *, redis_client=None ) -> Dict[str, Any]: """ @@ -402,7 +404,7 @@ async def stream_speech_detection_job( current_job = get_current_job() session_key = f"audio:session:{session_id}" start_time = time.time() - max_runtime = 3540 # 59 minutes + max_runtime = 86340 # 24 hours - 60 seconds (graceful exit before RQ timeout) # Get conversation count conversation_count_key = f"session:conversation_count:{session_id}" diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 370117f1..0e5f2a1f 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -3,40 +3,53 @@ import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, Ro import { conversationsApi, BACKEND_URL } from '../services/api' import ConversationVersionHeader from '../components/ConversationVersionHeader' +interface TranscriptVersion { + version_id: string + transcript: string + segments: Array<{ + text: string + speaker: string + start: number + end: number + confidence?: number + }> + provider: string + model?: string + created_at: string + processing_time_seconds?: number + metadata?: { + segment_count?: number + word_count?: number + speaker_recognition?: { + enabled: boolean + identified_speakers: string[] + processing_time_seconds?: number + } + } +} + interface Conversation { conversation_id?: string audio_uuid: string title?: string summary?: string - timestamp: number + detailed_summary?: string created_at?: string client_id: string segment_count?: number // From list endpoint - transcript?: string // Full text transcript (for LLM parsing) - segments?: Array<{ // Optional - only populated after fetching details - text: string - speaker: string - start: number - end: number - speaker_id?: string - confidence?: number - }> audio_path?: string cropped_audio_path?: string - speakers_identified?: string[] - speaker_names?: { [key: string]: string } duration_seconds?: number - memories?: any[] has_memory?: boolean - memory_processing_status?: string - transcription_status?: string - action_items?: any[] - version_info?: { - transcript_count: number - memory_count: number - active_transcript_version?: string - active_memory_version?: string - } + transcript_versions?: TranscriptVersion[] + memory_versions?: any[] + active_transcript_version?: string + active_memory_version?: string + transcript_version_count?: number + memory_version_count?: number + deleted?: boolean + deletion_reason?: string + deleted_at?: string } // Speaker color palette for consistent colors across conversations @@ -184,15 +197,15 @@ export default function Conversations() { } } - const handleDeleteConversation = async (audioUuid: string) => { + const handleDeleteConversation = async (conversationId: string) => { try { const confirmed = window.confirm('Are you sure you want to delete this conversation? This action cannot be undone.') if (!confirmed) return - setDeletingConversation(prev => new Set(prev).add(audioUuid)) + setDeletingConversation(prev => new Set(prev).add(conversationId)) setOpenDropdown(null) - const response = await conversationsApi.delete(audioUuid) + const response = await conversationsApi.delete(conversationId) if (response.status === 200) { // Refresh conversations to show updated data @@ -205,7 +218,7 @@ export default function Conversations() { } finally { setDeletingConversation(prev => { const newSet = new Set(prev) - newSet.delete(audioUuid) + newSet.delete(conversationId) return newSet }) } @@ -229,20 +242,25 @@ export default function Conversations() { return } - // If segments are already loaded, just expand - if (conversation.segments && conversation.segments.length > 0) { + // Get active transcript + const activeTranscript = conversation.transcript_versions?.find( + v => v.version_id === conversation.active_transcript_version + ) + + // If segments are already loaded in active transcript, just expand + if (activeTranscript && activeTranscript.segments && activeTranscript.segments.length > 0) { setExpandedTranscripts(prev => new Set(prev).add(conversationId)) return } - // Fetch full conversation details including segments + // Fetch full conversation details including transcript versions try { const response = await conversationsApi.getById(conversation.conversation_id) if (response.status === 200 && response.data.conversation) { - // Update the conversation in state with full segments and transcript + // Update the conversation in state with full transcript versions setConversations(prev => prev.map(c => c.conversation_id === conversationId - ? { ...c, segments: response.data.conversation.segments, transcript: response.data.conversation.transcript } + ? { ...c, transcript_versions: response.data.conversation.transcript_versions } : c )) // Expand the transcript @@ -254,12 +272,12 @@ export default function Conversations() { } } - const handleSegmentPlayPause = (audioUuid: string, segmentIndex: number, segment: any, audioPath: string) => { - const segmentId = `${audioUuid}-${segmentIndex}`; - + const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, audioPath: string) => { + const segmentId = `${conversationId}-${segmentIndex}`; + // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[audioUuid]; + const audio = audioRefs.current[conversationId]; if (audio) { audio.pause(); } @@ -270,11 +288,11 @@ export default function Conversations() { setPlayingSegment(null); return; } - + // Stop any currently playing segment if (playingSegment) { - const [currentAudioUuid] = playingSegment.split('-'); - const currentAudio = audioRefs.current[currentAudioUuid]; + const [currentConversationId] = playingSegment.split('-'); + const currentAudio = audioRefs.current[currentConversationId]; if (currentAudio) { currentAudio.pause(); } @@ -283,12 +301,12 @@ export default function Conversations() { segmentTimerRef.current = null; } } - + // Get or create audio element for this conversation - let audio = audioRefs.current[audioUuid]; + let audio = audioRefs.current[conversationId]; if (!audio) { audio = new Audio(`${BACKEND_URL}/audio/${audioPath}`); - audioRefs.current[audioUuid] = audio; + audioRefs.current[conversationId] = audio; // Add event listener to handle when audio ends naturally audio.addEventListener('ended', () => { @@ -391,14 +409,47 @@ export default function Conversations() { ) : ( conversations.map((conversation) => (
Processing Failed
++ Reason: {conversation.deletion_reason === 'no_meaningful_speech' + ? 'No meaningful speech detected' + : conversation.deletion_reason === 'audio_file_not_ready' + ? 'Audio file not saved (possible Bluetooth disconnect)' + : conversation.deletion_reason || 'Unknown'} +
+ {conversation.deleted_at && ( ++ Deleted at: {new Date(conversation.deleted_at).toLocaleString()} +
+ )} +{conversation.summary}
)} + {/* Detailed Summary */} + {conversation.detailed_summary && ( ++ {conversation.detailed_summary} +
+