From e4f261ecff0e20d0f2a5b028c397a73168f8d239 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Sat, 15 Nov 2025 12:37:03 +0000 Subject: [PATCH 01/12] Fixed issue where the timestamps for segements were incorrect due to cropping. --- .../controllers/queue_controller.py | 57 +++++++++---------- .../workers/audio_jobs.py | 21 ++++++- .../workers/conversation_jobs.py | 8 +++ .../webui/src/pages/Conversations.tsx | 30 +++++----- 4 files changed, 71 insertions(+), 45 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index a6a406c8..fdc9177e 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -321,8 +321,8 @@ def start_post_conversation_jobs( Start post-conversation processing jobs after conversation is created. This creates the standard processing chain after a conversation is created: - 1. Audio cropping job - Removes silence from audio - 2. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 1. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 2. Audio cropping job - Removes silence from audio 3. Speaker recognition job - Identifies speakers in audio 4. Memory extraction job - Extracts memories from conversation (parallel) 5. Title/summary generation job - Generates title and summary (parallel) @@ -348,7 +348,29 @@ def start_post_conversation_jobs( version_id = transcript_version_id or str(uuid.uuid4()) - # Step 1: Audio cropping job + # Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps) + # Even for streaming, we need batch transcription before cropping to fix cumulative timestamps + transcribe_job_id = f"transcribe_{conversation_id[:12]}" + logger.info(f"๐Ÿ” DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + audio_uuid, + audio_file_path, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + depends_on=depends_on_job, + job_id=transcribe_job_id, + description=f"Transcribe conversation {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + ) + logger.info(f"๐Ÿ“ฅ RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}") + crop_depends_on = transcription_job + + # Step 2: Audio cropping job (depends on transcription if it ran, otherwise depends_on_job) crop_job_id = f"crop_{conversation_id[:12]}" logger.info(f"๐Ÿ” DEBUG: Creating crop job with job_id={crop_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -358,38 +380,15 @@ def start_post_conversation_jobs( audio_file_path, job_timeout=300, # 5 minutes result_ttl=JOB_RESULT_TTL, - depends_on=depends_on_job, + depends_on=crop_depends_on, job_id=crop_job_id, description=f"Crop audio for conversation {conversation_id[:8]}", meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} ) logger.info(f"๐Ÿ“ฅ RQ: Enqueued cropping job {cropping_job.id}, meta={cropping_job.meta}") - # Step 2: Transcription job (conditional) - transcription_job = None - if post_transcription: - transcribe_job_id = f"transcribe_{conversation_id[:12]}" - logger.info(f"๐Ÿ” DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") - - transcription_job = transcription_queue.enqueue( - transcribe_full_audio_job, - conversation_id, - audio_uuid, - audio_file_path, - version_id, - "batch", # trigger - job_timeout=1800, # 30 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=cropping_job, - job_id=transcribe_job_id, - description=f"Transcribe conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} - ) - logger.info(f"๐Ÿ“ฅ RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta} (depends on {cropping_job.id})") - speaker_depends_on = transcription_job - else: - logger.info(f"โญ๏ธ RQ: Skipping transcription (streaming already has transcript)") - speaker_depends_on = cropping_job + # Speaker recognition depends on cropping + speaker_depends_on = cropping_job # Step 3: Speaker recognition job speaker_job_id = f"speaker_{conversation_id[:12]}" diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 1c7b227a..0a74b8f7 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -102,10 +102,27 @@ async def process_cropping_job( # Calculate cropped duration cropped_duration_seconds = sum(end - start for start, end in speech_segments) - # Update conversation with cropped audio path + # Update segment timestamps to match cropped audio + # After cropping, segments are concatenated, so they need new timestamps + updated_segments = [] + current_time = 0.0 + for i, seg in enumerate(segments): + original_duration = seg.end - seg.start + updated_seg = seg.model_copy() + updated_seg.start = current_time + updated_seg.end = current_time + original_duration + updated_segments.append(updated_seg) + current_time += original_duration + logger.debug(f"Updated segment {i}: {seg.start:.2f}-{seg.end:.2f}s โ†’ {updated_seg.start:.2f}-{updated_seg.end:.2f}s") + + # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename + conversation.segments = updated_segments + # Also update the active transcript version segments + if conversation.active_transcript: + conversation.active_transcript.segments = updated_segments await conversation.save() - logger.info(f"๐Ÿ’พ Updated conversation {conversation_id[:12]} with cropped_audio_path: {cropped_filename}") + logger.info(f"๐Ÿ’พ Updated conversation {conversation_id[:12]} with cropped_audio_path and adjusted {len(updated_segments)} segment timestamps") logger.info(f"โœ… RQ: Completed audio cropping for conversation {conversation_id} ({cropped_duration_seconds:.1f}s)") diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 1d3400c3..edffa65f 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -64,6 +64,14 @@ async def open_conversation_job( conversation_id = conversation.conversation_id # Get the auto-generated ID logger.info(f"โœ… Created streaming conversation {conversation_id} for session {session_id}") + # Update THIS job's metadata with conversation_id (for Queue page grouping) + if current_job: + if not current_job.meta: + current_job.meta = {} + current_job.meta['conversation_id'] = conversation_id + current_job.save_meta() + logger.info(f"๐Ÿ”— Updated open_conversation_job {current_job.id[:12]} metadata with conversation_id") + # Update speech detection job metadata with conversation_id if speech_job_id: try: diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 370117f1..269a9606 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -254,12 +254,12 @@ export default function Conversations() { } } - const handleSegmentPlayPause = (audioUuid: string, segmentIndex: number, segment: any, audioPath: string) => { - const segmentId = `${audioUuid}-${segmentIndex}`; - + const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, audioPath: string) => { + const segmentId = `${conversationId}-${segmentIndex}`; + // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[audioUuid]; + const audio = audioRefs.current[conversationId]; if (audio) { audio.pause(); } @@ -270,11 +270,11 @@ export default function Conversations() { setPlayingSegment(null); return; } - + // Stop any currently playing segment if (playingSegment) { - const [currentAudioUuid] = playingSegment.split('-'); - const currentAudio = audioRefs.current[currentAudioUuid]; + const [currentConversationId] = playingSegment.split('-'); + const currentAudio = audioRefs.current[currentConversationId]; if (currentAudio) { currentAudio.pause(); } @@ -283,12 +283,12 @@ export default function Conversations() { segmentTimerRef.current = null; } } - + // Get or create audio element for this conversation - let audio = audioRefs.current[audioUuid]; + let audio = audioRefs.current[conversationId]; if (!audio) { audio = new Audio(`${BACKEND_URL}/audio/${audioPath}`); - audioRefs.current[audioUuid] = audio; + audioRefs.current[conversationId] = audio; // Add event listener to handle when audio ends naturally audio.addEventListener('ended', () => { @@ -603,10 +603,12 @@ export default function Conversations() { return conversation.segments.map((segment, index) => { const speaker = segment.speaker || 'Unknown'; const speakerColor = speakerColorMap[speaker]; - const segmentId = `${conversation.audio_uuid}-${index}`; + // Use conversation_id for unique segment IDs (falls back to audio_uuid for backward compatibility) + const conversationKey = conversation.conversation_id || conversation.audio_uuid; + const segmentId = `${conversationKey}-${index}`; const isPlaying = playingSegment === segmentId; - const audioPath = debugMode - ? conversation.audio_path + const audioPath = debugMode + ? conversation.audio_path : conversation.cropped_audio_path || conversation.audio_path; return ( @@ -619,7 +621,7 @@ export default function Conversations() { {/* Play/Pause Button */} {audioPath && ( {/* Dropdown Menu */} - {openDropdown === conversation.audio_uuid && ( + {openDropdown === (conversation.conversation_id || conversation.audio_uuid) && (
)} - +
{debugMode && ( diff --git a/backends/advanced/webui/src/pages/Queue.tsx b/backends/advanced/webui/src/pages/Queue.tsx index e5b27dca..ea783ed9 100644 --- a/backends/advanced/webui/src/pages/Queue.tsx +++ b/backends/advanced/webui/src/pages/Queue.tsx @@ -289,8 +289,14 @@ const Queue: React.FC = () => { console.log(`๐Ÿ“‚ Auto-expanded ${expandedJobsCount} job card(s) in active conversations`); setExpandedJobs(newExpandedJobs); } - } catch (error) { + } catch (error: any) { console.error('โŒ Error fetching dashboard data:', error); + + // If it's a 401 error, stop auto-refresh to prevent repeated failed requests + if (error?.response?.status === 401) { + console.warn('๐Ÿ” Authentication error detected - disabling auto-refresh'); + setAutoRefreshEnabled(false); + } } finally { setLoading(false); setRefreshing(false); From 2036b9c252755ceefb462b08dce49f98b912c097 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Mon, 17 Nov 2025 09:17:18 +0000 Subject: [PATCH 03/12] better summary and delete convo + cleanup Added detailed summary, fixed delete conversation, added checking to mark erronous conversations as deleted --- .../controllers/conversation_controller.py | 87 +++---- .../models/conversation.py | 8 +- .../utils/conversation_utils.py | 237 +++++++++++------- .../workers/conversation_jobs.py | 132 ++++++++-- .../webui/src/pages/Conversations.tsx | 67 ++++- 5 files changed, 346 insertions(+), 185 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 0a82efdf..c4070910 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -165,74 +165,52 @@ async def get_conversations(user: User): return JSONResponse(status_code=500, content={"error": "Error fetching conversations"}) -async def delete_conversation(audio_uuid: str, user: User): - """Delete a conversation and its associated audio file. Users can only delete their own conversations.""" +async def delete_conversation(conversation_id: str, user: User): + """Delete a conversation and its associated audio files. Users can only delete their own conversations.""" try: # Create masked identifier for logging - masked_uuid = f"{audio_uuid[:8]}...{audio_uuid[-4:]}" if len(audio_uuid) > 12 else "***" - logger.info(f"Attempting to delete conversation: {masked_uuid}") + masked_id = f"{conversation_id[:8]}...{conversation_id[-4:]}" if len(conversation_id) > 12 else "***" + logger.info(f"Attempting to delete conversation: {masked_id}") - # Detailed debugging only when debug level is enabled - if logger.isEnabledFor(logging.DEBUG): - total_count = await AudioFile.count() - logger.debug(f"Total audio files in collection: {total_count}") - logger.debug(f"UUID length: {len(audio_uuid)}, type: {type(audio_uuid)}") - - # First, get the audio file record to check ownership and get conversation_id - audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid) - - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Audio file lookup result: {'found' if audio_file else 'not found'}") - if audio_file: - logger.debug(f"Found audio file with client_id: {audio_file.client_id}") - logger.debug(f"Audio file has conversation_id: {audio_file.conversation_id}") + # Find the conversation using Beanie + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) - if not audio_file: + if not conversation: return JSONResponse( status_code=404, - content={"error": f"Audio file with audio_uuid '{audio_uuid}' not found"} + content={"error": f"Conversation '{conversation_id}' not found"} ) - # Check if user has permission to delete this conversation - client_id = audio_file.client_id - if not user.is_superuser and not client_belongs_to_user(client_id, user.user_id): + # Check ownership for non-admin users + if not user.is_superuser and conversation.user_id != str(user.user_id): logger.warning( - f"User {user.user_id} attempted to delete conversation {audio_uuid} without permission" + f"User {user.user_id} attempted to delete conversation {conversation_id} without permission" ) return JSONResponse( status_code=403, content={ "error": "Access forbidden. You can only delete your own conversations.", - "details": f"Conversation '{audio_uuid}' does not belong to your account." + "details": f"Conversation '{conversation_id}' does not belong to your account." } ) - # Get audio file paths for deletion - audio_path = audio_file.audio_path - cropped_audio_path = audio_file.cropped_audio_path - - # Get conversation_id if this audio file has an associated conversation - conversation_id = audio_file.conversation_id - conversation_deleted = False + # Get file paths before deletion + audio_path = conversation.audio_path + cropped_audio_path = conversation.cropped_audio_path + audio_uuid = conversation.audio_uuid + client_id = conversation.client_id - # Delete the audio file from database first - await audio_file.delete() - logger.info(f"Deleted audio file {audio_uuid}") + # Delete the conversation from database + await conversation.delete() + logger.info(f"Deleted conversation {conversation_id}") - # If this audio file has an associated conversation, delete it using Beanie - if conversation_id: - try: - conversation_model = await Conversation.find_one(Conversation.conversation_id == conversation_id) - if conversation_model: - await conversation_model.delete() - conversation_deleted = True - logger.info(f"Deleted conversation {conversation_id} associated with audio chunk {audio_uuid}") - else: - logger.warning(f"Conversation {conversation_id} not found in conversations collection, but audio chunk was deleted") - except Exception as e: - logger.warning(f"Failed to delete conversation {conversation_id}: {e}") + # Also delete from legacy AudioFile collection if it exists (backward compatibility) + audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid) + if audio_file: + await audio_file.delete() + logger.info(f"Deleted legacy audio file record for {audio_uuid}") - # Delete associated audio files + # Delete associated audio files from disk deleted_files = [] if audio_path: try: @@ -256,29 +234,26 @@ async def delete_conversation(audio_uuid: str, user: User): except Exception as e: logger.warning(f"Failed to delete cropped audio file {cropped_audio_path}: {e}") - logger.info(f"Successfully deleted conversation {audio_uuid} for user {user.user_id}") + logger.info(f"Successfully deleted conversation {conversation_id} for user {user.user_id}") # Prepare response message - delete_summary = [] - delete_summary.append("audio chunk") - if conversation_deleted: - delete_summary.append("conversation record") + delete_summary = ["conversation"] if deleted_files: delete_summary.append(f"{len(deleted_files)} audio file(s)") return JSONResponse( status_code=200, content={ - "message": f"Successfully deleted {', '.join(delete_summary)} for '{audio_uuid}'", + "message": f"Successfully deleted {', '.join(delete_summary)} '{conversation_id}'", "deleted_files": deleted_files, "client_id": client_id, "conversation_id": conversation_id, - "conversation_deleted": conversation_deleted + "audio_uuid": audio_uuid } ) except Exception as e: - logger.error(f"Error deleting conversation {audio_uuid}: {e}") + logger.error(f"Error deleting conversation {conversation_id}: {e}") return JSONResponse( status_code=500, content={"error": f"Failed to delete conversation: {str(e)}"} diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 2415337d..a0480bcb 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -81,9 +81,15 @@ class MemoryVersion(BaseModel): # Creation metadata created_at: Indexed(datetime) = Field(default_factory=datetime.utcnow, description="When the conversation was created") + # Processing status tracking + deleted: bool = Field(False, description="Whether this conversation was deleted due to processing failure") + deletion_reason: Optional[str] = Field(None, description="Reason for deletion (no_meaningful_speech, audio_file_not_ready, etc.)") + deleted_at: Optional[datetime] = Field(None, description="When the conversation was marked as deleted") + # Summary fields (auto-generated from transcript) title: Optional[str] = Field(None, description="Auto-generated conversation title") - summary: Optional[str] = Field(None, description="Auto-generated conversation summary") + summary: Optional[str] = Field(None, description="Auto-generated short summary (1-2 sentences)") + detailed_summary: Optional[str] = Field(None, description="Auto-generated detailed summary (comprehensive, corrected content)") # Versioned processing transcript_versions: List["Conversation.TranscriptVersion"] = Field( diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index ef83f3ba..47e8ed83 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -134,16 +134,31 @@ def analyze_speech(transcript_data: dict) -> dict: } -async def generate_title(text: str) -> str: +async def generate_title(text: str, segments: Optional[list] = None) -> str: """ Generate an LLM-powered title from conversation text. Args: - text: Conversation transcript + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: + [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, uses speaker-aware conversation formatting Returns: str: Generated title (3-6 words) or fallback + + Note: + Title intentionally does NOT include speaker names - focuses on topic/theme only. """ + # Format conversation text from segments if provided + if segments: + conversation_text = "" + for segment in segments[:10]: # Use first 10 segments for title generation + segment_text = segment.get("text", "").strip() + if segment_text: + conversation_text += f"{segment_text}\n" + text = conversation_text if conversation_text.strip() else text + if not text or len(text.strip()) < 10: return "Conversation" @@ -155,6 +170,7 @@ async def generate_title(text: str) -> str: Rules: - Maximum 6 words - Capture the main topic or theme +- Do NOT include speaker names or participants - No quotes or special characters - Examples: "Planning Weekend Trip", "Work Project Discussion", "Medical Appointment" @@ -171,28 +187,54 @@ async def generate_title(text: str) -> str: return title[:40] + "..." if len(title) > 40 else title or "Conversation" -async def generate_summary(text: str) -> str: +async def generate_short_summary(text: str, segments: Optional[list] = None) -> str: """ - Generate an LLM-powered summary from conversation text. + Generate a brief LLM-powered summary from conversation text. Args: - text: Conversation transcript + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: + [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, includes speaker context in summary Returns: - str: Generated summary (1-2 sentences, max 120 chars) or fallback + str: Generated short summary (1-2 sentences, max 120 chars) or fallback """ - if not text or len(text.strip()) < 10: + # Format conversation text from segments if provided + conversation_text = text + include_speakers = False + + if segments: + formatted_text = "" + speakers_in_conv = set() + for segment in segments: + speaker = segment.get("speaker", "") + segment_text = segment.get("text", "").strip() + if segment_text: + if speaker: + formatted_text += f"{speaker}: {segment_text}\n" + speakers_in_conv.add(speaker) + else: + formatted_text += f"{segment_text}\n" + + if formatted_text.strip(): + conversation_text = formatted_text + include_speakers = len(speakers_in_conv) > 0 + + if not conversation_text or len(conversation_text.strip()) < 10: return "No content" try: + speaker_instruction = "- Include speaker names when relevant (e.g., \"John discusses X with Sarah\")\n" if include_speakers else "" + prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation: -"{text[:1000]}" +"{conversation_text[:1000]}" Rules: - Maximum 120 characters - 1-2 complete sentences -- Capture key topics and outcomes +{speaker_instruction}- Capture key topics and outcomes - Use present tense - Be specific and informative @@ -202,113 +244,126 @@ async def generate_summary(text: str) -> str: return summary.strip().strip('"').strip("'") or "No content" except Exception as e: - logger.warning(f"Failed to generate LLM summary: {e}") + logger.warning(f"Failed to generate LLM short summary: {e}") # Fallback to simple summary generation - return text[:120] + "..." if len(text) > 120 else text or "No content" + return conversation_text[:120] + "..." if len(conversation_text) > 120 else conversation_text or "No content" -async def generate_title_with_speakers(segments: list) -> str: +# Backward compatibility alias +async def generate_summary(text: str) -> str: + """ + Backward compatibility wrapper for generate_short_summary. + + Deprecated: Use generate_short_summary instead. + """ + return await generate_short_summary(text) + + +async def generate_detailed_summary(text: str, segments: Optional[list] = None) -> str: """ - Generate an LLM-powered title from conversation segments with speaker information. + Generate a comprehensive, detailed summary of the conversation. + + This summary provides full information about what was discussed and said, + correcting transcript errors and removing filler words to create a higher + quality information set. Not word-for-word like the transcript, but captures + all key points, context, and meaningful content. Args: - segments: List of dicts with: + text: Conversation transcript (used if segments not provided) + segments: Optional list of speaker segments with structure: [{"speaker": str, "text": str, "start": float, "end": float}, ...] + If provided, includes speaker attribution in detailed summary Returns: - str: Generated title (max 40 chars) or fallback + str: Comprehensive detailed summary (multiple paragraphs) or fallback """ - if not segments: - return "Conversation" - - # Format conversation with speaker names - conversation_text = "" - for segment in segments[:10]: # Use first 10 segments for title generation - speaker = segment.get("speaker", "") - text = segment.get("text", "").strip() - if text: - if speaker: - conversation_text += f"{speaker}: {text}\n" - else: - conversation_text += f"{text}\n" - - if not conversation_text.strip(): - return "Conversation" + # Format conversation text from segments if provided + conversation_text = text + include_speakers = False + + if segments: + formatted_text = "" + speakers_in_conv = set() + for segment in segments: + speaker = segment.get("speaker", "") + segment_text = segment.get("text", "").strip() + if segment_text: + if speaker: + formatted_text += f"{speaker}: {segment_text}\n" + speakers_in_conv.add(speaker) + else: + formatted_text += f"{segment_text}\n" + + if formatted_text.strip(): + conversation_text = formatted_text + include_speakers = len(speakers_in_conv) > 0 + + if not conversation_text or len(conversation_text.strip()) < 10: + return "No meaningful content to summarize" try: - prompt = f"""Generate a concise title (max 40 characters) for this conversation: + speaker_instruction = """- Attribute key points and statements to specific speakers when relevant +- Capture the flow of conversation between participants +- Note any agreements, disagreements, or important exchanges +""" if include_speakers else "" + + prompt = f"""Generate a comprehensive, detailed summary of this conversation transcript. -"{conversation_text[:500]}" +TRANSCRIPT: +"{conversation_text}" + +INSTRUCTIONS: +Your task is to create a high-quality, detailed summary of a conversation transcription that captures the full information and context of what was discussed. This is NOT a brief summary - provide comprehensive coverage. Rules: -- Maximum 40 characters -- Include speaker names if relevant -- Capture the main topic -- Be specific and informative +- We know it's a conversation, so no need to say "This conversation involved..." +- Provide complete coverage of all topics, points, and important details discussed +- Correct obvious transcription errors and remove filler words (um, uh, like, you know) +- Organize information logically by topic or chronologically as appropriate +- Use clear, well-structured paragraphs or bullet points +- Maintain the meaning and intent of what was said, but improve clarity and coherence +- Include relevant context, decisions made, action items mentioned, and conclusions reached +{speaker_instruction}- Write in a natural, flowing narrative style +- Only include word-for-word quotes if it's more efficiency than rephrasing +- Focus on substantive content - what was actually discussed and decided -Title:""" +Think of this as creating a high-quality information set that someone could use to understand everything important that happened in this conversation without reading the full transcript. - title = await async_generate(prompt, temperature=0.3) - title = title.strip().strip('"').strip("'") - return title[:40] + "..." if len(title) > 40 else title or "Conversation" +DETAILED SUMMARY:""" + + summary = await async_generate(prompt, temperature=0.3) + return summary.strip().strip('"').strip("'") or "No meaningful content to summarize" except Exception as e: - logger.warning(f"Failed to generate LLM title with speakers: {e}") - # Fallback to simple title generation - words = conversation_text.split()[:6] - title = " ".join(words) - return title[:40] + "..." if len(title) > 40 else title or "Conversation" + logger.warning(f"Failed to generate detailed summary: {e}") + # Fallback to returning cleaned transcript + lines = conversation_text.split('\n') + cleaned = '\n'.join(line.strip() for line in lines if line.strip()) + return cleaned[:2000] + "..." if len(cleaned) > 2000 else cleaned or "No meaningful content to summarize" -async def generate_summary_with_speakers(segments: list) -> str: +# Backward compatibility aliases for deprecated speaker-specific methods +async def generate_title_with_speakers(segments: list) -> str: """ - Generate an LLM-powered summary from conversation segments with speaker information. - - Args: - segments: List of dicts with: - [{"speaker": str, "text": str, "start": float, "end": float}, ...] + Deprecated: Use generate_title(text, segments=segments) instead. - Returns: - str: Generated summary (1-2 sentences, max 120 chars) or fallback + Backward compatibility wrapper. """ if not segments: - return "No content" - - # Format conversation with speaker names - conversation_text = "" - speakers_in_conv = set() - for segment in segments: - speaker = segment.get("speaker", "") - text = segment.get("text", "").strip() - if text: - if speaker: - conversation_text += f"{speaker}: {text}\n" - speakers_in_conv.add(speaker) - else: - conversation_text += f"{text}\n" - - if not conversation_text.strip(): - return "No content" - - try: - prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation with speakers: - -"{conversation_text[:1000]}" - -Rules: -- Maximum 120 characters -- 1-2 complete sentences -- Include speaker names when relevant (e.g., "John discusses X with Sarah") -- Capture key topics and outcomes -- Use present tense -- Be specific and informative + return "Conversation" + # Extract text from segments for compatibility + text = "\n".join(s.get("text", "") for s in segments if s.get("text")) + return await generate_title(text, segments=segments) -Summary:""" - summary = await async_generate(prompt, temperature=0.3) - return summary.strip().strip('"').strip("'") or "No content" +async def generate_summary_with_speakers(segments: list) -> str: + """ + Deprecated: Use generate_short_summary(text, segments=segments) instead. - except Exception as e: - logger.warning(f"Failed to generate LLM summary with speakers: {e}") - # Fallback to simple summary generation - return conversation_text[:120] + "..." if len(conversation_text) > 120 else conversation_text or "No content" + Backward compatibility wrapper. + """ + if not segments: + return "No content" + # Extract text from segments for compatibility + text = "\n".join(s.get("text", "") for s in segments if s.get("text")) + return await generate_short_summary(text, segments=segments) diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index db370628..a97db81b 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -264,7 +264,46 @@ async def open_conversation_job( await asyncio.sleep(1) # Check every second for responsiveness - logger.info(f"โœ… Conversation {conversation_id} updates complete, waiting for audio file to be ready...") + logger.info(f"โœ… Conversation {conversation_id} updates complete, checking for meaningful speech...") + + # FINAL VALIDATION: Check if conversation has meaningful speech before post-processing + # This prevents empty/noise-only conversations from being processed and saved + combined = await aggregator.get_combined_results(session_id) + from advanced_omi_backend.utils.conversation_utils import is_meaningful_speech + + if not is_meaningful_speech(combined): + logger.warning(f"โš ๏ธ Conversation {conversation_id} has no meaningful speech after finalization, marking as deleted...") + + # Mark the conversation as deleted instead of removing from database + from datetime import datetime + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + conversation.deleted = True + conversation.deletion_reason = "no_meaningful_speech" + conversation.deleted_at = datetime.utcnow() + await conversation.save() + logger.info(f"๐Ÿ—‘๏ธ Marked empty conversation {conversation_id} as deleted") + + # Clean up Redis tracking keys + conversation_key = f"conversation:session:{session_id}" + await redis_client.delete(conversation_key) + open_job_key = f"open_conversation:session:{session_id}" + await redis_client.delete(open_job_key) + current_conversation_key = f"conversation:current:{session_id}" + await redis_client.delete(current_conversation_key) + + logger.info(f"โœ… Marked conversation as deleted, session can continue") + + return { + "conversation_id": conversation_id, + "deleted": True, + "reason": "no_meaningful_speech", + "final_result_count": last_result_count, + "runtime_seconds": time.time() - start_time, + "timeout_triggered": timeout_triggered + } + + logger.info(f"โœ… Conversation has meaningful speech, proceeding with post-processing") # Wait for audio_streaming_persistence_job to complete and write the file path # Audio persistence now writes files per-conversation, so key uses conversation_id @@ -290,6 +329,34 @@ async def open_conversation_job( if not file_path_bytes: logger.error(f"โŒ Audio file path not found in Redis after {max_wait_audio}s (key: {audio_file_key})") logger.warning(f"โš ๏ธ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription") + + # Mark the conversation as deleted - it has speech but no audio file to process + from datetime import datetime + logger.warning(f"๐Ÿ—‘๏ธ Marking incomplete conversation {conversation_id} as deleted - has speech but no audio file") + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + conversation.deleted = True + conversation.deletion_reason = "audio_file_not_ready" + conversation.deleted_at = datetime.utcnow() + await conversation.save() + logger.info(f"โœ… Marked incomplete conversation {conversation_id} as deleted") + + # Clean up Redis tracking keys + conversation_key = f"conversation:session:{session_id}" + await redis_client.delete(conversation_key) + open_job_key = f"open_conversation:session:{session_id}" + await redis_client.delete(open_job_key) + current_conversation_key = f"conversation:current:{session_id}" + await redis_client.delete(current_conversation_key) + + return { + "conversation_id": conversation_id, + "deleted": True, + "reason": "audio_file_not_ready", + "final_result_count": last_result_count, + "runtime_seconds": time.time() - start_time, + "timeout_triggered": timeout_triggered + } else: file_path = file_path_bytes.decode() logger.info(f"๐Ÿ“ Retrieved audio file path: {file_path}") @@ -416,7 +483,7 @@ async def generate_title_summary_job( redis_client=None ) -> Dict[str, Any]: """ - Generate title and summary for a conversation using LLM. + Generate title, short summary, and detailed summary for a conversation using LLM. This job runs independently of transcription and memory jobs to ensure conversations always get meaningful titles and summaries, even if other @@ -429,12 +496,13 @@ async def generate_title_summary_job( redis_client: Redis client (injected by decorator) Returns: - Dict with generated title and summary + Dict with generated title, summary, and detailed_summary """ from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.utils.conversation_utils import ( - generate_title_with_speakers, - generate_summary_with_speakers + generate_title, + generate_short_summary, + generate_detailed_summary ) logger.info(f"๐Ÿ“ Starting title/summary generation for conversation {conversation_id}") @@ -447,40 +515,50 @@ async def generate_title_summary_job( logger.error(f"Conversation {conversation_id} not found") return {"success": False, "error": "Conversation not found"} - # Get segments from active transcript version + # Get transcript and segments from active transcript version + transcript_text = conversation.transcript or "" segments = conversation.segments or [] - if not segments or len(segments) == 0: - logger.warning(f"โš ๏ธ No segments available for conversation {conversation_id}") + if not transcript_text and (not segments or len(segments) == 0): + logger.warning(f"โš ๏ธ No transcript or segments available for conversation {conversation_id}") return { "success": False, - "error": "No segments available", + "error": "No transcript or segments available", "conversation_id": conversation_id } - # Generate title and summary using speaker-aware utilities + # Generate title, short summary, and detailed summary using unified utilities try: - logger.info(f"๐Ÿค– Generating title/summary using LLM for conversation {conversation_id}") + logger.info(f"๐Ÿค– Generating title/summary/detailed_summary using LLM for conversation {conversation_id}") # Convert segments to dict format expected by utils - segment_dicts = [ - { - "speaker": seg.speaker, - "text": seg.text, - "start": seg.start, - "end": seg.end - } - for seg in segments - ] - - # Generate title and summary with speaker awareness - title = await generate_title_with_speakers(segment_dicts) - summary = await generate_summary_with_speakers(segment_dicts) + segment_dicts = None + if segments and len(segments) > 0: + segment_dicts = [ + { + "speaker": seg.speaker, + "text": seg.text, + "start": seg.start, + "end": seg.end + } + for seg in segments + ] + + # Generate all three summaries in parallel for efficiency + import asyncio + title, short_summary, detailed_summary = await asyncio.gather( + generate_title(transcript_text, segments=segment_dicts), + generate_short_summary(transcript_text, segments=segment_dicts), + generate_detailed_summary(transcript_text, segments=segment_dicts) + ) conversation.title = title - conversation.summary = summary + conversation.summary = short_summary + conversation.detailed_summary = detailed_summary - logger.info(f"โœ… Generated title: '{conversation.title}', summary: '{conversation.summary}'") + logger.info(f"โœ… Generated title: '{conversation.title}'") + logger.info(f"โœ… Generated summary: '{conversation.summary}'") + logger.info(f"โœ… Generated detailed summary: {len(conversation.detailed_summary)} chars") except Exception as gen_error: logger.error(f"โŒ Title/summary generation failed: {gen_error}") @@ -506,6 +584,7 @@ async def generate_title_summary_job( "conversation_id": conversation_id, "title": conversation.title, "summary": conversation.summary, + "detailed_summary_length": len(conversation.detailed_summary) if conversation.detailed_summary else 0, "segment_count": len(segments), "processing_time": processing_time }) @@ -518,5 +597,6 @@ async def generate_title_summary_job( "conversation_id": conversation_id, "title": conversation.title, "summary": conversation.summary, + "detailed_summary": conversation.detailed_summary, "processing_time_seconds": processing_time } diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 269a9606..ae99e7ce 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -8,6 +8,7 @@ interface Conversation { audio_uuid: string title?: string summary?: string + detailed_summary?: string // Comprehensive detailed summary timestamp: number created_at?: string client_id: string @@ -37,6 +38,9 @@ interface Conversation { active_transcript_version?: string active_memory_version?: string } + deleted?: boolean + deletion_reason?: string + deleted_at?: string } // Speaker color palette for consistent colors across conversations @@ -184,15 +188,15 @@ export default function Conversations() { } } - const handleDeleteConversation = async (audioUuid: string) => { + const handleDeleteConversation = async (conversationId: string) => { try { const confirmed = window.confirm('Are you sure you want to delete this conversation? This action cannot be undone.') if (!confirmed) return - setDeletingConversation(prev => new Set(prev).add(audioUuid)) + setDeletingConversation(prev => new Set(prev).add(conversationId)) setOpenDropdown(null) - const response = await conversationsApi.delete(audioUuid) + const response = await conversationsApi.delete(conversationId) if (response.status === 200) { // Refresh conversations to show updated data @@ -205,7 +209,7 @@ export default function Conversations() { } finally { setDeletingConversation(prev => { const newSet = new Set(prev) - newSet.delete(audioUuid) + newSet.delete(conversationId) return newSet }) } @@ -391,11 +395,39 @@ export default function Conversations() { ) : ( conversations.map((conversation) => (
+ {/* Deleted Conversation Warning */} + {conversation.deleted && ( +
+
+ +
+

Processing Failed

+

+ Reason: {conversation.deletion_reason === 'no_meaningful_speech' + ? 'No meaningful speech detected' + : conversation.deletion_reason === 'audio_file_not_ready' + ? 'Audio file not saved (possible Bluetooth disconnect)' + : conversation.deletion_reason || 'Unknown'} +

+ {conversation.deleted_at && ( +

+ Deleted at: {new Date(conversation.deleted_at).toLocaleString()} +

+ )} +
+
+
+ )} + {/* Version Selector Header - Only show for conversations with conversation_id */} - {conversation.conversation_id && ( + {conversation.conversation_id && !conversation.deleted && ( - {/* Summary */} + {/* Short Summary */} {conversation.summary && (

{conversation.summary}

)} + {/* Detailed Summary */} + {conversation.detailed_summary && ( +
+

Detailed Summary

+

+ {conversation.detailed_summary} +

+
+ )} + {/* Metadata */}
@@ -501,16 +543,19 @@ export default function Conversations() {
)} From bb1682a57ddaa09b4a3c38ba9e0b2e8a938360e8 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Mon, 17 Nov 2025 09:58:27 +0000 Subject: [PATCH 04/12] Changed timeouts of jobs to be 24h --- .../controllers/queue_controller.py | 12 ++++++------ .../src/advanced_omi_backend/workers/audio_jobs.py | 4 ++-- .../workers/conversation_jobs.py | 8 ++++---- .../workers/transcription_jobs.py | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index fdc9177e..ced15fc7 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -39,12 +39,12 @@ QUEUE_NAMES = [DEFAULT_QUEUE, TRANSCRIPTION_QUEUE, MEMORY_QUEUE, AUDIO_QUEUE] # Job retention configuration -JOB_RESULT_TTL = int(os.getenv("RQ_RESULT_TTL", 3600)) # 1 hour default +JOB_RESULT_TTL = int(os.getenv("RQ_RESULT_TTL", 86400)) # 24 hour default # Create queues with custom result TTL -transcription_queue = Queue(TRANSCRIPTION_QUEUE, connection=redis_conn, default_timeout=300) +transcription_queue = Queue(TRANSCRIPTION_QUEUE, connection=redis_conn, default_timeout=86400) # 24 hours for streaming jobs memory_queue = Queue(MEMORY_QUEUE, connection=redis_conn, default_timeout=300) -audio_queue = Queue(AUDIO_QUEUE, connection=redis_conn, default_timeout=3600) # 1 hour timeout for long sessions +audio_queue = Queue(AUDIO_QUEUE, connection=redis_conn, default_timeout=86400) # 24 hours for all-day sessions default_queue = Queue(DEFAULT_QUEUE, connection=redis_conn, default_timeout=300) @@ -271,7 +271,7 @@ def start_streaming_jobs( session_id, user_id, client_id, - job_timeout=3600, # 1 hour for long recordings + job_timeout=86400, # 24 hours for all-day sessions result_ttl=JOB_RESULT_TTL, job_id=f"speech-detect_{session_id[:12]}", description=f"Listening for speech...", @@ -281,7 +281,7 @@ def start_streaming_jobs( # Store job ID for cleanup (keyed by client_id for easy WebSocket cleanup) try: - redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) # 1 hour TTL + redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=86400) # 24 hour TTL logger.info(f"๐Ÿ“Œ Stored speech detection job ID for client {client_id}") except Exception as e: logger.warning(f"โš ๏ธ Failed to store job ID for {client_id}: {e}") @@ -294,7 +294,7 @@ def start_streaming_jobs( session_id, user_id, client_id, - job_timeout=3600, # 1 hour for long recordings + job_timeout=86400, # 24 hours for all-day sessions result_ttl=JOB_RESULT_TTL, job_id=f"audio-persist_{session_id[:12]}", description=f"Audio persistence for session {session_id[:12]}", diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 376a2f4a..a38af1a9 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -197,7 +197,7 @@ async def audio_streaming_persistence_job( # Job control session_key = f"audio:session:{session_id}" - max_runtime = 3540 # 59 minutes + max_runtime = 86340 # 24 hours - 60 seconds (graceful exit before RQ timeout) start_time = time.time() from advanced_omi_backend.config import CHUNK_DIR @@ -316,7 +316,7 @@ async def audio_streaming_persistence_job( # Store file path in Redis (keyed by conversation_id, not session_id) audio_file_key = f"audio:file:{current_conversation_id}" - await redis_client.set(audio_file_key, str(file_path), ex=3600) + await redis_client.set(audio_file_key, str(file_path), ex=86400) # 24 hour TTL logger.info(f"๐Ÿ’พ Stored audio file path in Redis: {audio_file_key}") else: # Key deleted - conversation ended, close current file diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index a97db81b..476c3430 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -112,12 +112,12 @@ async def open_conversation_job( # Store conversation_id in Redis for finalize job to find conversation_key = f"conversation:session:{session_id}" - await redis_client.set(conversation_key, conversation_id, ex=3600) + await redis_client.set(conversation_key, conversation_id, ex=86400) # 24 hour TTL logger.info(f"๐Ÿ’พ Stored conversation ID in Redis: {conversation_key}") # Signal audio persistence job to rotate to this conversation's file current_conversation_key = f"conversation:current:{session_id}" - await redis_client.set(current_conversation_key, conversation_id, ex=3600) + await redis_client.set(current_conversation_key, conversation_id, ex=86400) # 24 hour TTL logger.info(f"๐Ÿ”„ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}") # Use redis_client parameter @@ -125,7 +125,7 @@ async def open_conversation_job( # Job control session_key = f"audio:session:{session_id}" - max_runtime = 3540 # 59 minutes (graceful exit before RQ timeout at 60 min) + max_runtime = 10740 # 3 hours - 60 seconds (single conversations shouldn't exceed 3 hours) start_time = time.time() last_result_count = 0 @@ -188,7 +188,7 @@ async def open_conversation_job( await redis_client.set( f"conversation:last_speech:{conversation_id}", last_meaningful_speech_time, - ex=3600 # 1 hour TTL + ex=86400 # 24 hour TTL ) logger.debug(f"๐Ÿ—ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp") diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 60f6cb05..57496f61 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -371,7 +371,7 @@ async def stream_speech_detection_job( current_job = get_current_job() session_key = f"audio:session:{session_id}" start_time = time.time() - max_runtime = 3540 # 59 minutes + max_runtime = 86340 # 24 hours - 60 seconds (graceful exit before RQ timeout) # Get conversation count conversation_count_key = f"session:conversation_count:{session_id}" From 7269bb72ffd5bc52ad887e91479183c214d66964 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Mon, 17 Nov 2025 09:59:27 +0000 Subject: [PATCH 05/12] Filter out empty segments --- .../advanced_omi_backend/workers/speaker_jobs.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index b9420d0a..cbc5e3c3 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -221,19 +221,31 @@ async def recognise_speakers_job( logger.info(f"๐ŸŽค Speaker recognition returned {len(speaker_segments)} segments") # Update the transcript version segments with identified speakers + # Filter out empty segments (diarization sometimes creates segments with no text) updated_segments = [] + empty_segment_count = 0 for seg in speaker_segments: + segment_text = seg.get("text", "").strip() + + # Skip segments with no text + if not segment_text: + empty_segment_count += 1 + continue + speaker_name = seg.get("identified_as") or seg.get("speaker", "Unknown") updated_segments.append( Conversation.SpeakerSegment( start=seg.get("start", 0), end=seg.get("end", 0), - text=seg.get("text", ""), + text=segment_text, speaker=speaker_name, confidence=seg.get("confidence") ) ) + if empty_segment_count > 0: + logger.info(f"๐Ÿ”‡ Filtered out {empty_segment_count} empty segments from speaker recognition") + # Update the transcript version transcript_version.segments = updated_segments From 5a9f74bc054843a8b672a5cb6d5613e1eb744a1c Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Mon, 17 Nov 2025 09:59:55 +0000 Subject: [PATCH 06/12] improve prompt --- .../src/advanced_omi_backend/utils/conversation_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index 47e8ed83..67df1e9d 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -320,7 +320,7 @@ async def generate_detailed_summary(text: str, segments: Optional[list] = None) - Provide complete coverage of all topics, points, and important details discussed - Correct obvious transcription errors and remove filler words (um, uh, like, you know) - Organize information logically by topic or chronologically as appropriate -- Use clear, well-structured paragraphs or bullet points +- Use clear, well-structured paragraphs or bullet points, but make the length relative to the amound of content. - Maintain the meaning and intent of what was said, but improve clarity and coherence - Include relevant context, decisions made, action items mentioned, and conclusions reached {speaker_instruction}- Write in a natural, flowing narrative style From 59a613b4fe6bc3f2fd5ce3a08bcae5b47156d801 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Mon, 17 Nov 2025 12:47:35 +0000 Subject: [PATCH 07/12] removed duplicate segments from the conversation model we were returning 2 sets of segments and transcripts from the conversation model. changed to draw from the active transcript --- .../controllers/conversation_controller.py | 57 ++-- .../models/conversation.py | 60 +--- .../workers/audio_jobs.py | 5 +- .../workers/conversation_jobs.py | 23 +- .../workers/speaker_jobs.py | 4 - .../webui/src/pages/Conversations.tsx | 275 ++++++++++-------- 6 files changed, 209 insertions(+), 215 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index c4070910..74d6aa4a 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -96,23 +96,26 @@ async def get_conversation(conversation_id: str, user: User): if not user.is_superuser and conversation.user_id != str(user.user_id): return JSONResponse(status_code=403, content={"error": "Access forbidden"}) - # Format conversation for API response - use model_dump and add computed fields + # Format conversation for API response - clean, no duplication formatted_conversation = conversation.model_dump( - mode='json', # Automatically converts datetime to ISO strings, handles nested models + mode='json', exclude={'id'} # Exclude MongoDB internal _id ) - # Add computed fields not in the model - formatted_conversation.update({ - "timestamp": 0, # Legacy field - using created_at instead - "has_memory": bool(conversation.memories), - "version_info": { - "transcript_count": len(conversation.transcript_versions), - "memory_count": len(conversation.memory_versions), - "active_transcript_version": conversation.active_transcript_version, - "active_memory_version": conversation.active_memory_version - } - }) + # Clean up transcript versions - remove heavy metadata.words + if 'transcript_versions' in formatted_conversation: + for version in formatted_conversation['transcript_versions']: + if 'metadata' in version and version['metadata']: + # Remove words array - not needed by frontend + version['metadata'].pop('words', None) + # Remove redundant speaker_recognition counts (derivable from segments) + if 'speaker_recognition' in version['metadata']: + sr = version['metadata']['speaker_recognition'] + sr.pop('total_segments', None) # Derivable from len(segments) + sr.pop('speaker_count', None) # Derivable from identified_speakers + + # Add minimal computed fields + formatted_conversation['has_memory'] = len(conversation.memory_versions) > 0 return {"conversation": formatted_conversation} @@ -134,26 +137,26 @@ async def get_conversations(user: User): # Admins see all conversations user_conversations = await Conversation.find_all().sort(-Conversation.created_at).to_list() - # Convert conversations to API format + # Convert conversations to API format - minimal for list view conversations = [] for conv in user_conversations: - # Format conversation for list - use model_dump with exclusions + # Format conversation for list - exclude heavy version data conv_dict = conv.model_dump( - mode='json', # Automatically converts datetime to ISO strings - exclude={'id', 'transcript', 'segments', 'transcript_versions', 'memory_versions'} # Exclude large fields for list view + mode='json', + exclude={'id', 'transcript_versions', 'memory_versions'} # Exclude large version arrays ) - # Add computed/external fields + # Add computed fields + # segment_count - count from active transcript version + segment_count = 0 + if conv.active_transcript: + segment_count = len(conv.active_transcript.segments) if conv.active_transcript.segments else 0 + conv_dict.update({ - "timestamp": 0, # Legacy field - using created_at instead - "segment_count": len(conv.segments) if conv.segments else 0, - "has_memory": bool(conv.memories), - "version_info": { - "transcript_count": len(conv.transcript_versions), - "memory_count": len(conv.memory_versions), - "active_transcript_version": conv.active_transcript_version, - "active_memory_version": conv.active_memory_version - } + "segment_count": segment_count, + "has_memory": len(conv.memory_versions) > 0, + "transcript_version_count": len(conv.transcript_versions), + "memory_version_count": len(conv.memory_versions) }) conversations.append(conv_dict) diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index a0480bcb..56f55ede 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -111,30 +111,18 @@ class MemoryVersion(BaseModel): description="Version ID of currently active memory extraction" ) - # Legacy fields (auto-populated from active versions) - transcript: Union[str, List[Dict[str, Any]], None] = Field(None, description="Current transcript text") - segments: List["Conversation.SpeakerSegment"] = Field(default_factory=list, description="Current transcript segments") - memories: List[Dict[str, Any]] = Field(default_factory=list, description="Current extracted memories") - memory_count: int = Field(default=0, description="Current memory count") + # Legacy fields removed - use transcript_versions[active_transcript_version] and memory_versions[active_memory_version] + # Frontend should access: conversation.active_transcript.segments, conversation.active_transcript.transcript @model_validator(mode='before') @classmethod def clean_legacy_data(cls, data: Any) -> Any: """Clean up legacy/malformed data before Pydantic validation.""" - - #TODO Unsure that we need this, likely best to migrate database on startup, or mimic the old structure better + if not isinstance(data, dict): return data - # Fix legacy transcript field if it's a dict (should be string or None) - if isinstance(data.get('transcript'), dict): - data['transcript'] = None - - # Fix legacy segments field if it's a dict (should be list) - if isinstance(data.get('segments'), dict): - data['segments'] = [] - - # Fix malformed transcript_versions + # Fix malformed transcript_versions (from old schema versions) if 'transcript_versions' in data and isinstance(data['transcript_versions'], list): for version in data['transcript_versions']: if isinstance(version, dict): @@ -156,15 +144,6 @@ def clean_legacy_data(cls, data: Any) -> Any: elif not isinstance(segment['speaker'], str): segment['speaker'] = "unknown" - # Also fix legacy segments field - if 'segments' in data and isinstance(data['segments'], list): - for segment in data['segments']: - if isinstance(segment, dict) and 'speaker' in segment: - if isinstance(segment['speaker'], int): - segment['speaker'] = f"Speaker {segment['speaker']}" - elif not isinstance(segment['speaker'], str): - segment['speaker'] = "unknown" - return data @property @@ -189,6 +168,17 @@ def active_memory(self) -> Optional["Conversation.MemoryVersion"]: return version return None + # Convenience properties that return data from active transcript version + @property + def transcript(self) -> Optional[str]: + """Get transcript text from active transcript version.""" + return self.active_transcript.transcript if self.active_transcript else None + + @property + def segments(self) -> List["Conversation.SpeakerSegment"]: + """Get segments from active transcript version.""" + return self.active_transcript.segments if self.active_transcript else [] + def add_transcript_version( self, version_id: str, @@ -216,7 +206,6 @@ def add_transcript_version( if set_as_active: self.active_transcript_version = version_id - self._update_legacy_transcript_fields() return new_version @@ -247,7 +236,6 @@ def add_memory_version( if set_as_active: self.active_memory_version = version_id - self._update_legacy_memory_fields(memory_count) return new_version @@ -256,7 +244,6 @@ def set_active_transcript_version(self, version_id: str) -> bool: for version in self.transcript_versions: if version.version_id == version_id: self.active_transcript_version = version_id - self._update_legacy_transcript_fields() return True return False @@ -265,26 +252,9 @@ def set_active_memory_version(self, version_id: str) -> bool: for version in self.memory_versions: if version.version_id == version_id: self.active_memory_version = version_id - self._update_legacy_memory_fields(version.memory_count) return True return False - def _update_legacy_transcript_fields(self): - """Update legacy transcript fields from active version.""" - active = self.active_transcript - if active: - self.transcript = active.transcript - self.segments = active.segments - else: - self.transcript = None - self.segments = [] - - def _update_legacy_memory_fields(self, memory_count: int): - """Update legacy memory fields from active version.""" - self.memory_count = memory_count - # Note: actual memories list would need to be fetched from memory storage - # This is just the count for now - class Settings: name = "conversations" indexes = [ diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index a38af1a9..7c6ebfda 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -58,7 +58,7 @@ async def process_cropping_job( if not conversation: raise ValueError(f"Conversation {conversation_id} not found") - # Extract speech segments from transcript + # Extract speech segments from transcript (property returns data from active version) segments = conversation.segments if not segments or len(segments) == 0: logger.warning(f"โš ๏ธ No segments found for conversation {conversation_id}, skipping cropping") @@ -113,8 +113,7 @@ async def process_cropping_job( # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename - conversation.segments = updated_segments - # Also update the active transcript version segments + # Update the active transcript version segments if conversation.active_transcript: conversation.active_transcript.segments = updated_segments await conversation.save() diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 476c3430..ba3700d0 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -241,25 +241,12 @@ async def open_conversation_job( finalize_received = True break - # Update conversation if new results arrived + # Track results progress (conversation will get transcript from transcription job) if current_count > last_result_count: - # Update conversation in MongoDB - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id + logger.info( + f"๐Ÿ“Š Conversation {conversation_id} progress: " + f"{current_count} results, {len(combined['text'])} chars, {len(combined['segments'])} segments" ) - - if conversation: - conversation.transcript = combined["text"] - conversation.segments = combined["segments"] - await conversation.save() - - logger.info( - f"๐Ÿ“Š Updated conversation {conversation_id}: " - f"{current_count} results, {len(combined['text'])} chars, {len(combined['segments'])} segments" - ) - else: - logger.warning(f"โš ๏ธ Conversation {conversation_id} not found") - last_result_count = current_count await asyncio.sleep(1) # Check every second for responsiveness @@ -515,7 +502,7 @@ async def generate_title_summary_job( logger.error(f"Conversation {conversation_id} not found") return {"success": False, "error": "Conversation not found"} - # Get transcript and segments from active transcript version + # Get transcript and segments (properties return data from active transcript version) transcript_text = conversation.transcript or "" segments = conversation.segments or [] diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index cbc5e3c3..d9165b2d 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -268,10 +268,6 @@ async def recognise_speakers_job( "processing_time_seconds": time.time() - start_time } - # Update legacy fields if this is the active version - if conversation.active_transcript_version == version_id: - conversation.segments = updated_segments - await conversation.save() processing_time = time.time() - start_time diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index ae99e7ce..0e5f2a1f 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -3,41 +3,50 @@ import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, Ro import { conversationsApi, BACKEND_URL } from '../services/api' import ConversationVersionHeader from '../components/ConversationVersionHeader' +interface TranscriptVersion { + version_id: string + transcript: string + segments: Array<{ + text: string + speaker: string + start: number + end: number + confidence?: number + }> + provider: string + model?: string + created_at: string + processing_time_seconds?: number + metadata?: { + segment_count?: number + word_count?: number + speaker_recognition?: { + enabled: boolean + identified_speakers: string[] + processing_time_seconds?: number + } + } +} + interface Conversation { conversation_id?: string audio_uuid: string title?: string summary?: string - detailed_summary?: string // Comprehensive detailed summary - timestamp: number + detailed_summary?: string created_at?: string client_id: string segment_count?: number // From list endpoint - transcript?: string // Full text transcript (for LLM parsing) - segments?: Array<{ // Optional - only populated after fetching details - text: string - speaker: string - start: number - end: number - speaker_id?: string - confidence?: number - }> audio_path?: string cropped_audio_path?: string - speakers_identified?: string[] - speaker_names?: { [key: string]: string } duration_seconds?: number - memories?: any[] has_memory?: boolean - memory_processing_status?: string - transcription_status?: string - action_items?: any[] - version_info?: { - transcript_count: number - memory_count: number - active_transcript_version?: string - active_memory_version?: string - } + transcript_versions?: TranscriptVersion[] + memory_versions?: any[] + active_transcript_version?: string + active_memory_version?: string + transcript_version_count?: number + memory_version_count?: number deleted?: boolean deletion_reason?: string deleted_at?: string @@ -233,20 +242,25 @@ export default function Conversations() { return } - // If segments are already loaded, just expand - if (conversation.segments && conversation.segments.length > 0) { + // Get active transcript + const activeTranscript = conversation.transcript_versions?.find( + v => v.version_id === conversation.active_transcript_version + ) + + // If segments are already loaded in active transcript, just expand + if (activeTranscript && activeTranscript.segments && activeTranscript.segments.length > 0) { setExpandedTranscripts(prev => new Set(prev).add(conversationId)) return } - // Fetch full conversation details including segments + // Fetch full conversation details including transcript versions try { const response = await conversationsApi.getById(conversation.conversation_id) if (response.status === 200 && response.data.conversation) { - // Update the conversation in state with full segments and transcript + // Update the conversation in state with full transcript versions setConversations(prev => prev.map(c => c.conversation_id === conversationId - ? { ...c, segments: response.data.conversation.segments, transcript: response.data.conversation.transcript } + ? { ...c, transcript_versions: response.data.conversation.transcript_versions } : c )) // Expand the transcript @@ -427,10 +441,15 @@ export default function Conversations() { )} {/* Version Selector Header - Only show for conversations with conversation_id */} - {conversation.conversation_id && !conversation.deleted && ( + {conversation.conversation_id && !conversation.deleted && conversation.transcript_versions && ( { // Update only this specific conversation without reloading all conversations // This prevents page scroll jump @@ -481,7 +500,7 @@ export default function Conversations() {
- {formatDate(conversation.created_at || conversation.timestamp)} + {formatDate(conversation.created_at || '')}
@@ -500,7 +519,8 @@ export default function Conversations() { {/* Dropdown Menu */} - {openDropdown === conversation.audio_uuid && ( + {openDropdown === (conversation.conversation_id || conversation.audio_uuid) && (
)} - +
{debugMode && ( @@ -696,37 +725,46 @@ export default function Conversations() {
- ); - }); - })()} + ) + }) + })()} +
+
+ ) : ( +
+ No transcript available +
+ )}
-
- ) : ( -
- No transcript available -
- )} - -
- )} + )} + + ) + })()}
- {/* Speaker Information */} - {conversation.speakers_identified && conversation.speakers_identified.length > 0 && ( -
-

๐ŸŽค Identified Speakers:

-
- {conversation.speakers_identified.map((speaker, index) => ( - - {speaker} - - ))} + {/* Speaker Information - from active transcript metadata */} + {(() => { + const activeTranscript = conversation.transcript_versions?.find( + v => v.version_id === conversation.active_transcript_version + ) + const identifiedSpeakers = activeTranscript?.metadata?.speaker_recognition?.identified_speakers || [] + + return identifiedSpeakers.length > 0 ? ( +
+

๐ŸŽค Identified Speakers:

+
+ {identifiedSpeakers.map((speaker: string, index: number) => ( + + {speaker} + + ))} +
-
- )} + ) : null + })()} {/* Debug info */} {debugMode && ( @@ -737,9 +775,10 @@ export default function Conversations() {
Audio UUID: {conversation.audio_uuid}
Original Audio: {conversation.audio_path || 'N/A'}
Cropped Audio: {conversation.cropped_audio_path || 'N/A'}
-
Transcription Status: {conversation.transcription_status || 'N/A'}
-
Memory Processing Status: {conversation.memory_processing_status || 'N/A'}
-
Transcript Segments: {conversation.segments?.length || 0}
+
Active Transcript Version: {conversation.active_transcript_version || 'N/A'}
+
Transcript Versions: {conversation.transcript_versions?.length || 0}
+
Active Memory Version: {conversation.active_memory_version || 'N/A'}
+
Memory Versions: {conversation.memory_versions?.length || 0}
Client ID: {conversation.client_id}
From 6736a5d2a44ca7ad4f61248b1ca63ce1f332b0d9 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Tue, 18 Nov 2025 09:47:31 +0000 Subject: [PATCH 08/12] added segement timing tracking for padding and dropped segments --- .../advanced_omi_backend/utils/audio_utils.py | 78 +++++++++++++++---- .../workers/audio_jobs.py | 43 ++++++---- 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py index 20d8de7c..111c7abc 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py @@ -264,7 +264,7 @@ async def _process_audio_cropping_with_relative_timestamps( output_path: str, audio_uuid: str, _deprecated_chunk_repo=None, # Deprecated - kept for backward compatibility -) -> bool: +) -> tuple[bool, list[dict]]: """ Process audio cropping with speech segments already in relative format. @@ -272,6 +272,9 @@ async def _process_audio_cropping_with_relative_timestamps( as provided by Deepgram transcription. No timestamp conversion is needed. Note: Database updates are now handled by the caller (audio_jobs.py). + + Returns: + Tuple of (success: bool, segment_mapping: list[dict]) """ try: # Validate input segments @@ -307,19 +310,19 @@ async def _process_audio_cropping_with_relative_timestamps( logger.warning( f"No valid segments for cropping {audio_uuid}" ) - return False + return False, [] - success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) + success, segment_mapping = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) if success: cropped_filename = output_path.split("/")[-1] logger.info(f"Successfully processed cropped audio: {cropped_filename}") - return True + return True, segment_mapping else: logger.error(f"Failed to crop audio for {audio_uuid}") - return False + return False, segment_mapping except Exception as e: logger.error(f"Error in audio cropping task for {audio_uuid}: {e}", exc_info=True) - return False + return False, [] def write_pcm_to_wav( @@ -367,29 +370,72 @@ def write_pcm_to_wav( async def _crop_audio_with_ffmpeg( original_path: str, speech_segments: list[tuple[float, float]], output_path: str -) -> bool: - """Use ffmpeg to crop audio - runs as async subprocess, no GIL issues""" +) -> tuple[bool, list[dict]]: + """ + Use ffmpeg to crop audio - runs as async subprocess, no GIL issues. + + Returns: + Tuple of (success: bool, segment_mapping: list[dict]) + + segment_mapping contains one entry per input segment with: + - original_index: Index in input speech_segments + - original_start/end: Original timestamps in source audio + - cropped_start/end: Where the speech starts/ends in cropped file (None if filtered) + - kept: Whether segment was kept (True) or filtered out (False) + """ logger.info(f"Cropping audio {original_path} with {len(speech_segments)} speech segments") if not speech_segments: logger.warning(f"No speech segments to crop for {original_path}") - return False + return False, [] # Check if the original file exists if not os.path.exists(original_path): logger.error(f"Original audio file does not exist: {original_path}") - return False + return False, [] - # Filter out segments that are too short + # Filter out segments that are too short and build mapping filtered_segments = [] - for start, end in speech_segments: + segment_mapping = [] + current_cropped_offset = 0.0 + + for idx, (start, end) in enumerate(speech_segments): duration = end - start if duration >= MIN_SPEECH_SEGMENT_DURATION: # Add padding around speech segments padded_start = max(0, start - CROPPING_CONTEXT_PADDING) padded_end = end + CROPPING_CONTEXT_PADDING + padded_duration = padded_end - padded_start + filtered_segments.append((padded_start, padded_end)) + + # Calculate where the speech (not padding) appears in cropped file + # The cropped file will have: [padding_before][speech][padding_after] + padding_before = start - padded_start + speech_start_in_cropped = current_cropped_offset + padding_before + speech_end_in_cropped = speech_start_in_cropped + duration + + segment_mapping.append({ + "original_index": idx, + "original_start": start, + "original_end": end, + "cropped_start": speech_start_in_cropped, + "cropped_end": speech_end_in_cropped, + "kept": True + }) + + # Move offset by the full padded duration + current_cropped_offset += padded_duration else: + # Segment filtered out + segment_mapping.append({ + "original_index": idx, + "original_start": start, + "original_end": end, + "cropped_start": None, + "cropped_end": None, + "kept": False + }) logger.debug( f"Skipping short segment: {start}-{end} ({duration:.2f}s < {MIN_SPEECH_SEGMENT_DURATION}s)" ) @@ -398,7 +444,7 @@ async def _crop_audio_with_ffmpeg( logger.warning( f"No segments meet minimum duration ({MIN_SPEECH_SEGMENT_DURATION}s) for {original_path}" ) - return False + return False, segment_mapping logger.info( f"Cropping audio {original_path} with {len(filtered_segments)} speech segments (filtered from {len(speech_segments)})" @@ -450,12 +496,12 @@ async def _crop_audio_with_ffmpeg( logger.info( f"Successfully cropped {original_path} -> {output_path} ({cropped_duration:.1f}s from {len(filtered_segments)} segments)" ) - return True + return True, segment_mapping else: error_msg = stderr.decode() if stderr else "Unknown ffmpeg error" logger.error(f"ffmpeg failed for {original_path}: {error_msg}") - return False + return False, segment_mapping except Exception as e: logger.error(f"Error running ffmpeg on {original_path}: {e}", exc_info=True) - return False + return False, segment_mapping diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 7c6ebfda..92e4676a 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -79,7 +79,7 @@ async def process_cropping_job( output_path = CHUNK_DIR / cropped_filename # Process cropping (no repository needed - we update conversation directly) - success = await _process_audio_cropping_with_relative_timestamps( + success, segment_mapping = await _process_audio_cropping_with_relative_timestamps( str(original_path), speech_segments, str(output_path), @@ -95,21 +95,38 @@ async def process_cropping_job( "reason": "cropping_failed" } - # Calculate cropped duration - cropped_duration_seconds = sum(end - start for start, end in speech_segments) + # Calculate actual cropped duration from kept segments + kept_segments = [m for m in segment_mapping if m["kept"]] + if kept_segments: + # Duration is end of last kept segment + cropped_duration_seconds = kept_segments[-1]["cropped_end"] + else: + cropped_duration_seconds = 0.0 - # Update segment timestamps to match cropped audio - # After cropping, segments are concatenated, so they need new timestamps + # Update segment timestamps using the mapping + # Only keep segments that weren't filtered out updated_segments = [] - current_time = 0.0 for i, seg in enumerate(segments): - original_duration = seg.end - seg.start - updated_seg = seg.model_copy() - updated_seg.start = current_time - updated_seg.end = current_time + original_duration - updated_segments.append(updated_seg) - current_time += original_duration - logger.debug(f"Updated segment {i}: {seg.start:.2f}-{seg.end:.2f}s โ†’ {updated_seg.start:.2f}-{updated_seg.end:.2f}s") + if i >= len(segment_mapping): + logger.warning(f"โš ๏ธ Segment {i} not in mapping, skipping") + continue + + mapping = segment_mapping[i] + if mapping["kept"]: + # Segment was kept - use the cropped timestamps + updated_seg = seg.model_copy() + updated_seg.start = mapping["cropped_start"] + updated_seg.end = mapping["cropped_end"] + updated_segments.append(updated_seg) + logger.debug( + f"Segment {i}: {seg.start:.2f}-{seg.end:.2f}s โ†’ " + f"{updated_seg.start:.2f}-{updated_seg.end:.2f}s (in cropped audio)" + ) + else: + # Segment was filtered out (too short) + logger.debug( + f"Segment {i} filtered out (duration {seg.end - seg.start:.2f}s < MIN_SPEECH_SEGMENT_DURATION)" + ) # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename From a64f4fe97006590a3256f0c01c6c0a277e46f27e Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Tue, 18 Nov 2025 10:13:28 +0000 Subject: [PATCH 09/12] removed deprecated function and invalid import --- .../controllers/audio_controller.py | 108 ------------------ .../routers/modules/conversation_routes.py | 9 -- 2 files changed, 117 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 483958a6..4ca72ca0 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -18,7 +18,6 @@ from advanced_omi_backend.utils.audio_utils import ( AudioValidationError, write_audio_file, - _process_audio_cropping_with_relative_timestamps, ) from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.models.user import User @@ -227,110 +226,3 @@ async def get_cropped_audio_info(audio_uuid: str, user: User): # Database or unexpected errors when fetching audio metadata audio_logger.exception("Error fetching cropped audio info") return JSONResponse(status_code=500, content={"error": "Error fetching cropped audio info"}) - - -async def reprocess_audio_cropping(audio_uuid: str, user: User): - """ - Re-process audio cropping operation for an audio file. - - This is an audio service operation that re-runs the audio cropping process - to extract only speech segments from the full audio file. - - Used for: Re-processing audio when cropping failed or needs updating. - Works with: Conversation model and audio_utils cropping functions. - """ - try: - # Find the conversation - conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid) - if not conversation: - return JSONResponse(status_code=404, content={"error": "Conversation not found"}) - - # Check ownership for non-admin users - if not user.is_superuser: - if conversation.user_id != str(user.user_id): - return JSONResponse(status_code=404, content={"error": "Conversation not found"}) - - audio_path = conversation.audio_path - if not audio_path: - return JSONResponse( - status_code=400, content={"error": "No audio file found for this conversation"} - ) - - # Check if file exists - try multiple possible locations - possible_paths = [ - Path("/app/audio_chunks") / audio_path, - Path(audio_path), # fallback to relative path - ] - - full_audio_path = None - for path in possible_paths: - if path.exists(): - full_audio_path = path - break - - if not full_audio_path: - return JSONResponse( - status_code=422, - content={ - "error": "Audio file not found on disk", - "details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations", - "searched_paths": [str(p) for p in possible_paths] - } - ) - - # Get speech segments from the conversation - speech_segments = conversation.speech_segments if hasattr(conversation, 'speech_segments') else [] - if not speech_segments: - return JSONResponse( - status_code=400, - content={"error": "No speech segments found for this conversation"} - ) - - # Generate output path for cropped audio - cropped_filename = f"cropped_{audio_uuid}.wav" - output_path = Path("/app/audio_chunks") / cropped_filename - - # Reprocess the audio cropping (simplified without repository) - try: - from advanced_omi_backend.utils.audio_utils import extract_speech_segments - - success = await extract_speech_segments( - str(full_audio_path), - speech_segments, - str(output_path) - ) - - if success: - # Update conversation with cropped audio path - conversation.cropped_audio_path = cropped_filename - await conversation.save() - - audio_logger.info(f"Successfully reprocessed audio cropping for {audio_uuid}") - return JSONResponse( - content={"message": f"Audio cropping reprocessed for {audio_uuid}"} - ) - else: - audio_logger.error(f"Failed to reprocess audio cropping for {audio_uuid}") - return JSONResponse( - status_code=500, content={"error": "Failed to reprocess audio cropping"} - ) - - except (OSError, IOError) as processing_error: - # File I/O errors during audio cropping - audio_logger.exception("File I/O error during audio cropping reprocessing") - return JSONResponse( - status_code=500, - content={"error": f"Audio processing failed: {str(processing_error)}"}, - ) - except Exception as processing_error: - # Unexpected errors during cropping operation - audio_logger.exception("Unexpected error during audio cropping reprocessing") - return JSONResponse( - status_code=500, - content={"error": f"Audio processing failed: {str(processing_error)}"}, - ) - - except Exception as e: - # Database or unexpected errors in reprocessing handler - audio_logger.exception("Error reprocessing audio cropping") - return JSONResponse(status_code=500, content={"error": "Error reprocessing audio cropping"}) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index 54e40d36..e2b76f7d 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -50,15 +50,6 @@ async def get_cropped_audio_info( return await audio_controller.get_cropped_audio_info(audio_uuid, current_user) -# Deprecated -@router.post("/{audio_uuid}/reprocess") -async def reprocess_audio_cropping( - audio_uuid: str, current_user: User = Depends(current_active_user) -): - """Reprocess audio cropping for a conversation. Users can only reprocess their own conversations.""" - return await audio_controller.reprocess_audio_cropping(audio_uuid, current_user) - - # New reprocessing endpoints @router.post("/{conversation_id}/reprocess-transcript") async def reprocess_transcript( From bcd2a2b42fc7439ee5f49c54aa22990e2f39a2b0 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Tue, 18 Nov 2025 10:23:52 +0000 Subject: [PATCH 10/12] removed old processing routes that are deprecated --- .../controllers/system_controller.py | 20 ------------------- .../routers/modules/system_routes.py | 14 ------------- 2 files changed, 34 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 19852e68..b75f22c8 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -55,26 +55,6 @@ async def get_auth_config(): } -async def get_all_processing_tasks(): - """Get all active processing tasks. - - NOTE: This function is deprecated - old processor architecture has been removed. - Kept for backward compatibility but always returns empty list. - """ - logger.warning("get_all_processing_tasks called - deprecated function") - return [] - - -async def get_processing_task_status(client_id: str): - """Get processing task status for a specific client. - - NOTE: This function is deprecated - old processor architecture has been removed. - Kept for backward compatibility but always returns None. - """ - logger.warning(f"get_processing_task_status called for {client_id} - deprecated function") - return None - - async def get_processor_status(): """Get RQ worker and queue status.""" try: diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py index c03a7802..95569afa 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py @@ -30,20 +30,6 @@ async def get_auth_config(): return await system_controller.get_auth_config() -@router.get("/processor/tasks") -async def get_all_processing_tasks(current_user: User = Depends(current_superuser)): - """Get all active processing tasks. Admin only.""" - return await system_controller.get_all_processing_tasks() - - -@router.get("/processor/tasks/{client_id}") -async def get_processing_task_status( - client_id: str, current_user: User = Depends(current_superuser) -): - """Get processing task status for a specific client. Admin only.""" - return await system_controller.get_processing_task_status(client_id) - - @router.get("/processor/status") async def get_processor_status(current_user: User = Depends(current_superuser)): """Get processor queue status and health. Admin only.""" From c26c75a3399095acf84cc087d5826e361a8615f7 Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Tue, 18 Nov 2025 10:28:29 +0000 Subject: [PATCH 11/12] fixed incorrect param --- backends/advanced/src/advanced_omi_backend/client_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/client_manager.py b/backends/advanced/src/advanced_omi_backend/client_manager.py index 7a74f036..5a3131b5 100644 --- a/backends/advanced/src/advanced_omi_backend/client_manager.py +++ b/backends/advanced/src/advanced_omi_backend/client_manager.py @@ -104,7 +104,7 @@ def get_client_count(self) -> int: """ return len(self._active_clients) - def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState": + def create_client(self, client_id: str, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState": """ Atomically create and register a new client. @@ -113,7 +113,6 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, Args: client_id: Unique client identifier - ac_repository: Audio chunks repository chunk_dir: Directory for audio chunks user_id: User ID who owns this client user_email: Optional user email From 62d0951ae3552cd88a6e43dfe1e4c3bc6fc8347e Mon Sep 17 00:00:00 2001 From: Stu Alexandere Date: Tue, 18 Nov 2025 16:35:23 +0000 Subject: [PATCH 12/12] ensure cleanup happens on convo fail and moved some methods to convo_utils --- .../utils/conversation_utils.py | 327 ++++++++- .../workers/conversation_jobs.py | 631 +++++++++--------- 2 files changed, 616 insertions(+), 342 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index 67df1e9d..363b2efd 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -4,8 +4,12 @@ Extracted from legacy TranscriptionService to be reusable across V2 architecture. """ +import asyncio import logging -from typing import Optional +import time +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, Any, List from advanced_omi_backend.config import get_speech_detection_settings from advanced_omi_backend.llm_client import async_generate @@ -37,10 +41,7 @@ def is_meaningful_speech(combined_results: dict) -> bool: if not combined_results.get("text"): return False - transcript_data = { - "text": combined_results["text"], - "words": combined_results.get("words", []) - } + transcript_data = {"text": combined_results["text"], "words": combined_results.get("words", [])} speech_analysis = analyze_speech(transcript_data) return speech_analysis["has_speech"] @@ -82,17 +83,14 @@ def analyze_speech(transcript_data: dict) -> dict: # Method 1: Word-level analysis (preferred - has confidence scores and timing) if words: # Filter by confidence threshold - valid_words = [ - w for w in words - if w.get("confidence", 0) >= settings["min_confidence"] - ] + valid_words = [w for w in words if w.get("confidence", 0) >= settings["min_confidence"]] if len(valid_words) < settings["min_words"]: return { "has_speech": False, "reason": f"Not enough valid words ({len(valid_words)} < {settings['min_words']})", "word_count": len(valid_words), - "duration": 0.0 + "duration": 0.0, } # Calculate speech duration from word timing @@ -107,7 +105,7 @@ def analyze_speech(transcript_data: dict) -> dict: "speech_start": speech_start, "speech_end": speech_end, "duration": speech_duration, - "reason": f"Valid speech detected ({len(valid_words)} words, {speech_duration:.1f}s)" + "reason": f"Valid speech detected ({len(valid_words)} words, {speech_duration:.1f}s)", } # Method 2: Text-only fallback (when no word-level data available) @@ -122,7 +120,7 @@ def analyze_speech(transcript_data: dict) -> dict: "speech_end": 0.0, "duration": 0.0, "reason": f"Valid speech detected ({word_count} words, no timing data)", - "fallback": True + "fallback": True, } # No speech detected @@ -130,7 +128,7 @@ def analyze_speech(transcript_data: dict) -> dict: "has_speech": False, "reason": "No meaningful speech content detected", "word_count": 0, - "duration": 0.0 + "duration": 0.0, } @@ -225,7 +223,11 @@ async def generate_short_summary(text: str, segments: Optional[list] = None) -> return "No content" try: - speaker_instruction = "- Include speaker names when relevant (e.g., \"John discusses X with Sarah\")\n" if include_speakers else "" + speaker_instruction = ( + '- Include speaker names when relevant (e.g., "John discusses X with Sarah")\n' + if include_speakers + else "" + ) prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation: @@ -246,7 +248,11 @@ async def generate_short_summary(text: str, segments: Optional[list] = None) -> except Exception as e: logger.warning(f"Failed to generate LLM short summary: {e}") # Fallback to simple summary generation - return conversation_text[:120] + "..." if len(conversation_text) > 120 else conversation_text or "No content" + return ( + conversation_text[:120] + "..." + if len(conversation_text) > 120 + else conversation_text or "No content" + ) # Backward compatibility alias @@ -302,10 +308,14 @@ async def generate_detailed_summary(text: str, segments: Optional[list] = None) return "No meaningful content to summarize" try: - speaker_instruction = """- Attribute key points and statements to specific speakers when relevant + speaker_instruction = ( + """- Attribute key points and statements to specific speakers when relevant - Capture the flow of conversation between participants - Note any agreements, disagreements, or important exchanges -""" if include_speakers else "" +""" + if include_speakers + else "" + ) prompt = f"""Generate a comprehensive, detailed summary of this conversation transcript. @@ -337,9 +347,13 @@ async def generate_detailed_summary(text: str, segments: Optional[list] = None) except Exception as e: logger.warning(f"Failed to generate detailed summary: {e}") # Fallback to returning cleaned transcript - lines = conversation_text.split('\n') - cleaned = '\n'.join(line.strip() for line in lines if line.strip()) - return cleaned[:2000] + "..." if len(cleaned) > 2000 else cleaned or "No meaningful content to summarize" + lines = conversation_text.split("\n") + cleaned = "\n".join(line.strip() for line in lines if line.strip()) + return ( + cleaned[:2000] + "..." + if len(cleaned) > 2000 + else cleaned or "No meaningful content to summarize" + ) # Backward compatibility aliases for deprecated speaker-specific methods @@ -367,3 +381,276 @@ async def generate_summary_with_speakers(segments: list) -> str: # Extract text from segments for compatibility text = "\n".join(s.get("text", "") for s in segments if s.get("text")) return await generate_short_summary(text, segments=segments) + + +# ============================================================================ +# Conversation Job Helpers +# ============================================================================ + + +async def link_job_metadata_to_conversation( + conversation_id: str, speech_job_id: Optional[str], current_job +) -> None: + """ + Link job metadata to conversation for Queue UI tracking. + + Updates metadata for current job and optionally parent jobs (speech detection, speaker check). + Uses "first conversation wins" pattern - only links if conversation_id not already set. + + Args: + conversation_id: Conversation ID to link + speech_job_id: Optional parent speech detection job ID + current_job: Current RQ job instance (must not be None) + """ + from advanced_omi_backend.controllers.queue_controller import redis_conn + + # Update current job metadata + if not current_job.meta: + current_job.meta = {} + current_job.meta["conversation_id"] = conversation_id + current_job.save_meta() + logger.info(f"๐Ÿ”— Linked job {current_job.id[:12]} to conversation {conversation_id[:12]}") + + # Update parent jobs if provided + if not speech_job_id: + return + + _link_parent_job(speech_job_id, conversation_id, redis_conn) + + +def _link_parent_job(job_id: str, conversation_id: str, redis_conn) -> None: + """ + Link a parent job to conversation, with cascading to speaker check job. + + Helper function to reduce nesting in link_job_metadata_to_conversation. + """ + from rq.job import Job + + try: + job = Job.fetch(job_id, connection=redis_conn) + except Exception as e: + logger.warning(f"โš ๏ธ Failed to fetch job {job_id[:12]}: {e}") + return + + if not job.meta: + logger.debug(f"Job {job_id[:12]} has no metadata, skipping") + return + + # Check if already linked (first conversation wins) + existing_conv_id = job.meta.get("conversation_id") + if existing_conv_id: + logger.info(f"โญ๏ธ Job {job_id[:12]} already linked to {existing_conv_id[:12]}, skipping") + return + + # Link this job + job.meta["conversation_id"] = conversation_id + job.meta.pop("session_level", None) # Remove session-level flag + job.save_meta() + logger.info(f"๐Ÿ”— Linked job {job_id[:12]} to conversation {conversation_id[:12]}") + + # Cascade to speaker check job if present + speaker_check_job_id = job.meta.get("speaker_check_job_id") + if speaker_check_job_id: + _link_parent_job(speaker_check_job_id, conversation_id, redis_conn) + + +async def signal_conversation_file_rotation( + session_id: str, conversation_id: str, redis_client +) -> None: + """ + Signal audio persistence job to rotate to new conversation file. + + Sets Redis key that audio_streaming_persistence_job watches to detect + when a new conversation starts and rotate the output file accordingly. + + Args: + session_id: Session ID + conversation_id: Conversation ID to rotate to + redis_client: Redis client instance + """ + # Signal audio persistence job to rotate to this conversation's file + rotation_signal_key = f"conversation:current:{session_id}" + await redis_client.set(rotation_signal_key, conversation_id, ex=86400) # 24 hour TTL + logger.info( + f"๐Ÿ”„ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}" + ) + + +def extract_speakers_from_segments(segments: List[Dict[str, Any]]) -> List[str]: + """ + Extract unique speaker names from segments. + + Args: + segments: List of segments with speaker information + + Returns: + List of unique speaker names (excluding "Unknown") + """ + speakers = [] + if segments: + for seg in segments: + speaker = seg.get("speaker", "Unknown") + if speaker and speaker != "Unknown" and speaker not in speakers: + speakers.append(speaker) + return speakers + + +async def track_speech_activity( + speech_analysis: Dict[str, Any], last_word_count: int, conversation_id: str, redis_client +) -> tuple[float, int]: + """ + Track new speech activity and update last speech timestamp. + + Uses word count instead of chunk count to avoid false positives from noise/silence. + + Args: + speech_analysis: Speech analysis results from analyze_speech() + last_word_count: Previous word count + conversation_id: Conversation ID for Redis key + redis_client: Redis client instance + + Returns: + Tuple of (last_meaningful_speech_time, new_word_count) + """ + current_word_count = speech_analysis.get("word_count", 0) + + if current_word_count > last_word_count: + last_meaningful_speech_time = time.time() + + # Store timestamp in Redis for visibility/debugging + await redis_client.set( + f"conversation:last_speech:{conversation_id}", + last_meaningful_speech_time, + ex=86400, # 24 hour TTL + ) + logger.debug( + f"๐Ÿ—ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp" + ) + + return last_meaningful_speech_time, current_word_count + + # No new speech - return None to indicate no update + return None, last_word_count + + +async def update_job_progress_metadata( + current_job, + conversation_id: str, + session_id: str, + client_id: str, + combined: Dict[str, Any], + speech_analysis: Dict[str, Any], + speakers: List[str], + last_meaningful_speech_time: float, +) -> None: + """ + Update job metadata with current conversation progress. + + Args: + current_job: Current RQ job instance + conversation_id: Conversation ID + session_id: Session ID + client_id: Client ID + combined: Combined transcription results + speech_analysis: Speech analysis results + speakers: List of speakers + last_meaningful_speech_time: Timestamp of last speech + """ + if not current_job: + return + + if not current_job.meta: + current_job.meta = {} + + # Set created_at only once (first time we update metadata) + if "created_at" not in current_job.meta: + current_job.meta["created_at"] = datetime.now().isoformat() + + current_job.meta.update( + { + "conversation_id": conversation_id, + "audio_uuid": session_id, # Link to session for job grouping + "client_id": client_id, # Ensure client_id is always present + "transcript": ( + combined["text"][:500] + "..." if len(combined["text"]) > 500 else combined["text"] + ), # First 500 chars + "transcript_length": len(combined["text"]), + "speakers": speakers, + "word_count": speech_analysis.get("word_count", 0), + "duration_seconds": speech_analysis.get("duration", 0), + "has_speech": speech_analysis.get("has_speech", False), + "last_update": datetime.now().isoformat(), + "inactivity_seconds": time.time() - last_meaningful_speech_time, + "chunks_processed": combined["chunk_count"], + } + ) + current_job.save_meta() + + +async def wait_for_audio_file( + conversation_id: str, redis_client, max_wait_seconds: int = 30 +) -> Optional[str]: + """ + Wait for audio persistence job to write audio file path to Redis. + + Polls Redis for audio file path with configurable timeout. + + Args: + conversation_id: Conversation ID + redis_client: Redis client instance + max_wait_seconds: Maximum wait time in seconds (default: 30) + + Returns: + Audio file path (str) if ready, None if timeout + """ + audio_file_key = f"audio:file:{conversation_id}" + wait_start = time.time() + + while time.time() - wait_start < max_wait_seconds: + file_path_bytes = await redis_client.get(audio_file_key) + if file_path_bytes: + wait_duration = time.time() - wait_start + logger.info(f"โœ… Audio file ready after {wait_duration:.1f}s") + return file_path_bytes.decode() + + # Log progress every 5 seconds + elapsed = time.time() - wait_start + if elapsed % 5 == 0: + logger.info( + f"โณ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)" + ) + + await asyncio.sleep(0.5) # Check every 500ms + + logger.error( + f"โŒ Audio file path not found in Redis after {max_wait_seconds}s (key: {audio_file_key})" + ) + logger.warning( + "โš ๏ธ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription" + ) + return None + + +async def mark_conversation_deleted(conversation_id: str, deletion_reason: str) -> None: + """ + Mark a conversation as deleted with a specific reason. + + Uses soft delete pattern - conversation remains in database but marked as deleted. + + Args: + conversation_id: Conversation ID to mark as deleted + deletion_reason: Reason for deletion (e.g., "no_meaningful_speech", "audio_file_not_ready") + """ + from advanced_omi_backend.models.conversation import Conversation + + logger.warning( + f"๐Ÿ—‘๏ธ Marking conversation {conversation_id} as deleted - reason: {deletion_reason}" + ) + + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + conversation.deleted = True + conversation.deletion_reason = deletion_reason + conversation.deleted_at = datetime.utcnow() + await conversation.save() + logger.info(f"โœ… Marked conversation {conversation_id} as deleted") diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index ba3700d0..67e4d935 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -6,14 +6,154 @@ import asyncio import logging -import time +import time, os +from datetime import datetime from typing import Dict, Any - +from rq.job import Job from advanced_omi_backend.models.job import async_job +from advanced_omi_backend.controllers.queue_controller import redis_conn + +from advanced_omi_backend.utils.conversation_utils import ( + analyze_speech, + extract_speakers_from_segments, + track_speech_activity, + update_job_progress_metadata, +) +from advanced_omi_backend.utils.conversation_utils import ( + is_meaningful_speech, + mark_conversation_deleted, +) + +from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs logger = logging.getLogger(__name__) +async def handle_end_of_conversation( + session_id: str, + conversation_id: str, + client_id: str, + user_id: str, + start_time: float, + last_result_count: int, + timeout_triggered: bool, + redis_client, +) -> Dict[str, Any]: + """ + Handle end-of-conversation cleanup and session restart logic. + + This function is called at the end of open_conversation_job to: + 1. Clean up Redis streams and tracking keys + 2. Increment conversation count for the session + 3. Re-enqueue speech detection job if session is still active + + Args: + session_id: Stream session ID + conversation_id: Conversation ID that just completed + client_id: Client ID + user_id: User ID + start_time: Job start time (for runtime calculation) + last_result_count: Number of transcription results processed + timeout_triggered: Whether closure was due to inactivity timeout + redis_client: Redis client instance + + Returns: + Dict with conversation_id, conversation_count, final_result_count, runtime_seconds, timeout_triggered + """ + # Clean up Redis streams to prevent memory leaks + try: + # NOTE: Do NOT delete audio:stream:{client_id} here! + # The audio stream is per-client (WebSocket connection), not per-conversation. + # It's still actively receiving audio and will be reused by the next conversation. + # Only delete it on WebSocket disconnect (handled in websocket_controller.py) + + # Delete the transcription results stream (per-session/conversation) + results_stream_key = f"transcription:results:{session_id}" + await redis_client.delete(results_stream_key) + logger.info(f"๐Ÿงน Deleted results stream: {results_stream_key}") + + # Set TTL on session key (expire after 1 hour) + session_key = f"audio:session:{session_id}" + await redis_client.expire(session_key, 3600) + logger.info(f"โฐ Set TTL on session key: {session_key}") + except Exception as cleanup_error: + logger.warning(f"โš ๏ธ Error during stream cleanup: {cleanup_error}") + + # Clean up Redis tracking keys so speech detection job knows conversation is complete + open_job_key = f"open_conversation:session:{session_id}" + await redis_client.delete(open_job_key) + logger.info(f"๐Ÿงน Cleaned up tracking key {open_job_key}") + + # Delete the conversation:current signal so audio persistence knows conversation ended + current_conversation_key = f"conversation:current:{session_id}" + await redis_client.delete(current_conversation_key) + logger.info(f"๐Ÿงน Deleted conversation:current signal for session {session_id[:12]}") + + # Increment conversation count for this session + conversation_count_key = f"session:conversation_count:{session_id}" + conversation_count = await redis_client.incr(conversation_count_key) + await redis_client.expire(conversation_count_key, 3600) # 1 hour TTL + logger.info(f"๐Ÿ“Š Conversation count for session {session_id}: {conversation_count}") + + # Check if session is still active (user still recording) and restart listening jobs + session_key = f"audio:session:{session_id}" + session_status = await redis_client.hget(session_key, "status") + if session_status: + status_str = ( + session_status.decode() if isinstance(session_status, bytes) else session_status + ) + + if status_str == "active": + # Session still active - enqueue new speech detection for next conversation + logger.info( + f"๐Ÿ”„ Enqueueing new speech detection (conversation #{conversation_count + 1})" + ) + + from advanced_omi_backend.controllers.queue_controller import ( + transcription_queue, + redis_conn, + JOB_RESULT_TTL, + ) + from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job + + # Enqueue speech detection job for next conversation (audio persistence keeps running) + speech_job = transcription_queue.enqueue( + stream_speech_detection_job, + session_id, + user_id, + client_id, + job_timeout=3600, + result_ttl=JOB_RESULT_TTL, + job_id=f"speech-detect_{session_id[:12]}_{conversation_count}", + description=f"Listening for speech (conversation #{conversation_count + 1})", + meta={"audio_uuid": session_id, "client_id": client_id, "session_level": True}, + ) + + # Store job ID for cleanup (keyed by client_id for WebSocket cleanup) + try: + redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) + logger.info(f"๐Ÿ“Œ Stored speech detection job ID for client {client_id}") + except Exception as e: + logger.warning(f"โš ๏ธ Failed to store job ID for {client_id}: {e}") + + logger.info(f"โœ… Enqueued speech detection job {speech_job.id}") + else: + logger.info( + f"Session {session_id} status={status_str}, not restarting (user stopped recording)" + ) + else: + logger.info(f"Session {session_id} not found, not restarting (session ended)") + + return { + "conversation_id": conversation_id, + "conversation_count": conversation_count, + "deleted": False, # This conversation was not deleted (normal completion) + "final_result_count": last_result_count, + "runtime_seconds": time.time() - start_time, + "timeout_triggered": timeout_triggered, + } + + @async_job(redis=True, beanie=True) async def open_conversation_job( session_id: str, @@ -22,7 +162,7 @@ async def open_conversation_job( speech_detected_at: float, speech_job_id: str = None, *, - redis_client=None + redis_client=None, ) -> Dict[str, Any]: """ Long-running RQ job that creates and continuously updates conversation with transcription results. @@ -46,18 +186,22 @@ async def open_conversation_job( from advanced_omi_backend.models.conversation import Conversation, create_conversation from rq import get_current_job - logger.info(f"๐Ÿ“ Creating and opening conversation for session {session_id} (speech detected at {speech_detected_at})") + logger.info( + f"๐Ÿ“ Creating and opening conversation for session {session_id} (speech detected at {speech_detected_at})" + ) # Get current job for meta storage current_job = get_current_job() - + current_job.meta = {} + current_job.save_meta() + # Create minimal streaming conversation (conversation_id auto-generated) conversation = create_conversation( audio_uuid=session_id, user_id=user_id, client_id=client_id, title="Recording...", - summary="Transcribing audio..." + summary="Transcribing audio...", ) # Save to database @@ -65,60 +209,24 @@ async def open_conversation_job( conversation_id = conversation.conversation_id # Get the auto-generated ID logger.info(f"โœ… Created streaming conversation {conversation_id} for session {session_id}") - # Update THIS job's metadata with conversation_id (for Queue page grouping) - if current_job: - if not current_job.meta: - current_job.meta = {} - current_job.meta['conversation_id'] = conversation_id - current_job.save_meta() - logger.info(f"๐Ÿ”— Updated open_conversation_job {current_job.id[:12]} metadata with conversation_id") - - # Update speech detection job metadata with conversation_id - if speech_job_id: - try: - from rq.job import Job - from advanced_omi_backend.controllers.queue_controller import redis_conn - - speech_job = Job.fetch(speech_job_id, connection=redis_conn) - if speech_job and speech_job.meta: - # Only update if conversation_id not already set (first conversation wins) - if not speech_job.meta.get('conversation_id'): - speech_job.meta['conversation_id'] = conversation_id - # Remove session_level flag - now linked to conversation - speech_job.meta.pop('session_level', None) - speech_job.save_meta() - logger.info(f"๐Ÿ”— Updated speech job {speech_job_id[:12]} with conversation_id") - else: - logger.info(f"โญ๏ธ Speech job {speech_job_id[:12]} already linked to conversation {speech_job.meta.get('conversation_id')[:12]}") - - # Also update the speaker check job if referenced in speech job metadata - # Only update if it doesn't already have a conversation_id (first conversation wins) - speaker_check_job_id = speech_job.meta.get('speaker_check_job_id') - if speaker_check_job_id: - try: - speaker_check_job = Job.fetch(speaker_check_job_id, connection=redis_conn) - if speaker_check_job and speaker_check_job.meta: - # Only update if conversation_id not already set - if not speaker_check_job.meta.get('conversation_id'): - speaker_check_job.meta['conversation_id'] = conversation_id - speaker_check_job.save_meta() - logger.info(f"๐Ÿ”— Updated speaker check job {speaker_check_job_id} with conversation_id") - else: - logger.info(f"โญ๏ธ Speaker check job {speaker_check_job_id} already linked to conversation {speaker_check_job.meta.get('conversation_id')[:12]}") - except Exception as speaker_err: - logger.warning(f"โš ๏ธ Failed to update speaker check job metadata: {speaker_err}") - except Exception as e: - logger.warning(f"โš ๏ธ Failed to update speech job metadata: {e}") - - # Store conversation_id in Redis for finalize job to find - conversation_key = f"conversation:session:{session_id}" - await redis_client.set(conversation_key, conversation_id, ex=86400) # 24 hour TTL - logger.info(f"๐Ÿ’พ Stored conversation ID in Redis: {conversation_key}") - + # Link job metadata to conversation (cascading updates) + current_job.meta["conversation_id"] = conversation_id + current_job.save_meta() + speech_job = Job.fetch(speech_job_id, connection=redis_conn) + speech_job.meta["conversation_id"] = conversation_id + speech_job.save_meta() + speaker_check_job_id = speech_job.meta.get("speaker_check_job_id") + if speaker_check_job_id: + speaker_check_job = Job.fetch(speaker_check_job_id, connection=redis_conn) + speaker_check_job.meta["conversation_id"] = conversation_id + speaker_check_job.save_meta() + # Signal audio persistence job to rotate to this conversation's file - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.set(current_conversation_key, conversation_id, ex=86400) # 24 hour TTL - logger.info(f"๐Ÿ”„ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}") + rotation_signal_key = f"conversation:current:{session_id}" + await redis_client.set(rotation_signal_key, conversation_id, ex=86400) # 24 hour TTL + logger.info( + f"๐Ÿ”„ Signaled audio persistence to rotate file for conversation {conversation_id[:12]}" + ) # Use redis_client parameter aggregator = TranscriptionResultsAggregator(redis_client) @@ -132,7 +240,6 @@ async def open_conversation_job( finalize_received = False # Inactivity timeout configuration - import os inactivity_timeout_seconds = float(os.getenv("SPEECH_INACTIVITY_THRESHOLD_SECONDS", "60")) inactivity_timeout_minutes = inactivity_timeout_seconds / 60 last_meaningful_speech_time = time.time() # Initialize with conversation start @@ -140,7 +247,9 @@ async def open_conversation_job( last_inactivity_log_time = time.time() # Track when we last logged inactivity last_word_count = 0 # Track word count to detect actual new speech - logger.info(f"๐Ÿ“Š Conversation timeout configured: {inactivity_timeout_minutes} minutes ({inactivity_timeout_seconds}s)") + logger.info( + f"๐Ÿ“Š Conversation timeout configured: {inactivity_timeout_minutes} minutes ({inactivity_timeout_seconds}s)" + ) while True: # Check if session is finalizing (set by producer when recording stops) @@ -148,7 +257,9 @@ async def open_conversation_job( status = await redis_client.hget(session_key, "status") if status and status.decode() in ["finalizing", "complete"]: finalize_received = True - logger.info(f"๐Ÿ›‘ Session finalizing, waiting for audio persistence job to complete...") + logger.info( + f"๐Ÿ›‘ Session finalizing, waiting for audio persistence job to complete..." + ) break # Exit immediately when finalize signal received # Check max runtime timeout @@ -161,63 +272,36 @@ async def open_conversation_job( current_count = combined["chunk_count"] # Analyze speech content using detailed analysis - from advanced_omi_backend.utils.conversation_utils import analyze_speech - transcript_data = { - "text": combined["text"], - "words": combined.get("words", []) - } + + transcript_data = {"text": combined["text"], "words": combined.get("words", [])} speech_analysis = analyze_speech(transcript_data) # Extract speaker information from segments - speakers = [] segments = combined.get("segments", []) - if segments: - for seg in segments: - speaker = seg.get("speaker", "Unknown") - if speaker and speaker != "Unknown" and speaker not in speakers: - speakers.append(speaker) - - # Check if NEW speech arrived (word count increased) - # Track word count instead of chunk count to avoid resetting on noise/silence chunks - current_word_count = speech_analysis.get("word_count", 0) - if current_word_count > last_word_count: - last_meaningful_speech_time = time.time() - last_word_count = current_word_count - # Store timestamp in Redis for visibility/debugging - await redis_client.set( - f"conversation:last_speech:{conversation_id}", - last_meaningful_speech_time, - ex=86400 # 24 hour TTL - ) - logger.debug(f"๐Ÿ—ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp") - - # Update job meta with current state - if current_job: - if not current_job.meta: - current_job.meta = {} - - from datetime import datetime + speakers = extract_speakers_from_segments(segments) - # Set created_at only once (first time we update metadata) - if 'created_at' not in current_job.meta: - current_job.meta['created_at'] = datetime.now().isoformat() + # Track new speech activity (word count based) + new_speech_time, last_word_count = await track_speech_activity( + speech_analysis=speech_analysis, + last_word_count=last_word_count, + conversation_id=conversation_id, + redis_client=redis_client, + ) + if new_speech_time: + last_meaningful_speech_time = new_speech_time - current_job.meta.update({ - "conversation_id": conversation_id, - "audio_uuid": session_id, # Link to session for job grouping - "client_id": client_id, # Ensure client_id is always present - "transcript": combined["text"][:500] + "..." if len(combined["text"]) > 500 else combined["text"], # First 500 chars - "transcript_length": len(combined["text"]), - "speakers": speakers, - "word_count": speech_analysis.get("word_count", 0), - "duration_seconds": speech_analysis.get("duration", 0), - "has_speech": speech_analysis.get("has_speech", False), - "last_update": datetime.now().isoformat(), - "inactivity_seconds": time.time() - last_meaningful_speech_time, - "chunks_processed": current_count - }) - current_job.save_meta() + # Update job metadata with current progress + await update_job_progress_metadata( + current_job=current_job, + conversation_id=conversation_id, + session_id=session_id, + client_id=client_id, + combined=combined, + speech_analysis=speech_analysis, + speakers=speakers, + last_meaningful_speech_time=last_meaningful_speech_time, + ) # Check inactivity timeout and log every 10 seconds inactivity_duration = time.time() - last_meaningful_speech_time @@ -225,7 +309,9 @@ async def open_conversation_job( # Log inactivity every 10 seconds if current_time - last_inactivity_log_time >= 10: - logger.info(f"โฑ๏ธ Time since last speech: {inactivity_duration:.1f}s (timeout: {inactivity_timeout_seconds:.0f}s)") + logger.info( + f"โฑ๏ธ Time since last speech: {inactivity_duration:.1f}s (timeout: {inactivity_timeout_seconds:.0f}s)" + ) last_inactivity_log_time = current_time if inactivity_duration > inactivity_timeout_seconds: @@ -251,224 +337,120 @@ async def open_conversation_job( await asyncio.sleep(1) # Check every second for responsiveness - logger.info(f"โœ… Conversation {conversation_id} updates complete, checking for meaningful speech...") + logger.info( + f"โœ… Conversation {conversation_id} updates complete, checking for meaningful speech..." + ) # FINAL VALIDATION: Check if conversation has meaningful speech before post-processing # This prevents empty/noise-only conversations from being processed and saved combined = await aggregator.get_combined_results(session_id) - from advanced_omi_backend.utils.conversation_utils import is_meaningful_speech - - if not is_meaningful_speech(combined): - logger.warning(f"โš ๏ธ Conversation {conversation_id} has no meaningful speech after finalization, marking as deleted...") - - # Mark the conversation as deleted instead of removing from database - from datetime import datetime - conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) - if conversation: - conversation.deleted = True - conversation.deletion_reason = "no_meaningful_speech" - conversation.deleted_at = datetime.utcnow() - await conversation.save() - logger.info(f"๐Ÿ—‘๏ธ Marked empty conversation {conversation_id} as deleted") - - # Clean up Redis tracking keys - conversation_key = f"conversation:session:{session_id}" - await redis_client.delete(conversation_key) - open_job_key = f"open_conversation:session:{session_id}" - await redis_client.delete(open_job_key) - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.delete(current_conversation_key) - - logger.info(f"โœ… Marked conversation as deleted, session can continue") - - return { - "conversation_id": conversation_id, - "deleted": True, - "reason": "no_meaningful_speech", - "final_result_count": last_result_count, - "runtime_seconds": time.time() - start_time, - "timeout_triggered": timeout_triggered - } - - logger.info(f"โœ… Conversation has meaningful speech, proceeding with post-processing") - - # Wait for audio_streaming_persistence_job to complete and write the file path - # Audio persistence now writes files per-conversation, so key uses conversation_id - audio_file_key = f"audio:file:{conversation_id}" - file_path_bytes = None - max_wait_audio = 30 # Maximum 30 seconds to wait for audio file - wait_start = time.time() - - while time.time() - wait_start < max_wait_audio: - file_path_bytes = await redis_client.get(audio_file_key) - if file_path_bytes: - wait_duration = time.time() - wait_start - logger.info(f"โœ… Audio file ready after {wait_duration:.1f}s") - break - # Check if still within reasonable time - elapsed = time.time() - wait_start - if elapsed % 5 == 0: # Log every 5 seconds - logger.info(f"โณ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)") - - await asyncio.sleep(0.5) # Check every 500ms - - if not file_path_bytes: - logger.error(f"โŒ Audio file path not found in Redis after {max_wait_audio}s (key: {audio_file_key})") - logger.warning(f"โš ๏ธ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription") - - # Mark the conversation as deleted - it has speech but no audio file to process - from datetime import datetime - logger.warning(f"๐Ÿ—‘๏ธ Marking incomplete conversation {conversation_id} as deleted - has speech but no audio file") - conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) - if conversation: - conversation.deleted = True - conversation.deletion_reason = "audio_file_not_ready" - conversation.deleted_at = datetime.utcnow() - await conversation.save() - logger.info(f"โœ… Marked incomplete conversation {conversation_id} as deleted") - - # Clean up Redis tracking keys - conversation_key = f"conversation:session:{session_id}" - await redis_client.delete(conversation_key) - open_job_key = f"open_conversation:session:{session_id}" - await redis_client.delete(open_job_key) - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.delete(current_conversation_key) - return { - "conversation_id": conversation_id, - "deleted": True, - "reason": "audio_file_not_ready", - "final_result_count": last_result_count, - "runtime_seconds": time.time() - start_time, - "timeout_triggered": timeout_triggered - } - else: - file_path = file_path_bytes.decode() - logger.info(f"๐Ÿ“ Retrieved audio file path: {file_path}") + if not is_meaningful_speech(combined): + logger.warning( + f"โš ๏ธ Conversation {conversation_id} has no meaningful speech after finalization" + ) - # Update conversation with audio file path - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id + # Mark conversation as deleted (soft delete) + await mark_conversation_deleted( + conversation_id=conversation_id, + deletion_reason="no_meaningful_speech", ) - if conversation: - # Store just the filename (relative to CHUNK_DIR) - from pathlib import Path - audio_filename = Path(file_path).name - conversation.audio_path = audio_filename - await conversation.save() - logger.info(f"๐Ÿ’พ Updated conversation {conversation_id[:12]} with audio_path: {audio_filename}") - else: - logger.warning(f"โš ๏ธ Conversation {conversation_id} not found for audio_path update") - # Enqueue post-conversation processing pipeline - from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs + logger.info("โœ… Marked conversation as deleted, session can continue") - job_ids = start_post_conversation_jobs( + # Call shared cleanup/restart logic before returning + return await handle_end_of_conversation( + session_id=session_id, conversation_id=conversation_id, - audio_uuid=session_id, - audio_file_path=file_path, + client_id=client_id, user_id=user_id, - post_transcription=True # Run batch transcription for streaming audio + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, ) - logger.info( - f"๐Ÿ“ฅ Pipeline: transcribe({job_ids['transcription']}) โ†’ " - f"speaker({job_ids['speaker_recognition']}) โ†’ " - f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})]" - ) + logger.info("โœ… Conversation has meaningful speech, proceeding with post-processing") - # Wait a moment to ensure jobs are registered in RQ - await asyncio.sleep(0.5) - - # Clean up Redis streams to prevent memory leaks - try: - # NOTE: Do NOT delete audio:stream:{client_id} here! - # The audio stream is per-client (WebSocket connection), not per-conversation. - # It's still actively receiving audio and will be reused by the next conversation. - # Only delete it on WebSocket disconnect (handled in websocket_controller.py) + # Wait for audio_streaming_persistence_job to complete and write the file path + from advanced_omi_backend.utils.conversation_utils import wait_for_audio_file - # Delete the transcription results stream (per-session/conversation) - results_stream_key = f"transcription:results:{session_id}" - await redis_client.delete(results_stream_key) - logger.info(f"๐Ÿงน Deleted results stream: {results_stream_key}") + file_path = await wait_for_audio_file( + conversation_id=conversation_id, redis_client=redis_client, max_wait_seconds=30 + ) - # Set TTL on session key (expire after 1 hour) - await redis_client.expire(session_key, 3600) - logger.info(f"โฐ Set TTL on session key: {session_key}") - except Exception as cleanup_error: - logger.warning(f"โš ๏ธ Error during stream cleanup: {cleanup_error}") + if not file_path: + # Mark conversation as deleted - has speech but no audio file to process + await mark_conversation_deleted( + conversation_id=conversation_id, + deletion_reason="audio_file_not_ready", + ) - # Clean up Redis tracking keys so speech detection job knows conversation is complete - open_job_key = f"open_conversation:session:{session_id}" - await redis_client.delete(open_job_key) - logger.info(f"๐Ÿงน Cleaned up tracking key {open_job_key}") + # Call shared cleanup/restart logic before returning + return await handle_end_of_conversation( + session_id=session_id, + conversation_id=conversation_id, + client_id=client_id, + user_id=user_id, + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, + ) - # Delete the conversation:current signal so audio persistence knows conversation ended - current_conversation_key = f"conversation:current:{session_id}" - await redis_client.delete(current_conversation_key) - logger.info(f"๐Ÿงน Deleted conversation:current signal for session {session_id[:12]}") + logger.info(f"๐Ÿ“ Retrieved audio file path: {file_path}") - # Increment conversation count for this session - conversation_count_key = f"session:conversation_count:{session_id}" - conversation_count = await redis_client.incr(conversation_count_key) - await redis_client.expire(conversation_count_key, 3600) # 1 hour TTL - logger.info(f"๐Ÿ“Š Conversation count for session {session_id}: {conversation_count}") + # Update conversation with audio file path + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if conversation: + # Store just the filename (relative to CHUNK_DIR) + from pathlib import Path - # Check if session is still active (user still recording) and restart listening jobs - session_status = await redis_client.hget(session_key, "status") - if session_status: - status_str = session_status.decode() if isinstance(session_status, bytes) else session_status + audio_filename = Path(file_path).name + conversation.audio_path = audio_filename + await conversation.save() + logger.info( + f"๐Ÿ’พ Updated conversation {conversation_id[:12]} with audio_path: {audio_filename}" + ) + else: + logger.warning(f"โš ๏ธ Conversation {conversation_id} not found for audio_path update") - if status_str == "active": - # Session still active - enqueue new speech detection for next conversation - logger.info(f"๐Ÿ”„ Enqueueing new speech detection (conversation #{conversation_count + 1})") + # Enqueue post-conversation processing pipeline - from advanced_omi_backend.controllers.queue_controller import transcription_queue, redis_conn, JOB_RESULT_TTL - from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job - # Enqueue speech detection job for next conversation (audio persistence keeps running) - speech_job = transcription_queue.enqueue( - stream_speech_detection_job, - session_id, - user_id, - client_id, - job_timeout=3600, - result_ttl=JOB_RESULT_TTL, - job_id=f"speech-detect_{session_id[:12]}_{conversation_count}", - description=f"Listening for speech (conversation #{conversation_count + 1})", - meta={'audio_uuid': session_id, 'client_id': client_id, 'session_level': True} - ) + job_ids = start_post_conversation_jobs( + conversation_id=conversation_id, + audio_uuid=session_id, + audio_file_path=file_path, + user_id=user_id, + post_transcription=True, # Run batch transcription for streaming audio + ) - # Store job ID for cleanup (keyed by client_id for WebSocket cleanup) - try: - redis_conn.set(f"speech_detection_job:{client_id}", speech_job.id, ex=3600) - logger.info(f"๐Ÿ“Œ Stored speech detection job ID for client {client_id}") - except Exception as e: - logger.warning(f"โš ๏ธ Failed to store job ID for {client_id}: {e}") + logger.info( + f"๐Ÿ“ฅ Pipeline: transcribe({job_ids['transcription']}) โ†’ " + f"speaker({job_ids['speaker_recognition']}) โ†’ " + f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})]" + ) - logger.info(f"โœ… Enqueued speech detection job {speech_job.id}") - else: - logger.info(f"Session {session_id} status={status_str}, not restarting (user stopped recording)") - else: - logger.info(f"Session {session_id} not found, not restarting (session ended)") + # Wait a moment to ensure jobs are registered in RQ + await asyncio.sleep(0.5) - return { - "conversation_id": conversation_id, - "conversation_count": conversation_count, - "final_result_count": last_result_count, - "runtime_seconds": time.time() - start_time, - "timeout_triggered": timeout_triggered - } + # Call shared cleanup/restart logic + return await handle_end_of_conversation( + session_id=session_id, + conversation_id=conversation_id, + client_id=client_id, + user_id=user_id, + start_time=start_time, + last_result_count=last_result_count, + timeout_triggered=timeout_triggered, + redis_client=redis_client, + ) @async_job(redis=True, beanie=True) -async def generate_title_summary_job( - conversation_id: str, - *, - redis_client=None -) -> Dict[str, Any]: +async def generate_title_summary_job(conversation_id: str, *, redis_client=None) -> Dict[str, Any]: """ Generate title, short summary, and detailed summary for a conversation using LLM. @@ -489,7 +471,7 @@ async def generate_title_summary_job( from advanced_omi_backend.utils.conversation_utils import ( generate_title, generate_short_summary, - generate_detailed_summary + generate_detailed_summary, ) logger.info(f"๐Ÿ“ Starting title/summary generation for conversation {conversation_id}") @@ -511,32 +493,30 @@ async def generate_title_summary_job( return { "success": False, "error": "No transcript or segments available", - "conversation_id": conversation_id + "conversation_id": conversation_id, } # Generate title, short summary, and detailed summary using unified utilities try: - logger.info(f"๐Ÿค– Generating title/summary/detailed_summary using LLM for conversation {conversation_id}") + logger.info( + f"๐Ÿค– Generating title/summary/detailed_summary using LLM for conversation {conversation_id}" + ) # Convert segments to dict format expected by utils segment_dicts = None if segments and len(segments) > 0: segment_dicts = [ - { - "speaker": seg.speaker, - "text": seg.text, - "start": seg.start, - "end": seg.end - } + {"speaker": seg.speaker, "text": seg.text, "start": seg.start, "end": seg.end} for seg in segments ] # Generate all three summaries in parallel for efficiency import asyncio + title, short_summary, detailed_summary = await asyncio.gather( generate_title(transcript_text, segments=segment_dicts), generate_short_summary(transcript_text, segments=segment_dicts), - generate_detailed_summary(transcript_text, segments=segment_dicts) + generate_detailed_summary(transcript_text, segments=segment_dicts), ) conversation.title = title @@ -553,7 +533,7 @@ async def generate_title_summary_job( "success": False, "error": str(gen_error), "conversation_id": conversation_id, - "processing_time_seconds": time.time() - start_time + "processing_time_seconds": time.time() - start_time, } # Save the updated conversation @@ -563,21 +543,28 @@ async def generate_title_summary_job( # Update job metadata from rq import get_current_job + current_job = get_current_job() if current_job: if not current_job.meta: current_job.meta = {} - current_job.meta.update({ - "conversation_id": conversation_id, - "title": conversation.title, - "summary": conversation.summary, - "detailed_summary_length": len(conversation.detailed_summary) if conversation.detailed_summary else 0, - "segment_count": len(segments), - "processing_time": processing_time - }) + current_job.meta.update( + { + "conversation_id": conversation_id, + "title": conversation.title, + "summary": conversation.summary, + "detailed_summary_length": ( + len(conversation.detailed_summary) if conversation.detailed_summary else 0 + ), + "segment_count": len(segments), + "processing_time": processing_time, + } + ) current_job.save_meta() - logger.info(f"โœ… Title/summary generation completed for {conversation_id} in {processing_time:.2f}s") + logger.info( + f"โœ… Title/summary generation completed for {conversation_id} in {processing_time:.2f}s" + ) return { "success": True, @@ -585,5 +572,5 @@ async def generate_title_summary_job( "title": conversation.title, "summary": conversation.summary, "detailed_summary": conversation.detailed_summary, - "processing_time_seconds": processing_time + "processing_time_seconds": processing_time, }