Implement close conversation functionality#138
Conversation
This squashed commit includes all changes from the close-convo branch: - Added close conversation endpoint and functionality - Improved job queue management and UI - Enhanced audio processing and transcription handling - Updated speaker recognition integration - Various bug fixes and improvements
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR restructures the advanced backend architecture around conversation-based audio processing, introducing audio cropping workflows, refactored RQ job orchestration, new audio utilities with FFmpeg integration, and extensive documentation. Key changes include removing files from .gitignore, migrating job management to queue controllers, introducing session management, and consolidating audio preprocessing into a dedicated pipeline. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant WebSocket as WebSocket Handler
participant ConvJob as open_conversation_job
participant AudioCrop as process_cropping_job
participant Transcribe as transcribe_full_audio_job
participant SpeakerRec as recognise_speakers_job
participant Memory as process_memory_job
participant DB as Database
Client->>WebSocket: Send audio chunks (batch)
WebSocket->>DB: Create Conversation (auto-id)
WebSocket->>ConvJob: Enqueue (speech_job_id)
ConvJob->>ConvJob: Monitor transcription results
ConvJob->>ConvJob: Analyze speech meaningfulness
ConvJob->>ConvJob: Check enrolled speakers (if enabled)
Note over ConvJob: Meaningful speech detected
ConvJob->>DB: Update audio_path on Conversation
ConvJob->>AudioCrop: Enqueue process_cropping_job
AudioCrop->>DB: Fetch Conversation with segments
AudioCrop->>AudioCrop: Extract speech segments
AudioCrop->>AudioCrop: Crop audio with FFmpeg
AudioCrop->>DB: Update cropped_audio_path
AudioCrop->>Transcribe: Enqueue (depends on cropping)
Transcribe->>DB: Fetch Conversation context
Transcribe->>Transcribe: Transcribe audio
Transcribe->>DB: Store transcript version
Transcribe->>SpeakerRec: Enqueue (depends on transcription)
SpeakerRec->>DB: Fetch transcript & audio
SpeakerRec->>SpeakerRec: Recognize speakers
SpeakerRec->>Memory: Enqueue (depends on speaker rec)
Memory->>DB: Fetch Conversation with transcript
Memory->>Memory: Check primary speakers filter
Memory->>Memory: Extract memories from conversation
Memory->>DB: Store memory references
Memory->>DB: Update Conversation with memory IDs
Memory-->>Client: Processing complete
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes This PR involves substantial architectural refactoring across multiple interconnected systems: new audio processing pipelines with FFmpeg integration, comprehensive worker/job orchestration changes, database model updates, API route consolidation, and significant documentation overhaul. The changes are heterogeneous (spanning utilities, controllers, models, workers, deployment configs, and frontend), require understanding of new dependency flows (audio cropping → transcription → speaker recognition → memory), and introduce new public APIs and data structures. The scope and density of logic changes—particularly around job queuing, session management, and audio processing—demand careful review of control flow, error handling, and data consistency. Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai check my codes |
|
@stuartalexander-nexus: I'll review the code changes in this PR right away. Let me analyze the implementation of the close conversation functionality and all the related improvements you've made. ✅ Actions performedReview triggered.
|
|
@thestumonkey: I'll review the code changes in this PR. I'll analyze the implementation of the close conversation functionality, job queue management improvements, audio processing enhancements, speaker recognition integration, and all the related improvements and bug fixes you've made. ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 32
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (22)
extras/speaker-recognition/.env.template (1)
5-6: Documentation syntax mismatch: comments reference--groupbut Dockerfile uses--extra.Comments on lines 5–6 suggest using
uv sync --group cpuanduv sync --group gpu, but the Dockerfile (line 28 and 41) uses--extraflag instead. Align the documentation with the actual Dockerfile implementation to avoid confusion during local setup.Apply this diff to correct the documentation:
-# uv sync --group cpu (CPU-only, lighter, works everywhere) -# uv sync --group gpu (GPU acceleration, requires NVIDIA+CUDA) +# uv sync --extra cpu (CPU-only, lighter, works everywhere) +# uv sync --extra gpu (GPU acceleration, requires NVIDIA+CUDA)extras/speaker-recognition/docker-compose.yml (1)
43-61: Missing YAML anchor&base-speaker-servicewill cause docker-compose parse failure.The
speaker-service-gpuservice at line 44 references<<: *base-speaker-service, but no anchor definition&base-speaker-serviceexists in the file. The mainspeaker-service(lines 3-41) is not defined with this anchor. This will cause a YAML parsing error and the compose file will fail to load.Fix: Add
&base-speaker-serviceanchor to the mainspeaker-servicedefinition (afterspeaker-service:on line 3), or remove the alias and manually duplicate the necessary configuration in the GPU service.extras/speaker-recognition/README.md (1)
59-61: Port mismatch: 5174 vs default 5173Direct Access shows 5174 but REACT_UI_PORT defaults to 5173 above. Fix to avoid confusion.
-**Direct Access**: http://localhost:5174/ (or configured REACT_UI_PORT) +**Direct Access**: http://localhost:${REACT_UI_PORT:-5173}/Makefile (1)
188-202: Fix help text and deploy target dependencies to match actual config-k8s behaviorThe review comment correctly identifies a critical inconsistency:
- Help text (lines 46, 87) claims config-k8s generates "Skaffold env + ConfigMap/Secret", but target description now accurately says "ConfigMap/Secret only"
- Dependency bug:
deploy-k8s(line 239) anddeploy-apps(line 252) depend only onconfig-k8s, which does not generateskaffold.envskaffold.envis produced bygenerate-docker-configs.py(called inconfig-docker, line 185)- Both deploy targets try to
source skaffold.env(lines 241, 254) but will fail if run without first executingconfig-dockerRecommended fixes:
- Update help text at lines 46 and 87 to: "Generate Kubernetes files (ConfigMap/Secret only)"
- Change
deploy-k8sanddeploy-appsdependencies fromconfig-k8stoconfig-all(which calls bothconfig-dockerandconfig-k8s), or explicitly addconfig-dockeras a prerequisitebackends/advanced/init.py (1)
132-136: Collect API keys via hidden input (don’t echo secrets).Current prompts echo keys to the terminal history/logs. Use getpass with safe fallbacks.
Apply:
-api_key = self.prompt_value("Deepgram API key (leave empty to skip)", "") +try: + api_key = getpass.getpass("Deepgram API key (leave empty to skip): ") +except (EOFError, KeyboardInterrupt): + api_key = ""-api_key = self.prompt_value("Mistral API key (leave empty to skip)", "") +try: + api_key = getpass.getpass("Mistral API key (leave empty to skip): ") +except (EOFError, KeyboardInterrupt): + api_key = ""-api_key = self.prompt_value("OpenAI API key (leave empty to skip)", "") +try: + api_key = getpass.getpass("OpenAI API key (leave empty to skip): ") +except (EOFError, KeyboardInterrupt): + api_key = ""Also applies to: 145-151, 182-190
extras/speaker-recognition/webui/src/pages/Enrollment.tsx (2)
238-243: Ensure media tracks stop even if processing throws.Wrap in try/finally so tracks are always stopped.
- mediaRecorder.onstop = async () => { - const blob = new Blob(audioChunksRef.current, { type: mimeType || 'audio/webm' }) - await processRecording(blob) - stream.getTracks().forEach(track => track.stop()) - } + mediaRecorder.onstop = async () => { + try { + const blob = new Blob(audioChunksRef.current, { type: mimeType || 'audio/webm' }) + await processRecording(blob) + } finally { + stream.getTracks().forEach(track => track.stop()) + } + }
339-344: Close previous AudioContext to avoid leaks/limit errors.Creating a new context per recording without closing the previous can exhaust the browser limit.
- const audioContext = createAudioContext() - audioContextRef.current = audioContext + if (audioContextRef.current && audioContextRef.current.state !== 'closed') { + await audioContextRef.current.close() + } + const audioContext = createAudioContext() + audioContextRef.current = audioContextbackends/advanced/src/advanced_omi_backend/database.py (2)
384-453: Duplicate method name: earlier version will be shadowed by later definition.AudioChunksRepository.update_transcription_status is defined again at Lines 517–548, overriding this versioned one at runtime. Unify into a single implementation or rename the simpler variant.
@@ - async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None - ): + async def update_transcription_status( + self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None + ): """Update transcription processing status and completion timestamp. @@ - if not active_version: + if not active_version: # Create initial transcript version if none exists @@ - else: + else: # Update existing active version @@ if result.modified_count > 0: logger.info(f"Updated transcription status to {status} for {audio_uuid}") return result.modified_count > 0Follow-up: remove the second definition at Lines 517–548 or have it delegate to this one to preserve versioning.
296-316: Normalize timestamp storage to ISO strings.Mixed datetime types (datetime vs ISO string) will cause inconsistent serialization/querying.
- "cropped_at": datetime.now(UTC), + "cropped_at": datetime.now(UTC).isoformat(),- f"memory_versions.$[version].updated_at": datetime.now(UTC), + f"memory_versions.$[version].updated_at": datetime.now(UTC).isoformat(),- if provider: - update_doc["transcription_provider"] = provider + if provider: + update_doc["transcription_provider"] = providerAlso ensure any other updated_at fields use .isoformat() for consistency.
Also applies to: 360-371, 533-543
backends/advanced/src/advanced_omi_backend/services/audio_service.py (3)
145-146: Wrong exception class; ResponseError should come from redis.exceptions.Catching aioredis.ResponseError won’t work; use redis.exceptions.ResponseError instead.
Apply:
-from typing import Any, Dict, Optional +from typing import Any, Dict, Optional +from redis.exceptions import ResponseError @@ - except aioredis.ResponseError as e: + except ResponseError as e: if "BUSYGROUP" not in str(e): raise @@ - except aioredis.ResponseError as e: + except ResponseError as e: if "BUSYGROUP" not in str(e): raise @@ - except aioredis.ResponseError: + except ResponseError: return {}Also applies to: 195-196, 324-325
60-60: Don’t log full Redis URL (may leak credentials).Sanitize before logging.
- logger.info(f"Audio stream service connected to Redis at {self.redis_url}") + from urllib.parse import urlsplit, urlunsplit + parts = urlsplit(self.redis_url) + netloc = f"{parts.hostname}:{parts.port}" if parts.hostname else parts.netloc + redacted = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)) + logger.info(f"Audio stream service connected to Redis at {redacted}")
114-122: Avoid storing user_email in stream payload; rely on user_id.Email in Redis Streams is PII with unclear retention.
- b"user_email": user_email.encode(), + # Avoid PII; user_id is sufficient for joins + # b"user_email": user_email.encode(),If email is required downstream, derive it server-side via user_id.
backends/advanced/src/advanced_omi_backend/models/job.py (2)
251-255: Avoid model→controller dependency; source REDIS_URL from config or env.Importing REDIS_URL from controllers creates a fragile cycle. Read from env or inject via kwargs.
- from advanced_omi_backend.controllers.queue_controller import REDIS_URL - redis_client = redis_async.from_url(REDIS_URL) + import os + redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") + redis_client = redis_async.from_url(redis_url)Alternatively, pass redis_url to the job via kwargs and prefer that over env.
192-195: Use logger.exception for failures (includes traceback automatically).Simplifies code and avoids duplicating exc_info.
- logger.error(f"❌ {job_name} failed after {elapsed:.2f}s: {e}", exc_info=True) + logger.exception(f"❌ {job_name} failed after {elapsed:.2f}s") @@ - logger.error(f"❌ {job_name} failed after {elapsed:.2f}s: {e}", exc_info=True) + logger.exception(f"❌ {job_name} failed after {elapsed:.2f}s")Also applies to: 276-278
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (3)
60-66: Don't mask 403 with a blanket 404; re-raise HTTPException.Permission denials raised inside try are caught and turned into 404s. Preserve the original HTTPException (403) and only map unexpected errors.
Apply:
try: job = Job.fetch(job_id, connection=redis_conn) @@ - except Exception as e: - logger.error(f"Failed to get job {job_id}: {e}") - raise HTTPException(status_code=404, detail="Job not found") + except HTTPException: + # Preserve explicit HTTP errors (e.g., 403) + raise + except Exception as e: + logger.exception("Failed to get job %s", job_id) + raise HTTPException(status_code=404, detail="Job not found") from eAlso applies to: 94-96
353-397: xinfo_groups parsing assumes list-of-pairs; redis-py returns dicts.Current loop indexes dicts by integer keys, which will fail silently and drop group/consumer info.
Robustly handle both structures:
- groups = await audio_service.redis.xinfo_groups(stream_name) - for group in groups: - group_dict = {} - # Parse group info (alternating key-value pairs) - for i in range(0, len(group), 2): - if i+1 < len(group): - key = group[i].decode() if isinstance(group[i], bytes) else str(group[i]) - value = group[i+1] - if isinstance(value, bytes): - try: - value = value.decode() - except: - value = str(value) - group_dict[key] = value + groups = await audio_service.redis.xinfo_groups(stream_name) + for group in groups: + # redis-py >=5 returns dicts; older returns flat lists + if isinstance(group, dict): + group_dict = { (k.decode() if isinstance(k, bytes) else str(k)): + (v.decode() if isinstance(v, bytes) else v) + for k, v in group.items() } + else: + group_dict = {} + for i in range(0, len(group), 2): + if i + 1 < len(group): + k = group[i] + v = group[i+1] + key = k.decode() if isinstance(k, bytes) else str(k) + if isinstance(v, bytes): + try: + v = v.decode() + except Exception: + v = str(v) + group_dict[key] = vDo the same defensive shape handling for xinfo_consumers below.
41-44: Pagination ‘has_more’ becomes stale after user filtering.After filtering jobs for a non-admin, pagination.total is recomputed but has_more isn't. Recompute or drop has_more for user-scoped lists to avoid misleading UIs.
- result["jobs"] = user_jobs - result["pagination"]["total"] = len(user_jobs) + result["jobs"] = user_jobs + result["pagination"]["total"] = len(user_jobs) + result["pagination"]["has_more"] = max(0, len(user_jobs) - offset) > limitbackends/advanced/src/advanced_omi_backend/controllers/audio_controller.py (1)
33-37: Use a consistent user identifier (user.user_id vs user.id).generate_client_id uses user.id while the rest of the file uses user.user_id. This can produce mismatched client IDs and ownership checks.
Apply:
-def generate_client_id(user: User, device_name: str) -> str: - """Generate client ID for uploaded files.""" - user_id_suffix = str(user.id)[-6:] +def generate_client_id(user: User, device_name: str) -> str: + """Generate client ID for uploaded files.""" + user_id_suffix = str(user.user_id)[-6:] return f"{user_id_suffix}-{device_name}"backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)
190-221: Fix enqueue args: process_memory_job now only accepts (conversation_id).Current enqueue passes client_id/user_id/user_email causing a runtime error when RQ calls the job.
Apply:
-def enqueue_memory_processing( - client_id: str, - user_id: str, - user_email: str, - conversation_id: str, - priority: JobPriority = JobPriority.NORMAL -): +def enqueue_memory_processing( + conversation_id: str, + priority: JobPriority = JobPriority.NORMAL +): @@ - job = memory_queue.enqueue( - process_memory_job, - client_id, - user_id, - user_email, - conversation_id, + job = memory_queue.enqueue( + process_memory_job, + conversation_id, job_timeout=timeout_mapping.get(priority, 1800), result_ttl=JOB_RESULT_TTL, job_id=f"memory_{conversation_id[:8]}", description=f"Process memory for conversation {conversation_id[:8]}" )Update all call sites accordingly (see conversation_controller.py comments).
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2)
354-367: Fix transcribe_full_audio_job argument order (user_id no longer in signature).The current enqueue shifts trigger into redis_client position and will break at runtime.
transcript_job = transcription_queue.enqueue( transcribe_full_audio_job, conversation_id, audio_uuid, str(full_audio_path), version_id, - str(user.user_id), - "reprocess", + "reprocess", job_timeout=600, result_ttl=JOB_RESULT_TTL, job_id=f"reprocess_{conversation_id[:8]}", description=f"Transcribe audio for {conversation_id[:8]}", meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} )
474-480: Align reprocess_memory with updated enqueue_memory_processing signature.After the helper is updated, call with just conversation_id.
- job = enqueue_memory_processing( - client_id=conversation_model.client_id, - user_id=str(user.user_id), - user_email=user.email, - conversation_id=conversation_id, - priority=JobPriority.NORMAL - ) + job = enqueue_memory_processing( + conversation_id=conversation_id, + priority=JobPriority.NORMAL + )backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (1)
376-387: Ensure file is closed on normal END path.If the loop breaks after
ENDand empty reads,file_sinkmay remain open. Close it before computing stats.- logger.info(f"✅ Stream empty after END signal - stopping audio collection") - break + logger.info("✅ Stream empty after END signal - stopping audio collection") + # Ensure file is closed before exit + if file_sink: + try: + await file_sink.close() + logger.info(f"✅ Closed file at loop end: {wav_filename}") + except Exception as e: + logger.debug(f"Ignored close() error: {e}") + break
| # Audio file serving | ||
| location /audio/ { | ||
| proxy_pass http://friend_backend/audio/; | ||
| proxy_set_header Host $host; | ||
| proxy_set_header X-Real-IP $remote_addr; | ||
| proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; | ||
| proxy_set_header X-Forwarded-Proto $scheme; | ||
| proxy_redirect off; | ||
|
|
||
| # Add headers for audio file serving | ||
| proxy_set_header Accept-Ranges bytes; | ||
| proxy_cache_bypass $http_range; | ||
| } |
There was a problem hiding this comment.
Accept-Ranges is being set as a request header; use a response header instead.
proxy_set_header Accept-Ranges bytes; won’t expose range support to clients. Add it as a response header (and optionally force range proxying).
Apply:
location /audio/ {
proxy_pass http://friend_backend/audio/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_redirect off;
- # Add headers for audio file serving
- proxy_set_header Accept-Ranges bytes;
- proxy_cache_bypass $http_range;
+ # Add headers for audio file serving
+ add_header Accept-Ranges bytes always;
+ proxy_cache_bypass $http_range;
+ # Optional: ensure upstream range handling
+ proxy_force_ranges on;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Audio file serving | |
| location /audio/ { | |
| proxy_pass http://friend_backend/audio/; | |
| proxy_set_header Host $host; | |
| proxy_set_header X-Real-IP $remote_addr; | |
| proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; | |
| proxy_set_header X-Forwarded-Proto $scheme; | |
| proxy_redirect off; | |
| # Add headers for audio file serving | |
| proxy_set_header Accept-Ranges bytes; | |
| proxy_cache_bypass $http_range; | |
| } | |
| location /audio/ { | |
| proxy_pass http://friend_backend/audio/; | |
| proxy_set_header Host $host; | |
| proxy_set_header X-Real-IP $remote_addr; | |
| proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; | |
| proxy_set_header X-Forwarded-Proto $scheme; | |
| proxy_redirect off; | |
| # Add headers for audio file serving | |
| add_header Accept-Ranges bytes always; | |
| proxy_cache_bypass $http_range; | |
| # Optional: ensure upstream range handling | |
| proxy_force_ranges on; | |
| } |
🤖 Prompt for AI Agents
In backends/advanced/nginx.conf around lines 172 to 184, the config incorrectly
sets "Accept-Ranges" as a request header using proxy_set_header; change it to
set the response header instead by replacing that line with an add_header
Accept-Ranges bytes; (use the "always" parameter if you need it sent on all
responses), and optionally enable proxy_force_ranges on in the same location to
force range support while proxying.
| # NOTE: ClientState is legacy V1 code. In V2 architecture, conversation closure | ||
| # is handled by the websocket controllers using RQ jobs directly. | ||
| # This method is kept minimal for backward compatibility. |
There was a problem hiding this comment.
🧩 Analysis chain
Verify whether this method is still actively used.
The comment suggests this is legacy code kept for backward compatibility, but the method is called from start_new_conversation() (Line 151) and disconnect() (Line 173), indicating active usage. If V2 architecture truly handles conversation closure elsewhere, consider either removing these calls or clarifying the comment to explain the dual responsibility.
🏁 Script executed:
#!/bin/bash
# Description: Search for all calls to close_current_conversation to verify usage patterns
# Find all calls to this method
rg -n -C3 'close_current_conversation\(' --type=pyLength of output: 5319
I need to examine the actual implementation of the method to complete the verification.
🏁 Script executed:
#!/bin/bash
# Description: Extract the full implementation of close_current_conversation in client.py
# Show lines 120-147 to see the full method implementation
sed -n '120,147p' backends/advanced/src/advanced_omi_backend/client.pyLength of output: 95
🏁 Script executed:
#!/bin/bash
# Description: Extract the full implementation of close_current_conversation in client.py
# Show lines 120-147 to see the full method implementation
sed -n '120,147p' backends/advanced/src/advanced_omi_backend/client.pyLength of output: 1288
Clarify or correct the misleading comment about legacy code.
The comment states this is "legacy V1 code kept minimal for backward compatibility," but the method is actively used through a current HTTP API endpoint (@router.post("/{client_id}/close") in conversation_routes.py) that routes to this exact method. This is part of the active V2 architecture, not legacy backward compatibility. Either correct the comment to reflect its current role or remove the misleading claim.
Additionally, the method is marked async but contains no await statements—the async keyword is unnecessary.
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/client.py around lines 135 to 137,
the comment incorrectly labels this method as "legacy V1" kept only for backward
compatibility despite it being invoked by the active V2 HTTP endpoint
(conversation_routes.py POST /{client_id}/close); update or replace the comment
to accurately state that this method is part of the current V2 flow (or remove
the misleading legacy claim), and because the method contains no await
expressions, remove the unnecessary async modifier and make it a synchronous
method so its signature matches its implementation and avoids misleading
callers.
| if not full_audio_path: | ||
| return JSONResponse( | ||
| status_code=422, | ||
| content={ | ||
| "error": "Audio file not found on disk", | ||
| "details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations", | ||
| "searched_paths": [str(p) for p in possible_paths] | ||
| } | ||
| ) |
There was a problem hiding this comment.
Avoid leaking internal paths in API errors.
Returning searched_paths exposes internal filesystem layout. Hide behind DEBUG or omit.
- return JSONResponse(
+ return JSONResponse(
status_code=422,
content={
"error": "Audio file not found on disk",
- "details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations",
- "searched_paths": [str(p) for p in possible_paths]
+ "details": "Conversation exists but audio file is missing from expected locations"
}
)🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py
around lines 253 to 261, the API error returns searched_paths which leaks
internal filesystem locations; change the response to omit or redact searched
paths unless running in a DEBUG mode. Update the handler to return a generic
error and details without paths in production, and conditionally include the
searched_paths only when a configuration flag (e.g., settings.DEBUG or an env
var) is true; additionally log the full searched_paths/server paths to the
server logger for diagnostics rather than returning them in the HTTP response.
| # Job 4: Extract memories (depends on cropping) | ||
| memory_job = memory_queue.enqueue( | ||
| process_memory_job, | ||
| None, # client_id - will be read from conversation in DB | ||
| str(user.user_id), | ||
| "", # user_email - will be read from user in DB | ||
| conversation_id, | ||
| depends_on=speaker_job, | ||
| depends_on=cropping_job, | ||
| job_timeout=1800, | ||
| result_ttl=JOB_RESULT_TTL, | ||
| job_id=f"memory_{conversation_id[:8]}", | ||
| description=f"Extract memories for {conversation_id[:8]}", | ||
| meta={'audio_uuid': audio_uuid} | ||
| meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} | ||
| ) |
There was a problem hiding this comment.
Update memory job enqueue to new process_memory_job signature.
process_memory_job now derives identity; pass only conversation_id and depends_on.
memory_job = memory_queue.enqueue(
process_memory_job,
- None, # client_id - will be read from conversation in DB
- str(user.user_id),
- "", # user_email - will be read from user in DB
- conversation_id,
+ conversation_id,
depends_on=cropping_job,
job_timeout=1800,
result_ttl=JOB_RESULT_TTL,
job_id=f"memory_{conversation_id[:8]}",
description=f"Extract memories for {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Job 4: Extract memories (depends on cropping) | |
| memory_job = memory_queue.enqueue( | |
| process_memory_job, | |
| None, # client_id - will be read from conversation in DB | |
| str(user.user_id), | |
| "", # user_email - will be read from user in DB | |
| conversation_id, | |
| depends_on=speaker_job, | |
| depends_on=cropping_job, | |
| job_timeout=1800, | |
| result_ttl=JOB_RESULT_TTL, | |
| job_id=f"memory_{conversation_id[:8]}", | |
| description=f"Extract memories for {conversation_id[:8]}", | |
| meta={'audio_uuid': audio_uuid} | |
| meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} | |
| ) | |
| # Job 4: Extract memories (depends on cropping) | |
| memory_job = memory_queue.enqueue( | |
| process_memory_job, | |
| conversation_id, | |
| depends_on=cropping_job, | |
| job_timeout=1800, | |
| result_ttl=JOB_RESULT_TTL, | |
| job_id=f"memory_{conversation_id[:8]}", | |
| description=f"Extract memories for {conversation_id[:8]}", | |
| meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} | |
| ) |
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 401 to 414, the memory_job enqueue call passes the old positional
arguments (client_id, user_id, user_email, conversation_id) to
process_memory_job; update the call to match the new signature which only
requires conversation_id and depends_on. Remove the extra positional args and
enqueue process_memory_job with just conversation_id, keep
depends_on=cropping_job, preserve job_timeout, result_ttl, job_id, description
and meta as-is (e.g., meta={'audio_uuid': audio_uuid, 'conversation_id':
conversation_id}). Ensure no other positional parameters are passed so the
function uses its internal identity derivation.
| # Get pending messages for this specific consumer | ||
| try: | ||
| pending_messages = await redis_client.execute_command( | ||
| 'XPENDING', stream_name, group_name, '-', '+', str(consumer_pending), consumer_name | ||
| ) | ||
|
|
||
| # XPENDING returns flat list: [msg_id, consumer, idle_ms, delivery_count, msg_id, ...] | ||
| # Parse in groups of 4 | ||
| for i in range(0, len(pending_messages), 4): | ||
| if i < len(pending_messages): | ||
| msg_id = pending_messages[i] | ||
| if isinstance(msg_id, bytes): | ||
| msg_id = msg_id.decode() | ||
|
|
||
| # Claim the message to a cleanup worker | ||
| try: | ||
| await redis_client.execute_command( | ||
| 'XCLAIM', stream_name, group_name, 'cleanup-worker', '0', msg_id | ||
| ) | ||
|
|
||
| # Acknowledge it immediately | ||
| await redis_client.xack(stream_name, group_name, msg_id) | ||
| cleaned_count += 1 | ||
| except Exception as claim_error: | ||
| logger.warning(f"Failed to claim/ack message {msg_id}: {claim_error}") |
There was a problem hiding this comment.
Fix XPENDING parsing; handle both array-of-arrays and flat forms.
Current code assumes a flat list; Redis typically returns a list of [id, consumer, idle, delivered].
- pending_messages = await redis_client.execute_command(
+ pending_messages = await redis_client.execute_command(
'XPENDING', stream_name, group_name, '-', '+', str(consumer_pending), consumer_name
)
-
- # XPENDING returns flat list: [msg_id, consumer, idle_ms, delivery_count, msg_id, ...]
- # Parse in groups of 4
- for i in range(0, len(pending_messages), 4):
- if i < len(pending_messages):
- msg_id = pending_messages[i]
- if isinstance(msg_id, bytes):
- msg_id = msg_id.decode()
- # Claim the message to a cleanup worker
- try:
- await redis_client.execute_command(
- 'XCLAIM', stream_name, group_name, 'cleanup-worker', '0', msg_id
- )
- # Acknowledge it immediately
- await redis_client.xack(stream_name, group_name, msg_id)
- cleaned_count += 1
- except Exception as claim_error:
- logger.warning(f"Failed to claim/ack message {msg_id}: {claim_error}")
+ # XPENDING returns list of entries: [[id, consumer, idle_ms, deliveries], ...]
+ entries = pending_messages or []
+ # Backward-compat: flatten if driver returns a flat list
+ if entries and not isinstance(entries[0], (list, tuple)):
+ entries = [entries[i:i+4] for i in range(0, len(entries), 4)]
+ for entry in entries:
+ if not entry:
+ continue
+ msg_id = entry[0].decode() if isinstance(entry[0], bytes) else entry[0]
+ try:
+ await redis_client.execute_command(
+ 'XCLAIM', stream_name, group_name, 'cleanup-worker', '0', msg_id
+ )
+ await redis_client.xack(stream_name, group_name, msg_id)
+ cleaned_count += 1
+ except Exception as claim_error:
+ logger.warning(f"Failed to claim/ack message {msg_id}: {claim_error}")🧰 Tools
🪛 Ruff (0.14.1)
635-635: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
around lines 612 to 636, the XPENDING response is being treated only as a flat
list; update the parsing to handle both the flat list form (flat sequence of
[id, consumer, idle, delivered, ...]) and the array-of-arrays form (list of
entries like [id, consumer, idle, delivered]). Detect which shape was returned,
iterate the correct structure, decode bytes to str for msg_id, guard against
incomplete groups/entries, and then claim and xack each msg_id as before; ensure
you skip or log malformed entries instead of raising on unexpected shapes.
| uv run --with-requirements setup-requirements.txt python init.py | ||
| ``` |
There was a problem hiding this comment.
🧩 Analysis chain
Verify setup command/files and cross-link init docs.
Please confirm these exist and match the new init workflow, and add a link to Docs/init-system.md for context.
🏁 Script executed:
#!/bin/bash
# Verify referenced files exist
fd -a '^init\.py$' -H
fd -a '^services\.py$' -H
fd -a '^setup-requirements\.txt$' -H
# Grep for "init.py" in docs to ensure consistency
rg -n "init\.py" -g "!**/node_modules/**"Length of output: 2752
🏁 Script executed:
#!/bin/bash
# Check the context around lines 96-97 in quickstart.md
# Look for any existing links to Docs/init-system.md or other documentation
cat quickstart.md | head -110 | tail -30Length of output: 841
Add cross-link to initialization documentation.
Files exist and command is correct. Add a link to Docs/init-system.md near line 96 to provide users with context on the setup wizard's capabilities and configuration options.
🤖 Prompt for AI Agents
In quickstart.md around lines 96–97, the uv run command is correct but lacks a
cross-link to the initialization docs; add a Markdown link to
Docs/init-system.md adjacent to the command (either on the following line or
inline in the surrounding sentence) with a short descriptive label like
"Initialization documentation" or "See Docs/init-system.md" and ensure the
relative path and link text follow the file's existing link/style conventions so
users can easily jump to the setup-wizard/configuration details.
| *[WebUI Dashboard - Screenshot coming soon]* | ||
|
|
||
|  | ||
|  | ||
|
|
||
| ### Memory Search | ||
| *[Mobile App - Screenshot coming soon]* | ||
|
|
||
|  | ||
|  | ||
|
|
||
| *[Mobile App - Screenshot coming soon]* | ||
| *[Memory Search - Screenshot coming soon]* | ||
|
|
||
|  | ||
|  | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
Verify screenshot assets exist to avoid broken links.
Paths switched to screenshots/*.png with “coming soon” notes. Please confirm files are present or gate links until added.
Run from repo root:
🏁 Script executed:
#!/usr/bin/env bash
set -euo pipefail
echo "Listing expected screenshots:"
for f in screenshots/dashboard.png screenshots/mobile-app.png screenshots/memory-search.png; do
if [ -f "$f" ]; then echo "OK - $f"; else echo "MISS- $f"; fi
doneLength of output: 130
Add missing screenshot files or remove broken image references.
The README references three screenshot files that don't exist in the repository:
screenshots/dashboard.pngscreenshots/mobile-app.pngscreenshots/memory-search.png
This causes broken image links in the rendered documentation. Either commit the screenshot files to the screenshots/ directory or replace the image markdown with placeholder text until images are ready.
🤖 Prompt for AI Agents
In README.md around lines 11 to 22, the README references three missing images
(screenshots/dashboard.png, screenshots/mobile-app.png,
screenshots/memory-search.png) causing broken links; either add the actual image
files at those paths in the repository or replace each markdown image reference
with placeholder text or a single-line note like "[Screenshot coming soon]" so
the README renders cleanly; update the file accordingly and commit the added
images under screenshots/ if available.
| if service_name == 'speaker-recognition' and command == 'up': | ||
| # Read configuration to determine how to start | ||
| env_file = service_path / '.env' | ||
| if env_file.exists(): | ||
| env_values = dotenv_values(env_file) | ||
| compute_mode = env_values.get('COMPUTE_MODE', 'cpu') | ||
|
|
||
| # Add profile flag for both up and down commands | ||
| if compute_mode == 'gpu': | ||
| cmd.extend(['--profile', 'gpu']) | ||
| https_enabled = env_values.get('REACT_UI_HTTPS', 'false') | ||
|
|
||
| if https_enabled.lower() == 'true': | ||
| # HTTPS mode: start with profile for all services (includes nginx) | ||
| if compute_mode == 'gpu': | ||
| cmd.extend(['--profile', 'gpu']) | ||
| else: | ||
| cmd.extend(['--profile', 'cpu']) | ||
| cmd.extend(['up', '-d']) | ||
| else: | ||
| cmd.extend(['--profile', 'cpu']) | ||
|
|
||
| if command == 'up': | ||
| https_enabled = env_values.get('REACT_UI_HTTPS', 'false') | ||
| if https_enabled.lower() == 'true': | ||
| # HTTPS mode: start with profile for all services (includes nginx) | ||
| cmd.extend(['up', '-d']) | ||
| # HTTP mode: start specific services with profile (no nginx) | ||
| if compute_mode == 'gpu': | ||
| cmd.extend(['--profile', 'gpu']) | ||
| else: | ||
| # HTTP mode: start specific services with profile (no nginx) | ||
| cmd.extend(['up', '-d', 'speaker-service-gpu' if compute_mode == 'gpu' else 'speaker-service-cpu', 'web-ui']) | ||
| elif command == 'down': | ||
| cmd.extend(['down']) | ||
| cmd.extend(['--profile', 'cpu']) | ||
| cmd.extend(['up', '-d', 'speaker-service-gpu' if compute_mode == 'gpu' else 'speaker-service-cpu', 'web-ui']) | ||
| else: | ||
| # Fallback: no profile | ||
| if command == 'up': | ||
| cmd.extend(['up', '-d']) | ||
| elif command == 'down': | ||
| cmd.extend(['down']) | ||
| # Fallback: just start base service | ||
| cmd.extend(['up', '-d']) | ||
| else: |
There was a problem hiding this comment.
Parse REACT_UI_HTTPS robustly; current check only matches 'true'.
Allow common truthy values to avoid mis-start (e.g., '1', 'True', 'YES').
Apply:
-https_enabled = env_values.get('REACT_UI_HTTPS', 'false')
-
-if https_enabled.lower() == 'true':
+https_enabled_val = str(env_values.get('REACT_UI_HTTPS', 'false'))
+https_enabled = https_enabled_val.strip().lower() in ('1','true','yes','on')
+
+if https_enabled:🤖 Prompt for AI Agents
In services.py around lines 75 to 100, the REACT_UI_HTTPS check only accepts the
literal 'true', so update the check to treat common truthy values (e.g., '1',
'true', 'yes', 'y', 'on') by normalizing the env value (strip and lower) and
testing membership in a truthy set; keep the existing flow and defaults
unchanged so HTTPS is enabled when the normalized value is in that set and
treated as disabled otherwise.
| # Dependencies for Friend-Lite root setup orchestrator | ||
| rich>=13.0.0 | ||
| python-dotenv No newline at end of file | ||
| dotenv No newline at end of file |
There was a problem hiding this comment.
Incorrect package name - use python-dotenv instead of dotenv.
The correct PyPI package for environment variable management is python-dotenv, not dotenv. The dotenv package is a different, unmaintained library that will cause installation issues or unexpected behavior.
Apply this diff to fix the package name:
-dotenv
+python-dotenv📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| dotenv | |
| python-dotenv |
🤖 Prompt for AI Agents
In setup-requirements.txt around line 3, the package name is incorrect: replace
"dotenv" with the correct PyPI package name "python-dotenv" so the project
installs the maintained environment variable library; update the requirements
file to use "python-dotenv" (optionally with a version spec) and save the file.
| Run RQ Persistence Tests | ||
| [Documentation] Execute Redis Queue persistence tests | ||
| [Tags] rq persistence redis suite | ||
| Run Tests rq_persistence_tests.robot |
There was a problem hiding this comment.
🧩 Analysis chain
Good addition - verify the test file exists.
Adding RQ persistence test coverage is valuable for validating the queue system changes in this PR.
Run the following script to confirm the referenced test file exists:
🏁 Script executed:
#!/bin/bash
# Verify rq_persistence_tests.robot exists
fd "rq_persistence_tests.robot" tests/Length of output: 0
🏁 Script executed:
#!/bin/bash
# Broader search for Robot test files and directory structure
echo "=== Searching for any .robot files with 'rq' in name ==="
fd "\.robot$" . | grep -i rq || echo "(none found)"
echo ""
echo "=== All .robot files in tests/ directory ==="
fd "\.robot$" tests/ || echo "(no robot files or tests/ doesn't exist)"
echo ""
echo "=== List contents of tests/ directory ==="
ls -la tests/ 2>/dev/null || echo "(tests/ directory not found)"
echo ""
echo "=== Check if rq_persistence_tests.robot exists anywhere ==="
find . -name "*rq_persistence*" 2>/dev/null || echo "(file not found anywhere)"Length of output: 1844
🏁 Script executed:
#!/bin/bash
# Inspect all_api_tests.robot to see the exact context and verify the reference
echo "=== Content of all_api_tests.robot around the added test ==="
cat tests/all_api_tests.robot
echo ""
echo "=== Content of rq_queue_tests.robot (existing similar file) ==="
head -20 tests/endpoints/rq_queue_tests.robotLength of output: 2878
Fix broken test reference - rq_persistence_tests.robot does not exist.
The test file rq_persistence_tests.robot referenced at line 41 does not exist in the repository. The test will fail at runtime. Either create the missing test file or correct the reference to an existing file like rq_queue_tests.robot. Only tests/endpoints/rq_queue_tests.robot exists for RQ-related tests.
🤖 Prompt for AI Agents
In tests/all_api_tests.robot around lines 38 to 41, the referenced test file
rq_persistence_tests.robot does not exist causing runtime failures; update the
Run Tests entry to point to the existing RQ test file
(tests/endpoints/rq_queue_tests.robot or its relative path) or create the
missing rq_persistence_tests.robot file if intended; ensure the path matches
repository layout and that the test filename is correct so the suite can locate
and execute the RQ tests.
…nto close-convo-squashed # Conflicts: # backends/advanced/uv.lock
This squashed commit includes all changes from the close-convo branch:
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Configuration
Documentation