Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/agents/extensions/memory/advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ def _add_items_sync():
except Exception as cleanup_error:
conn.rollback()
self._logger.error(f"Failed to cleanup orphaned messages: {cleanup_error}")
raise RuntimeError(
f"Failed to persist structure metadata for session {self.session_id}"
) from e

await asyncio.to_thread(_add_items_sync)

Expand Down Expand Up @@ -788,12 +791,18 @@ def _delete_sync():

conn.commit()

return usage_deleted, structure_deleted
# Clean up messages that are no longer referenced by any branch.
orphaned_deleted = self._cleanup_orphaned_messages_sync(conn)
Comment thread
kimnamu marked this conversation as resolved.
if orphaned_deleted:
conn.commit()

usage_deleted, structure_deleted = await asyncio.to_thread(_delete_sync)
return usage_deleted, structure_deleted, orphaned_deleted

usage_deleted, structure_deleted, orphaned_deleted = await asyncio.to_thread(_delete_sync)

self._logger.info(
f"Deleted branch '{branch_id}': {structure_deleted} message entries, {usage_deleted} usage entries" # noqa: E501
f"Deleted branch '{branch_id}': {structure_deleted} structure entries, "
f"{usage_deleted} usage entries, {orphaned_deleted} orphaned messages removed"
)

async def list_branches(self) -> list[dict[str, Any]]:
Expand Down
67 changes: 67 additions & 0 deletions tests/extensions/memory/test_advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import json
import sqlite3
import tempfile
from pathlib import Path
from typing import Any, cast
Expand Down Expand Up @@ -1396,6 +1397,72 @@ async def add_batch(worker_id: int) -> list[str]:
session.close()


async def test_delete_branch_cleans_up_orphaned_messages():
"""Verify that delete_branch removes messages only referenced by the deleted branch.

Regression test for #3346: delete_branch() previously left branch-only
messages in the messages table after removing their structure metadata.
"""
session = AdvancedSQLiteSession(session_id="orphan_cleanup", create_tables=True)

# Add items to main branch.
await session.add_items([{"role": "user", "content": "Shared message"}])

# Create a branch from turn 1 and add a branch-only message.
await session.create_branch_from_turn(1, "feature_branch")
await session.add_items([{"role": "user", "content": "Branch-only message"}])

# Record total message count before deletion.
with session._locked_connection() as conn:
total_before = conn.execute(
f"SELECT COUNT(*) FROM {session.messages_table} WHERE session_id = ?",
(session.session_id,),
).fetchone()[0]

assert total_before == 2 # shared + branch-only

# Delete the feature branch.
await session.delete_branch("feature_branch", force=True)

# The branch-only message should now be removed from messages table.
with session._locked_connection() as conn:
total_after = conn.execute(
f"SELECT COUNT(*) FROM {session.messages_table} WHERE session_id = ?",
(session.session_id,),
).fetchone()[0]

assert total_after == 1 # only the shared message remains
session.close()


async def test_add_items_raises_on_structure_metadata_failure(monkeypatch):
"""Verify that add_items raises when structure metadata insertion fails.

Regression test for #3348: add_items() previously swallowed the exception
and reported success even when structure metadata could not be persisted.
"""
session = AdvancedSQLiteSession(session_id="metadata_fail", create_tables=True)

# Force _insert_structure_metadata to raise.
def _raise(*args, **kwargs):
raise sqlite3.OperationalError("simulated metadata failure")

monkeypatch.setattr(session, "_insert_structure_metadata", _raise)

with pytest.raises(RuntimeError, match="Failed to persist structure metadata"):
await session.add_items([{"role": "user", "content": "hello"}])

# The orphaned message should have been cleaned up.
with session._locked_connection() as conn:
remaining = conn.execute(
f"SELECT COUNT(*) FROM {session.messages_table} WHERE session_id = ?",
(session.session_id,),
).fetchone()[0]

assert remaining == 0
session.close()


async def test_output_tokens_details_persisted_when_input_details_missing():
"""Regression: output_tokens_details must persist even if input_tokens_details is None.

Expand Down