From 8138135f22bd1af4cd6b9b0c4e67ce0ff4269ed2 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:30:17 +0000 Subject: [PATCH 01/20] LCORE-1572: add conversation compaction and wire it into /v1/query Introduce runtime conversation compaction (Option A): once a conversation approaches the model's context window, lightspeed-stack summarizes older turns and owns the LLM context itself instead of letting Llama Stack reload the full history. - src/utils/conversation_compaction.py: apply_compaction() async generator and apply_compaction_blocking() wrapper. Holds a per-conversation lock (R11), estimates tokens (LCORE-1569), partitions and summarizes old turns (LCORE-1570), writes the summary into the conversation as a marker item, and rebuilds the request as explicit input (summaries + recent verbatim turns + new query). Marker items track the boundary; the conversation_id is preserved and the full history stays in Llama Stack items for audit. - models/common/responses/responses_api_params.py: omit_conversation flag so the conversation parameter is dropped from the request body in compacted mode while remaining on the object for identity. - configuration.py: AppConfig.compaction accessor. - app/endpoints/query.py: apply compaction after preparing params; in compacted mode store the completed turn against the original user query (the conversation parameter is no longer sent, so Llama Stack does not persist the turn automatically). Background: the spec's original marker-keeps-conversation-parameter approach was found unimplementable on llama-stack 0.6.0, which always reloads the full conversation history when the conversation parameter is set. This restores the spike's original explicit-input approach. --- src/app/endpoints/query.py | 46 +- src/configuration.py | 16 + .../common/responses/responses_api_params.py | 14 +- src/utils/conversation_compaction.py | 406 ++++++++++++++++++ 4 files changed, 478 insertions(+), 4 deletions(-) create mode 100644 src/utils/conversation_compaction.py diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index f7fd5f632..ee2195f85 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -39,8 +39,13 @@ from models.api.responses.successful import QueryResponse from models.common.moderation import ShieldModerationResult from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import TurnSummary from models.config import Action +from utils.conversation_compaction import ( + apply_compaction_blocking, + store_compacted_turn, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -196,6 +201,17 @@ async def query_endpoint_handler( inline_rag_context=inline_rag_context.context_text, ) + # Compact the conversation if it is approaching the context window limit. + # When compaction is active, params carry explicit input and the + # conversation parameter is dropped (lightspeed-stack owns the context). + compaction = await apply_compaction_blocking( + client, + responses_params, + configuration.inference, + configuration.compaction, + ) + responses_params = compaction.params + # Handle Azure token refresh if needed if ( responses_params.model.startswith("azure") @@ -207,7 +223,11 @@ async def query_endpoint_handler( # Retrieve response using Responses API turn_summary = await retrieve_response( - client, responses_params, moderation_result, endpoint_path + client, + responses_params, + moderation_result, + endpoint_path, + original_input=compaction.original_input if compaction.summarized else None, ) if moderation_result.decision == "passed": @@ -282,6 +302,7 @@ async def retrieve_response( responses_params: ResponsesApiParams, moderation_result: ShieldModerationResult, endpoint_path: str = "", + original_input: Optional[ResponseInput] = None, ) -> TurnSummary: """ Retrieve response from LLMs and agents. @@ -294,17 +315,28 @@ async def retrieve_response( client: The AsyncLlamaStackClient to use for the request. responses_params: The Responses API parameters. moderation_result: The moderation result. + endpoint_path: The request path, for metrics/telemetry. + original_input: Set only in compacted mode (LCORE-1572). It is the new + user query before the explicit-input rewrite. When provided, the + turn is appended to the conversation here, because the conversation + parameter is no longer passed to Llama Stack and so the turn is not + stored automatically. Returns: ------- TurnSummary: Summary of the LLM response content """ response: Optional[OpenAIResponseObject] = None + # In compacted mode, the new turn must be stored against the original user + # query, not the explicit summaries-plus-recent input we send to inference. + turn_input = ( + original_input if original_input is not None else responses_params.input + ) if moderation_result.decision == "blocked": await append_turn_items_to_conversation( client, responses_params.conversation, - responses_params.input, + turn_input, [moderation_result.refusal_response], ) return TurnSummary( @@ -331,6 +363,16 @@ async def retrieve_response( error_response = handle_known_apistatus_errors(e, responses_params.model) raise HTTPException(**error_response.model_dump()) from e + # In compacted mode, store the completed turn ourselves (the conversation + # parameter was not sent, so Llama Stack did not persist it). + if original_input is not None: + await store_compacted_turn( + client, + responses_params.conversation, + original_input, + response.output, + ) + vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools) rag_id_mapping = configuration.rag_id_mapping return build_turn_summary( diff --git a/src/configuration.py b/src/configuration.py index e65c8c230..0de369892 100644 --- a/src/configuration.py +++ b/src/configuration.py @@ -18,6 +18,7 @@ AuthenticationConfiguration, AuthorizationConfiguration, AzureEntraIdConfiguration, + CompactionConfiguration, Configuration, ConversationHistoryConfiguration, Customization, @@ -319,6 +320,21 @@ def inference(self) -> InferenceConfiguration: raise LogicError("logic error: configuration is not loaded") return self._configuration.inference + @property + def compaction(self) -> CompactionConfiguration: + """Return conversation compaction configuration. + + Returns: + CompactionConfiguration: The compaction configuration from the + loaded application configuration. + + Raises: + LogicError: If the configuration has not been loaded. + """ + if self._configuration is None: + raise LogicError("logic error: configuration is not loaded") + return self._configuration.compaction + @property def conversation_cache_configuration(self) -> ConversationHistoryConfiguration: """Return conversation cache configuration. diff --git a/src/models/common/responses/responses_api_params.py b/src/models/common/responses/responses_api_params.py index 6abf9a618..fb9e064b9 100644 --- a/src/models/common/responses/responses_api_params.py +++ b/src/models/common/responses/responses_api_params.py @@ -121,17 +121,27 @@ class ResponsesApiParams(BaseModel): default=None, description="Extra HTTP headers to send with the request (e.g. x-llamastack-provider-data)", ) + omit_conversation: bool = Field( + default=False, + exclude=True, + description="When True, the conversation parameter is dropped from the " + "request body while remaining on the object for identity. Set by " + "conversation compaction (LCORE-1572): once a conversation is " + "compacted, lightspeed-stack supplies explicit input and must not let " + "Llama Stack reload the full history via the conversation parameter.", + ) def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]: """Serialize to a request body dict. - Omits conversation when previous_response_id is set. + Omits conversation when previous_response_id is set or when + omit_conversation is True (compacted mode). Returns: Serializable dict for the Responses API request body. """ result = super().model_dump(*args, **kwargs) - if self.previous_response_id: + if self.previous_response_id or self.omit_conversation: result.pop("conversation", None) return result diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py new file mode 100644 index 000000000..63ea6c8ea --- /dev/null +++ b/src/utils/conversation_compaction.py @@ -0,0 +1,406 @@ +"""Runtime integration of conversation compaction into the request flow. + +This module wires the pure compaction primitives (``utils.compaction``, +LCORE-1570) and the token estimator (``utils.token_estimator``, LCORE-1569) +into the actual request path (LCORE-1572). Unlike ``utils.compaction`` — which +is deliberately side-effect free — this module *does* touch conversation state: +it fetches conversation items from Llama Stack, calls the summarization LLM, +writes summary marker items, persists summaries to the cache (best-effort), and +holds a per-conversation lock. + +Design (see ``docs/design/conversation-compaction/conversation-compaction.md``): + +* **Option A — lightspeed owns the context after compaction.** Once a + conversation has been compacted, lightspeed-stack stops handing the + ``conversation`` parameter to Llama Stack (which would otherwise reload the + full message history and defeat compaction). Instead it builds the model + input explicitly from the summaries plus the recent verbatim turns. The + conversation identity (``conversation_id``) is preserved, and the full + history remains in Llama Stack's conversation *items* for UI/audit. + +* **Marker items track the boundary.** Each compaction writes the summary into + the conversation as a recognizable *marker* message (a message whose text + starts with ``MARKER_SENTINEL``). The items after the last marker are the + recent verbatim turns; the marker texts are the additive summaries. This is + lightspeed's own bookkeeping — Llama Stack never interprets it (we no longer + pass ``conversation`` to inference once a marker exists). + +* **Streaming notification.** When driven by the streaming endpoint, this + module yields a :class:`CompactionStartedEvent` *before* the summarization + LLM call so the client can show a progress indicator (R12). The non-streaming + wrapper :func:`apply_compaction_blocking` simply ignores those events. + +The cache (LCORE-1571) is a *best-effort* secondary store here: summaries are +written to it for fast/queryable persistence, but the runtime boundary and +summary text are read back from the Llama Stack marker items, so this module +does not depend on the cache being functional. +""" + +import asyncio +from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass +from typing import Any, Optional, cast + +from llama_stack_client import AsyncLlamaStackClient +from llama_stack_client.types.conversations.item_create_params import Item + +from log import get_logger +from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput +from models.config import CompactionConfiguration, InferenceConfiguration +from utils.compaction import partition_conversation, summarize_chunk +from utils.conversations import ( + append_turn_items_to_conversation, + get_all_conversation_items, +) +from utils.token_estimator import ( + DEFAULT_ENCODING_NAME, + estimate_conversation_tokens, + estimate_tokens, + extract_message_text, + get_context_window, + is_message_item, +) + +logger = get_logger(__name__) + + +MARKER_SENTINEL = "[lightspeed:compaction-summary]" +"""Prefix that identifies a compaction summary marker message. + +Marker items are ordinary conversation messages whose text begins with this +sentinel. They are written by :func:`_write_summary_marker` and recognized by +:func:`is_marker_item`. The sentinel is stripped before the summary is shown to +the model (:func:`_summary_input_message`). +""" + + +# Per-conversation locks (R11). A request that triggers compaction holds the +# conversation's lock across the summarization LLM call so concurrent requests +# on the same conversation wait rather than racing (e.g. double-compacting or +# appending a turn mid-compaction). +_conversation_locks: dict[str, asyncio.Lock] = {} + + +def _get_lock(conversation_id: str) -> asyncio.Lock: + """Return the (process-wide) asyncio lock for a conversation, creating it lazily.""" + lock = _conversation_locks.get(conversation_id) + if lock is None: + lock = asyncio.Lock() + _conversation_locks[conversation_id] = lock + return lock + + +@dataclass +class CompactionStartedEvent: + """Sentinel yielded before the summarization LLM call (streaming only). + + The streaming endpoint formats this into an SSE ``compaction`` event so the + client can display a progress indicator. The module stays decoupled from SSE + formatting by yielding this typed value instead of a formatted string. + + Attributes: + conversation_id: The conversation being compacted (llama-stack format). + """ + + conversation_id: str + + +@dataclass +class CompactionResult: + """Outcome of applying compaction to a request. + + Attributes: + params: The (possibly rewritten) Responses API params to send. When + ``summarized`` is True, ``params.input`` is an explicit item list + (summaries + recent turns + new query) and the ``conversation`` + parameter is omitted from the request body. + summarized: Whether the conversation is in compacted mode (it has at + least one summary marker). Drives ``context_status``. + original_input: The new user query exactly as it arrived (before the + explicit-input rewrite). Populated only in compacted mode (where + ``summarized`` is True); ``None`` otherwise. In compacted mode the + caller must append this plus the LLM output to the conversation + items itself, since the ``conversation`` parameter is no longer + passed to Llama Stack. + """ + + params: ResponsesApiParams + summarized: bool + original_input: Optional[ResponseInput] = None + + +def is_marker_item(item: Any) -> bool: + """Return True when *item* is a compaction summary marker message.""" + if not is_message_item(item): + return False + return extract_message_text(item).startswith(MARKER_SENTINEL) + + +def _summary_text_of(item: Any) -> str: + """Extract the summary text from a marker item (sentinel stripped).""" + return extract_message_text(item)[len(MARKER_SENTINEL) :].strip() + + +def _items_after_last_marker(items: list[Any]) -> list[Any]: + """Return the conversation items that follow the last summary marker. + + These are the recent turns kept verbatim. When there is no marker the whole + list is returned (no compaction has happened yet). + """ + last = -1 + for index, item in enumerate(items): + if is_marker_item(item): + last = index + return items[last + 1 :] + + +def _marker_summaries(items: list[Any]) -> list[str]: + """Return the summary texts of every marker item, in order (oldest first).""" + return [_summary_text_of(item) for item in items if is_marker_item(item)] + + +def _summary_input_message(summary_text: str) -> dict[str, Any]: + """Build an explicit input message carrying a summary for the model.""" + return { + "type": "message", + "role": "user", + "content": [ + { + "type": "input_text", + "text": f"Summary of earlier conversation:\n{summary_text}", + } + ], + } + + +def _verbatim_input_message(item: Any) -> Optional[dict[str, Any]]: + """Render a recent conversation message item as an explicit input message. + + Only message items are rendered; non-message items (tool calls/results) are + skipped — they remain in the conversation's items for audit, but the + explicit LLM context for the recent buffer is built from message text. This + keeps the input schema-valid without reconstructing tool-call sequences. + """ + if not is_message_item(item): + return None + text = extract_message_text(item) + if not text: + return None + role = item.get("role") if isinstance(item, dict) else getattr(item, "role", "user") + content_type = "output_text" if role == "assistant" else "input_text" + return { + "type": "message", + "role": role, + "content": [{"type": content_type, "text": text}], + } + + +def _query_input_message(original_input: ResponseInput) -> list[Any]: + """Render the new user query (string or item list) as explicit input messages.""" + if isinstance(original_input, str): + return [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": original_input}], + } + ] + rendered: list[Any] = [] + for item in original_input: + dumped = item.model_dump() if hasattr(item, "model_dump") else item + rendered.append(dumped) + return rendered + + +def _build_explicit_input( + summaries: list[str], + recent_items: list[Any], + original_input: ResponseInput, +) -> list[Any]: + """Assemble the explicit model input: summaries + recent turns + new query.""" + built: list[Any] = [_summary_input_message(text) for text in summaries] + for item in recent_items: + message = _verbatim_input_message(item) + if message is not None: + built.append(message) + built.extend(_query_input_message(original_input)) + return built + + +async def _write_summary_marker( + client: AsyncLlamaStackClient, + conversation_id: str, + summary_text: str, +) -> None: + """Write the summary into the conversation as a recognizable marker message.""" + marker_item: dict[str, Any] = { + "type": "message", + "role": "user", + "content": [ + {"type": "input_text", "text": f"{MARKER_SENTINEL} {summary_text}"} + ], + } + await client.conversations.items.create( + conversation_id, + items=cast(list[Item], [marker_item]), + ) + + +def _should_compact( + estimated_tokens: int, + context_window: int, + config: CompactionConfiguration, +) -> bool: + """Decide whether the estimated input warrants compaction. + + Triggers when the estimate exceeds ``threshold_ratio`` of the context + window and also clears the absolute ``token_floor`` (which prevents + over-eager compaction on very small windows). + """ + threshold = context_window * config.threshold_ratio + return estimated_tokens > threshold and estimated_tokens > config.token_floor + + +async def apply_compaction( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + emit_events: bool = False, + encoding_name: str = DEFAULT_ENCODING_NAME, +) -> AsyncIterator[Any]: + """Apply conversation compaction to a prepared request, yielding the result. + + This is an async generator. When ``emit_events`` is True it yields a + :class:`CompactionStartedEvent` immediately before the summarization LLM + call (so the streaming endpoint can surface progress). It always yields a + single :class:`CompactionResult` as its final item. + + The whole evaluate-and-summarize section runs under the conversation's lock + (R11). When compaction is disabled, the model has no registered context + window, or the conversation is not yet near the limit, the result simply + carries the unchanged params with ``summarized`` reflecting whether any + prior summary marker already exists. + + Parameters: + client: Llama Stack client. + params: The base Responses API params from ``prepare_responses_params``. + inference_config: Inference config (for the per-model context window). + compaction_config: Compaction tuning (enabled, threshold, buffer, ...). + emit_events: Whether to yield CompactionStartedEvent before summarizing. + encoding_name: tiktoken encoding name for estimation/summarization. + + Yields: + Zero or more CompactionStartedEvent, then exactly one CompactionResult. + """ + if not compaction_config.enabled: + yield CompactionResult(params, summarized=False) + return + + conversation_id = params.conversation + model = params.model + system_prompt = params.instructions + original_input = params.input + + async with _get_lock(conversation_id): + items = await get_all_conversation_items(client, conversation_id) + summaries = _marker_summaries(items) + recent_items = _items_after_last_marker(items) + + context_window = get_context_window(model, inference_config) + if context_window is not None: + estimated = estimate_tokens(system_prompt or "", encoding_name) + estimated += sum(estimate_tokens(text, encoding_name) for text in summaries) + estimated += estimate_conversation_tokens( + recent_items, encoding_name=encoding_name + ) + if isinstance(original_input, str): + estimated += estimate_tokens(original_input, encoding_name) + + if _should_compact(estimated, context_window, compaction_config): + if emit_events: + yield CompactionStartedEvent(conversation_id=conversation_id) + + budget = int(context_window * compaction_config.buffer_max_ratio) + old_items, keep_items = partition_conversation( + recent_items, + available_budget_tokens=budget, + buffer_turns=compaction_config.buffer_turns, + encoding_name=encoding_name, + ) + if old_items: + already = len(items) - len(recent_items) + summary = await summarize_chunk( + client, + model, + old_items, + summarized_through_turn=already + len(old_items), + encoding_name=encoding_name, + ) + await _write_summary_marker( + client, conversation_id, summary.summary_text + ) + summaries.append(summary.summary_text) + recent_items = keep_items + + if not summaries: + # No compaction has ever happened for this conversation: leave the + # normal conversation-parameter flow untouched. + yield CompactionResult(params, summarized=False) + return + + # Compacted mode: lightspeed owns the context. Build explicit input and + # stop passing the conversation parameter to inference. + explicit_input = _build_explicit_input(summaries, recent_items, original_input) + compacted_params = params.model_copy( + update={"input": explicit_input, "omit_conversation": True} + ) + yield CompactionResult( + compacted_params, summarized=True, original_input=original_input + ) + + +async def apply_compaction_blocking( + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + encoding_name: str = DEFAULT_ENCODING_NAME, +) -> CompactionResult: + """Non-streaming wrapper around :func:`apply_compaction`. + + Drains the generator with event emission disabled and returns the final + :class:`CompactionResult`. + """ + result: Optional[CompactionResult] = None + async for item in apply_compaction( + client, + params, + inference_config, + compaction_config, + emit_events=False, + encoding_name=encoding_name, + ): + if isinstance(item, CompactionResult): + result = item + assert result is not None # the generator always yields exactly one result + return result + + +async def store_compacted_turn( + client: AsyncLlamaStackClient, + conversation_id: str, + original_input: ResponseInput, + output_items: Sequence[Any], +) -> None: + """Append a completed turn to the conversation when in compacted mode. + + In compacted mode the ``conversation`` parameter is not sent to inference, + so Llama Stack does not auto-store the turn. lightspeed-stack appends the + user query and the LLM output to the conversation items itself, keeping the + full history (and the recent-turn buffer for the next request) intact. + """ + await append_turn_items_to_conversation( + client, conversation_id, original_input, output_items + ) From a9ac9bcaebfc40bb99d4ce848e94afa9532f203e Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:30:26 +0000 Subject: [PATCH 02/20] LCORE-1572: unit tests for conversation compaction core and /v1/query Cover marker detection and boundary selection, explicit-input assembly, the trigger threshold, the disabled / no-context-window / existing-marker / triggered paths of apply_compaction, the streaming CompactionStartedEvent ordering, and compacted-turn storage. --- .../utils/test_conversation_compaction.py | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 tests/unit/utils/test_conversation_compaction.py diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py new file mode 100644 index 000000000..80301a75a --- /dev/null +++ b/tests/unit/utils/test_conversation_compaction.py @@ -0,0 +1,266 @@ +"""Unit tests for runtime conversation compaction (LCORE-1572).""" + +# Tests exercise internal helpers directly. +# pylint: disable=protected-access + +from typing import Any + +import pytest +from pytest_mock import MockerFixture + +from models.common.responses.responses_api_params import ResponsesApiParams +from models.compaction import ConversationSummary +from models.config import CompactionConfiguration, InferenceConfiguration +from utils import conversation_compaction as cc + +MODEL = "openai/gpt-4o-mini" +CONV = "conv_abc123" + + +def _msg(role: str, text: str) -> dict: + """Build a duck-typed conversation message item.""" + return {"role": role, "content": text} + + +def _marker(text: str) -> dict: + """Build a summary marker message item.""" + return {"role": "user", "content": f"{cc.MARKER_SENTINEL} {text}"} + + +def _params(input_text: str = "new question") -> ResponsesApiParams: + return ResponsesApiParams( + input=input_text, + model=MODEL, + conversation=CONV, + instructions="system prompt", + store=True, + stream=False, + ) + + +def _inference(window: int | None) -> InferenceConfiguration: + windows = {MODEL: window} if window is not None else {} + return InferenceConfiguration(context_windows=windows) + + +def _compaction(**kw: Any) -> CompactionConfiguration: + base = { + "enabled": True, + "threshold_ratio": 0.5, + "token_floor": 0, + "buffer_turns": 1, + "buffer_max_ratio": 0.3, + } + base.update(kw) + return CompactionConfiguration(**base) + + +# --- pure helpers --- + + +def test_is_marker_item() -> None: + """Marker messages are recognized; ordinary messages and non-messages are not.""" + assert cc.is_marker_item(_marker("s")) is True + assert cc.is_marker_item(_msg("user", "hello")) is False + assert cc.is_marker_item({"type": "function_call"}) is False + + +def test_items_after_last_marker() -> None: + """Only items following the last marker are treated as recent verbatim turns.""" + items = [ + _msg("user", "a"), + _marker("first summary"), + _msg("user", "b"), + _marker("second summary"), + _msg("assistant", "c"), + ] + recent = cc._items_after_last_marker(items) + assert recent == [_msg("assistant", "c")] + + +def test_items_after_last_marker_no_marker() -> None: + """With no marker, every item is recent.""" + items = [_msg("user", "a"), _msg("assistant", "b")] + assert cc._items_after_last_marker(items) == items + + +def test_marker_summaries_in_order() -> None: + """All marker summaries are returned oldest-first with the sentinel stripped.""" + items = [_marker("one"), _msg("user", "x"), _marker("two")] + assert cc._marker_summaries(items) == ["one", "two"] + + +def test_build_explicit_input_shape() -> None: + """Explicit input is summaries, then recent message turns, then the new query.""" + built = cc._build_explicit_input( + summaries=["earlier stuff"], + recent_items=[_msg("user", "recent q"), _msg("assistant", "recent a")], + original_input="brand new question", + ) + texts = [part["content"][0]["text"] for part in built] + assert "Summary of earlier conversation:\nearlier stuff" in texts[0] + assert texts[1] == "recent q" + assert texts[2] == "recent a" + assert texts[3] == "brand new question" + # the assistant turn is rendered with output_text content + assert built[2]["content"][0]["type"] == "output_text" + + +def test_should_compact() -> None: + """Trigger requires exceeding both the ratio threshold and the token floor.""" + cfg = _compaction(threshold_ratio=0.7, token_floor=100) + assert cc._should_compact(estimated_tokens=800, context_window=1000, config=cfg) + # below the ratio threshold + assert not cc._should_compact(600, 1000, cfg) + # above ratio but below the floor + assert not cc._should_compact( + 90, 100, _compaction(threshold_ratio=0.5, token_floor=100) + ) + + +# --- apply_compaction --- + + +@pytest.mark.asyncio +async def test_disabled_passes_through() -> None: + """When compaction is disabled the params are returned unchanged.""" + result = await cc.apply_compaction_blocking( + client=None, # not used when disabled + params=_params(), + inference_config=_inference(1000), + compaction_config=_compaction(enabled=False), + ) + assert result.summarized is False + assert result.params.omit_conversation is False + assert result.params.input == "new question" + + +@pytest.mark.asyncio +async def test_no_context_window_no_marker_passes_through(mocker: MockerFixture) -> None: + """No registered context window and no prior summary => normal flow.""" + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=[]) + ) + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params(), + inference_config=_inference(None), + compaction_config=_compaction(), + ) + assert result.summarized is False + assert result.params.omit_conversation is False + + +@pytest.mark.asyncio +async def test_existing_marker_builds_explicit_input(mocker: MockerFixture) -> None: + """A conversation that already has a marker is served in compacted mode. + + Even when below the trigger threshold (large window), the presence of a + prior summary means lightspeed-stack must own the context: explicit input + and the conversation parameter dropped. + """ + items = [ + _msg("user", "old q"), + _marker("the earlier conversation summary"), + _msg("user", "recent q"), + _msg("assistant", "recent a"), + ] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("brand new"), + inference_config=_inference(1_000_000), # huge: no new trigger + compaction_config=_compaction(), + ) + + summarize.assert_not_called() # below threshold, no new summary + assert result.summarized is True + assert result.params.omit_conversation is True + assert isinstance(result.params.input, list) + texts = [p["content"][0]["text"] for p in result.params.input] + assert texts[0].endswith("the earlier conversation summary") + assert texts[-1] == "brand new" + assert result.original_input == "brand new" + + +@pytest.mark.asyncio +async def test_triggers_summarization_and_writes_marker(mocker: MockerFixture) -> None: + """Exceeding the threshold summarizes old turns and writes a marker item.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = ConversationSummary( + summary_text="condensed earlier turns", + summarized_through_turn=2, + token_count=4, + created_at="2026-05-26T00:00:00Z", + model_used=MODEL, + ) + summarize = mocker.patch.object( + cc, "summarize_chunk", mocker.AsyncMock(return_value=summary) + ) + write_marker = mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("follow-up"), + inference_config=_inference(50), # small window forces the trigger + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + ) + + summarize.assert_awaited_once() + write_marker.assert_awaited_once() + assert result.summarized is True + assert result.params.omit_conversation is True + texts = [p["content"][0]["text"] for p in result.params.input] + assert "condensed earlier turns" in texts[0] + assert texts[-1] == "follow-up" + + +@pytest.mark.asyncio +async def test_streaming_emits_event_before_summarizing(mocker: MockerFixture) -> None: + """In streaming mode a CompactionStartedEvent precedes the summary result.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = ConversationSummary( + summary_text="condensed", + summarized_through_turn=2, + token_count=2, + created_at="2026-05-26T00:00:00Z", + model_used=MODEL, + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock(return_value=summary)) + mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + + yielded = [] + async for item in cc.apply_compaction( + client=mocker.AsyncMock(), + params=_params(), + inference_config=_inference(50), + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + emit_events=True, + ): + yielded.append(item) + + assert isinstance(yielded[0], cc.CompactionStartedEvent) + assert yielded[0].conversation_id == CONV + assert isinstance(yielded[-1], cc.CompactionResult) + assert yielded[-1].summarized is True + + +@pytest.mark.asyncio +async def test_store_compacted_turn_appends(mocker: MockerFixture) -> None: + """store_compacted_turn delegates to append_turn_items_to_conversation.""" + append = mocker.patch.object( + cc, "append_turn_items_to_conversation", mocker.AsyncMock() + ) + client = mocker.AsyncMock() + await cc.store_compacted_turn(client, CONV, "the query", ["out"]) + append.assert_awaited_once_with(client, CONV, "the query", ["out"]) From 56a9d88f8cc29b98ce88a13bd16ddae2de11e14e Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:30:57 +0000 Subject: [PATCH 03/20] LCORE-1572: apply conversation compaction in the A2A endpoint The A2A executor uses the same prepare_responses_params + Responses API flow as /v1/query and persists conversation_id for multi-turn contexts, so it accumulates context and must compact too. - Run apply_compaction_blocking before responses.create (A2A is not a browser SSE stream, so no progress event is emitted). - In compacted mode, persist the completed turn from the response.completed stream event, since the conversation parameter is no longer sent and Llama Stack therefore does not store the turn automatically. --- src/app/endpoints/a2a.py | 48 +++++++++++++++++--- tests/unit/app/endpoints/test_a2a.py | 67 ++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 5 deletions(-) diff --git a/src/app/endpoints/a2a.py b/src/app/endpoints/a2a.py index e3a4cf9f9..acee799c3 100644 --- a/src/app/endpoints/a2a.py +++ b/src/app/endpoints/a2a.py @@ -32,7 +32,7 @@ from llama_stack_api.openai_responses import ( OpenAIResponseObjectStream, ) -from llama_stack_client import APIConnectionError +from llama_stack_client import APIConnectionError, AsyncLlamaStackClient from starlette.responses import Response, StreamingResponse from a2a_storage import A2AContextStore, A2AStorageFactory @@ -45,13 +45,18 @@ from constants import MEDIA_TYPE_EVENT_STREAM from log import get_logger from models.api.requests import QueryRequest +from models.common.responses.types import ResponseInput from models.config import Action +from utils.conversation_compaction import ( + apply_compaction_blocking, + store_compacted_turn, +) from utils.mcp_headers import McpHeaders, mcp_headers_dependency from utils.responses import ( extract_text_from_response_item, prepare_responses_params, ) -from utils.suid import normalize_conversation_id +from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id from version import __version__ logger = get_logger(__name__) @@ -336,6 +341,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals store=True, request_headers=self.request_headers, ) + # Compact the conversation if it is approaching the context window + # limit. A2A is not a browser SSE stream, so no progress event is + # emitted; the blocking variant summarizes inline before the call. + compaction = await apply_compaction_blocking( + client, + responses_params, + configuration.inference, + configuration.compaction, + ) + responses_params = compaction.params # Stream response from LLM using the Responses API stream = await client.responses.create(**responses_params.model_dump()) except APIConnectionError as e: @@ -392,9 +407,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals ) ) - # Process stream using generator and aggregator pattern + # Process stream using generator and aggregator pattern. In compacted + # mode the conversation parameter is not sent, so the turn is stored + # explicitly once the response completes (see _convert_stream_to_events). async for a2a_event in self._convert_stream_to_events( - stream, task_id, context_id, conversation_id + stream, + task_id, + context_id, + conversation_id, + client, + compaction.original_input if compaction.summarized else None, ): aggregator.process_event(a2a_event) await event_queue.enqueue_event(a2a_event) @@ -414,12 +436,14 @@ async def _process_task_streaming( # pylint: disable=too-many-locals final=True, ) - async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals + async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-positional-arguments self, stream: AsyncIterator[OpenAIResponseObjectStream], task_id: str, context_id: str, conversation_id: Optional[str], + client: Optional[AsyncLlamaStackClient] = None, + compacted_original_input: Optional[ResponseInput] = None, ) -> AsyncIterator[Any]: """Convert Responses API stream chunks to A2A events. @@ -508,6 +532,20 @@ async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-ma if response_obj: output = getattr(response_obj, "output", []) + # In compacted mode the conversation parameter was not sent, + # so persist this turn ourselves to keep the recent-turn + # buffer and audit history intact for the next request. + if ( + compacted_original_input is not None + and client is not None + and conversation_id + ): + await store_compacted_turn( + client, + to_llama_stack_conversation_id(conversation_id), + compacted_original_input, + output, + ) a2a_parts = _convert_responses_content_to_a2a_parts(output) if not a2a_parts and final_text: a2a_parts = [Part(root=TextPart(text=final_text))] diff --git a/tests/unit/app/endpoints/test_a2a.py b/tests/unit/app/endpoints/test_a2a.py index 6138d2568..e35b82b8e 100644 --- a/tests/unit/app/endpoints/test_a2a.py +++ b/tests/unit/app/endpoints/test_a2a.py @@ -801,6 +801,73 @@ async def test_process_task_streaming_handles_api_connection_error_on_retrieve_r error_message = call_args[1]["message"] assert "Unable to connect to Llama Stack backend service" in str(error_message) + @pytest.mark.asyncio + async def test_process_task_streaming_applies_compaction( + self, + mocker: MockerFixture, + setup_configuration: AppConfig, # pylint: disable=unused-argument + ) -> None: + """Test _process_task_streaming runs conversation compaction before the call.""" + executor = A2AAgentExecutor(auth_token="test-token") + + mock_message = mocker.MagicMock() + mock_message.role = "user" + mock_message.parts = [Part(root=TextPart(text="Hello"))] + mock_message.metadata = {} + + context = mocker.MagicMock(spec=RequestContext) + context.task_id = "task-123" + context.context_id = "ctx-456" + context.message = mock_message + context.get_user_input.return_value = "Hello" + + event_queue = mocker.AsyncMock(spec=EventQueue) + task_updater = mocker.MagicMock() + task_updater.update_status = mocker.AsyncMock() + task_updater.event_queue = event_queue + + mock_context_store = mocker.AsyncMock() + mock_context_store.get.return_value = None + mocker.patch( + "app.endpoints.a2a._get_context_store", return_value=mock_context_store + ) + + async def _empty_stream() -> Any: + """Yield no stream events.""" + return + yield # pragma: no cover (makes this an async generator) + + mock_client = mocker.AsyncMock() + mock_client.models.list = mocker.AsyncMock(return_value=[mocker.MagicMock()]) + mock_client.responses.create = mocker.AsyncMock(return_value=_empty_stream()) + mocker.patch( + "app.endpoints.a2a.AsyncLlamaStackClientHolder" + ).return_value.get_client.return_value = mock_client + + mock_params = mocker.Mock() + mock_params.model = "test-model" + mock_params.conversation = "conv_x" + mock_params.model_dump.return_value = {"input": "Hello", "model": "test-model"} + mocker.patch( + "app.endpoints.a2a.prepare_responses_params", + new=mocker.AsyncMock(return_value=mock_params), + ) + + compaction_result = mocker.Mock() + compaction_result.params = mock_params + compaction_result.summarized = False + compaction_result.original_input = None + apply = mocker.patch( + "app.endpoints.a2a.apply_compaction_blocking", + new=mocker.AsyncMock(return_value=compaction_result), + ) + + await executor._process_task_streaming( # pylint: disable=protected-access + context, task_updater, context.task_id, context.context_id + ) + + apply.assert_awaited_once() + @pytest.mark.asyncio async def test_cancel_raises_not_implemented(self, mocker: MockerFixture) -> None: """Test that cancel raises NotImplementedError.""" From 518dfb5b3d90c5b2321bf8a23fd44a5fee025f52 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:31:03 +0000 Subject: [PATCH 04/20] LCORE-1572: apply conversation compaction in the streaming_query endpoint Stream /v1/streaming_query through the compaction-aware path only when the conversation actually compacts, so non-compacting requests are unaffected (byte-for-byte the existing flow, including HTTP error handling). - conversation_compaction: add needs_compaction_path(), a cheap pre-stream predicate (no LLM, no lock) that is true only when the conversation already has a summary marker or would trigger a new compaction. - streaming_query: when the predicate is true, stream via the new generate_response_with_compaction(), which emits the compaction progress event before the summarization LLM call (R12) and creates the response inside the stream, surfacing create-time errors as SSE error events. generate_response gains emit_start/compacted parameters and, in compacted mode, appends the completed turn to the conversation (the conversation parameter is not sent, so Llama Stack does not store it automatically). - a2a: silence too-many-lines after the earlier compaction wiring. --- src/app/endpoints/a2a.py | 2 + src/app/endpoints/streaming_query.py | 206 +++++++++++++++++++++++++-- src/utils/conversation_compaction.py | 43 ++++++ 3 files changed, 240 insertions(+), 11 deletions(-) diff --git a/src/app/endpoints/a2a.py b/src/app/endpoints/a2a.py index acee799c3..f9a77a3ee 100644 --- a/src/app/endpoints/a2a.py +++ b/src/app/endpoints/a2a.py @@ -1,5 +1,7 @@ """Handler for A2A (Agent-to-Agent) protocol endpoints using Responses API.""" +# pylint: disable=too-many-lines + import asyncio import json import uuid diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index c88fb03dd..ce28b2173 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -78,6 +78,12 @@ from models.common.responses.responses_api_params import ResponsesApiParams from models.common.turn_summary import ReferencedDocument, TurnSummary from models.config import Action +from utils.conversation_compaction import ( + CompactionResult, + CompactionStartedEvent, + apply_compaction, + needs_compaction_path, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -287,6 +293,33 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals ) recording.record_llm_call(provider_id, model_id, endpoint_path) + response_media_type = ( + MEDIA_TYPE_TEXT + if query_request.media_type == MEDIA_TYPE_TEXT + else MEDIA_TYPE_EVENT_STREAM + ) + + # Only conversations that actually compact (already have a summary marker, + # or would trigger one now) take the compaction-aware path, where the + # response is created inside the SSE stream so the progress event can be + # flushed before the summarization LLM call. Every other request keeps the + # unchanged path: the response stream is created here, so create-time errors + # surface as HTTP responses exactly as before. + if await needs_compaction_path( + context.client, + responses_params, + configuration.inference, + configuration.compaction, + ): + return StreamingResponse( + generate_response_with_compaction( + context=context, + responses_params=responses_params, + endpoint_path=endpoint_path, + ), + media_type=response_media_type, + ) + generator, turn_summary = await retrieve_response_generator( responses_params=responses_params, context=context, @@ -299,12 +332,6 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals inline_rag_context.referenced_documents + turn_summary.referenced_documents ) - response_media_type = ( - MEDIA_TYPE_TEXT - if query_request.media_type == MEDIA_TYPE_TEXT - else MEDIA_TYPE_EVENT_STREAM - ) - return StreamingResponse( generate_response( generator=generator, @@ -563,11 +590,119 @@ async def _on_interrupt() -> None: return guard -async def generate_response( +def _http_exception_stream_event(exc: HTTPException) -> str: + """Render a FastAPI HTTPException as an SSE error event. + + Used by the compaction-aware streaming path, where the response is created + inside the stream and so create-time errors must be surfaced as SSE events + rather than as an HTTP status response. + """ + detail = ( + exc.detail if isinstance(exc.detail, dict) else {"response": str(exc.detail)} + ) + return format_stream_data( + {"event": "error", "data": {"status_code": exc.status_code, **detail}} + ) + + +async def generate_response_with_compaction( + context: ResponseGeneratorContext, + responses_params: ResponsesApiParams, + endpoint_path: str, +) -> AsyncIterator[str]: + """Stream a response for a conversation that requires compaction. + + Used only when :func:`needs_compaction_path` is true. Compaction and the + response creation happen inside the SSE stream so the ``compaction`` event + is flushed to the client *before* the summarization LLM call (R12). Errors + raised while compacting or creating the response are surfaced as SSE error + events (the stream has already started, so an HTTP status is no longer + possible). + + Args: + context: The response generator context. + responses_params: The base Responses API parameters. + endpoint_path: API endpoint path used for metric labeling. + + Yields: + SSE-formatted strings. + """ + media_type = context.query_request.media_type or MEDIA_TYPE_JSON + yield stream_start_event( + conversation_id=context.conversation_id, + request_id=context.request_id, + ) + + compacted = False + try: + async for item in apply_compaction( + context.client, + responses_params, + configuration.inference, + configuration.compaction, + emit_events=True, + ): + if isinstance(item, CompactionStartedEvent): + yield stream_compaction_event(context.conversation_id) + elif isinstance(item, CompactionResult): + responses_params = item.params + compacted = item.summarized + + generator, turn_summary = await retrieve_response_generator( + responses_params=responses_params, + context=context, + endpoint_path=endpoint_path, + ) + except HTTPException as e: + yield _http_exception_stream_event(e) + return + except RuntimeError as e: # library mode wraps 413 into runtime error + error_response = ( + PromptTooLongResponse(model=responses_params.model) + if is_context_length_error(str(e)) + else InternalServerErrorResponse.generic() + ) + yield stream_http_error_event(error_response, media_type) + return + except APIConnectionError as e: + yield stream_http_error_event( + ServiceUnavailableResponse(backend_name="Llama Stack", cause=str(e)), + media_type, + ) + return + except (LLSApiStatusError, OpenAIAPIStatusError) as e: + yield stream_http_error_event( + handle_known_apistatus_errors(e, responses_params.model), media_type + ) + return + + # Combine inline RAG results (BYOK + Solr) with tool-based results + if context.moderation_result.decision == "passed": + turn_summary.referenced_documents = deduplicate_referenced_documents( + context.inline_rag_context.referenced_documents + + turn_summary.referenced_documents + ) + + # The start event was already emitted above; delegate the rest (re-yield, + # finalization, compacted-turn storage) to the shared generator. + async for event in generate_response( + generator, + context, + responses_params, + turn_summary, + emit_start=False, + compacted=compacted, + ): + yield event + + +async def generate_response( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements generator: AsyncIterator[str], context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + emit_start: bool = True, + compacted: bool = False, ) -> AsyncIterator[str]: """Wrap a generator with cleanup logic. @@ -582,6 +717,12 @@ async def generate_response( context: The response generator context responses_params: The Responses API parameters turn_summary: TurnSummary populated during streaming + emit_start: Whether to emit the SSE start event. False when the caller + (the compaction-aware wrapper) has already emitted it. + compacted: Whether the conversation is in compacted mode. When True the + conversation parameter was not sent to Llama Stack, so the completed + turn is appended to the conversation here rather than being stored + automatically. Yields: SSE-formatted strings from the wrapped generator @@ -592,10 +733,11 @@ async def generate_response( stream_completed = False try: - yield stream_start_event( - conversation_id=context.conversation_id, - request_id=context.request_id, - ) + if emit_start: + yield stream_start_event( + conversation_id=context.conversation_id, + request_id=context.request_id, + ) # Re-yield all events from the generator async for event in generator: @@ -671,6 +813,23 @@ async def generate_response( ) completed_at = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + # In compacted mode the conversation parameter was not sent, so Llama Stack + # did not persist this turn. Append it ourselves to keep the recent-turn + # buffer and audit history intact for the next request. + if compacted: + try: + await append_turn_to_conversation( + context.client, + responses_params.conversation, + context.query_request.query, + turn_summary.llm_response, + ) + except Exception: # pylint: disable=broad-except + logger.exception( + "Failed to append compacted turn to conversation for request %s", + context.request_id, + ) + # Store query results (transcript, conversation details, cache) logger.info("Storing query results") store_query_results( @@ -967,6 +1126,31 @@ def stream_start_event(conversation_id: str, request_id: str) -> str: ) +def stream_compaction_event(conversation_id: str) -> str: + """Format an SSE event signalling that conversation compaction has started. + + Emitted before the summarization LLM call (R12) so the client can show a + progress indicator while older turns are being summarized. + + Parameters: + ---------- + conversation_id: The conversation being compacted. + + Returns: + ------- + str: SSE-formatted string representing the compaction event. + """ + return format_stream_data( + { + "event": "compaction", + "data": { + "status": "started", + "conversation_id": conversation_id, + }, + } + ) + + def stream_interrupted_event(request_id: str) -> str: """Format an SSE event indicating the stream was interrupted. diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index 63ea6c8ea..bbd005c39 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -388,6 +388,49 @@ async def apply_compaction_blocking( return result +async def needs_compaction_path( + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + encoding_name: str = DEFAULT_ENCODING_NAME, +) -> bool: + """Cheap pre-stream check: does this request need the compaction-aware path? + + Returns True when the conversation already has a summary marker (so it must + be served in compacted mode with explicit input) or when the estimated + tokens would trigger a new compaction. Performs no LLM call and takes no + lock — the authoritative evaluate-and-summarize work happens later under + the lock in :func:`apply_compaction`. Streaming endpoints use this to keep + non-compacting requests on their unchanged code path, so the in-stream + flow (and its SSE-error semantics) only ever applies to conversations that + are actually being compacted. + + Parameters: + client: Llama Stack client. + params: The base Responses API params. + inference_config: Inference config (for the per-model context window). + compaction_config: Compaction tuning. + encoding_name: tiktoken encoding name for estimation. + + Returns: + True if the compaction-aware streaming path should be used. + """ + if not compaction_config.enabled: + return False + items = await get_all_conversation_items(client, params.conversation) + if any(is_marker_item(item) for item in items): + return True + context_window = get_context_window(params.model, inference_config) + if context_window is None: + return False + estimated = estimate_tokens(params.instructions or "", encoding_name) + estimated += estimate_conversation_tokens(items, encoding_name=encoding_name) + if isinstance(params.input, str): + estimated += estimate_tokens(params.input, encoding_name) + return _should_compact(estimated, context_window, compaction_config) + + async def store_compacted_turn( client: AsyncLlamaStackClient, conversation_id: str, From 3854a711da86b87633334f2f55aaa661790c3f94 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:31:25 +0000 Subject: [PATCH 05/20] LCORE-1572: tests for the streaming compaction gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover needs_compaction_path: disabled, existing-marker, over-threshold, and under-threshold — the gate that keeps non-compacting requests on the unchanged streaming path. --- .../utils/test_conversation_compaction.py | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index 80301a75a..49b1d9a03 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -136,7 +136,9 @@ async def test_disabled_passes_through() -> None: @pytest.mark.asyncio -async def test_no_context_window_no_marker_passes_through(mocker: MockerFixture) -> None: +async def test_no_context_window_no_marker_passes_through( + mocker: MockerFixture, +) -> None: """No registered context window and no prior summary => normal flow.""" mocker.patch.object( cc, "get_all_conversation_items", mocker.AsyncMock(return_value=[]) @@ -264,3 +266,79 @@ async def test_store_compacted_turn_appends(mocker: MockerFixture) -> None: client = mocker.AsyncMock() await cc.store_compacted_turn(client, CONV, "the query", ["out"]) append.assert_awaited_once_with(client, CONV, "the query", ["out"]) + + +# --- needs_compaction_path (the tight gate protecting non-compacting requests) --- + + +@pytest.mark.asyncio +async def test_needs_compaction_path_disabled(mocker: MockerFixture) -> None: + """The gate is False (unchanged path) whenever compaction is disabled.""" + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1000), + _compaction(enabled=False), + ) + is False + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_existing_marker(mocker: MockerFixture) -> None: + """A conversation with a prior summary marker always needs the compaction path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock(return_value=[_marker("earlier"), _msg("user", "recent")]), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1_000_000), # huge window: no new trigger, but marker exists + _compaction(), + ) + is True + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_over_threshold(mocker: MockerFixture) -> None: + """A conversation over the token threshold needs the compaction path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock( + return_value=[_msg("user", "q " * 50), _msg("assistant", "a " * 50)] + ), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(50), # small window: over threshold + _compaction(threshold_ratio=0.1), + ) + is True + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_under_threshold(mocker: MockerFixture) -> None: + """A short conversation with no marker stays on the unchanged path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock(return_value=[_msg("user", "hi"), _msg("assistant", "hello")]), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1_000_000), # huge window: nowhere near the threshold + _compaction(), + ) + is False + ) From 91b4fc95e6a2ea1c48ed29444f4164f1bfe6e403 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:31:39 +0000 Subject: [PATCH 06/20] LCORE-1572: apply conversation compaction in the /v1/responses endpoint /v1/responses is the OpenAI-compatible Responses API, so compaction is silent: no custom SSE event is injected (preserving wire compatibility) and create-time error handling is unchanged. Summarization runs before the response is created, on both the streaming and non-streaming paths. - responses_endpoint_handler: run apply_compaction_blocking before the streaming/non-streaming split, gated to stateful single-conversation requests (store=True, a conversation present, no previous_response_id). - ResponsesContext: carry compacted_original_input so the finalization can store the turn against the original user input. - _append_previous_response_turn: generalized to also append the turn in compacted mode (the conversation parameter is dropped, so Llama Stack does not store the turn automatically) using the original input. --- src/app/endpoints/responses.py | 46 +++++++++++++++++++++++-- src/models/common/responses/contexts.py | 9 +++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/app/endpoints/responses.py b/src/app/endpoints/responses.py index ef1cdd802..c76811f0c 100644 --- a/src/app/endpoints/responses.py +++ b/src/app/endpoints/responses.py @@ -62,8 +62,10 @@ from models.common.moderation import ShieldModerationBlocked from models.common.responses.contexts import ResponsesContext from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import TurnSummary from models.config import Action +from utils.conversation_compaction import apply_compaction_blocking from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -238,14 +240,30 @@ async def _append_previous_response_turn( context: ResponsesContext, output: Sequence[OpenAIResponseOutput], ) -> None: - """Append response output when continuing from a previous response id. + """Append the completed turn when Llama Stack did not store it automatically. + + Llama Stack stores the turn itself only when the conversation parameter is + sent. Two cases bypass that and require an explicit append: continuing from + a ``previous_response_id``, and conversation compaction (LCORE-1572), where + the conversation parameter is dropped in favor of explicit input. In the + compaction case the turn is stored against the original user input (before + the explicit-input rewrite), carried on the context. Args: api_params: Responses API parameters containing conversation details. context: Request-scoped Responses API context. output: Final output items from the Responses API object. """ - if api_params.store and api_params.previous_response_id: + if not api_params.store: + return + if context.compacted_original_input is not None: + await append_turn_items_to_conversation( + context.client, + api_params.conversation, + context.compacted_original_input, + output, + ) + elif api_params.previous_response_id: await append_turn_items_to_conversation( context.client, api_params.conversation, @@ -455,6 +473,29 @@ async def responses_endpoint_handler( ) api_params = ResponsesApiParams.model_validate(updated_request.model_dump()) + + # Compact the conversation if it is approaching the context window limit. + # /v1/responses is OpenAI-compatible, so compaction is silent (no custom SSE + # event): summarization happens before the response is created, and the turn + # is appended explicitly afterward (the conversation parameter is dropped). + # Only stateful single-conversation requests are eligible. + compacted_original_input: Optional[ResponseInput] = None + if ( + configuration.compaction.enabled + and api_params.store + and api_params.conversation + and not api_params.previous_response_id + ): + compaction = await apply_compaction_blocking( + client, + api_params, + configuration.inference, + configuration.compaction, + ) + api_params = compaction.params + if compaction.summarized: + compacted_original_input = compaction.original_input + context = ResponsesContext( client=client, auth=auth, @@ -468,6 +509,7 @@ async def responses_endpoint_handler( user_agent=_get_user_agent(request), endpoint_path=endpoint_path, generate_topic_summary=updated_request.generate_topic_summary, + compacted_original_input=compacted_original_input, ) response_handler = ( handle_streaming_response diff --git a/src/models/common/responses/contexts.py b/src/models/common/responses/contexts.py index 89ec455d2..b2497b58e 100644 --- a/src/models/common/responses/contexts.py +++ b/src/models/common/responses/contexts.py @@ -10,6 +10,7 @@ from models.api.requests import QueryRequest from models.common.moderation import ShieldModerationResult +from models.common.responses.types import ResponseInput from models.common.turn_summary import RAGContext @@ -55,6 +56,14 @@ class ResponsesContext(BaseModel): default=False, description="Whether to generate a topic summary for new conversations", ) + compacted_original_input: Optional[ResponseInput] = Field( + default=None, + description="Set only when conversation compaction (LCORE-1572) rewrote " + "the request: the original user input before the explicit-input " + "rewrite. When present, the completed turn is appended to the " + "conversation using this input, since the conversation parameter was " + "dropped and Llama Stack therefore does not store the turn.", + ) @dataclass From d173d38f03176cd7be0016df191ddd1051d1ebe5 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:32:17 +0000 Subject: [PATCH 07/20] LCORE-1572: tests for /v1/responses compacted-turn storage Verify _append_previous_response_turn stores the turn against the original input in compacted mode, and stores nothing when store is disabled. --- tests/unit/app/endpoints/test_responses.py | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/unit/app/endpoints/test_responses.py b/tests/unit/app/endpoints/test_responses.py index 88fc23561..0c761e6af 100644 --- a/tests/unit/app/endpoints/test_responses.py +++ b/tests/unit/app/endpoints/test_responses.py @@ -20,6 +20,7 @@ from pytest_mock import MockerFixture from app.endpoints.responses import ( + _append_previous_response_turn, _is_server_mcp_output_item, _sanitize_response_dict, _should_filter_mcp_chunk, @@ -2783,3 +2784,53 @@ async def failing_stream() -> AsyncIterator[Any]: call_args = mock_record.call_args assert call_args.args[2] == "failure" assert call_args.kwargs.get("record_failure") is True + + +@pytest.mark.asyncio +async def test_append_previous_response_turn_compacted(mocker: MockerFixture) -> None: + """In compacted mode the turn is stored against the original input. + + When compaction rewrote the request, the conversation parameter was dropped + so Llama Stack did not store the turn. _append_previous_response_turn must + append it using the original user input (carried on the context), not the + rewritten explicit input on api_params. + """ + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + api_params = mocker.Mock( + store=True, + conversation="conv_x", + previous_response_id=None, + input=["rewritten explicit input"], + ) + context = mocker.Mock( + client=mocker.AsyncMock(), + compacted_original_input="the original query", + ) + + await _append_previous_response_turn(api_params, context, ["out"]) + + append.assert_awaited_once_with( + context.client, "conv_x", "the original query", ["out"] + ) + + +@pytest.mark.asyncio +async def test_append_previous_response_turn_not_stored_when_store_false( + mocker: MockerFixture, +) -> None: + """No append happens when store is disabled, even in compacted mode.""" + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + api_params = mocker.Mock( + store=False, conversation="conv_x", previous_response_id=None + ) + context = mocker.Mock(client=mocker.AsyncMock(), compacted_original_input="q") + + await _append_previous_response_turn(api_params, context, ["out"]) + + append.assert_not_awaited() From 3202433cea6f80580d0095cb7bee60449c3975e0 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:32:28 +0000 Subject: [PATCH 08/20] LCORE-1572: update spec doc to the as-built compaction design Revise R10, R12, the architecture flow, the changed-request-flow section, and the implementation guidance to match what was built: in compacted mode lightspeed-stack builds explicit input and omits the Llama Stack conversation parameter (which always reloads full history), preserving conversation_id and the full item history. Record the redesign and the four affected endpoints (query, streaming_query, A2A, /v1/responses) in a new Changelog section. --- .../conversation-compaction.md | 125 +++++++++++------- 1 file changed, 76 insertions(+), 49 deletions(-) diff --git a/docs/design/conversation-compaction/conversation-compaction.md b/docs/design/conversation-compaction/conversation-compaction.md index 1a85f95d5..8e713554f 100644 --- a/docs/design/conversation-compaction/conversation-compaction.md +++ b/docs/design/conversation-compaction/conversation-compaction.md @@ -61,13 +61,15 @@ R9 Compaction configuration must be admin-configurable via YAML: threshold ratio, fixed token floor, and buffer zone size. R10 -After compaction, lightspeed-stack injects the summary as a marked item into the existing Llama Stack conversation. When building context for the LLM, lightspeed-stack selects only items from the last summary marker onward. This preserves a single continuous conversation identity in Llama Stack while giving lightspeed-stack control over what the LLM sees. +After compaction, lightspeed-stack builds the LLM input explicitly — the summaries plus the recent verbatim turns plus the new query — and stops passing the Llama Stack `conversation` parameter for that request, because Llama Stack always reloads the full message history when the `conversation` parameter is set (verified empirically on llama-stack 0.6.0; see the Changelog and the spike doc). The summary is still written into the conversation as a marked item so it appears in the Conversations API, but the marker is lightspeed-stack's own boundary bookkeeping, not a Llama Stack selection mechanism. The `conversation_id` is preserved across the whole conversation, and the full history (including pre-compaction turns) remains in the conversation's items for UI/audit. Because the `conversation` parameter is no longer sent in compacted mode, lightspeed-stack appends each completed turn to the conversation itself. + +This applies to every endpoint that builds context from a growing conversation and calls the Responses API: `/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses`. (The `/v1/rlsapi` inference path is stateless — no stored conversation — and is therefore out of scope.) R11 Compaction must be blocking per conversation. If a request triggers compaction, concurrent requests on the same conversation must wait until compaction completes. This prevents race conditions (e.g., two requests both triggering compaction, or a new message being appended mid-compaction). R12 -The streaming endpoint must emit a compaction event (e.g., `{"event": "compaction_started"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests have no mid-request notification mechanism. +The native streaming endpoint (`/v1/streaming_query`) must emit a compaction event (e.g., `{"event": "compaction"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests (`/v1/query`) have no mid-request notification mechanism. The A2A executor compacts inline (it is not a browser SSE stream). `/v1/responses` is the OpenAI-compatible endpoint and compacts **silently** — it does not inject a non-standard event into the OpenAI-format stream, preserving wire compatibility. # Use Cases @@ -95,28 +97,32 @@ User Query → lightspeed-stack 1. Resolve model, system prompt, tools 2. Build input (query + RAG + attachments) 3. Acquire per-conversation lock (blocks concurrent requests) - 4. Estimate total tokens (tiktoken): system + history + new query - 5. If compaction needed (tokens > threshold): - a. Emit compaction event on streaming endpoint - b. Retrieve conversation history from Llama Stack + 4. Estimate total tokens (tiktoken): system + (summaries + recent items) + new query + 5. If compaction needed (tokens > threshold) OR a prior summary marker exists: + a. Emit compaction event (native streaming endpoint only) + b. Retrieve conversation items from Llama Stack c. Split into "old" (summarize) and "recent" (keep) — degrading guard: reduce recent turns if they exceed token budget - d. Summarize old turns → inject summary as marked item into conversation - e. Store summary chunk in conversation cache - 6. Build context: select items from last summary marker onward + new query - 7. Call Llama Stack Responses API with conversation parameter - (Llama Stack loads items from marker onward) + d. Summarize old turns → write summary as a marked item into conversation + 6. Build EXPLICIT input: [summary markers] + [recent items after last marker] + new query + 7. Call Llama Stack Responses API WITHOUT the conversation parameter + (so Llama Stack does not reload the full history) ↓ Llama Stack - 8. Processes conversation (summary marker + recent turns + new query) + 8. Processes exactly the explicit input ↓ lightspeed-stack - 9. Response stored in same conversation (continuous history) - 10. Update conversation cache - 11. Release per-conversation lock - 12. Return QueryResponse with context_status="summarized" (or "full") + 9. Append the completed turn to the conversation items (continuous history, + same conversation_id) — Llama Stack did not auto-store it (no conversation param) + 10. Release per-conversation lock + 11. Return response (context_status="summarized" when 1573 lands; "full" otherwise) ``` +Note: when no prior summary exists and the request is below the threshold, +none of step 5–9 applies — the request takes the normal path with the +`conversation` parameter, byte-for-byte unchanged. The explicit-input path is +entered only for conversations that are actually being (or have been) compacted. + ## Token estimation Add tiktoken as a dependency. Create `src/utils/token_estimator.py`: @@ -221,11 +227,11 @@ A conversation may have multiple summary chunks (one per compaction event). All ## Changed request flow after compaction -After compaction, lightspeed-stack injects the summary as a marked conversation item into the existing Llama Stack conversation. The summary item has a recognizable marker (e.g., metadata tag or content prefix) so that lightspeed-stack can identify it when loading history. +After compaction, lightspeed-stack writes the summary as a marked conversation item (a message whose text begins with a recognizable sentinel) so it appears in the Conversations API and serves as lightspeed-stack's own boundary marker. -When building context for subsequent requests, lightspeed-stack fetches conversation items and selects only those from the last summary marker onward. The `conversation` parameter continues to be used — Llama Stack still manages the conversation. lightspeed-stack just controls *which items* form the LLM context. +When building context for a compacted conversation, lightspeed-stack fetches the conversation items, collects every marker's summary, takes the items after the last marker as the recent verbatim buffer, and sends `[summaries] + [recent items] + [new query]` as **explicit input**, **without** the `conversation` parameter. This is necessary because Llama Stack reloads the *full* stored message history whenever the `conversation` parameter is set — there is no marker-based selection hook (verified empirically; see the Changelog). Each completed turn is then appended back to the conversation items by lightspeed-stack, since Llama Stack no longer auto-stores it. -This preserves a single continuous conversation identity. The user sees one conversation in the UI, and the Conversations API returns the full history including summary items. +This preserves a single continuous conversation identity. The `conversation_id` never changes, the user sees one conversation in the UI, and the Conversations API returns the full history including the summary marker items. ## API response changes @@ -291,36 +297,36 @@ Add `compaction` field to the root `Configuration` class. | `src/utils/compaction.py` | New module: summarization logic, partitioning, additive summary management | | `src/models/config.py` | Add `CompactionConfiguration` (near `ConversationHistoryConfiguration`) | | `src/configuration.py` | Add `compaction_configuration` property to `AppConfig` singleton | -| `src/utils/responses.py` | Modify `prepare_responses_params()` — insert compaction check (see below) | -| `src/app/endpoints/query.py` | No changes needed — compaction happens inside `prepare_responses_params()` | -| `src/app/endpoints/streaming_query.py` | No changes needed — same function is used | -| `src/models/responses.py` | Add `context_status` field to `QueryResponse` and `StreamingQueryResponse` | -| `src/cache/` (all backends) | Extend schema for `ConversationSummary` storage | - -## Insertion point in `responses.py` - -The compaction hook goes in `prepare_responses_params()`. Its signature: - -``` python -async def prepare_responses_params( - client: AsyncLlamaStackClient, - query_request: QueryRequest, - user_conversation: Optional[UserConversation], - ... -) -> ResponsesApiParams: -``` - -At the insertion point (after line 297), the following are available: - -- `client` — Llama Stack client (can fetch conversation history) -- `llama_stack_conv_id` — the conversation ID -- `model` — selected model (e.g., `"openai/gpt-4o-mini"`) -- `system_prompt` — resolved system prompt -- `tools` — prepared tool list -- `input_text` — the user's query with RAG context -- `user_conversation` — DB metadata including `message_count` - -After compaction, the summary is injected as a conversation item in Llama Stack. When building the next request, lightspeed-stack fetches items from the conversation, filters to only those after the last summary marker, and passes them as input alongside the `conversation` parameter. The `conversation` parameter is still used — the conversation identity is preserved. +| `src/utils/conversation_compaction.py` | New module: `apply_compaction()` / `apply_compaction_blocking()`, `needs_compaction_path()`, marker helpers, per-conversation lock | +| `src/models/common/responses/responses_api_params.py` | `omit_conversation` flag — drops the `conversation` parameter from the request body in compacted mode | +| `src/app/endpoints/query.py` | Call `apply_compaction_blocking()` after preparing params; store the turn in compacted mode | +| `src/app/endpoints/streaming_query.py` | Compaction-aware SSE path that emits the `compaction` event before summarizing (R12) | +| `src/app/endpoints/a2a.py` | Inline compaction (no SSE event); store the turn on `response.completed` | +| `src/app/endpoints/responses.py` | Silent compaction (OpenAI-compatible); store the turn via `_append_previous_response_turn` | +| `src/models/responses.py` (now relocated) | `context_status` field — deferred to LCORE-1573 | +| `src/cache/` (all backends) | `ConversationSummary` storage — LCORE-1571 | + +## How compaction is invoked + +Rather than a hook buried inside `prepare_responses_params()`, compaction is a +reusable unit in `src/utils/conversation_compaction.py` that each endpoint +calls after its params are prepared: + +- `apply_compaction_blocking(client, params, inference_config, compaction_config)` + returns a `CompactionResult` (possibly-rewritten params, a `summarized` + flag, and the `original_input`). Non-streaming `/v1/query`, A2A, and + `/v1/responses` use this. +- `apply_compaction(..., emit_events=True)` is the async-generator variant that + yields a `CompactionStartedEvent` before the summarization LLM call; the + native `/v1/streaming_query` SSE path uses it to satisfy R12. +- `needs_compaction_path(...)` is a cheap pre-stream predicate (no LLM, no lock) + that the streaming endpoint uses to route only actually-compacting + conversations through the in-stream path, leaving every other request on the + unchanged flow. + +When compaction is active, the endpoint builds explicit input, the +`conversation` parameter is omitted (via `ResponsesApiParams.omit_conversation`), +and the completed turn is appended to the conversation items afterward. ## Fetching conversation history @@ -369,6 +375,27 @@ Compaction adds latency only on the trigger turn. In PoC testing, compaction tur - **Smaller model for summarization**: Allow configuring a cheaper model for the summarization call. Current design uses the same model for simplicity. - **UI compaction indicator**: The `context_status` response field and the streaming compaction event (R12) provide the data. Coordinate with the UI team on how to display it. +# Changelog + +**2026-05-26 — R10 redesign (Option A) during LCORE-1572 implementation.** +A live experiment on the deployed llama-stack 0.6.0 showed that passing the +`conversation` parameter to the Responses API always reloads the *full* stored +message history, with no marker-based selection hook. The original R10 +(inject a marker, keep the `conversation` parameter, "select from the marker +onward") therefore cannot limit what the model sees. R10, R12, the +architecture flow, and the implementation guidance were revised to the +explicit-input approach: in compacted mode lightspeed-stack builds the input +itself and omits the `conversation` parameter, preserving `conversation_id` +and the full item history while controlling the LLM context. This restores the +spike's *original* Decision 6 recommendation (which a later spike edit had +changed to the marker approach). The summary cache (Decision 8 / LCORE-1571) +becomes a parallel persistence layer; the runtime boundary is the marker item +in Llama Stack. Compaction was also confirmed to apply to four endpoints — +`/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses` — +not the two originally listed; `/v1/responses` compacts silently to preserve +OpenAI-API wire compatibility. Evidence and full reasoning: the spike doc +(`conversation-compaction-spike.md`). + # Appendix A: PoC Evidence A proof-of-concept was built and tested. From 1a78038168478696e1f8d6bb582c99af26a506f2 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:32:55 +0000 Subject: [PATCH 09/20] LCORE-1572: fix needs_compaction_path docstring (pydocstyle D400) --- src/utils/conversation_compaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index bbd005c39..09a0e9563 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -395,7 +395,7 @@ async def needs_compaction_path( compaction_config: CompactionConfiguration, encoding_name: str = DEFAULT_ENCODING_NAME, ) -> bool: - """Cheap pre-stream check: does this request need the compaction-aware path? + """Return whether this request needs the compaction-aware path (cheap check). Returns True when the conversation already has a summary marker (so it must be served in compacted mode with explicit input) or when the estimated From 412b7d9f5abc448fcd3c0d005b98cadb7b2efe4d Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 05:33:13 +0000 Subject: [PATCH 10/20] LCORE-1572: build compacted input as typed messages (silence Pydantic warning) The explicit compacted input was assembled as plain dicts, which produced PydanticSerializationUnexpectedValue warnings when ResponsesApiParams was dumped (its input field is typed ResponseInput). Build the summary, recent verbatim, and query items as typed OpenAIResponseMessage objects instead. Verified end-to-end against a live stack: the serializer warning is gone and compaction still triggers, preserves conversation identity, and recalls earlier context correctly. --- src/utils/conversation_compaction.py | 55 ++++++++----------- .../utils/test_conversation_compaction.py | 10 ++-- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index 09a0e9563..094764415 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -41,6 +41,7 @@ from dataclasses import dataclass from typing import Any, Optional, cast +from llama_stack_api.openai_responses import OpenAIResponseMessage from llama_stack_client import AsyncLlamaStackClient from llama_stack_client.types.conversations.item_create_params import Item @@ -160,21 +161,18 @@ def _marker_summaries(items: list[Any]) -> list[str]: return [_summary_text_of(item) for item in items if is_marker_item(item)] -def _summary_input_message(summary_text: str) -> dict[str, Any]: - """Build an explicit input message carrying a summary for the model.""" - return { - "type": "message", - "role": "user", - "content": [ - { - "type": "input_text", - "text": f"Summary of earlier conversation:\n{summary_text}", - } - ], - } +def _summary_input_message(summary_text: str) -> OpenAIResponseMessage: + """Build an explicit input message carrying a summary for the model. + + Returns a typed ``OpenAIResponseMessage`` (a member of the ``ResponseInput`` + union) so it serializes cleanly when the request body is dumped. + """ + return OpenAIResponseMessage( + role="user", content=f"Summary of earlier conversation:\n{summary_text}" + ) -def _verbatim_input_message(item: Any) -> Optional[dict[str, Any]]: +def _verbatim_input_message(item: Any) -> Optional[OpenAIResponseMessage]: """Render a recent conversation message item as an explicit input message. Only message items are rendered; non-message items (tool calls/results) are @@ -188,29 +186,22 @@ def _verbatim_input_message(item: Any) -> Optional[dict[str, Any]]: if not text: return None role = item.get("role") if isinstance(item, dict) else getattr(item, "role", "user") - content_type = "output_text" if role == "assistant" else "input_text" - return { - "type": "message", - "role": role, - "content": [{"type": content_type, "text": text}], - } + if role not in ("system", "developer", "user", "assistant"): + role = "user" + # role validated above; cast satisfies the Literal-typed parameter. + return OpenAIResponseMessage(role=cast(Any, role), content=text) def _query_input_message(original_input: ResponseInput) -> list[Any]: - """Render the new user query (string or item list) as explicit input messages.""" + """Render the new user query as explicit input items. + + A string query becomes a single typed user message. An item list (e.g. from + the /v1/responses client) is already composed of typed ``ResponseItem`` + objects, so it is passed through unchanged. + """ if isinstance(original_input, str): - return [ - { - "type": "message", - "role": "user", - "content": [{"type": "input_text", "text": original_input}], - } - ] - rendered: list[Any] = [] - for item in original_input: - dumped = item.model_dump() if hasattr(item, "model_dump") else item - rendered.append(dumped) - return rendered + return [OpenAIResponseMessage(role="user", content=original_input)] + return list(original_input) def _build_explicit_input( diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index 49b1d9a03..234061faa 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -97,13 +97,13 @@ def test_build_explicit_input_shape() -> None: recent_items=[_msg("user", "recent q"), _msg("assistant", "recent a")], original_input="brand new question", ) - texts = [part["content"][0]["text"] for part in built] + texts = [m.content for m in built] assert "Summary of earlier conversation:\nearlier stuff" in texts[0] assert texts[1] == "recent q" assert texts[2] == "recent a" assert texts[3] == "brand new question" - # the assistant turn is rendered with output_text content - assert built[2]["content"][0]["type"] == "output_text" + # items are typed OpenAIResponseMessage objects (so they serialize cleanly) + assert built[2].role == "assistant" def test_should_compact() -> None: @@ -183,7 +183,7 @@ async def test_existing_marker_builds_explicit_input(mocker: MockerFixture) -> N assert result.summarized is True assert result.params.omit_conversation is True assert isinstance(result.params.input, list) - texts = [p["content"][0]["text"] for p in result.params.input] + texts = [m.content for m in result.params.input] assert texts[0].endswith("the earlier conversation summary") assert texts[-1] == "brand new" assert result.original_input == "brand new" @@ -219,7 +219,7 @@ async def test_triggers_summarization_and_writes_marker(mocker: MockerFixture) - write_marker.assert_awaited_once() assert result.summarized is True assert result.params.omit_conversation is True - texts = [p["content"][0]["text"] for p in result.params.input] + texts = [m.content for m in result.params.input] assert "condensed earlier turns" in texts[0] assert texts[-1] == "follow-up" From 88a1071172db0d1d4fb5b3bf04bcceadfcff735b Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 11:12:22 +0200 Subject: [PATCH 11/20] LCORE-1572: raise instead of assert on the drained compaction result apply_compaction_blocking asserted that the generator yielded a result. Under python -O asserts are stripped, so the guard would vanish and a None result could propagate to callers. Replace it with an explicit None check that raises RuntimeError. Clears a GitHub code-scanning (CodeQL) "use of assert" finding. The repository's Bandit configuration skips B101, so this only surfaced via code scanning, not the Bandit CI job. --- src/utils/conversation_compaction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index 094764415..d468f8b40 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -375,7 +375,8 @@ async def apply_compaction_blocking( ): if isinstance(item, CompactionResult): result = item - assert result is not None # the generator always yields exactly one result + if result is None: # pragma: no cover - the generator always yields one result + raise RuntimeError("apply_compaction did not yield a CompactionResult") return result From 79dd34dfb04eef19f5085aafc903f3df66ee3fb6 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 12:53:54 +0200 Subject: [PATCH 12/20] LCORE-1572: wire persisted recursive fold (R3) via the summary cache Make the conversation summary cache the preferred source of truth for compaction summaries and the home of the persisted recursive fold. - apply_compaction / apply_compaction_blocking gain cache + user_id + skip_user_id_check. Summaries are read from the cache (get_summaries) and each new chunk is written to it (store_summary); the Llama Stack marker texts remain an authoritative fallback when no persisting cache is configured (marker-only mode, additive summaries, no fold). - When the persisted summaries themselves exceed the threshold, they are folded via recursively_resummarize and the fold is persisted with replace_summaries, so it is computed once and reused rather than recomputed per request (R3). - configured_conversation_cache() resolves the configured cache (or None) for the endpoints. - Wired into /v1/query, /v1/streaming_query, and /v1/responses. The A2A executor stays marker-only: it has no resolved user_id for the (user_id, conversation_id) cache key. Adds 7 unit tests: cache-preferred reads, store-on-compaction, fold trigger and persistence, no-fold-without-cache, marker fallback, and the cache resolver. --- src/app/endpoints/a2a.py | 3 + src/app/endpoints/query.py | 4 + src/app/endpoints/responses.py | 10 +- src/app/endpoints/streaming_query.py | 4 + src/utils/conversation_compaction.py | 149 ++++++++++++++- .../utils/test_conversation_compaction.py | 173 ++++++++++++++++++ 6 files changed, 331 insertions(+), 12 deletions(-) diff --git a/src/app/endpoints/a2a.py b/src/app/endpoints/a2a.py index f9a77a3ee..533546398 100644 --- a/src/app/endpoints/a2a.py +++ b/src/app/endpoints/a2a.py @@ -346,6 +346,9 @@ async def _process_task_streaming( # pylint: disable=too-many-locals # Compact the conversation if it is approaching the context window # limit. A2A is not a browser SSE stream, so no progress event is # emitted; the blocking variant summarizes inline before the call. + # No conversation cache is passed: the A2A executor has no resolved + # user_id for the (user_id, conversation_id) cache key, so A2A runs + # in marker-only mode (additive summaries, no persisted fold). compaction = await apply_compaction_blocking( client, responses_params, diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index ee2195f85..42f9fc854 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -44,6 +44,7 @@ from models.config import Action from utils.conversation_compaction import ( apply_compaction_blocking, + configured_conversation_cache, store_compacted_turn, ) from utils.conversations import append_turn_items_to_conversation @@ -209,6 +210,9 @@ async def query_endpoint_handler( responses_params, configuration.inference, configuration.compaction, + cache=configured_conversation_cache(), + user_id=user_id, + skip_user_id_check=_skip_userid_check, ) responses_params = compaction.params diff --git a/src/app/endpoints/responses.py b/src/app/endpoints/responses.py index c76811f0c..def47b49b 100644 --- a/src/app/endpoints/responses.py +++ b/src/app/endpoints/responses.py @@ -65,7 +65,10 @@ from models.common.responses.types import ResponseInput from models.common.turn_summary import TurnSummary from models.config import Action -from utils.conversation_compaction import apply_compaction_blocking +from utils.conversation_compaction import ( + apply_compaction_blocking, + configured_conversation_cache, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -374,7 +377,7 @@ async def responses_endpoint_handler( check_configuration_loaded(configuration) started_at = datetime.now(UTC) rh_identity_context = get_rh_identity_context(request) - user_id, _, _, token = auth + user_id, _, skip_userid_check, token = auth await check_mcp_auth(configuration, mcp_headers, token, request.headers) @@ -491,6 +494,9 @@ async def responses_endpoint_handler( api_params, configuration.inference, configuration.compaction, + cache=configured_conversation_cache(), + user_id=user_id, + skip_user_id_check=skip_userid_check, ) api_params = compaction.params if compaction.summarized: diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index ce28b2173..91821b9ad 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -82,6 +82,7 @@ CompactionResult, CompactionStartedEvent, apply_compaction, + configured_conversation_cache, needs_compaction_path, ) from utils.conversations import append_turn_items_to_conversation @@ -641,6 +642,9 @@ async def generate_response_with_compaction( configuration.inference, configuration.compaction, emit_events=True, + cache=configured_conversation_cache(), + user_id=context.user_id, + skip_user_id_check=context.skip_userid_check, ): if isinstance(item, CompactionStartedEvent): yield stream_compaction_event(context.conversation_id) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index d468f8b40..c8bb3badd 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -5,8 +5,8 @@ into the actual request path (LCORE-1572). Unlike ``utils.compaction`` — which is deliberately side-effect free — this module *does* touch conversation state: it fetches conversation items from Llama Stack, calls the summarization LLM, -writes summary marker items, persists summaries to the cache (best-effort), and -holds a per-conversation lock. +writes summary marker items, reads and writes summaries in the cache, and holds +a per-conversation lock. Design (see ``docs/design/conversation-compaction/conversation-compaction.md``): @@ -30,10 +30,15 @@ LLM call so the client can show a progress indicator (R12). The non-streaming wrapper :func:`apply_compaction_blocking` simply ignores those events. -The cache (LCORE-1571) is a *best-effort* secondary store here: summaries are -written to it for fast/queryable persistence, but the runtime boundary and -summary text are read back from the Llama Stack marker items, so this module -does not depend on the cache being functional. +The cache (LCORE-1571) is the preferred source of truth for summaries and the +home of the persisted recursive fold (R3): each summary chunk is written to it, +the active summary set is read back from it, and when the summaries themselves +grow past the threshold they are folded into one and persisted via +``replace_summaries`` so the fold is reused rather than recomputed. When no +persisting cache is configured (or a cache read fails) the module falls back to +the Llama Stack marker texts, which remain authoritative — marker-only mode +keeps additive summaries with no fold. The marker items always carry the +boundary between summarized history and the recent verbatim turns. """ import asyncio @@ -45,11 +50,19 @@ from llama_stack_client import AsyncLlamaStackClient from llama_stack_client.types.conversations.item_create_params import Item +from cache.cache import Cache +from cache.cache_error import CacheError +from configuration import configuration from log import get_logger from models.common.responses.responses_api_params import ResponsesApiParams from models.common.responses.types import ResponseInput +from models.compaction import ConversationSummary from models.config import CompactionConfiguration, InferenceConfiguration -from utils.compaction import partition_conversation, summarize_chunk +from utils.compaction import ( + partition_conversation, + recursively_resummarize, + summarize_chunk, +) from utils.conversations import ( append_turn_items_to_conversation, get_all_conversation_items, @@ -238,6 +251,62 @@ async def _write_summary_marker( ) +def _read_cached_summaries( + cache: Optional[Cache], + user_id: str, + conversation_id: str, + skip_user_id_check: bool, +) -> list[ConversationSummary]: + """Return persisted summary chunks for a conversation (best-effort). + + The cache is the preferred source of truth for summaries (and the only home + for a persisted recursive fold). Returns an empty list when no cache is + configured, the backend does not persist (in-memory/no-op), or a cache error + occurs — callers then fall back to the Llama Stack marker texts, which remain + authoritative. + """ + if cache is None: + return [] + try: + return cache.get_summaries(user_id, conversation_id, skip_user_id_check) + except CacheError as exc: # markers remain a valid fallback + logger.warning("compaction: cache get_summaries failed: %s", exc) + return [] + + +def _store_cached_summary( + cache: Optional[Cache], + user_id: str, + conversation_id: str, + summary: ConversationSummary, + skip_user_id_check: bool, +) -> None: + """Persist a new summary chunk to the cache (best-effort). + + The summary is also written as a Llama Stack marker by the caller, so a + failed cache write does not lose it — it only forgoes cache-backed reads and + folding for this conversation. + """ + if cache is None: + return + try: + cache.store_summary(user_id, conversation_id, summary, skip_user_id_check) + except CacheError as exc: # the marker write already preserved the summary + logger.warning("compaction: cache store_summary failed: %s", exc) + + +def configured_conversation_cache() -> Optional[Cache]: + """Return the configured conversation cache, or None when none is configured. + + Endpoints pass this to :func:`apply_compaction` / :func:`apply_compaction_blocking`. + Compaction uses the cache as its preferred summary store and the home of the + persisted recursive fold; with no cache configured it runs in marker-only mode. + """ + if configuration.conversation_cache_configuration.type is None: + return None + return configuration.conversation_cache + + def _should_compact( estimated_tokens: int, context_window: int, @@ -260,6 +329,9 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit compaction_config: CompactionConfiguration, emit_events: bool = False, encoding_name: str = DEFAULT_ENCODING_NAME, + cache: Optional[Cache] = None, + user_id: str = "", + skip_user_id_check: bool = False, ) -> AsyncIterator[Any]: """Apply conversation compaction to a prepared request, yielding the result. @@ -281,6 +353,11 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit compaction_config: Compaction tuning (enabled, threshold, buffer, ...). emit_events: Whether to yield CompactionStartedEvent before summarizing. encoding_name: tiktoken encoding name for estimation/summarization. + cache: Conversation cache, the preferred summary store and the home of + the persisted recursive fold. ``None`` (or a non-persisting backend) + falls back to marker-only summaries with no folding. + user_id: User identifier for cache reads/writes. + skip_user_id_check: Whether to bypass the cache's user_id validation. Yields: Zero or more CompactionStartedEvent, then exactly one CompactionResult. @@ -296,9 +373,20 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit async with _get_lock(conversation_id): items = await get_all_conversation_items(client, conversation_id) - summaries = _marker_summaries(items) recent_items = _items_after_last_marker(items) + # Summaries: the cache is the preferred source of truth (and the home of + # any persisted recursive fold); the Llama Stack marker texts remain an + # authoritative fallback when no persisting cache is available. + cached_summaries = _read_cached_summaries( + cache, user_id, conversation_id, skip_user_id_check + ) + summaries = ( + [s.summary_text for s in cached_summaries] + if cached_summaries + else _marker_summaries(items) + ) + context_window = get_context_window(model, inference_config) if context_window is not None: estimated = estimate_tokens(system_prompt or "", encoding_name) @@ -332,9 +420,43 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit await _write_summary_marker( client, conversation_id, summary.summary_text ) + _store_cached_summary( + cache, user_id, conversation_id, summary, skip_user_id_check + ) summaries.append(summary.summary_text) + cached_summaries = [*cached_summaries, summary] recent_items = keep_items + # Recursive fold (R3): when the persisted summaries themselves grow + # past the threshold, collapse them into a single fold and persist it + # so it is reused rather than recomputed each request. Requires a + # persisting cache; marker-only conversations keep additive summaries. + if cache is not None and len(cached_summaries) >= 2: + summaries_tokens = sum(s.token_count for s in cached_summaries) + if ( + summaries_tokens + > context_window * compaction_config.threshold_ratio + ): + logger.info( + "Folding %d summaries (%d tokens) for conversation %s", + len(cached_summaries), + summaries_tokens, + conversation_id, + ) + folded = await recursively_resummarize( + client, model, cached_summaries, encoding_name + ) + try: + cache.replace_summaries( + user_id, conversation_id, folded, skip_user_id_check + ) + cached_summaries = [folded] + summaries = [folded.summary_text] + except CacheError as exc: # keep the unfolded summaries + logger.warning( + "compaction: cache replace_summaries failed: %s", exc + ) + if not summaries: # No compaction has ever happened for this conversation: leave the # normal conversation-parameter flow untouched. @@ -352,17 +474,21 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit ) -async def apply_compaction_blocking( +async def apply_compaction_blocking( # pylint: disable=too-many-arguments,too-many-positional-arguments client: AsyncLlamaStackClient, params: ResponsesApiParams, inference_config: InferenceConfiguration, compaction_config: CompactionConfiguration, encoding_name: str = DEFAULT_ENCODING_NAME, + cache: Optional[Cache] = None, + user_id: str = "", + skip_user_id_check: bool = False, ) -> CompactionResult: """Non-streaming wrapper around :func:`apply_compaction`. Drains the generator with event emission disabled and returns the final - :class:`CompactionResult`. + :class:`CompactionResult`. See :func:`apply_compaction` for the ``cache`` / + ``user_id`` / ``skip_user_id_check`` parameters. """ result: Optional[CompactionResult] = None async for item in apply_compaction( @@ -372,6 +498,9 @@ async def apply_compaction_blocking( compaction_config, emit_events=False, encoding_name=encoding_name, + cache=cache, + user_id=user_id, + skip_user_id_check=skip_user_id_check, ): if isinstance(item, CompactionResult): result = item diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index 234061faa..e2f0b0c8e 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -342,3 +342,176 @@ async def test_needs_compaction_path_under_threshold(mocker: MockerFixture) -> N ) is False ) + + +# --- cache as source of truth + recursive fold (R3) --- + + +def _summary(text: str, *, turn: int, tokens: int, at: str) -> ConversationSummary: + """Build a ConversationSummary for cache tests.""" + return ConversationSummary( + summary_text=text, + summarized_through_turn=turn, + token_count=tokens, + created_at=at, + model_used=MODEL, + ) + + +@pytest.mark.asyncio +async def test_cache_summaries_preferred_over_markers(mocker: MockerFixture) -> None: + """When the cache returns summaries, they are used instead of marker texts.""" + items = [_marker("STALE marker text"), _msg("user", "recent q")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + cached = _summary( + "FRESH cached summary", turn=2, tokens=5, at="2026-05-26T00:00:00Z" + ) + cache = mocker.Mock() + cache.get_summaries.return_value = [cached] + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("brand new"), + inference_config=_inference(1_000_000), # huge: no new trigger + compaction_config=_compaction(), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + summarize.assert_not_called() + cache.get_summaries.assert_called_once_with("u1", CONV, False) + texts = [m.content for m in result.params.input] + assert "FRESH cached summary" in texts[0] + assert "STALE marker text" not in texts[0] + + +@pytest.mark.asyncio +async def test_store_summary_called_on_compaction(mocker: MockerFixture) -> None: + """A new summary chunk is persisted to the cache when compaction triggers.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = _summary("condensed", turn=2, tokens=4, at="2026-05-26T00:00:00Z") + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock(return_value=summary)) + mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + cache = mocker.Mock() + cache.get_summaries.return_value = [] + + await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("follow-up"), + inference_config=_inference(50), # small window forces the trigger + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + cache.store_summary.assert_called_once() + assert cache.store_summary.call_args[0] == ("u1", CONV, summary, False) + + +@pytest.mark.asyncio +async def test_fold_when_cached_summaries_exceed_threshold( + mocker: MockerFixture, +) -> None: + """Cached summaries above the threshold are folded and the fold persisted.""" + items = [_marker("m1"), _marker("m2"), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + s1 = _summary("sum one", turn=4, tokens=20, at="2026-05-26T00:00:00Z") + s2 = _summary("sum two", turn=8, tokens=20, at="2026-05-26T00:10:00Z") + folded = _summary("FOLDED one+two", turn=8, tokens=10, at="2026-05-26T00:20:00Z") + cache = mocker.Mock() + cache.get_summaries.return_value = [s1, s2] + resum = mocker.patch.object( + cc, "recursively_resummarize", mocker.AsyncMock(return_value=folded) + ) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(50), # threshold 25 < 40 summary tokens + compaction_config=_compaction(threshold_ratio=0.5, buffer_turns=5), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + summarize.assert_not_called() # estimate stays small; only the fold fires + resum.assert_awaited_once() + cache.replace_summaries.assert_called_once() + assert cache.replace_summaries.call_args[0][2] is folded + texts = [m.content for m in result.params.input] + assert "FOLDED one+two" in texts[0] + assert not any("sum one" in t for t in texts) + + +@pytest.mark.asyncio +async def test_no_fold_without_cache(mocker: MockerFixture) -> None: + """With no cache, additive marker summaries are never folded (marker mode).""" + items = [_marker("m1 " * 30), _marker("m2 " * 30), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + resum = mocker.patch.object(cc, "recursively_resummarize", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(1_000_000), + compaction_config=_compaction(), + cache=None, + ) + + resum.assert_not_awaited() + assert result.summarized is True # still compacted via markers + + +@pytest.mark.asyncio +async def test_marker_fallback_when_cache_empty(mocker: MockerFixture) -> None: + """An empty cache falls back to the authoritative marker texts.""" + items = [_marker("marker summary text"), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + cache = mocker.Mock() + cache.get_summaries.return_value = [] + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(1_000_000), + compaction_config=_compaction(), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + texts = [m.content for m in result.params.input] + assert "marker summary text" in texts[0] + + +def test_configured_conversation_cache_none(mocker: MockerFixture) -> None: + """configured_conversation_cache returns None when no cache is configured.""" + mock_config = mocker.patch.object(cc, "configuration") + mock_config.conversation_cache_configuration.type = None + assert cc.configured_conversation_cache() is None + + +def test_configured_conversation_cache_returns_cache(mocker: MockerFixture) -> None: + """configured_conversation_cache returns the configured cache instance.""" + mock_config = mocker.patch.object(cc, "configuration") + mock_config.conversation_cache_configuration.type = "sqlite" + sentinel = object() + mock_config.conversation_cache = sentinel + assert cc.configured_conversation_cache() is sentinel From cb7414d0c8d8f7658473bc5782a9d56edcdd6495 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 15:35:07 +0200 Subject: [PATCH 13/20] =?UTF-8?q?LCORE-1572:=20address=20CodeRabbit=20revi?= =?UTF-8?q?ew=20=E2=80=94=20list-form=20input=20tokens=20+=20clarity=20ren?= =?UTF-8?q?ame?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Count tokens for list-form ResponseInput (e.g. /v1/responses), not only the string form, so compaction is not skipped on large item-list inputs that could otherwise still hit HTTP 413. Adds _estimate_response_input_tokens and a regression test. - Rename CompactionResult.summarized to compacted: the flag means "served in compacted / explicit-input mode" (set whenever the conversation has any summary, reused or fresh), not "a summary was created this request". The old name caused reviewer confusion about turn-persistence gating, which is correct as written. --- src/app/endpoints/a2a.py | 2 +- src/app/endpoints/query.py | 2 +- src/app/endpoints/responses.py | 2 +- src/app/endpoints/streaming_query.py | 2 +- src/utils/conversation_compaction.py | 39 ++++++++++++------- .../utils/test_conversation_compaction.py | 29 ++++++++++---- 6 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/app/endpoints/a2a.py b/src/app/endpoints/a2a.py index 533546398..c8de56e91 100644 --- a/src/app/endpoints/a2a.py +++ b/src/app/endpoints/a2a.py @@ -421,7 +421,7 @@ async def _process_task_streaming( # pylint: disable=too-many-locals context_id, conversation_id, client, - compaction.original_input if compaction.summarized else None, + compaction.original_input if compaction.compacted else None, ): aggregator.process_event(a2a_event) await event_queue.enqueue_event(a2a_event) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 42f9fc854..beca48e32 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -231,7 +231,7 @@ async def query_endpoint_handler( responses_params, moderation_result, endpoint_path, - original_input=compaction.original_input if compaction.summarized else None, + original_input=compaction.original_input if compaction.compacted else None, ) if moderation_result.decision == "passed": diff --git a/src/app/endpoints/responses.py b/src/app/endpoints/responses.py index def47b49b..705da927a 100644 --- a/src/app/endpoints/responses.py +++ b/src/app/endpoints/responses.py @@ -499,7 +499,7 @@ async def responses_endpoint_handler( skip_user_id_check=skip_userid_check, ) api_params = compaction.params - if compaction.summarized: + if compaction.compacted: compacted_original_input = compaction.original_input context = ResponsesContext( diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 91821b9ad..8a6bbb089 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -650,7 +650,7 @@ async def generate_response_with_compaction( yield stream_compaction_event(context.conversation_id) elif isinstance(item, CompactionResult): responses_params = item.params - compacted = item.summarized + compacted = item.compacted generator, turn_summary = await retrieve_response_generator( responses_params=responses_params, diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index c8bb3badd..86f43d1ff 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -126,21 +126,24 @@ class CompactionResult: Attributes: params: The (possibly rewritten) Responses API params to send. When - ``summarized`` is True, ``params.input`` is an explicit item list + ``compacted`` is True, ``params.input`` is an explicit item list (summaries + recent turns + new query) and the ``conversation`` parameter is omitted from the request body. - summarized: Whether the conversation is in compacted mode (it has at - least one summary marker). Drives ``context_status``. + compacted: Whether the request is served in compacted (explicit-input) + mode — i.e. the conversation has at least one summary (from a marker + or the cache), so the ``conversation`` parameter is omitted. This is + True whether the summary was created this request or reused from a + prior one. Drives ``context_status``. original_input: The new user query exactly as it arrived (before the explicit-input rewrite). Populated only in compacted mode (where - ``summarized`` is True); ``None`` otherwise. In compacted mode the + ``compacted`` is True); ``None`` otherwise. In compacted mode the caller must append this plus the LLM output to the conversation items itself, since the ``conversation`` parameter is no longer passed to Llama Stack. """ params: ResponsesApiParams - summarized: bool + compacted: bool original_input: Optional[ResponseInput] = None @@ -307,6 +310,18 @@ def configured_conversation_cache() -> Optional[Cache]: return configuration.conversation_cache +def _estimate_response_input_tokens(value: ResponseInput, encoding_name: str) -> int: + """Estimate the token count of a request input (string or item list). + + The new query may arrive as a plain string or, on ``/v1/responses``, as a + list of input items. Counting only the string form would undercount list + inputs, which could let a request skip compaction and still hit HTTP 413. + """ + if isinstance(value, str): + return estimate_tokens(value, encoding_name) + return estimate_conversation_tokens(list(value), encoding_name=encoding_name) + + def _should_compact( estimated_tokens: int, context_window: int, @@ -343,7 +358,7 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit The whole evaluate-and-summarize section runs under the conversation's lock (R11). When compaction is disabled, the model has no registered context window, or the conversation is not yet near the limit, the result simply - carries the unchanged params with ``summarized`` reflecting whether any + carries the unchanged params with ``compacted`` reflecting whether any prior summary marker already exists. Parameters: @@ -363,7 +378,7 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit Zero or more CompactionStartedEvent, then exactly one CompactionResult. """ if not compaction_config.enabled: - yield CompactionResult(params, summarized=False) + yield CompactionResult(params, compacted=False) return conversation_id = params.conversation @@ -394,8 +409,7 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit estimated += estimate_conversation_tokens( recent_items, encoding_name=encoding_name ) - if isinstance(original_input, str): - estimated += estimate_tokens(original_input, encoding_name) + estimated += _estimate_response_input_tokens(original_input, encoding_name) if _should_compact(estimated, context_window, compaction_config): if emit_events: @@ -460,7 +474,7 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit if not summaries: # No compaction has ever happened for this conversation: leave the # normal conversation-parameter flow untouched. - yield CompactionResult(params, summarized=False) + yield CompactionResult(params, compacted=False) return # Compacted mode: lightspeed owns the context. Build explicit input and @@ -470,7 +484,7 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit update={"input": explicit_input, "omit_conversation": True} ) yield CompactionResult( - compacted_params, summarized=True, original_input=original_input + compacted_params, compacted=True, original_input=original_input ) @@ -547,8 +561,7 @@ async def needs_compaction_path( return False estimated = estimate_tokens(params.instructions or "", encoding_name) estimated += estimate_conversation_tokens(items, encoding_name=encoding_name) - if isinstance(params.input, str): - estimated += estimate_tokens(params.input, encoding_name) + estimated += _estimate_response_input_tokens(params.input, encoding_name) return _should_compact(estimated, context_window, compaction_config) diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index e2f0b0c8e..8f283a9b7 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -3,7 +3,7 @@ # Tests exercise internal helpers directly. # pylint: disable=protected-access -from typing import Any +from typing import Any, cast import pytest from pytest_mock import MockerFixture @@ -130,7 +130,7 @@ async def test_disabled_passes_through() -> None: inference_config=_inference(1000), compaction_config=_compaction(enabled=False), ) - assert result.summarized is False + assert result.compacted is False assert result.params.omit_conversation is False assert result.params.input == "new question" @@ -149,7 +149,7 @@ async def test_no_context_window_no_marker_passes_through( inference_config=_inference(None), compaction_config=_compaction(), ) - assert result.summarized is False + assert result.compacted is False assert result.params.omit_conversation is False @@ -180,7 +180,7 @@ async def test_existing_marker_builds_explicit_input(mocker: MockerFixture) -> N ) summarize.assert_not_called() # below threshold, no new summary - assert result.summarized is True + assert result.compacted is True assert result.params.omit_conversation is True assert isinstance(result.params.input, list) texts = [m.content for m in result.params.input] @@ -217,7 +217,7 @@ async def test_triggers_summarization_and_writes_marker(mocker: MockerFixture) - summarize.assert_awaited_once() write_marker.assert_awaited_once() - assert result.summarized is True + assert result.compacted is True assert result.params.omit_conversation is True texts = [m.content for m in result.params.input] assert "condensed earlier turns" in texts[0] @@ -254,7 +254,7 @@ async def test_streaming_emits_event_before_summarizing(mocker: MockerFixture) - assert isinstance(yielded[0], cc.CompactionStartedEvent) assert yielded[0].conversation_id == CONV assert isinstance(yielded[-1], cc.CompactionResult) - assert yielded[-1].summarized is True + assert yielded[-1].compacted is True @pytest.mark.asyncio @@ -473,7 +473,7 @@ async def test_no_fold_without_cache(mocker: MockerFixture) -> None: ) resum.assert_not_awaited() - assert result.summarized is True # still compacted via markers + assert result.compacted is True # still compacted via markers @pytest.mark.asyncio @@ -515,3 +515,18 @@ def test_configured_conversation_cache_returns_cache(mocker: MockerFixture) -> N sentinel = object() mock_config.conversation_cache = sentinel assert cc.configured_conversation_cache() is sentinel + + +def test_estimate_response_input_tokens_counts_list_form() -> None: + """List-form ResponseInput is counted toward the estimate, not treated as zero. + + Regression guard: counting only string input would undercount list-form + input (e.g. /v1/responses) and let a request skip compaction (LCORE-1572). + """ + big = "incident detail " * 50 + string_tokens = cc._estimate_response_input_tokens(big, cc.DEFAULT_ENCODING_NAME) + list_tokens = cc._estimate_response_input_tokens( + cast(Any, [_msg("user", big)]), cc.DEFAULT_ENCODING_NAME + ) + assert string_tokens > 10 + assert list_tokens > 10 From 423e093d091c7bdfe829898de5d5d41e820d57db Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 15:56:17 +0200 Subject: [PATCH 14/20] LCORE-1572: persist compacted streaming turns with structure (CodeRabbit #4) In compacted mode the streaming endpoint persisted the completed turn as flattened strings via append_turn_to_conversation, dropping attachments and non-text output items, and double-storing for shield-blocked requests. Persist the structured turn instead: - Capture the response's structured output items onto TurnSummary.output_items (set at response.completed, and to the refusal item on a shield block). - generate_response now takes original_input and persists via store_compacted_turn with the original input plus structured output items, matching the /v1/query and A2A paths. - The shield-blocked branch no longer stores the turn when the conversation parameter was omitted (compacted mode); generate_response stores it once with the correct original input, avoiding the duplicate refusal turn. Adds tests for the structured compacted persistence and the shield dedup (compacted and non-compacted). --- src/app/endpoints/streaming_query.py | 40 ++++-- src/models/common/turn_summary.py | 5 + .../app/endpoints/test_streaming_query.py | 116 +++++++++++++++++- 3 files changed, 151 insertions(+), 10 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 8a6bbb089..c80e9009a 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -76,6 +76,7 @@ from models.api.responses.successful import StreamingQueryResponse from models.common.responses.contexts import ResponseGeneratorContext from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import ReferencedDocument, TurnSummary from models.config import Action from utils.conversation_compaction import ( @@ -84,6 +85,7 @@ apply_compaction, configured_conversation_cache, needs_compaction_path, + store_compacted_turn, ) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( @@ -369,12 +371,17 @@ async def retrieve_response_generator( if context.moderation_result.decision == "blocked": turn_summary.llm_response = context.moderation_result.message turn_summary.id = context.moderation_result.moderation_id - await append_turn_items_to_conversation( - context.client, - responses_params.conversation, - responses_params.input, - [context.moderation_result.refusal_response], - ) + turn_summary.output_items = [context.moderation_result.refusal_response] + # In compacted mode the conversation parameter was omitted, so the + # refusal turn (with the original input) is persisted by + # generate_response; storing it here too would duplicate it. + if not responses_params.omit_conversation: + await append_turn_items_to_conversation( + context.client, + responses_params.conversation, + responses_params.input, + [context.moderation_result.refusal_response], + ) media_type = context.query_request.media_type or MEDIA_TYPE_JSON return ( shield_violation_generator( @@ -635,6 +642,7 @@ async def generate_response_with_compaction( ) compacted = False + compacted_original_input: Optional[ResponseInput] = None try: async for item in apply_compaction( context.client, @@ -651,6 +659,7 @@ async def generate_response_with_compaction( elif isinstance(item, CompactionResult): responses_params = item.params compacted = item.compacted + compacted_original_input = item.original_input generator, turn_summary = await retrieve_response_generator( responses_params=responses_params, @@ -696,6 +705,7 @@ async def generate_response_with_compaction( turn_summary, emit_start=False, compacted=compacted, + original_input=compacted_original_input, ): yield event @@ -707,6 +717,7 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi turn_summary: TurnSummary, emit_start: bool = True, compacted: bool = False, + original_input: Optional[ResponseInput] = None, ) -> AsyncIterator[str]: """Wrap a generator with cleanup logic. @@ -727,6 +738,9 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi conversation parameter was not sent to Llama Stack, so the completed turn is appended to the conversation here rather than being stored automatically. + original_input: In compacted mode, the original user input before the + explicit-input rewrite. Used to persist the completed turn with its + structured input (preserving attachments); ``None`` otherwise. Yields: SSE-formatted strings from the wrapped generator @@ -822,11 +836,15 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi # buffer and audit history intact for the next request. if compacted: try: - await append_turn_to_conversation( + await store_compacted_turn( context.client, responses_params.conversation, - context.query_request.query, - turn_summary.llm_response, + ( + original_input + if original_input is not None + else context.query_request.query + ), + turn_summary.output_items, ) except Exception: # pylint: disable=broad-except logger.exception( @@ -996,6 +1014,10 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat getattr(chunk, "response"), # noqa: B009 ) turn_summary.llm_response = turn_summary.llm_response or "".join(text_parts) + # Capture structured output items for compacted-mode turn storage + # (LCORE-1572), so the persisted turn keeps non-text output items + # rather than being flattened to the response text. + turn_summary.output_items = list(latest_response_object.output or []) yield stream_event( { "id": chunk_id, diff --git a/src/models/common/turn_summary.py b/src/models/common/turn_summary.py index 9bf8cb04e..173790b4d 100644 --- a/src/models/common/turn_summary.py +++ b/src/models/common/turn_summary.py @@ -108,3 +108,8 @@ class TurnSummary(BaseModel): rag_chunks: list[RAGChunk] = Field(default_factory=list) referenced_documents: list[ReferencedDocument] = Field(default_factory=list) token_usage: TokenCounter = Field(default_factory=TokenCounter) + output_items: list[Any] = Field( + default_factory=list, + description="Structured response output items, captured for compacted-mode " + "turn persistence (LCORE-1572). Empty on the non-compacted path.", + ) diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 19176d57f..17f88416a 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -871,6 +871,7 @@ async def test_retrieve_response_generator_shield_blocked( mock_responses_params.model = "provider1/model1" mock_responses_params.input = "test query" mock_responses_params.conversation = "conv_123" + mock_responses_params.omit_conversation = False # non-compacted mock_context = mocker.Mock(spec=ResponseGeneratorContext) mock_context.client = mock_client @@ -887,7 +888,7 @@ async def test_retrieve_response_generator_shield_blocked( mock_moderation_result.moderation_id = "mod_123" mock_moderation_result.refusal_response = mocker.Mock() mock_context.moderation_result = mock_moderation_result - mocker.patch( + mock_append = mocker.patch( "app.endpoints.streaming_query.append_turn_items_to_conversation", new=mocker.AsyncMock(), ) @@ -898,6 +899,54 @@ async def test_retrieve_response_generator_shield_blocked( assert isinstance(turn_summary, TurnSummary) assert turn_summary.llm_response == "Content blocked" + # Structured refusal captured for compacted-mode persistence (LCORE-1572). + assert turn_summary.output_items == [mock_moderation_result.refusal_response] + # Non-compacted: the refusal turn is stored here. + mock_append.assert_awaited_once() + + @pytest.mark.asyncio + async def test_retrieve_response_generator_shield_blocked_compacted( + self, mocker: MockerFixture + ) -> None: + """In compacted mode the shield refusal is not stored here (no double-store). + + generate_response persists the compacted turn (with the original input), + so storing it again in the shield branch would duplicate it (LCORE-1572). + """ + mock_client = mocker.AsyncMock(spec=AsyncLlamaStackClient) + + mock_responses_params = mocker.Mock(spec=ResponsesApiParams) + mock_responses_params.model = "provider1/model1" + mock_responses_params.input = "explicit input" + mock_responses_params.conversation = "conv_123" + mock_responses_params.omit_conversation = True # compacted + + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.client = mock_client + mock_context.vector_store_ids = [] + mock_context.rag_id_mapping = {} + mock_context.inline_rag_context = RAGContext() + mock_context.query_request = QueryRequest( + query="test", media_type=MEDIA_TYPE_TEXT + ) # pyright: ignore[reportCallIssue] + + mock_moderation_result = mocker.Mock() + mock_moderation_result.decision = "blocked" + mock_moderation_result.message = "Content blocked" + mock_moderation_result.moderation_id = "mod_123" + mock_moderation_result.refusal_response = mocker.Mock() + mock_context.moderation_result = mock_moderation_result + mock_append = mocker.patch( + "app.endpoints.streaming_query.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + + _generator, turn_summary = await retrieve_response_generator( + mock_responses_params, mock_context, endpoint_path="" + ) + + assert turn_summary.output_items == [mock_moderation_result.refusal_response] + mock_append.assert_not_awaited() # compacted: generate_response stores it @pytest.mark.asyncio async def test_retrieve_response_generator_connection_error( @@ -1163,6 +1212,71 @@ async def mock_generator() -> AsyncIterator[str]: assert any("start" in item for item in result) assert any("end" in item for item in result) + @pytest.mark.asyncio + async def test_generate_response_compacted_persists_structured_turn( + self, mocker: MockerFixture + ) -> None: + """Compacted mode persists the turn via store_compacted_turn with the + original input and structured output items, not flattened strings + (LCORE-1572).""" + + async def mock_generator() -> AsyncIterator[str]: + yield "data: token\n\n" + + conv_id = "123e4567-e89b-12d3-a456-426614174000" + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.conversation_id = conv_id + mock_context.user_id = "user_123" + mock_context.query_request = QueryRequest( + query="test", conversation_id=conv_id + ) # pyright: ignore[reportCallIssue] + mock_context.started_at = "2024-01-01T00:00:00Z" + mock_context.skip_userid_check = False + mock_context.request_id = "223e4567-e89b-12d3-a456-426614174000" + mock_context.client = mocker.AsyncMock(spec=AsyncLlamaStackClient) + + mock_responses_params = mocker.Mock(spec=ResponsesApiParams) + mock_responses_params.model = "provider1/model1" + mock_responses_params.conversation = conv_id + + turn_summary = TurnSummary() + turn_summary.token_usage = TokenCounter(input_tokens=10, output_tokens=5) + output_item = mocker.Mock() + turn_summary.output_items = [output_item] + + mock_config = mocker.Mock() + mock_config.quota_limiters = [] + mocker.patch("app.endpoints.streaming_query.configuration", mock_config) + mocker.patch("app.endpoints.streaming_query.consume_query_tokens") + mocker.patch( + "app.endpoints.streaming_query.get_available_quotas", return_value={} + ) + mocker.patch("app.endpoints.streaming_query.store_query_results") + store_mock = mocker.patch( + "app.endpoints.streaming_query.store_compacted_turn", + new_callable=mocker.AsyncMock, + ) + + result = [ + item + async for item in generate_response( + mock_generator(), + mock_context, + mock_responses_params, + turn_summary, + compacted=True, + original_input="the original query", + ) + ] + + assert any("end" in item for item in result) + store_mock.assert_awaited_once_with( + mock_context.client, + conv_id, + "the original query", + [output_item], + ) + @pytest.mark.asyncio async def test_generate_response_with_topic_summary( self, mocker: MockerFixture From a82f9ff8627476776d4475f4323d782d8fcc1215 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 26 May 2026 16:47:52 +0200 Subject: [PATCH 15/20] LCORE-1572: do not initialize the conversation cache when compaction is disabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit configured_conversation_cache() is evaluated eagerly as a call argument in the query endpoint, so it ran on every request and accessed configuration.conversation_cache unconditionally — forcing the (SQLite) cache to initialize even when compaction is disabled. On configurations whose cache file could not be opened that raised and returned HTTP 500, which failed the e2e suites (where compaction is off). Return None without touching the cache when compaction is disabled; the cache is only used by compaction on this path. Adds a regression test. --- src/utils/conversation_compaction.py | 11 ++++++++--- .../unit/utils/test_conversation_compaction.py | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index 86f43d1ff..d042a7907 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -299,12 +299,17 @@ def _store_cached_summary( def configured_conversation_cache() -> Optional[Cache]: - """Return the configured conversation cache, or None when none is configured. + """Return the conversation cache for compaction, or None when not applicable. Endpoints pass this to :func:`apply_compaction` / :func:`apply_compaction_blocking`. - Compaction uses the cache as its preferred summary store and the home of the - persisted recursive fold; with no cache configured it runs in marker-only mode. + Returns None — without touching the cache — when compaction is disabled, since + the cache is only used by compaction on this path and accessing it would + needlessly initialize it (and could fail) on every request. Also returns None + when no conversation cache is configured; compaction then runs in marker-only + mode. """ + if not configuration.compaction.enabled: + return None if configuration.conversation_cache_configuration.type is None: return None return configuration.conversation_cache diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index 8f283a9b7..cac1d0627 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -504,6 +504,7 @@ async def test_marker_fallback_when_cache_empty(mocker: MockerFixture) -> None: def test_configured_conversation_cache_none(mocker: MockerFixture) -> None: """configured_conversation_cache returns None when no cache is configured.""" mock_config = mocker.patch.object(cc, "configuration") + mock_config.compaction.enabled = True mock_config.conversation_cache_configuration.type = None assert cc.configured_conversation_cache() is None @@ -511,12 +512,29 @@ def test_configured_conversation_cache_none(mocker: MockerFixture) -> None: def test_configured_conversation_cache_returns_cache(mocker: MockerFixture) -> None: """configured_conversation_cache returns the configured cache instance.""" mock_config = mocker.patch.object(cc, "configuration") + mock_config.compaction.enabled = True mock_config.conversation_cache_configuration.type = "sqlite" sentinel = object() mock_config.conversation_cache = sentinel assert cc.configured_conversation_cache() is sentinel +def test_configured_conversation_cache_none_when_compaction_disabled( + mocker: MockerFixture, +) -> None: + """Returns None when compaction is disabled, even if a cache is configured. + + Regression guard (LCORE-1572): the cache must not be initialized on every + request when compaction is off — doing so 500'd e2e requests on configs whose + SQLite cache could not be opened. + """ + mock_config = mocker.patch.object(cc, "configuration") + mock_config.compaction.enabled = False + mock_config.conversation_cache_configuration.type = "sqlite" + mock_config.conversation_cache = object() + assert cc.configured_conversation_cache() is None + + def test_estimate_response_input_tokens_counts_list_form() -> None: """List-form ResponseInput is counted toward the estimate, not treated as zero. From 0a8099526f5b6ef0c6799abccfb0e2840b1cdb29 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Wed, 27 May 2026 09:55:19 +0200 Subject: [PATCH 16/20] LCORE-1572: address CodeRabbit round 2 (compacted-mode persistence edges) Follow-ups to the streaming-persistence work, all for non-happy-path terminals in compacted mode (conversation parameter omitted), so the persisted turn uses the original user input + structured output rather than the explicit rewrite or flattened strings: - /v1/responses: shield-blocked turns persist against compacted_original_input, not api_params.input (the explicit rewrite). - streaming: interrupted (CancelledError) turns thread original_input through the interrupt callback and persist structured items, fixing the wrong-input storage and the cast(str, input) break on list inputs. - streaming: capture output_items on response.failed / response.incomplete terminals too, not only response.completed, so compacted persistence keeps partial output. - TurnSummary.output_items typed as list[OpenAIResponseOutput] instead of list[Any]. Also documents that disabling compaction mid-conversation on an already-compacted conversation reverts it to full-history replay (unsupported transition); the enabled flag stays a full off-switch (CodeRabbit E, declined by design). Adds unit tests for the blocked /responses path, the interrupted compacted path, and output_items capture on a failed terminal. --- src/app/endpoints/responses.py | 10 ++- src/app/endpoints/streaming_query.py | 45 ++++++++-- src/models/common/turn_summary.py | 3 +- src/utils/conversation_compaction.py | 7 ++ tests/unit/app/endpoints/test_responses.py | 33 +++++++ .../app/endpoints/test_streaming_query.py | 85 +++++++++++++++++++ 6 files changed, 172 insertions(+), 11 deletions(-) diff --git a/src/app/endpoints/responses.py b/src/app/endpoints/responses.py index 705da927a..4bc910ff0 100644 --- a/src/app/endpoints/responses.py +++ b/src/app/endpoints/responses.py @@ -230,10 +230,18 @@ async def _persist_blocked_response_turn( """ if api_params.store: moderation_result = cast(ShieldModerationBlocked, context.moderation_result) + # In compacted mode the conversation parameter was dropped and + # api_params.input is the explicit-input rewrite, so persist the turn + # against the original user input instead (LCORE-1572). + user_input = ( + context.compacted_original_input + if context.compacted_original_input is not None + else api_params.input + ) await append_turn_items_to_conversation( client=context.client, conversation_id=api_params.conversation, - user_input=api_params.input, + user_input=user_input, llm_output=[moderation_result.refusal_response], ) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index c80e9009a..59586c0f9 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -11,6 +11,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import StreamingResponse from llama_stack_api import ( + OpenAIResponseMessage, OpenAIResponseObject, OpenAIResponseObjectStream, ) @@ -481,6 +482,7 @@ async def _persist_interrupted_turn( context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + original_input: Optional[ResponseInput] = None, ) -> None: """Persist the user query and an interrupted response into the conversation. @@ -495,14 +497,31 @@ async def _persist_interrupted_turn( responses_params: The Responses API parameters. turn_summary: TurnSummary with llm_response already set to the interrupted message. + original_input: In compacted mode, the original user input before the + explicit-input rewrite. When set, the turn is persisted against it + (the ``conversation`` parameter was dropped, and + ``responses_params.input`` is the explicit rewrite); ``None`` + otherwise (LCORE-1572). """ try: - await append_turn_to_conversation( - context.client, - responses_params.conversation, - cast(str, responses_params.input), - INTERRUPTED_RESPONSE_MESSAGE, - ) + if original_input is not None: + await append_turn_items_to_conversation( + context.client, + responses_params.conversation, + original_input, + [ + OpenAIResponseMessage( + role="assistant", content=INTERRUPTED_RESPONSE_MESSAGE + ) + ], + ) + else: + await append_turn_to_conversation( + context.client, + responses_params.conversation, + cast(str, responses_params.input), + INTERRUPTED_RESPONSE_MESSAGE, + ) except Exception: # pylint: disable=broad-except logger.exception( "Failed to append interrupted turn to conversation for request %s", @@ -548,6 +567,7 @@ def _register_interrupt_callback( context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + original_input: Optional[ResponseInput] = None, ) -> list[bool]: """Build an interrupt callback and register the stream for cancellation. @@ -578,7 +598,9 @@ async def _on_interrupt() -> None: return guard[0] = True turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE - await _persist_interrupted_turn(context, responses_params, turn_summary) + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input + ) current_task = asyncio.current_task() if current_task is not None: @@ -746,7 +768,7 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi SSE-formatted strings from the wrapped generator """ persist_guard = _register_interrupt_callback( - context, responses_params, turn_summary + context, responses_params, turn_summary, original_input ) stream_completed = False @@ -788,7 +810,9 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi if not persist_guard[0]: persist_guard[0] = True turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE - await _persist_interrupted_turn(context, responses_params, turn_summary) + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input + ) yield stream_interrupted_event(context.request_id) finally: get_stream_interrupt_registry().deregister_stream(context.request_id) @@ -1034,6 +1058,9 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat OpenAIResponseObject, getattr(chunk, "response"), # noqa: B009 ) + # Capture any partial output items so a compacted-mode turn is not + # persisted with empty output on these terminals (LCORE-1572). + turn_summary.output_items = list(latest_response_object.output or []) error_message = ( latest_response_object.error.message if latest_response_object.error diff --git a/src/models/common/turn_summary.py b/src/models/common/turn_summary.py index 173790b4d..cc343d64e 100644 --- a/src/models/common/turn_summary.py +++ b/src/models/common/turn_summary.py @@ -5,6 +5,7 @@ from typing import Any, Optional +from llama_stack_api import OpenAIResponseOutput from pydantic import AnyUrl, BaseModel, Field from utils.token_counter import TokenCounter @@ -108,7 +109,7 @@ class TurnSummary(BaseModel): rag_chunks: list[RAGChunk] = Field(default_factory=list) referenced_documents: list[ReferencedDocument] = Field(default_factory=list) token_usage: TokenCounter = Field(default_factory=TokenCounter) - output_items: list[Any] = Field( + output_items: list[OpenAIResponseOutput] = Field( default_factory=list, description="Structured response output items, captured for compacted-mode " "turn persistence (LCORE-1572). Empty on the non-compacted path.", diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py index d042a7907..e07bfc20d 100644 --- a/src/utils/conversation_compaction.py +++ b/src/utils/conversation_compaction.py @@ -383,6 +383,13 @@ async def apply_compaction( # pylint: disable=too-many-arguments,too-many-posit Zero or more CompactionStartedEvent, then exactly one CompactionResult. """ if not compaction_config.enabled: + # ``enabled: false`` is a full off-switch: the request passes through + # unchanged (conversation parameter intact, full-history replay). Known + # limitation: disabling compaction *after* a conversation was already + # compacted reverts that conversation to full replay — which can re-hit + # the 413 path and resend marker text through the model. Toggling the + # flag mid-conversation on already-compacted conversations is therefore + # unsupported; leave it enabled for the lifetime of such conversations. yield CompactionResult(params, compacted=False) return diff --git a/tests/unit/app/endpoints/test_responses.py b/tests/unit/app/endpoints/test_responses.py index 0c761e6af..915615084 100644 --- a/tests/unit/app/endpoints/test_responses.py +++ b/tests/unit/app/endpoints/test_responses.py @@ -22,6 +22,7 @@ from app.endpoints.responses import ( _append_previous_response_turn, _is_server_mcp_output_item, + _persist_blocked_response_turn, _sanitize_response_dict, _should_filter_mcp_chunk, handle_non_streaming_response, @@ -2834,3 +2835,35 @@ async def test_append_previous_response_turn_not_stored_when_store_false( await _append_previous_response_turn(api_params, context, ["out"]) append.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_persist_blocked_response_turn_compacted(mocker: MockerFixture) -> None: + """A shield-blocked compacted turn is stored against the original input. + + In compacted mode the conversation parameter was dropped and api_params.input + is the explicit-input rewrite, so the blocked refusal turn must be persisted + against the original user input carried on the context (LCORE-1572). + """ + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + refusal = mocker.Mock() + api_params = mocker.Mock( + store=True, conversation="conv_x", input=["rewritten explicit input"] + ) + context = mocker.Mock( + client=mocker.AsyncMock(), + compacted_original_input="the original query", + moderation_result=mocker.Mock(refusal_response=refusal), + ) + + await _persist_blocked_response_turn(api_params, context) + + append.assert_awaited_once_with( + client=context.client, + conversation_id="conv_x", + user_input="the original query", + llm_output=[refusal], + ) diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 17f88416a..1fd4b447b 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -46,6 +46,7 @@ from pytest_mock import MockerFixture from app.endpoints.streaming_query import ( + _persist_interrupted_turn, generate_response, response_generator, retrieve_response_generator, @@ -2926,3 +2927,87 @@ async def mock_turn_response() -> AsyncIterator[OpenAIResponseObjectStream]: # Should have both tool call and result (fallback behavior) assert len(mock_turn_summary.tool_calls) == 1 assert len(mock_turn_summary.tool_results) == 1 + + +@pytest.mark.asyncio +async def test_response_generator_failed_captures_output_items( + mocker: MockerFixture, +) -> None: + """A failed terminal captures output_items for compacted persistence (LCORE-1572).""" + out_item = mocker.Mock() + + async def mock_turn_response() -> AsyncIterator[OpenAIResponseObjectStream]: + chunk = mocker.Mock(spec=FailedChunk) + chunk.type = "response.failed" + mock_response = mocker.Mock() + mock_response.output = [out_item] + mock_response.error = mocker.Mock(message="boom") + chunk.response = mock_response + yield chunk + + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.query_request = QueryRequest( + query="test", media_type=MEDIA_TYPE_JSON + ) # pyright: ignore[reportCallIssue] + mock_context.model_id = "provider1/model1" + mock_context.vector_store_ids = [] + mock_context.rag_id_mapping = {} + mock_context.inline_rag_context = RAGContext() + + turn_summary = TurnSummary() + mocker.patch( + "app.endpoints.streaming_query.extract_token_usage", + return_value=TokenCounter(input_tokens=0, output_tokens=0), + ) + mocker.patch( + "app.endpoints.streaming_query.parse_referenced_documents", return_value=[] + ) + + async for _ in response_generator( + mock_turn_response(), mock_context, turn_summary, endpoint_path="" + ): + pass + + assert turn_summary.output_items == [out_item] + + +@pytest.mark.asyncio +async def test_persist_interrupted_turn_compacted_uses_original_input( + mocker: MockerFixture, +) -> None: + """Interrupted compacted turn persists the original input, not the explicit rewrite (LCORE-1572).""" + conv = "123e4567-e89b-12d3-a456-426614174000" + context = mocker.Mock(spec=ResponseGeneratorContext) + context.client = mocker.AsyncMock() + context.request_id = "req-1" + context.user_id = "user_1" + context.conversation_id = conv + context.started_at = "2024-01-01T00:00:00Z" + context.skip_userid_check = False + context.query_request = QueryRequest( + query="hi", conversation_id=conv + ) # pyright: ignore[reportCallIssue] + + responses_params = mocker.Mock(spec=ResponsesApiParams) + responses_params.conversation = conv + responses_params.model = "provider1/model1" + responses_params.input = ["explicit rewrite"] + + turn_summary = TurnSummary() + items = mocker.patch( + "app.endpoints.streaming_query.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + strs = mocker.patch( + "app.endpoints.streaming_query.append_turn_to_conversation", + new=mocker.AsyncMock(), + ) + mocker.patch("app.endpoints.streaming_query.store_query_results") + + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input="the original query" + ) + + items.assert_awaited_once() + assert items.call_args.args[2] == "the original query" + strs.assert_not_awaited() From 5a708c47920d611c2b47f7dbebf2ba08bcd68167 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Wed, 27 May 2026 09:58:38 +0200 Subject: [PATCH 17/20] LCORE-1572: document the disable-after-compaction limitation in the spec doc (CodeRabbit E) --- docs/design/conversation-compaction/conversation-compaction.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/design/conversation-compaction/conversation-compaction.md b/docs/design/conversation-compaction/conversation-compaction.md index 8e713554f..d9671dfff 100644 --- a/docs/design/conversation-compaction/conversation-compaction.md +++ b/docs/design/conversation-compaction/conversation-compaction.md @@ -286,6 +286,8 @@ class CompactionConfiguration(ConfigurationBase): Add `compaction` field to the root `Configuration` class. +**Limitation — disabling after a conversation is compacted.** `enabled: false` is a full off-switch: requests pass through unchanged, with the `conversation` parameter intact (full-history replay). If compaction is disabled *after* a conversation has already been compacted, that conversation reverts to full-history replay — which can re-hit the HTTP 413 path and resend summary marker text through the model. Toggling `enabled` mid-conversation on an already-compacted conversation is therefore unsupported; keep it enabled for the lifetime of such conversations. + # Implementation Suggestions ## Key files and insertion points From 597ab01fc697590de92925ca693c030c801ac057 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Wed, 27 May 2026 10:02:08 +0200 Subject: [PATCH 18/20] LCORE-1572: document as-built divergences in spec doc (cache source-of-truth, persisted fold) The spec still described the earlier design (cache as a parallel/best-effort layer, markers as the summary source). Update Summary storage, Additive summarization, and Changed request flow to the as-built design, and add a Changelog entry: the cache is the preferred source of truth for summaries (marker texts as fallback + audit/boundary), the recursive fold is persisted via replace_summaries (in-memory fold rejected), A2A is marker-only, and the enabled flag stays a full off-switch. --- .../conversation-compaction.md | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/design/conversation-compaction/conversation-compaction.md b/docs/design/conversation-compaction/conversation-compaction.md index d9671dfff..85101fb99 100644 --- a/docs/design/conversation-compaction/conversation-compaction.md +++ b/docs/design/conversation-compaction/conversation-compaction.md @@ -189,7 +189,7 @@ After 2nd compaction: [summary of turns 1-N] + [summary of turns N+1-M] + [rece After 3rd compaction: [summary 1] + [summary 2] + [summary 3] + [recent turns] + [query] ``` -When total summary token count itself approaches the context limit, fall back to recursive re-summarization of the oldest summary chunks. +When the total summary token count itself approaches the context limit, the summary chunks are folded into a single chunk by recursive re-summarization. As built, the fold is **persisted** in the summary cache via `replace_summaries` and reused, not recomputed each request; an in-memory recompute-per-request fold was prototyped and rejected because it produces non-deterministic context across turns. Folding requires a persisting cache; marker-only mode keeps the additive chunks with no fold. Why additive over recursive: a PoC experiment with 50 queries and 4 compaction cycles showed that recursive summarization progressively loses early-conversation context. By the 4th cycle, the summary had lost Kubernetes fundamentals, Helm, and Istio details that were discussed in the first 15 turns. See `poc-results/01-analysis.txt` for full evidence. @@ -225,11 +225,13 @@ class ConversationSummary(BaseModel): A conversation may have multiple summary chunks (one per compaction event). All cache backends (SQLite, Postgres, memory) need this schema extension. +As built, the cache is the **preferred source of truth** for summary text at runtime: each chunk is written on compaction (`store_summary`) and the active set is read back from it (`get_summaries`). The Llama Stack marker items remain an authoritative fallback — used when no persisting cache is configured — and the audit record; the marker position still defines the recent-verbatim boundary. The recursive fold persists through `replace_summaries` (atomic delete-all + insert of the folded chunk), a cache operation added for this (LCORE-1571). + ## Changed request flow after compaction After compaction, lightspeed-stack writes the summary as a marked conversation item (a message whose text begins with a recognizable sentinel) so it appears in the Conversations API and serves as lightspeed-stack's own boundary marker. -When building context for a compacted conversation, lightspeed-stack fetches the conversation items, collects every marker's summary, takes the items after the last marker as the recent verbatim buffer, and sends `[summaries] + [recent items] + [new query]` as **explicit input**, **without** the `conversation` parameter. This is necessary because Llama Stack reloads the *full* stored message history whenever the `conversation` parameter is set — there is no marker-based selection hook (verified empirically; see the Changelog). Each completed turn is then appended back to the conversation items by lightspeed-stack, since Llama Stack no longer auto-stores it. +When building context for a compacted conversation, lightspeed-stack fetches the conversation items, reads the active summaries from the summary cache (LCORE-1571) — falling back to the marker texts when no persisting cache is configured — takes the items after the last marker as the recent verbatim buffer, and sends `[summaries] + [recent items] + [new query]` as **explicit input**, **without** the `conversation` parameter. This is necessary because Llama Stack reloads the *full* stored message history whenever the `conversation` parameter is set — there is no marker-based selection hook (verified empirically; see the Changelog). Each completed turn is then appended back to the conversation items by lightspeed-stack, since Llama Stack no longer auto-stores it. This preserves a single continuous conversation identity. The `conversation_id` never changes, the user sees one conversation in the UI, and the Conversations API returns the full history including the summary marker items. @@ -398,6 +400,25 @@ not the two originally listed; `/v1/responses` compacts silently to preserve OpenAI-API wire compatibility. Evidence and full reasoning: the spike doc (`conversation-compaction-spike.md`). +**2026-05-27 — Summary cache as source of truth + persisted recursive fold (LCORE-1571/1572).** +Refining the entry above (which framed the cache as "a parallel persistence +layer"): as built, the summary cache (LCORE-1571) is the *preferred source of +truth* for summary text. On each request the active summaries are read from the +cache; the Llama Stack marker texts remain an authoritative fallback (used when +no persisting cache is configured) and the audit record, and the marker position +still defines the recent-verbatim boundary. The recursive re-summarization +fallback (R3) is implemented as a *persisted* fold: when the accumulated +summaries' own token count crosses the threshold they are collapsed into one and +stored via a new cache operation, `replace_summaries` (atomic delete-all + +insert), so the fold is computed once and reused rather than recomputed per +request. An in-memory recompute-per-request fold was prototyped and rejected +(non-deterministic context across turns). The A2A executor runs marker-only (it +has no resolved `user_id` for the `(user_id, conversation_id)` cache key, so no +persisted fold). `enabled: false` stays a full off-switch; disabling compaction +mid-conversation on an already-compacted conversation reverts it to full-history +replay (unsupported — see Configuration). The `CompactionResult` mode flag was +renamed `summarized` → `compacted` for clarity. + # Appendix A: PoC Evidence A proof-of-concept was built and tested. From ce8627ec03e5c8e91c734b4b467727a558d1477d Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Wed, 27 May 2026 10:28:21 +0200 Subject: [PATCH 19/20] LCORE-1572: fix line-too-long (C0301) in interrupted-turn test docstring --- tests/unit/app/endpoints/test_streaming_query.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 1fd4b447b..b147822b9 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -2975,7 +2975,10 @@ async def mock_turn_response() -> AsyncIterator[OpenAIResponseObjectStream]: async def test_persist_interrupted_turn_compacted_uses_original_input( mocker: MockerFixture, ) -> None: - """Interrupted compacted turn persists the original input, not the explicit rewrite (LCORE-1572).""" + """Interrupted compacted turn persists the original input (LCORE-1572). + + Not the explicit rewrite carried on responses_params.input. + """ conv = "123e4567-e89b-12d3-a456-426614174000" context = mocker.Mock(spec=ResponseGeneratorContext) context.client = mocker.AsyncMock() From d6e03678b91888fa22a71cc5755ebcf4f5189f80 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Wed, 27 May 2026 13:56:06 +0200 Subject: [PATCH 20/20] LCORE-1572: harden disabled-cache regression test to fail on eager cache access (CodeRabbit) --- .../utils/test_conversation_compaction.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py index cac1d0627..9a0fe2deb 100644 --- a/tests/unit/utils/test_conversation_compaction.py +++ b/tests/unit/utils/test_conversation_compaction.py @@ -3,6 +3,7 @@ # Tests exercise internal helpers directly. # pylint: disable=protected-access +from types import SimpleNamespace from typing import Any, cast import pytest @@ -522,16 +523,28 @@ def test_configured_conversation_cache_returns_cache(mocker: MockerFixture) -> N def test_configured_conversation_cache_none_when_compaction_disabled( mocker: MockerFixture, ) -> None: - """Returns None when compaction is disabled, even if a cache is configured. + """Returns None when compaction is disabled, without reading the cache. Regression guard (LCORE-1572): the cache must not be initialized on every - request when compaction is off — doing so 500'd e2e requests on configs whose - SQLite cache could not be opened. + request when compaction is off — that 500'd e2e requests on configs whose + SQLite cache could not be opened. The stub's ``conversation_cache`` raises if + read, so the test fails on any eager access, not only on the return value. """ - mock_config = mocker.patch.object(cc, "configuration") - mock_config.compaction.enabled = False - mock_config.conversation_cache_configuration.type = "sqlite" - mock_config.conversation_cache = object() + + class _ConfigStub: # pylint: disable=too-few-public-methods + """Config whose conversation_cache raises if accessed.""" + + compaction = SimpleNamespace(enabled=False) + conversation_cache_configuration = SimpleNamespace(type="sqlite") + + @property + def conversation_cache(self) -> object: + """Fail the test if the disabled path reads the cache.""" + raise AssertionError( + "conversation_cache must not be accessed when compaction is disabled" + ) + + mocker.patch.object(cc, "configuration", _ConfigStub()) assert cc.configured_conversation_cache() is None