diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index a6a406c8..fdc9177e 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -321,8 +321,8 @@ def start_post_conversation_jobs( Start post-conversation processing jobs after conversation is created. This creates the standard processing chain after a conversation is created: - 1. Audio cropping job - Removes silence from audio - 2. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 1. [Optional] Transcription job - Batch transcription (if post_transcription=True) + 2. Audio cropping job - Removes silence from audio 3. Speaker recognition job - Identifies speakers in audio 4. Memory extraction job - Extracts memories from conversation (parallel) 5. Title/summary generation job - Generates title and summary (parallel) @@ -348,7 +348,29 @@ def start_post_conversation_jobs( version_id = transcript_version_id or str(uuid.uuid4()) - # Step 1: Audio cropping job + # Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps) + # Even for streaming, we need batch transcription before cropping to fix cumulative timestamps + transcribe_job_id = f"transcribe_{conversation_id[:12]}" + logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + audio_uuid, + audio_file_path, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + depends_on=depends_on_job, + job_id=transcribe_job_id, + description=f"Transcribe conversation {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + ) + logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}") + crop_depends_on = transcription_job + + # Step 2: Audio cropping job (depends on transcription if it ran, otherwise depends_on_job) crop_job_id = f"crop_{conversation_id[:12]}" logger.info(f"🔍 DEBUG: Creating crop job with job_id={crop_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -358,38 +380,15 @@ def start_post_conversation_jobs( audio_file_path, job_timeout=300, # 5 minutes result_ttl=JOB_RESULT_TTL, - depends_on=depends_on_job, + depends_on=crop_depends_on, job_id=crop_job_id, description=f"Crop audio for conversation {conversation_id[:8]}", meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} ) logger.info(f"📥 RQ: Enqueued cropping job {cropping_job.id}, meta={cropping_job.meta}") - # Step 2: Transcription job (conditional) - transcription_job = None - if post_transcription: - transcribe_job_id = f"transcribe_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") - - transcription_job = transcription_queue.enqueue( - transcribe_full_audio_job, - conversation_id, - audio_uuid, - audio_file_path, - version_id, - "batch", # trigger - job_timeout=1800, # 30 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=cropping_job, - job_id=transcribe_job_id, - description=f"Transcribe conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} - ) - logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta} (depends on {cropping_job.id})") - speaker_depends_on = transcription_job - else: - logger.info(f"⏭️ RQ: Skipping transcription (streaming already has transcript)") - speaker_depends_on = cropping_job + # Speaker recognition depends on cropping + speaker_depends_on = cropping_job # Step 3: Speaker recognition job speaker_job_id = f"speaker_{conversation_id[:12]}" diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 1c7b227a..0a74b8f7 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -102,10 +102,27 @@ async def process_cropping_job( # Calculate cropped duration cropped_duration_seconds = sum(end - start for start, end in speech_segments) - # Update conversation with cropped audio path + # Update segment timestamps to match cropped audio + # After cropping, segments are concatenated, so they need new timestamps + updated_segments = [] + current_time = 0.0 + for i, seg in enumerate(segments): + original_duration = seg.end - seg.start + updated_seg = seg.model_copy() + updated_seg.start = current_time + updated_seg.end = current_time + original_duration + updated_segments.append(updated_seg) + current_time += original_duration + logger.debug(f"Updated segment {i}: {seg.start:.2f}-{seg.end:.2f}s → {updated_seg.start:.2f}-{updated_seg.end:.2f}s") + + # Update conversation with cropped audio path and adjusted segments conversation.cropped_audio_path = cropped_filename + conversation.segments = updated_segments + # Also update the active transcript version segments + if conversation.active_transcript: + conversation.active_transcript.segments = updated_segments await conversation.save() - logger.info(f"💾 Updated conversation {conversation_id[:12]} with cropped_audio_path: {cropped_filename}") + logger.info(f"💾 Updated conversation {conversation_id[:12]} with cropped_audio_path and adjusted {len(updated_segments)} segment timestamps") logger.info(f"✅ RQ: Completed audio cropping for conversation {conversation_id} ({cropped_duration_seconds:.1f}s)") diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 1d3400c3..edffa65f 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -64,6 +64,14 @@ async def open_conversation_job( conversation_id = conversation.conversation_id # Get the auto-generated ID logger.info(f"✅ Created streaming conversation {conversation_id} for session {session_id}") + # Update THIS job's metadata with conversation_id (for Queue page grouping) + if current_job: + if not current_job.meta: + current_job.meta = {} + current_job.meta['conversation_id'] = conversation_id + current_job.save_meta() + logger.info(f"🔗 Updated open_conversation_job {current_job.id[:12]} metadata with conversation_id") + # Update speech detection job metadata with conversation_id if speech_job_id: try: diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 370117f1..269a9606 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -254,12 +254,12 @@ export default function Conversations() { } } - const handleSegmentPlayPause = (audioUuid: string, segmentIndex: number, segment: any, audioPath: string) => { - const segmentId = `${audioUuid}-${segmentIndex}`; - + const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, audioPath: string) => { + const segmentId = `${conversationId}-${segmentIndex}`; + // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[audioUuid]; + const audio = audioRefs.current[conversationId]; if (audio) { audio.pause(); } @@ -270,11 +270,11 @@ export default function Conversations() { setPlayingSegment(null); return; } - + // Stop any currently playing segment if (playingSegment) { - const [currentAudioUuid] = playingSegment.split('-'); - const currentAudio = audioRefs.current[currentAudioUuid]; + const [currentConversationId] = playingSegment.split('-'); + const currentAudio = audioRefs.current[currentConversationId]; if (currentAudio) { currentAudio.pause(); } @@ -283,12 +283,12 @@ export default function Conversations() { segmentTimerRef.current = null; } } - + // Get or create audio element for this conversation - let audio = audioRefs.current[audioUuid]; + let audio = audioRefs.current[conversationId]; if (!audio) { audio = new Audio(`${BACKEND_URL}/audio/${audioPath}`); - audioRefs.current[audioUuid] = audio; + audioRefs.current[conversationId] = audio; // Add event listener to handle when audio ends naturally audio.addEventListener('ended', () => { @@ -603,10 +603,12 @@ export default function Conversations() { return conversation.segments.map((segment, index) => { const speaker = segment.speaker || 'Unknown'; const speakerColor = speakerColorMap[speaker]; - const segmentId = `${conversation.audio_uuid}-${index}`; + // Use conversation_id for unique segment IDs (falls back to audio_uuid for backward compatibility) + const conversationKey = conversation.conversation_id || conversation.audio_uuid; + const segmentId = `${conversationKey}-${index}`; const isPlaying = playingSegment === segmentId; - const audioPath = debugMode - ? conversation.audio_path + const audioPath = debugMode + ? conversation.audio_path : conversation.cropped_audio_path || conversation.audio_path; return ( @@ -619,7 +621,7 @@ export default function Conversations() { {/* Play/Pause Button */} {audioPath && (