Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ def start_post_conversation_jobs(
Start post-conversation processing jobs after conversation is created.

This creates the standard processing chain after a conversation is created:
1. Audio cropping job - Removes silence from audio
2. [Optional] Transcription job - Batch transcription (if post_transcription=True)
1. [Optional] Transcription job - Batch transcription (if post_transcription=True)
2. Audio cropping job - Removes silence from audio
3. Speaker recognition job - Identifies speakers in audio
4. Memory extraction job - Extracts memories from conversation (parallel)
5. Title/summary generation job - Generates title and summary (parallel)
Expand All @@ -348,7 +348,29 @@ def start_post_conversation_jobs(

version_id = transcript_version_id or str(uuid.uuid4())

# Step 1: Audio cropping job
# Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps)
# Even for streaming, we need batch transcription before cropping to fix cumulative timestamps
transcribe_job_id = f"transcribe_{conversation_id[:12]}"
logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
audio_uuid,
audio_file_path,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
depends_on=depends_on_job,
job_id=transcribe_job_id,
description=f"Transcribe conversation {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
)
logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}")
crop_depends_on = transcription_job

# Step 2: Audio cropping job (depends on transcription if it ran, otherwise depends_on_job)
crop_job_id = f"crop_{conversation_id[:12]}"
logger.info(f"🔍 DEBUG: Creating crop job with job_id={crop_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")

Expand All @@ -358,38 +380,15 @@ def start_post_conversation_jobs(
audio_file_path,
job_timeout=300, # 5 minutes
result_ttl=JOB_RESULT_TTL,
depends_on=depends_on_job,
depends_on=crop_depends_on,
job_id=crop_job_id,
description=f"Crop audio for conversation {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
)
logger.info(f"📥 RQ: Enqueued cropping job {cropping_job.id}, meta={cropping_job.meta}")

# Step 2: Transcription job (conditional)
transcription_job = None
if post_transcription:
transcribe_job_id = f"transcribe_{conversation_id[:12]}"
logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
audio_uuid,
audio_file_path,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
depends_on=cropping_job,
job_id=transcribe_job_id,
description=f"Transcribe conversation {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
)
logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta} (depends on {cropping_job.id})")
speaker_depends_on = transcription_job
else:
logger.info(f"⏭️ RQ: Skipping transcription (streaming already has transcript)")
speaker_depends_on = cropping_job
# Speaker recognition depends on cropping
speaker_depends_on = cropping_job

# Step 3: Speaker recognition job
speaker_job_id = f"speaker_{conversation_id[:12]}"
Expand Down
21 changes: 19 additions & 2 deletions backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,27 @@ async def process_cropping_job(
# Calculate cropped duration
cropped_duration_seconds = sum(end - start for start, end in speech_segments)

# Update conversation with cropped audio path
# Update segment timestamps to match cropped audio
# After cropping, segments are concatenated, so they need new timestamps
updated_segments = []
current_time = 0.0
for i, seg in enumerate(segments):
original_duration = seg.end - seg.start
updated_seg = seg.model_copy()
updated_seg.start = current_time
updated_seg.end = current_time + original_duration
updated_segments.append(updated_seg)
current_time += original_duration
logger.debug(f"Updated segment {i}: {seg.start:.2f}-{seg.end:.2f}s → {updated_seg.start:.2f}-{updated_seg.end:.2f}s")

# Update conversation with cropped audio path and adjusted segments
conversation.cropped_audio_path = cropped_filename
conversation.segments = updated_segments
# Also update the active transcript version segments
if conversation.active_transcript:
conversation.active_transcript.segments = updated_segments
await conversation.save()
logger.info(f"💾 Updated conversation {conversation_id[:12]} with cropped_audio_path: {cropped_filename}")
logger.info(f"💾 Updated conversation {conversation_id[:12]} with cropped_audio_path and adjusted {len(updated_segments)} segment timestamps")

logger.info(f"✅ RQ: Completed audio cropping for conversation {conversation_id} ({cropped_duration_seconds:.1f}s)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ async def open_conversation_job(
conversation_id = conversation.conversation_id # Get the auto-generated ID
logger.info(f"✅ Created streaming conversation {conversation_id} for session {session_id}")

# Update THIS job's metadata with conversation_id (for Queue page grouping)
if current_job:
if not current_job.meta:
current_job.meta = {}
current_job.meta['conversation_id'] = conversation_id
current_job.save_meta()
logger.info(f"🔗 Updated open_conversation_job {current_job.id[:12]} metadata with conversation_id")

# Update speech detection job metadata with conversation_id
if speech_job_id:
try:
Expand Down
30 changes: 16 additions & 14 deletions backends/advanced/webui/src/pages/Conversations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ export default function Conversations() {
}
}

const handleSegmentPlayPause = (audioUuid: string, segmentIndex: number, segment: any, audioPath: string) => {
const segmentId = `${audioUuid}-${segmentIndex}`;
const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, audioPath: string) => {
const segmentId = `${conversationId}-${segmentIndex}`;

// If this segment is already playing, pause it
if (playingSegment === segmentId) {
const audio = audioRefs.current[audioUuid];
const audio = audioRefs.current[conversationId];
if (audio) {
audio.pause();
}
Expand All @@ -270,11 +270,11 @@ export default function Conversations() {
setPlayingSegment(null);
return;
}

// Stop any currently playing segment
if (playingSegment) {
const [currentAudioUuid] = playingSegment.split('-');
const currentAudio = audioRefs.current[currentAudioUuid];
const [currentConversationId] = playingSegment.split('-');
const currentAudio = audioRefs.current[currentConversationId];
if (currentAudio) {
currentAudio.pause();
}
Expand All @@ -283,12 +283,12 @@ export default function Conversations() {
segmentTimerRef.current = null;
}
}

// Get or create audio element for this conversation
let audio = audioRefs.current[audioUuid];
let audio = audioRefs.current[conversationId];
if (!audio) {
audio = new Audio(`${BACKEND_URL}/audio/${audioPath}`);
audioRefs.current[audioUuid] = audio;
audioRefs.current[conversationId] = audio;

// Add event listener to handle when audio ends naturally
audio.addEventListener('ended', () => {
Expand Down Expand Up @@ -603,10 +603,12 @@ export default function Conversations() {
return conversation.segments.map((segment, index) => {
const speaker = segment.speaker || 'Unknown';
const speakerColor = speakerColorMap[speaker];
const segmentId = `${conversation.audio_uuid}-${index}`;
// Use conversation_id for unique segment IDs (falls back to audio_uuid for backward compatibility)
const conversationKey = conversation.conversation_id || conversation.audio_uuid;
const segmentId = `${conversationKey}-${index}`;
const isPlaying = playingSegment === segmentId;
const audioPath = debugMode
? conversation.audio_path
const audioPath = debugMode
? conversation.audio_path
: conversation.cropped_audio_path || conversation.audio_path;

return (
Expand All @@ -619,7 +621,7 @@ export default function Conversations() {
{/* Play/Pause Button */}
{audioPath && (
<button
onClick={() => handleSegmentPlayPause(conversation.audio_uuid, index, segment, audioPath)}
onClick={() => handleSegmentPlayPause(conversationKey, index, segment, audioPath)}
className={`flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center transition-colors mt-0.5 ${
isPlaying
? 'bg-blue-600 text-white hover:bg-blue-700'
Expand Down
Loading