Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions backends/advanced/src/advanced_omi_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@

__version__ = "0.1.0"

from .database import AudioChunksRepository

__all__ = ["AudioChunksRepository"]
__all__ = []
2 changes: 0 additions & 2 deletions backends/advanced/src/advanced_omi_backend/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def __init__(self):
self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017")
self.mongo_client = AsyncIOMotorClient(self.mongodb_uri)
self.db = self.mongo_client.get_default_database("friend-lite")
self.chunks_col = self.db["audio_chunks"]
self.users_col = self.db["users"]
self.speakers_col = self.db["speakers"]

Expand Down Expand Up @@ -104,7 +103,6 @@ def get_audio_chunk_dir() -> Path:
def get_mongo_collections():
"""Get MongoDB collections."""
return {
'chunks': app_config.chunks_col,
'users': app_config.users_col,
'speakers': app_config.speakers_col,
}
Expand Down
3 changes: 0 additions & 3 deletions backends/advanced/src/advanced_omi_backend/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from advanced_omi_backend.database import AudioChunksRepository
from advanced_omi_backend.task_manager import get_task_manager
from wyoming.audio import AudioChunk

Expand All @@ -29,14 +28,12 @@ class ClientState:
def __init__(
self,
client_id: str,
ac_db_collection_helper: AudioChunksRepository,
chunk_dir: Path,
user_id: str,
user_email: Optional[str] = None,
):
self.client_id = client_id
self.connected = True
self.db_helper = ac_db_collection_helper
self.chunk_dir = chunk_dir

# Store user data for memory processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str,
from advanced_omi_backend.client import ClientState

# Create client state
client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email)
client_state = ClientState(client_id, chunk_dir, user_id, user_email)

# Atomically add to internal storage and register mapping
self._active_clients[client_id] = client_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Handles audio file uploads and processes them directly.
Simplified to write files immediately and enqueue transcription.

Also includes audio cropping operations that work with the audio_chunks collection.
Also includes audio cropping operations that work with the Conversation model.
"""

import logging
Expand All @@ -23,8 +23,7 @@
from advanced_omi_backend.models.job import JobPriority
from advanced_omi_backend.models.user import User
from advanced_omi_backend.models.conversation import create_conversation
from advanced_omi_backend.database import AudioChunksRepository, chunks_col
from advanced_omi_backend.client_manager import client_belongs_to_user
from advanced_omi_backend.models.conversation import Conversation

logger = logging.getLogger(__name__)
audio_logger = logging.getLogger("audio_processing")
Expand Down Expand Up @@ -115,6 +114,8 @@ async def upload_and_process_audio_files(
title=title,
summary="Processing uploaded audio file..."
)
# Set audio_path so the frontend can play the audio
conversation.audio_path = wav_filename
await conversation.insert()
conversation_id = conversation.conversation_id # Get the auto-generated ID

Expand Down Expand Up @@ -194,32 +195,32 @@ async def upload_and_process_audio_files(

async def get_cropped_audio_info(audio_uuid: str, user: User):
"""
Get audio cropping metadata from the audio_chunks collection.
Get audio cropping metadata from the conversation.

This is an audio service operation that retrieves cropping-related metadata
such as speech segments, cropped audio path, and cropping timestamps.

Used for: Checking cropping status and retrieving audio processing details.
Works with: audio_chunks collection (audio service operations).
Works with: Conversation model.
"""
try:
# Find the audio chunk
chunk = await chunks_col.find_one({"audio_uuid": audio_uuid})
if not chunk:
# Find the conversation
conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid)
if not conversation:
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

# Check ownership for non-admin users
if not user.is_superuser:
if not client_belongs_to_user(chunk["client_id"], user.user_id):
if conversation.user_id != str(user.user_id):
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

return {
"audio_uuid": audio_uuid,
"cropped_audio_path": chunk.get("cropped_audio_path"),
"speech_segments": chunk.get("speech_segments", []),
"cropped_duration": chunk.get("cropped_duration"),
"cropped_at": chunk.get("cropped_at"),
"original_audio_path": chunk.get("audio_path"),
"cropped_audio_path": conversation.cropped_audio_path,
"speech_segments": conversation.speech_segments if hasattr(conversation, 'speech_segments') else [],
"cropped_duration": conversation.cropped_duration if hasattr(conversation, 'cropped_duration') else None,
"cropped_at": conversation.cropped_at if hasattr(conversation, 'cropped_at') else None,
"original_audio_path": conversation.audio_path,
}

except Exception as e:
Expand All @@ -236,20 +237,20 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User):
to extract only speech segments from the full audio file.

Used for: Re-processing audio when cropping failed or needs updating.
Works with: audio_chunks collection and audio_utils cropping functions.
Works with: Conversation model and audio_utils cropping functions.
"""
try:
# Find the audio chunk
chunk = await chunks_col.find_one({"audio_uuid": audio_uuid})
if not chunk:
# Find the conversation
conversation = await Conversation.find_one(Conversation.audio_uuid == audio_uuid)
if not conversation:
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

# Check ownership for non-admin users
if not user.is_superuser:
if not client_belongs_to_user(chunk["client_id"], user.user_id):
if conversation.user_id != str(user.user_id):
return JSONResponse(status_code=404, content={"error": "Conversation not found"})

audio_path = chunk.get("audio_path")
audio_path = conversation.audio_path
if not audio_path:
return JSONResponse(
status_code=400, content={"error": "No audio file found for this conversation"}
Expand Down Expand Up @@ -277,8 +278,8 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User):
}
)

# Get speech segments from the chunk
speech_segments = chunk.get("speech_segments", [])
# Get speech segments from the conversation
speech_segments = conversation.speech_segments if hasattr(conversation, 'speech_segments') else []
if not speech_segments:
return JSONResponse(
status_code=400,
Expand All @@ -289,20 +290,21 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User):
cropped_filename = f"cropped_{audio_uuid}.wav"
output_path = Path("/app/audio_chunks") / cropped_filename

# Get repository for database updates
chunk_repo = AudioChunksRepository(chunks_col)

# Reprocess the audio cropping
# Reprocess the audio cropping (simplified without repository)
try:
result = await _process_audio_cropping_with_relative_timestamps(
from advanced_omi_backend.utils.audio_utils import extract_speech_segments

success = await extract_speech_segments(
str(full_audio_path),
speech_segments,
str(output_path),
audio_uuid,
chunk_repo
str(output_path)
)

if result:
if success:
# Update conversation with cropped audio path
conversation.cropped_audio_path = cropped_filename
await conversation.save()

audio_logger.info(f"Successfully reprocessed audio cropping for {audio_uuid}")
return JSONResponse(
content={"message": f"Audio cropping reprocessed for {audio_uuid}"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ClientManager,
client_belongs_to_user,
)
from advanced_omi_backend.database import chunks_col
from advanced_omi_backend.models.audio_file import AudioFile
from advanced_omi_backend.models.conversation import Conversation
from advanced_omi_backend.users import User
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -174,32 +174,27 @@ async def delete_conversation(audio_uuid: str, user: User):

# Detailed debugging only when debug level is enabled
if logger.isEnabledFor(logging.DEBUG):
total_count = await chunks_col.count_documents({})
logger.debug(f"Total conversations in collection: {total_count}")
total_count = await AudioFile.count()
logger.debug(f"Total audio files in collection: {total_count}")
logger.debug(f"UUID length: {len(audio_uuid)}, type: {type(audio_uuid)}")

# First, get the audio chunk record to check ownership and get conversation_id
audio_chunk = await chunks_col.find_one({"audio_uuid": audio_uuid})
# First, get the audio file record to check ownership and get conversation_id
audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid)

if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Audio chunk lookup result: {'found' if audio_chunk else 'not found'}")
if audio_chunk:
logger.debug(f"Found audio chunk with client_id: {audio_chunk.get('client_id')}")
logger.debug(f"Audio chunk has conversation_id: {audio_chunk.get('conversation_id')}")
else:
# Try alternative queries for debugging
regex_result = await chunks_col.find_one({"audio_uuid": {"$regex": f"^{audio_uuid}$", "$options": "i"}})
contains_result = await chunks_col.find_one({"audio_uuid": {"$regex": audio_uuid}})
logger.debug(f"Alternative query attempts - case insensitive: {'found' if regex_result else 'not found'}, substring: {'found' if contains_result else 'not found'}")

if not audio_chunk:
logger.debug(f"Audio file lookup result: {'found' if audio_file else 'not found'}")
if audio_file:
logger.debug(f"Found audio file with client_id: {audio_file.client_id}")
logger.debug(f"Audio file has conversation_id: {audio_file.conversation_id}")

if not audio_file:
return JSONResponse(
status_code=404,
content={"error": f"Audio chunk with audio_uuid '{audio_uuid}' not found"}
content={"error": f"Audio file with audio_uuid '{audio_uuid}' not found"}
)

# Check if user has permission to delete this conversation
client_id = audio_chunk.get("client_id")
client_id = audio_file.client_id
if not user.is_superuser and not client_belongs_to_user(client_id, user.user_id):
logger.warning(
f"User {user.user_id} attempted to delete conversation {audio_uuid} without permission"
Expand All @@ -213,25 +208,18 @@ async def delete_conversation(audio_uuid: str, user: User):
)

# Get audio file paths for deletion
audio_path = audio_chunk.get("audio_path")
cropped_audio_path = audio_chunk.get("cropped_audio_path")
audio_path = audio_file.audio_path
cropped_audio_path = audio_file.cropped_audio_path

# Get conversation_id if this audio chunk has an associated conversation
conversation_id = audio_chunk.get("conversation_id")
# Get conversation_id if this audio file has an associated conversation
conversation_id = audio_file.conversation_id
conversation_deleted = False

# Delete from audio_chunks collection first
audio_result = await chunks_col.delete_one({"audio_uuid": audio_uuid})

if audio_result.deleted_count == 0:
return JSONResponse(
status_code=404,
content={"error": f"Failed to delete audio chunk with audio_uuid '{audio_uuid}'"}
)

logger.info(f"Deleted audio chunk {audio_uuid}")
# Delete the audio file from database first
await audio_file.delete()
logger.info(f"Deleted audio file {audio_uuid}")

# If this audio chunk has an associated conversation, delete it using Beanie
# If this audio file has an associated conversation, delete it using Beanie
if conversation_id:
try:
conversation_model = await Conversation.find_one(Conversation.conversation_id == conversation_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
save_diarization_settings_to_file,
)
from advanced_omi_backend.models.user import User
# TODO: Remove old processor architecture
# from advanced_omi_backend.processors import get_processor_manager
def get_processor_manager():
"""Stub - processors being removed."""
return None
from advanced_omi_backend.task_manager import get_task_manager
from fastapi.responses import JSONResponse

Expand Down Expand Up @@ -61,52 +56,23 @@ async def get_auth_config():


async def get_all_processing_tasks():
"""Get all active processing tasks."""
try:
processor_manager = get_processor_manager()
return processor_manager.get_all_processing_status()
except Exception as e:
logger.error(f"Error getting processing tasks: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to get processing tasks: {str(e)}"}
)
"""Get all active processing tasks.

NOTE: This function is deprecated - old processor architecture has been removed.
Kept for backward compatibility but always returns empty list.
"""
logger.warning("get_all_processing_tasks called - deprecated function")
return []


async def get_processing_task_status(client_id: str):
"""Get processing task status for a specific client."""
try:
processor_manager = get_processor_manager()
processing_status = processor_manager.get_processing_status(client_id)

# Check if transcription is marked as started but not completed, and verify with database
stages = processing_status.get("stages", {})
transcription_stage = stages.get("transcription", {})

"""This is a hack to update it the DB INCASE a process failed
if transcription_stage.get("status") == "started" and not transcription_stage.get("completed", False):
# Check if transcription is actually complete by checking the database
try:
chunk = await chunks_col.find_one({"client_id": client_id})
if chunk and chunk.get("transcript") and len(chunk.get("transcript", [])) > 0:
# Transcription is complete! Update the processor state
processor_manager.track_processing_stage(
client_id,
"transcription",
"completed",
{"audio_uuid": chunk.get("audio_uuid"), "segments": len(chunk.get("transcript", []))}
)
logger.info(f"Detected transcription completion for client {client_id} ({len(chunk.get('transcript', []))} segments)")
# Get updated status
processing_status = processor_manager.get_processing_status(client_id)
except Exception as e:
logger.debug(f"Error checking transcription completion: {e}")
"""
return processing_status
except Exception as e:
logger.error(f"Error getting processing task status for {client_id}: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to get processing task status: {str(e)}"}
)
"""Get processing task status for a specific client.

NOTE: This function is deprecated - old processor architecture has been removed.
Kept for backward compatibility but always returns None.
"""
logger.warning(f"get_processing_task_status called for {client_id} - deprecated function")
return None


async def get_processor_status():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
UserManager,
)
from advanced_omi_backend.client_manager import get_user_clients_all
from advanced_omi_backend.database import chunks_col, db, users_col
from advanced_omi_backend.database import db, users_col
from advanced_omi_backend.memory import get_memory_service
from advanced_omi_backend.models.conversation import Conversation
from advanced_omi_backend.users import User, UserCreate, UserUpdate

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -174,8 +175,8 @@ async def delete_user(
deleted_data["user_deleted"] = user_result.deleted_count > 0

if delete_conversations:
# Delete all conversations (audio chunks) for this user
conversations_result = await chunks_col.delete_many({"client_id": user_id})
# Delete all conversations for this user
conversations_result = await Conversation.find(Conversation.user_id == user_id).delete()
deleted_data["conversations_deleted"] = conversations_result.deleted_count

if delete_memories:
Expand Down
Loading
Loading