diff --git a/docs/design/conversation-compaction/conversation-compaction.md b/docs/design/conversation-compaction/conversation-compaction.md index 1a85f95d5..85101fb99 100644 --- a/docs/design/conversation-compaction/conversation-compaction.md +++ b/docs/design/conversation-compaction/conversation-compaction.md @@ -61,13 +61,15 @@ R9 Compaction configuration must be admin-configurable via YAML: threshold ratio, fixed token floor, and buffer zone size. R10 -After compaction, lightspeed-stack injects the summary as a marked item into the existing Llama Stack conversation. When building context for the LLM, lightspeed-stack selects only items from the last summary marker onward. This preserves a single continuous conversation identity in Llama Stack while giving lightspeed-stack control over what the LLM sees. +After compaction, lightspeed-stack builds the LLM input explicitly — the summaries plus the recent verbatim turns plus the new query — and stops passing the Llama Stack `conversation` parameter for that request, because Llama Stack always reloads the full message history when the `conversation` parameter is set (verified empirically on llama-stack 0.6.0; see the Changelog and the spike doc). The summary is still written into the conversation as a marked item so it appears in the Conversations API, but the marker is lightspeed-stack's own boundary bookkeeping, not a Llama Stack selection mechanism. The `conversation_id` is preserved across the whole conversation, and the full history (including pre-compaction turns) remains in the conversation's items for UI/audit. Because the `conversation` parameter is no longer sent in compacted mode, lightspeed-stack appends each completed turn to the conversation itself. + +This applies to every endpoint that builds context from a growing conversation and calls the Responses API: `/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses`. (The `/v1/rlsapi` inference path is stateless — no stored conversation — and is therefore out of scope.) R11 Compaction must be blocking per conversation. If a request triggers compaction, concurrent requests on the same conversation must wait until compaction completes. This prevents race conditions (e.g., two requests both triggering compaction, or a new message being appended mid-compaction). R12 -The streaming endpoint must emit a compaction event (e.g., `{"event": "compaction_started"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests have no mid-request notification mechanism. +The native streaming endpoint (`/v1/streaming_query`) must emit a compaction event (e.g., `{"event": "compaction"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests (`/v1/query`) have no mid-request notification mechanism. The A2A executor compacts inline (it is not a browser SSE stream). `/v1/responses` is the OpenAI-compatible endpoint and compacts **silently** — it does not inject a non-standard event into the OpenAI-format stream, preserving wire compatibility. # Use Cases @@ -95,28 +97,32 @@ User Query → lightspeed-stack 1. Resolve model, system prompt, tools 2. Build input (query + RAG + attachments) 3. Acquire per-conversation lock (blocks concurrent requests) - 4. Estimate total tokens (tiktoken): system + history + new query - 5. If compaction needed (tokens > threshold): - a. Emit compaction event on streaming endpoint - b. Retrieve conversation history from Llama Stack + 4. Estimate total tokens (tiktoken): system + (summaries + recent items) + new query + 5. If compaction needed (tokens > threshold) OR a prior summary marker exists: + a. Emit compaction event (native streaming endpoint only) + b. Retrieve conversation items from Llama Stack c. Split into "old" (summarize) and "recent" (keep) — degrading guard: reduce recent turns if they exceed token budget - d. Summarize old turns → inject summary as marked item into conversation - e. Store summary chunk in conversation cache - 6. Build context: select items from last summary marker onward + new query - 7. Call Llama Stack Responses API with conversation parameter - (Llama Stack loads items from marker onward) + d. Summarize old turns → write summary as a marked item into conversation + 6. Build EXPLICIT input: [summary markers] + [recent items after last marker] + new query + 7. Call Llama Stack Responses API WITHOUT the conversation parameter + (so Llama Stack does not reload the full history) ↓ Llama Stack - 8. Processes conversation (summary marker + recent turns + new query) + 8. Processes exactly the explicit input ↓ lightspeed-stack - 9. Response stored in same conversation (continuous history) - 10. Update conversation cache - 11. Release per-conversation lock - 12. Return QueryResponse with context_status="summarized" (or "full") + 9. Append the completed turn to the conversation items (continuous history, + same conversation_id) — Llama Stack did not auto-store it (no conversation param) + 10. Release per-conversation lock + 11. Return response (context_status="summarized" when 1573 lands; "full" otherwise) ``` +Note: when no prior summary exists and the request is below the threshold, +none of step 5–9 applies — the request takes the normal path with the +`conversation` parameter, byte-for-byte unchanged. The explicit-input path is +entered only for conversations that are actually being (or have been) compacted. + ## Token estimation Add tiktoken as a dependency. Create `src/utils/token_estimator.py`: @@ -183,7 +189,7 @@ After 2nd compaction: [summary of turns 1-N] + [summary of turns N+1-M] + [rece After 3rd compaction: [summary 1] + [summary 2] + [summary 3] + [recent turns] + [query] ``` -When total summary token count itself approaches the context limit, fall back to recursive re-summarization of the oldest summary chunks. +When the total summary token count itself approaches the context limit, the summary chunks are folded into a single chunk by recursive re-summarization. As built, the fold is **persisted** in the summary cache via `replace_summaries` and reused, not recomputed each request; an in-memory recompute-per-request fold was prototyped and rejected because it produces non-deterministic context across turns. Folding requires a persisting cache; marker-only mode keeps the additive chunks with no fold. Why additive over recursive: a PoC experiment with 50 queries and 4 compaction cycles showed that recursive summarization progressively loses early-conversation context. By the 4th cycle, the summary had lost Kubernetes fundamentals, Helm, and Istio details that were discussed in the first 15 turns. See `poc-results/01-analysis.txt` for full evidence. @@ -219,13 +225,15 @@ class ConversationSummary(BaseModel): A conversation may have multiple summary chunks (one per compaction event). All cache backends (SQLite, Postgres, memory) need this schema extension. +As built, the cache is the **preferred source of truth** for summary text at runtime: each chunk is written on compaction (`store_summary`) and the active set is read back from it (`get_summaries`). The Llama Stack marker items remain an authoritative fallback — used when no persisting cache is configured — and the audit record; the marker position still defines the recent-verbatim boundary. The recursive fold persists through `replace_summaries` (atomic delete-all + insert of the folded chunk), a cache operation added for this (LCORE-1571). + ## Changed request flow after compaction -After compaction, lightspeed-stack injects the summary as a marked conversation item into the existing Llama Stack conversation. The summary item has a recognizable marker (e.g., metadata tag or content prefix) so that lightspeed-stack can identify it when loading history. +After compaction, lightspeed-stack writes the summary as a marked conversation item (a message whose text begins with a recognizable sentinel) so it appears in the Conversations API and serves as lightspeed-stack's own boundary marker. -When building context for subsequent requests, lightspeed-stack fetches conversation items and selects only those from the last summary marker onward. The `conversation` parameter continues to be used — Llama Stack still manages the conversation. lightspeed-stack just controls *which items* form the LLM context. +When building context for a compacted conversation, lightspeed-stack fetches the conversation items, reads the active summaries from the summary cache (LCORE-1571) — falling back to the marker texts when no persisting cache is configured — takes the items after the last marker as the recent verbatim buffer, and sends `[summaries] + [recent items] + [new query]` as **explicit input**, **without** the `conversation` parameter. This is necessary because Llama Stack reloads the *full* stored message history whenever the `conversation` parameter is set — there is no marker-based selection hook (verified empirically; see the Changelog). Each completed turn is then appended back to the conversation items by lightspeed-stack, since Llama Stack no longer auto-stores it. -This preserves a single continuous conversation identity. The user sees one conversation in the UI, and the Conversations API returns the full history including summary items. +This preserves a single continuous conversation identity. The `conversation_id` never changes, the user sees one conversation in the UI, and the Conversations API returns the full history including the summary marker items. ## API response changes @@ -280,6 +288,8 @@ class CompactionConfiguration(ConfigurationBase): Add `compaction` field to the root `Configuration` class. +**Limitation — disabling after a conversation is compacted.** `enabled: false` is a full off-switch: requests pass through unchanged, with the `conversation` parameter intact (full-history replay). If compaction is disabled *after* a conversation has already been compacted, that conversation reverts to full-history replay — which can re-hit the HTTP 413 path and resend summary marker text through the model. Toggling `enabled` mid-conversation on an already-compacted conversation is therefore unsupported; keep it enabled for the lifetime of such conversations. + # Implementation Suggestions ## Key files and insertion points @@ -291,36 +301,36 @@ Add `compaction` field to the root `Configuration` class. | `src/utils/compaction.py` | New module: summarization logic, partitioning, additive summary management | | `src/models/config.py` | Add `CompactionConfiguration` (near `ConversationHistoryConfiguration`) | | `src/configuration.py` | Add `compaction_configuration` property to `AppConfig` singleton | -| `src/utils/responses.py` | Modify `prepare_responses_params()` — insert compaction check (see below) | -| `src/app/endpoints/query.py` | No changes needed — compaction happens inside `prepare_responses_params()` | -| `src/app/endpoints/streaming_query.py` | No changes needed — same function is used | -| `src/models/responses.py` | Add `context_status` field to `QueryResponse` and `StreamingQueryResponse` | -| `src/cache/` (all backends) | Extend schema for `ConversationSummary` storage | - -## Insertion point in `responses.py` - -The compaction hook goes in `prepare_responses_params()`. Its signature: - -``` python -async def prepare_responses_params( - client: AsyncLlamaStackClient, - query_request: QueryRequest, - user_conversation: Optional[UserConversation], - ... -) -> ResponsesApiParams: -``` - -At the insertion point (after line 297), the following are available: - -- `client` — Llama Stack client (can fetch conversation history) -- `llama_stack_conv_id` — the conversation ID -- `model` — selected model (e.g., `"openai/gpt-4o-mini"`) -- `system_prompt` — resolved system prompt -- `tools` — prepared tool list -- `input_text` — the user's query with RAG context -- `user_conversation` — DB metadata including `message_count` - -After compaction, the summary is injected as a conversation item in Llama Stack. When building the next request, lightspeed-stack fetches items from the conversation, filters to only those after the last summary marker, and passes them as input alongside the `conversation` parameter. The `conversation` parameter is still used — the conversation identity is preserved. +| `src/utils/conversation_compaction.py` | New module: `apply_compaction()` / `apply_compaction_blocking()`, `needs_compaction_path()`, marker helpers, per-conversation lock | +| `src/models/common/responses/responses_api_params.py` | `omit_conversation` flag — drops the `conversation` parameter from the request body in compacted mode | +| `src/app/endpoints/query.py` | Call `apply_compaction_blocking()` after preparing params; store the turn in compacted mode | +| `src/app/endpoints/streaming_query.py` | Compaction-aware SSE path that emits the `compaction` event before summarizing (R12) | +| `src/app/endpoints/a2a.py` | Inline compaction (no SSE event); store the turn on `response.completed` | +| `src/app/endpoints/responses.py` | Silent compaction (OpenAI-compatible); store the turn via `_append_previous_response_turn` | +| `src/models/responses.py` (now relocated) | `context_status` field — deferred to LCORE-1573 | +| `src/cache/` (all backends) | `ConversationSummary` storage — LCORE-1571 | + +## How compaction is invoked + +Rather than a hook buried inside `prepare_responses_params()`, compaction is a +reusable unit in `src/utils/conversation_compaction.py` that each endpoint +calls after its params are prepared: + +- `apply_compaction_blocking(client, params, inference_config, compaction_config)` + returns a `CompactionResult` (possibly-rewritten params, a `summarized` + flag, and the `original_input`). Non-streaming `/v1/query`, A2A, and + `/v1/responses` use this. +- `apply_compaction(..., emit_events=True)` is the async-generator variant that + yields a `CompactionStartedEvent` before the summarization LLM call; the + native `/v1/streaming_query` SSE path uses it to satisfy R12. +- `needs_compaction_path(...)` is a cheap pre-stream predicate (no LLM, no lock) + that the streaming endpoint uses to route only actually-compacting + conversations through the in-stream path, leaving every other request on the + unchanged flow. + +When compaction is active, the endpoint builds explicit input, the +`conversation` parameter is omitted (via `ResponsesApiParams.omit_conversation`), +and the completed turn is appended to the conversation items afterward. ## Fetching conversation history @@ -369,6 +379,46 @@ Compaction adds latency only on the trigger turn. In PoC testing, compaction tur - **Smaller model for summarization**: Allow configuring a cheaper model for the summarization call. Current design uses the same model for simplicity. - **UI compaction indicator**: The `context_status` response field and the streaming compaction event (R12) provide the data. Coordinate with the UI team on how to display it. +# Changelog + +**2026-05-26 — R10 redesign (Option A) during LCORE-1572 implementation.** +A live experiment on the deployed llama-stack 0.6.0 showed that passing the +`conversation` parameter to the Responses API always reloads the *full* stored +message history, with no marker-based selection hook. The original R10 +(inject a marker, keep the `conversation` parameter, "select from the marker +onward") therefore cannot limit what the model sees. R10, R12, the +architecture flow, and the implementation guidance were revised to the +explicit-input approach: in compacted mode lightspeed-stack builds the input +itself and omits the `conversation` parameter, preserving `conversation_id` +and the full item history while controlling the LLM context. This restores the +spike's *original* Decision 6 recommendation (which a later spike edit had +changed to the marker approach). The summary cache (Decision 8 / LCORE-1571) +becomes a parallel persistence layer; the runtime boundary is the marker item +in Llama Stack. Compaction was also confirmed to apply to four endpoints — +`/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses` — +not the two originally listed; `/v1/responses` compacts silently to preserve +OpenAI-API wire compatibility. Evidence and full reasoning: the spike doc +(`conversation-compaction-spike.md`). + +**2026-05-27 — Summary cache as source of truth + persisted recursive fold (LCORE-1571/1572).** +Refining the entry above (which framed the cache as "a parallel persistence +layer"): as built, the summary cache (LCORE-1571) is the *preferred source of +truth* for summary text. On each request the active summaries are read from the +cache; the Llama Stack marker texts remain an authoritative fallback (used when +no persisting cache is configured) and the audit record, and the marker position +still defines the recent-verbatim boundary. The recursive re-summarization +fallback (R3) is implemented as a *persisted* fold: when the accumulated +summaries' own token count crosses the threshold they are collapsed into one and +stored via a new cache operation, `replace_summaries` (atomic delete-all + +insert), so the fold is computed once and reused rather than recomputed per +request. An in-memory recompute-per-request fold was prototyped and rejected +(non-deterministic context across turns). The A2A executor runs marker-only (it +has no resolved `user_id` for the `(user_id, conversation_id)` cache key, so no +persisted fold). `enabled: false` stays a full off-switch; disabling compaction +mid-conversation on an already-compacted conversation reverts it to full-history +replay (unsupported — see Configuration). The `CompactionResult` mode flag was +renamed `summarized` → `compacted` for clarity. + # Appendix A: PoC Evidence A proof-of-concept was built and tested. diff --git a/src/app/endpoints/a2a.py b/src/app/endpoints/a2a.py index e3a4cf9f9..c8de56e91 100644 --- a/src/app/endpoints/a2a.py +++ b/src/app/endpoints/a2a.py @@ -1,5 +1,7 @@ """Handler for A2A (Agent-to-Agent) protocol endpoints using Responses API.""" +# pylint: disable=too-many-lines + import asyncio import json import uuid @@ -32,7 +34,7 @@ from llama_stack_api.openai_responses import ( OpenAIResponseObjectStream, ) -from llama_stack_client import APIConnectionError +from llama_stack_client import APIConnectionError, AsyncLlamaStackClient from starlette.responses import Response, StreamingResponse from a2a_storage import A2AContextStore, A2AStorageFactory @@ -45,13 +47,18 @@ from constants import MEDIA_TYPE_EVENT_STREAM from log import get_logger from models.api.requests import QueryRequest +from models.common.responses.types import ResponseInput from models.config import Action +from utils.conversation_compaction import ( + apply_compaction_blocking, + store_compacted_turn, +) from utils.mcp_headers import McpHeaders, mcp_headers_dependency from utils.responses import ( extract_text_from_response_item, prepare_responses_params, ) -from utils.suid import normalize_conversation_id +from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id from version import __version__ logger = get_logger(__name__) @@ -336,6 +343,19 @@ async def _process_task_streaming( # pylint: disable=too-many-locals store=True, request_headers=self.request_headers, ) + # Compact the conversation if it is approaching the context window + # limit. A2A is not a browser SSE stream, so no progress event is + # emitted; the blocking variant summarizes inline before the call. + # No conversation cache is passed: the A2A executor has no resolved + # user_id for the (user_id, conversation_id) cache key, so A2A runs + # in marker-only mode (additive summaries, no persisted fold). + compaction = await apply_compaction_blocking( + client, + responses_params, + configuration.inference, + configuration.compaction, + ) + responses_params = compaction.params # Stream response from LLM using the Responses API stream = await client.responses.create(**responses_params.model_dump()) except APIConnectionError as e: @@ -392,9 +412,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals ) ) - # Process stream using generator and aggregator pattern + # Process stream using generator and aggregator pattern. In compacted + # mode the conversation parameter is not sent, so the turn is stored + # explicitly once the response completes (see _convert_stream_to_events). async for a2a_event in self._convert_stream_to_events( - stream, task_id, context_id, conversation_id + stream, + task_id, + context_id, + conversation_id, + client, + compaction.original_input if compaction.compacted else None, ): aggregator.process_event(a2a_event) await event_queue.enqueue_event(a2a_event) @@ -414,12 +441,14 @@ async def _process_task_streaming( # pylint: disable=too-many-locals final=True, ) - async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals + async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-positional-arguments self, stream: AsyncIterator[OpenAIResponseObjectStream], task_id: str, context_id: str, conversation_id: Optional[str], + client: Optional[AsyncLlamaStackClient] = None, + compacted_original_input: Optional[ResponseInput] = None, ) -> AsyncIterator[Any]: """Convert Responses API stream chunks to A2A events. @@ -508,6 +537,20 @@ async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-ma if response_obj: output = getattr(response_obj, "output", []) + # In compacted mode the conversation parameter was not sent, + # so persist this turn ourselves to keep the recent-turn + # buffer and audit history intact for the next request. + if ( + compacted_original_input is not None + and client is not None + and conversation_id + ): + await store_compacted_turn( + client, + to_llama_stack_conversation_id(conversation_id), + compacted_original_input, + output, + ) a2a_parts = _convert_responses_content_to_a2a_parts(output) if not a2a_parts and final_text: a2a_parts = [Part(root=TextPart(text=final_text))] diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index f7fd5f632..beca48e32 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -39,8 +39,14 @@ from models.api.responses.successful import QueryResponse from models.common.moderation import ShieldModerationResult from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import TurnSummary from models.config import Action +from utils.conversation_compaction import ( + apply_compaction_blocking, + configured_conversation_cache, + store_compacted_turn, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -196,6 +202,20 @@ async def query_endpoint_handler( inline_rag_context=inline_rag_context.context_text, ) + # Compact the conversation if it is approaching the context window limit. + # When compaction is active, params carry explicit input and the + # conversation parameter is dropped (lightspeed-stack owns the context). + compaction = await apply_compaction_blocking( + client, + responses_params, + configuration.inference, + configuration.compaction, + cache=configured_conversation_cache(), + user_id=user_id, + skip_user_id_check=_skip_userid_check, + ) + responses_params = compaction.params + # Handle Azure token refresh if needed if ( responses_params.model.startswith("azure") @@ -207,7 +227,11 @@ async def query_endpoint_handler( # Retrieve response using Responses API turn_summary = await retrieve_response( - client, responses_params, moderation_result, endpoint_path + client, + responses_params, + moderation_result, + endpoint_path, + original_input=compaction.original_input if compaction.compacted else None, ) if moderation_result.decision == "passed": @@ -282,6 +306,7 @@ async def retrieve_response( responses_params: ResponsesApiParams, moderation_result: ShieldModerationResult, endpoint_path: str = "", + original_input: Optional[ResponseInput] = None, ) -> TurnSummary: """ Retrieve response from LLMs and agents. @@ -294,17 +319,28 @@ async def retrieve_response( client: The AsyncLlamaStackClient to use for the request. responses_params: The Responses API parameters. moderation_result: The moderation result. + endpoint_path: The request path, for metrics/telemetry. + original_input: Set only in compacted mode (LCORE-1572). It is the new + user query before the explicit-input rewrite. When provided, the + turn is appended to the conversation here, because the conversation + parameter is no longer passed to Llama Stack and so the turn is not + stored automatically. Returns: ------- TurnSummary: Summary of the LLM response content """ response: Optional[OpenAIResponseObject] = None + # In compacted mode, the new turn must be stored against the original user + # query, not the explicit summaries-plus-recent input we send to inference. + turn_input = ( + original_input if original_input is not None else responses_params.input + ) if moderation_result.decision == "blocked": await append_turn_items_to_conversation( client, responses_params.conversation, - responses_params.input, + turn_input, [moderation_result.refusal_response], ) return TurnSummary( @@ -331,6 +367,16 @@ async def retrieve_response( error_response = handle_known_apistatus_errors(e, responses_params.model) raise HTTPException(**error_response.model_dump()) from e + # In compacted mode, store the completed turn ourselves (the conversation + # parameter was not sent, so Llama Stack did not persist it). + if original_input is not None: + await store_compacted_turn( + client, + responses_params.conversation, + original_input, + response.output, + ) + vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools) rag_id_mapping = configuration.rag_id_mapping return build_turn_summary( diff --git a/src/app/endpoints/responses.py b/src/app/endpoints/responses.py index ef1cdd802..4bc910ff0 100644 --- a/src/app/endpoints/responses.py +++ b/src/app/endpoints/responses.py @@ -62,8 +62,13 @@ from models.common.moderation import ShieldModerationBlocked from models.common.responses.contexts import ResponsesContext from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import TurnSummary from models.config import Action +from utils.conversation_compaction import ( + apply_compaction_blocking, + configured_conversation_cache, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -225,10 +230,18 @@ async def _persist_blocked_response_turn( """ if api_params.store: moderation_result = cast(ShieldModerationBlocked, context.moderation_result) + # In compacted mode the conversation parameter was dropped and + # api_params.input is the explicit-input rewrite, so persist the turn + # against the original user input instead (LCORE-1572). + user_input = ( + context.compacted_original_input + if context.compacted_original_input is not None + else api_params.input + ) await append_turn_items_to_conversation( client=context.client, conversation_id=api_params.conversation, - user_input=api_params.input, + user_input=user_input, llm_output=[moderation_result.refusal_response], ) @@ -238,14 +251,30 @@ async def _append_previous_response_turn( context: ResponsesContext, output: Sequence[OpenAIResponseOutput], ) -> None: - """Append response output when continuing from a previous response id. + """Append the completed turn when Llama Stack did not store it automatically. + + Llama Stack stores the turn itself only when the conversation parameter is + sent. Two cases bypass that and require an explicit append: continuing from + a ``previous_response_id``, and conversation compaction (LCORE-1572), where + the conversation parameter is dropped in favor of explicit input. In the + compaction case the turn is stored against the original user input (before + the explicit-input rewrite), carried on the context. Args: api_params: Responses API parameters containing conversation details. context: Request-scoped Responses API context. output: Final output items from the Responses API object. """ - if api_params.store and api_params.previous_response_id: + if not api_params.store: + return + if context.compacted_original_input is not None: + await append_turn_items_to_conversation( + context.client, + api_params.conversation, + context.compacted_original_input, + output, + ) + elif api_params.previous_response_id: await append_turn_items_to_conversation( context.client, api_params.conversation, @@ -356,7 +385,7 @@ async def responses_endpoint_handler( check_configuration_loaded(configuration) started_at = datetime.now(UTC) rh_identity_context = get_rh_identity_context(request) - user_id, _, _, token = auth + user_id, _, skip_userid_check, token = auth await check_mcp_auth(configuration, mcp_headers, token, request.headers) @@ -455,6 +484,32 @@ async def responses_endpoint_handler( ) api_params = ResponsesApiParams.model_validate(updated_request.model_dump()) + + # Compact the conversation if it is approaching the context window limit. + # /v1/responses is OpenAI-compatible, so compaction is silent (no custom SSE + # event): summarization happens before the response is created, and the turn + # is appended explicitly afterward (the conversation parameter is dropped). + # Only stateful single-conversation requests are eligible. + compacted_original_input: Optional[ResponseInput] = None + if ( + configuration.compaction.enabled + and api_params.store + and api_params.conversation + and not api_params.previous_response_id + ): + compaction = await apply_compaction_blocking( + client, + api_params, + configuration.inference, + configuration.compaction, + cache=configured_conversation_cache(), + user_id=user_id, + skip_user_id_check=skip_userid_check, + ) + api_params = compaction.params + if compaction.compacted: + compacted_original_input = compaction.original_input + context = ResponsesContext( client=client, auth=auth, @@ -468,6 +523,7 @@ async def responses_endpoint_handler( user_agent=_get_user_agent(request), endpoint_path=endpoint_path, generate_topic_summary=updated_request.generate_topic_summary, + compacted_original_input=compacted_original_input, ) response_handler = ( handle_streaming_response diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index c88fb03dd..59586c0f9 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -11,6 +11,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import StreamingResponse from llama_stack_api import ( + OpenAIResponseMessage, OpenAIResponseObject, OpenAIResponseObjectStream, ) @@ -76,8 +77,17 @@ from models.api.responses.successful import StreamingQueryResponse from models.common.responses.contexts import ResponseGeneratorContext from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput from models.common.turn_summary import ReferencedDocument, TurnSummary from models.config import Action +from utils.conversation_compaction import ( + CompactionResult, + CompactionStartedEvent, + apply_compaction, + configured_conversation_cache, + needs_compaction_path, + store_compacted_turn, +) from utils.conversations import append_turn_items_to_conversation from utils.endpoints import ( check_configuration_loaded, @@ -287,6 +297,33 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals ) recording.record_llm_call(provider_id, model_id, endpoint_path) + response_media_type = ( + MEDIA_TYPE_TEXT + if query_request.media_type == MEDIA_TYPE_TEXT + else MEDIA_TYPE_EVENT_STREAM + ) + + # Only conversations that actually compact (already have a summary marker, + # or would trigger one now) take the compaction-aware path, where the + # response is created inside the SSE stream so the progress event can be + # flushed before the summarization LLM call. Every other request keeps the + # unchanged path: the response stream is created here, so create-time errors + # surface as HTTP responses exactly as before. + if await needs_compaction_path( + context.client, + responses_params, + configuration.inference, + configuration.compaction, + ): + return StreamingResponse( + generate_response_with_compaction( + context=context, + responses_params=responses_params, + endpoint_path=endpoint_path, + ), + media_type=response_media_type, + ) + generator, turn_summary = await retrieve_response_generator( responses_params=responses_params, context=context, @@ -299,12 +336,6 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals inline_rag_context.referenced_documents + turn_summary.referenced_documents ) - response_media_type = ( - MEDIA_TYPE_TEXT - if query_request.media_type == MEDIA_TYPE_TEXT - else MEDIA_TYPE_EVENT_STREAM - ) - return StreamingResponse( generate_response( generator=generator, @@ -341,12 +372,17 @@ async def retrieve_response_generator( if context.moderation_result.decision == "blocked": turn_summary.llm_response = context.moderation_result.message turn_summary.id = context.moderation_result.moderation_id - await append_turn_items_to_conversation( - context.client, - responses_params.conversation, - responses_params.input, - [context.moderation_result.refusal_response], - ) + turn_summary.output_items = [context.moderation_result.refusal_response] + # In compacted mode the conversation parameter was omitted, so the + # refusal turn (with the original input) is persisted by + # generate_response; storing it here too would duplicate it. + if not responses_params.omit_conversation: + await append_turn_items_to_conversation( + context.client, + responses_params.conversation, + responses_params.input, + [context.moderation_result.refusal_response], + ) media_type = context.query_request.media_type or MEDIA_TYPE_JSON return ( shield_violation_generator( @@ -446,6 +482,7 @@ async def _persist_interrupted_turn( context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + original_input: Optional[ResponseInput] = None, ) -> None: """Persist the user query and an interrupted response into the conversation. @@ -460,14 +497,31 @@ async def _persist_interrupted_turn( responses_params: The Responses API parameters. turn_summary: TurnSummary with llm_response already set to the interrupted message. + original_input: In compacted mode, the original user input before the + explicit-input rewrite. When set, the turn is persisted against it + (the ``conversation`` parameter was dropped, and + ``responses_params.input`` is the explicit rewrite); ``None`` + otherwise (LCORE-1572). """ try: - await append_turn_to_conversation( - context.client, - responses_params.conversation, - cast(str, responses_params.input), - INTERRUPTED_RESPONSE_MESSAGE, - ) + if original_input is not None: + await append_turn_items_to_conversation( + context.client, + responses_params.conversation, + original_input, + [ + OpenAIResponseMessage( + role="assistant", content=INTERRUPTED_RESPONSE_MESSAGE + ) + ], + ) + else: + await append_turn_to_conversation( + context.client, + responses_params.conversation, + cast(str, responses_params.input), + INTERRUPTED_RESPONSE_MESSAGE, + ) except Exception: # pylint: disable=broad-except logger.exception( "Failed to append interrupted turn to conversation for request %s", @@ -513,6 +567,7 @@ def _register_interrupt_callback( context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + original_input: Optional[ResponseInput] = None, ) -> list[bool]: """Build an interrupt callback and register the stream for cancellation. @@ -543,7 +598,9 @@ async def _on_interrupt() -> None: return guard[0] = True turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE - await _persist_interrupted_turn(context, responses_params, turn_summary) + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input + ) current_task = asyncio.current_task() if current_task is not None: @@ -563,11 +620,126 @@ async def _on_interrupt() -> None: return guard -async def generate_response( +def _http_exception_stream_event(exc: HTTPException) -> str: + """Render a FastAPI HTTPException as an SSE error event. + + Used by the compaction-aware streaming path, where the response is created + inside the stream and so create-time errors must be surfaced as SSE events + rather than as an HTTP status response. + """ + detail = ( + exc.detail if isinstance(exc.detail, dict) else {"response": str(exc.detail)} + ) + return format_stream_data( + {"event": "error", "data": {"status_code": exc.status_code, **detail}} + ) + + +async def generate_response_with_compaction( + context: ResponseGeneratorContext, + responses_params: ResponsesApiParams, + endpoint_path: str, +) -> AsyncIterator[str]: + """Stream a response for a conversation that requires compaction. + + Used only when :func:`needs_compaction_path` is true. Compaction and the + response creation happen inside the SSE stream so the ``compaction`` event + is flushed to the client *before* the summarization LLM call (R12). Errors + raised while compacting or creating the response are surfaced as SSE error + events (the stream has already started, so an HTTP status is no longer + possible). + + Args: + context: The response generator context. + responses_params: The base Responses API parameters. + endpoint_path: API endpoint path used for metric labeling. + + Yields: + SSE-formatted strings. + """ + media_type = context.query_request.media_type or MEDIA_TYPE_JSON + yield stream_start_event( + conversation_id=context.conversation_id, + request_id=context.request_id, + ) + + compacted = False + compacted_original_input: Optional[ResponseInput] = None + try: + async for item in apply_compaction( + context.client, + responses_params, + configuration.inference, + configuration.compaction, + emit_events=True, + cache=configured_conversation_cache(), + user_id=context.user_id, + skip_user_id_check=context.skip_userid_check, + ): + if isinstance(item, CompactionStartedEvent): + yield stream_compaction_event(context.conversation_id) + elif isinstance(item, CompactionResult): + responses_params = item.params + compacted = item.compacted + compacted_original_input = item.original_input + + generator, turn_summary = await retrieve_response_generator( + responses_params=responses_params, + context=context, + endpoint_path=endpoint_path, + ) + except HTTPException as e: + yield _http_exception_stream_event(e) + return + except RuntimeError as e: # library mode wraps 413 into runtime error + error_response = ( + PromptTooLongResponse(model=responses_params.model) + if is_context_length_error(str(e)) + else InternalServerErrorResponse.generic() + ) + yield stream_http_error_event(error_response, media_type) + return + except APIConnectionError as e: + yield stream_http_error_event( + ServiceUnavailableResponse(backend_name="Llama Stack", cause=str(e)), + media_type, + ) + return + except (LLSApiStatusError, OpenAIAPIStatusError) as e: + yield stream_http_error_event( + handle_known_apistatus_errors(e, responses_params.model), media_type + ) + return + + # Combine inline RAG results (BYOK + Solr) with tool-based results + if context.moderation_result.decision == "passed": + turn_summary.referenced_documents = deduplicate_referenced_documents( + context.inline_rag_context.referenced_documents + + turn_summary.referenced_documents + ) + + # The start event was already emitted above; delegate the rest (re-yield, + # finalization, compacted-turn storage) to the shared generator. + async for event in generate_response( + generator, + context, + responses_params, + turn_summary, + emit_start=False, + compacted=compacted, + original_input=compacted_original_input, + ): + yield event + + +async def generate_response( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements generator: AsyncIterator[str], context: ResponseGeneratorContext, responses_params: ResponsesApiParams, turn_summary: TurnSummary, + emit_start: bool = True, + compacted: bool = False, + original_input: Optional[ResponseInput] = None, ) -> AsyncIterator[str]: """Wrap a generator with cleanup logic. @@ -582,20 +754,30 @@ async def generate_response( context: The response generator context responses_params: The Responses API parameters turn_summary: TurnSummary populated during streaming + emit_start: Whether to emit the SSE start event. False when the caller + (the compaction-aware wrapper) has already emitted it. + compacted: Whether the conversation is in compacted mode. When True the + conversation parameter was not sent to Llama Stack, so the completed + turn is appended to the conversation here rather than being stored + automatically. + original_input: In compacted mode, the original user input before the + explicit-input rewrite. Used to persist the completed turn with its + structured input (preserving attachments); ``None`` otherwise. Yields: SSE-formatted strings from the wrapped generator """ persist_guard = _register_interrupt_callback( - context, responses_params, turn_summary + context, responses_params, turn_summary, original_input ) stream_completed = False try: - yield stream_start_event( - conversation_id=context.conversation_id, - request_id=context.request_id, - ) + if emit_start: + yield stream_start_event( + conversation_id=context.conversation_id, + request_id=context.request_id, + ) # Re-yield all events from the generator async for event in generator: @@ -628,7 +810,9 @@ async def generate_response( if not persist_guard[0]: persist_guard[0] = True turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE - await _persist_interrupted_turn(context, responses_params, turn_summary) + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input + ) yield stream_interrupted_event(context.request_id) finally: get_stream_interrupt_registry().deregister_stream(context.request_id) @@ -671,6 +855,27 @@ async def generate_response( ) completed_at = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + # In compacted mode the conversation parameter was not sent, so Llama Stack + # did not persist this turn. Append it ourselves to keep the recent-turn + # buffer and audit history intact for the next request. + if compacted: + try: + await store_compacted_turn( + context.client, + responses_params.conversation, + ( + original_input + if original_input is not None + else context.query_request.query + ), + turn_summary.output_items, + ) + except Exception: # pylint: disable=broad-except + logger.exception( + "Failed to append compacted turn to conversation for request %s", + context.request_id, + ) + # Store query results (transcript, conversation details, cache) logger.info("Storing query results") store_query_results( @@ -833,6 +1038,10 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat getattr(chunk, "response"), # noqa: B009 ) turn_summary.llm_response = turn_summary.llm_response or "".join(text_parts) + # Capture structured output items for compacted-mode turn storage + # (LCORE-1572), so the persisted turn keeps non-text output items + # rather than being flattened to the response text. + turn_summary.output_items = list(latest_response_object.output or []) yield stream_event( { "id": chunk_id, @@ -849,6 +1058,9 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat OpenAIResponseObject, getattr(chunk, "response"), # noqa: B009 ) + # Capture any partial output items so a compacted-mode turn is not + # persisted with empty output on these terminals (LCORE-1572). + turn_summary.output_items = list(latest_response_object.output or []) error_message = ( latest_response_object.error.message if latest_response_object.error @@ -967,6 +1179,31 @@ def stream_start_event(conversation_id: str, request_id: str) -> str: ) +def stream_compaction_event(conversation_id: str) -> str: + """Format an SSE event signalling that conversation compaction has started. + + Emitted before the summarization LLM call (R12) so the client can show a + progress indicator while older turns are being summarized. + + Parameters: + ---------- + conversation_id: The conversation being compacted. + + Returns: + ------- + str: SSE-formatted string representing the compaction event. + """ + return format_stream_data( + { + "event": "compaction", + "data": { + "status": "started", + "conversation_id": conversation_id, + }, + } + ) + + def stream_interrupted_event(request_id: str) -> str: """Format an SSE event indicating the stream was interrupted. diff --git a/src/configuration.py b/src/configuration.py index e65c8c230..0de369892 100644 --- a/src/configuration.py +++ b/src/configuration.py @@ -18,6 +18,7 @@ AuthenticationConfiguration, AuthorizationConfiguration, AzureEntraIdConfiguration, + CompactionConfiguration, Configuration, ConversationHistoryConfiguration, Customization, @@ -319,6 +320,21 @@ def inference(self) -> InferenceConfiguration: raise LogicError("logic error: configuration is not loaded") return self._configuration.inference + @property + def compaction(self) -> CompactionConfiguration: + """Return conversation compaction configuration. + + Returns: + CompactionConfiguration: The compaction configuration from the + loaded application configuration. + + Raises: + LogicError: If the configuration has not been loaded. + """ + if self._configuration is None: + raise LogicError("logic error: configuration is not loaded") + return self._configuration.compaction + @property def conversation_cache_configuration(self) -> ConversationHistoryConfiguration: """Return conversation cache configuration. diff --git a/src/models/common/responses/contexts.py b/src/models/common/responses/contexts.py index 89ec455d2..b2497b58e 100644 --- a/src/models/common/responses/contexts.py +++ b/src/models/common/responses/contexts.py @@ -10,6 +10,7 @@ from models.api.requests import QueryRequest from models.common.moderation import ShieldModerationResult +from models.common.responses.types import ResponseInput from models.common.turn_summary import RAGContext @@ -55,6 +56,14 @@ class ResponsesContext(BaseModel): default=False, description="Whether to generate a topic summary for new conversations", ) + compacted_original_input: Optional[ResponseInput] = Field( + default=None, + description="Set only when conversation compaction (LCORE-1572) rewrote " + "the request: the original user input before the explicit-input " + "rewrite. When present, the completed turn is appended to the " + "conversation using this input, since the conversation parameter was " + "dropped and Llama Stack therefore does not store the turn.", + ) @dataclass diff --git a/src/models/common/responses/responses_api_params.py b/src/models/common/responses/responses_api_params.py index 6abf9a618..fb9e064b9 100644 --- a/src/models/common/responses/responses_api_params.py +++ b/src/models/common/responses/responses_api_params.py @@ -121,17 +121,27 @@ class ResponsesApiParams(BaseModel): default=None, description="Extra HTTP headers to send with the request (e.g. x-llamastack-provider-data)", ) + omit_conversation: bool = Field( + default=False, + exclude=True, + description="When True, the conversation parameter is dropped from the " + "request body while remaining on the object for identity. Set by " + "conversation compaction (LCORE-1572): once a conversation is " + "compacted, lightspeed-stack supplies explicit input and must not let " + "Llama Stack reload the full history via the conversation parameter.", + ) def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]: """Serialize to a request body dict. - Omits conversation when previous_response_id is set. + Omits conversation when previous_response_id is set or when + omit_conversation is True (compacted mode). Returns: Serializable dict for the Responses API request body. """ result = super().model_dump(*args, **kwargs) - if self.previous_response_id: + if self.previous_response_id or self.omit_conversation: result.pop("conversation", None) return result diff --git a/src/models/common/turn_summary.py b/src/models/common/turn_summary.py index 9bf8cb04e..cc343d64e 100644 --- a/src/models/common/turn_summary.py +++ b/src/models/common/turn_summary.py @@ -5,6 +5,7 @@ from typing import Any, Optional +from llama_stack_api import OpenAIResponseOutput from pydantic import AnyUrl, BaseModel, Field from utils.token_counter import TokenCounter @@ -108,3 +109,8 @@ class TurnSummary(BaseModel): rag_chunks: list[RAGChunk] = Field(default_factory=list) referenced_documents: list[ReferencedDocument] = Field(default_factory=list) token_usage: TokenCounter = Field(default_factory=TokenCounter) + output_items: list[OpenAIResponseOutput] = Field( + default_factory=list, + description="Structured response output items, captured for compacted-mode " + "turn persistence (LCORE-1572). Empty on the non-compacted path.", + ) diff --git a/src/utils/conversation_compaction.py b/src/utils/conversation_compaction.py new file mode 100644 index 000000000..e07bfc20d --- /dev/null +++ b/src/utils/conversation_compaction.py @@ -0,0 +1,595 @@ +"""Runtime integration of conversation compaction into the request flow. + +This module wires the pure compaction primitives (``utils.compaction``, +LCORE-1570) and the token estimator (``utils.token_estimator``, LCORE-1569) +into the actual request path (LCORE-1572). Unlike ``utils.compaction`` — which +is deliberately side-effect free — this module *does* touch conversation state: +it fetches conversation items from Llama Stack, calls the summarization LLM, +writes summary marker items, reads and writes summaries in the cache, and holds +a per-conversation lock. + +Design (see ``docs/design/conversation-compaction/conversation-compaction.md``): + +* **Option A — lightspeed owns the context after compaction.** Once a + conversation has been compacted, lightspeed-stack stops handing the + ``conversation`` parameter to Llama Stack (which would otherwise reload the + full message history and defeat compaction). Instead it builds the model + input explicitly from the summaries plus the recent verbatim turns. The + conversation identity (``conversation_id``) is preserved, and the full + history remains in Llama Stack's conversation *items* for UI/audit. + +* **Marker items track the boundary.** Each compaction writes the summary into + the conversation as a recognizable *marker* message (a message whose text + starts with ``MARKER_SENTINEL``). The items after the last marker are the + recent verbatim turns; the marker texts are the additive summaries. This is + lightspeed's own bookkeeping — Llama Stack never interprets it (we no longer + pass ``conversation`` to inference once a marker exists). + +* **Streaming notification.** When driven by the streaming endpoint, this + module yields a :class:`CompactionStartedEvent` *before* the summarization + LLM call so the client can show a progress indicator (R12). The non-streaming + wrapper :func:`apply_compaction_blocking` simply ignores those events. + +The cache (LCORE-1571) is the preferred source of truth for summaries and the +home of the persisted recursive fold (R3): each summary chunk is written to it, +the active summary set is read back from it, and when the summaries themselves +grow past the threshold they are folded into one and persisted via +``replace_summaries`` so the fold is reused rather than recomputed. When no +persisting cache is configured (or a cache read fails) the module falls back to +the Llama Stack marker texts, which remain authoritative — marker-only mode +keeps additive summaries with no fold. The marker items always carry the +boundary between summarized history and the recent verbatim turns. +""" + +import asyncio +from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass +from typing import Any, Optional, cast + +from llama_stack_api.openai_responses import OpenAIResponseMessage +from llama_stack_client import AsyncLlamaStackClient +from llama_stack_client.types.conversations.item_create_params import Item + +from cache.cache import Cache +from cache.cache_error import CacheError +from configuration import configuration +from log import get_logger +from models.common.responses.responses_api_params import ResponsesApiParams +from models.common.responses.types import ResponseInput +from models.compaction import ConversationSummary +from models.config import CompactionConfiguration, InferenceConfiguration +from utils.compaction import ( + partition_conversation, + recursively_resummarize, + summarize_chunk, +) +from utils.conversations import ( + append_turn_items_to_conversation, + get_all_conversation_items, +) +from utils.token_estimator import ( + DEFAULT_ENCODING_NAME, + estimate_conversation_tokens, + estimate_tokens, + extract_message_text, + get_context_window, + is_message_item, +) + +logger = get_logger(__name__) + + +MARKER_SENTINEL = "[lightspeed:compaction-summary]" +"""Prefix that identifies a compaction summary marker message. + +Marker items are ordinary conversation messages whose text begins with this +sentinel. They are written by :func:`_write_summary_marker` and recognized by +:func:`is_marker_item`. The sentinel is stripped before the summary is shown to +the model (:func:`_summary_input_message`). +""" + + +# Per-conversation locks (R11). A request that triggers compaction holds the +# conversation's lock across the summarization LLM call so concurrent requests +# on the same conversation wait rather than racing (e.g. double-compacting or +# appending a turn mid-compaction). +_conversation_locks: dict[str, asyncio.Lock] = {} + + +def _get_lock(conversation_id: str) -> asyncio.Lock: + """Return the (process-wide) asyncio lock for a conversation, creating it lazily.""" + lock = _conversation_locks.get(conversation_id) + if lock is None: + lock = asyncio.Lock() + _conversation_locks[conversation_id] = lock + return lock + + +@dataclass +class CompactionStartedEvent: + """Sentinel yielded before the summarization LLM call (streaming only). + + The streaming endpoint formats this into an SSE ``compaction`` event so the + client can display a progress indicator. The module stays decoupled from SSE + formatting by yielding this typed value instead of a formatted string. + + Attributes: + conversation_id: The conversation being compacted (llama-stack format). + """ + + conversation_id: str + + +@dataclass +class CompactionResult: + """Outcome of applying compaction to a request. + + Attributes: + params: The (possibly rewritten) Responses API params to send. When + ``compacted`` is True, ``params.input`` is an explicit item list + (summaries + recent turns + new query) and the ``conversation`` + parameter is omitted from the request body. + compacted: Whether the request is served in compacted (explicit-input) + mode — i.e. the conversation has at least one summary (from a marker + or the cache), so the ``conversation`` parameter is omitted. This is + True whether the summary was created this request or reused from a + prior one. Drives ``context_status``. + original_input: The new user query exactly as it arrived (before the + explicit-input rewrite). Populated only in compacted mode (where + ``compacted`` is True); ``None`` otherwise. In compacted mode the + caller must append this plus the LLM output to the conversation + items itself, since the ``conversation`` parameter is no longer + passed to Llama Stack. + """ + + params: ResponsesApiParams + compacted: bool + original_input: Optional[ResponseInput] = None + + +def is_marker_item(item: Any) -> bool: + """Return True when *item* is a compaction summary marker message.""" + if not is_message_item(item): + return False + return extract_message_text(item).startswith(MARKER_SENTINEL) + + +def _summary_text_of(item: Any) -> str: + """Extract the summary text from a marker item (sentinel stripped).""" + return extract_message_text(item)[len(MARKER_SENTINEL) :].strip() + + +def _items_after_last_marker(items: list[Any]) -> list[Any]: + """Return the conversation items that follow the last summary marker. + + These are the recent turns kept verbatim. When there is no marker the whole + list is returned (no compaction has happened yet). + """ + last = -1 + for index, item in enumerate(items): + if is_marker_item(item): + last = index + return items[last + 1 :] + + +def _marker_summaries(items: list[Any]) -> list[str]: + """Return the summary texts of every marker item, in order (oldest first).""" + return [_summary_text_of(item) for item in items if is_marker_item(item)] + + +def _summary_input_message(summary_text: str) -> OpenAIResponseMessage: + """Build an explicit input message carrying a summary for the model. + + Returns a typed ``OpenAIResponseMessage`` (a member of the ``ResponseInput`` + union) so it serializes cleanly when the request body is dumped. + """ + return OpenAIResponseMessage( + role="user", content=f"Summary of earlier conversation:\n{summary_text}" + ) + + +def _verbatim_input_message(item: Any) -> Optional[OpenAIResponseMessage]: + """Render a recent conversation message item as an explicit input message. + + Only message items are rendered; non-message items (tool calls/results) are + skipped — they remain in the conversation's items for audit, but the + explicit LLM context for the recent buffer is built from message text. This + keeps the input schema-valid without reconstructing tool-call sequences. + """ + if not is_message_item(item): + return None + text = extract_message_text(item) + if not text: + return None + role = item.get("role") if isinstance(item, dict) else getattr(item, "role", "user") + if role not in ("system", "developer", "user", "assistant"): + role = "user" + # role validated above; cast satisfies the Literal-typed parameter. + return OpenAIResponseMessage(role=cast(Any, role), content=text) + + +def _query_input_message(original_input: ResponseInput) -> list[Any]: + """Render the new user query as explicit input items. + + A string query becomes a single typed user message. An item list (e.g. from + the /v1/responses client) is already composed of typed ``ResponseItem`` + objects, so it is passed through unchanged. + """ + if isinstance(original_input, str): + return [OpenAIResponseMessage(role="user", content=original_input)] + return list(original_input) + + +def _build_explicit_input( + summaries: list[str], + recent_items: list[Any], + original_input: ResponseInput, +) -> list[Any]: + """Assemble the explicit model input: summaries + recent turns + new query.""" + built: list[Any] = [_summary_input_message(text) for text in summaries] + for item in recent_items: + message = _verbatim_input_message(item) + if message is not None: + built.append(message) + built.extend(_query_input_message(original_input)) + return built + + +async def _write_summary_marker( + client: AsyncLlamaStackClient, + conversation_id: str, + summary_text: str, +) -> None: + """Write the summary into the conversation as a recognizable marker message.""" + marker_item: dict[str, Any] = { + "type": "message", + "role": "user", + "content": [ + {"type": "input_text", "text": f"{MARKER_SENTINEL} {summary_text}"} + ], + } + await client.conversations.items.create( + conversation_id, + items=cast(list[Item], [marker_item]), + ) + + +def _read_cached_summaries( + cache: Optional[Cache], + user_id: str, + conversation_id: str, + skip_user_id_check: bool, +) -> list[ConversationSummary]: + """Return persisted summary chunks for a conversation (best-effort). + + The cache is the preferred source of truth for summaries (and the only home + for a persisted recursive fold). Returns an empty list when no cache is + configured, the backend does not persist (in-memory/no-op), or a cache error + occurs — callers then fall back to the Llama Stack marker texts, which remain + authoritative. + """ + if cache is None: + return [] + try: + return cache.get_summaries(user_id, conversation_id, skip_user_id_check) + except CacheError as exc: # markers remain a valid fallback + logger.warning("compaction: cache get_summaries failed: %s", exc) + return [] + + +def _store_cached_summary( + cache: Optional[Cache], + user_id: str, + conversation_id: str, + summary: ConversationSummary, + skip_user_id_check: bool, +) -> None: + """Persist a new summary chunk to the cache (best-effort). + + The summary is also written as a Llama Stack marker by the caller, so a + failed cache write does not lose it — it only forgoes cache-backed reads and + folding for this conversation. + """ + if cache is None: + return + try: + cache.store_summary(user_id, conversation_id, summary, skip_user_id_check) + except CacheError as exc: # the marker write already preserved the summary + logger.warning("compaction: cache store_summary failed: %s", exc) + + +def configured_conversation_cache() -> Optional[Cache]: + """Return the conversation cache for compaction, or None when not applicable. + + Endpoints pass this to :func:`apply_compaction` / :func:`apply_compaction_blocking`. + Returns None — without touching the cache — when compaction is disabled, since + the cache is only used by compaction on this path and accessing it would + needlessly initialize it (and could fail) on every request. Also returns None + when no conversation cache is configured; compaction then runs in marker-only + mode. + """ + if not configuration.compaction.enabled: + return None + if configuration.conversation_cache_configuration.type is None: + return None + return configuration.conversation_cache + + +def _estimate_response_input_tokens(value: ResponseInput, encoding_name: str) -> int: + """Estimate the token count of a request input (string or item list). + + The new query may arrive as a plain string or, on ``/v1/responses``, as a + list of input items. Counting only the string form would undercount list + inputs, which could let a request skip compaction and still hit HTTP 413. + """ + if isinstance(value, str): + return estimate_tokens(value, encoding_name) + return estimate_conversation_tokens(list(value), encoding_name=encoding_name) + + +def _should_compact( + estimated_tokens: int, + context_window: int, + config: CompactionConfiguration, +) -> bool: + """Decide whether the estimated input warrants compaction. + + Triggers when the estimate exceeds ``threshold_ratio`` of the context + window and also clears the absolute ``token_floor`` (which prevents + over-eager compaction on very small windows). + """ + threshold = context_window * config.threshold_ratio + return estimated_tokens > threshold and estimated_tokens > config.token_floor + + +async def apply_compaction( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + emit_events: bool = False, + encoding_name: str = DEFAULT_ENCODING_NAME, + cache: Optional[Cache] = None, + user_id: str = "", + skip_user_id_check: bool = False, +) -> AsyncIterator[Any]: + """Apply conversation compaction to a prepared request, yielding the result. + + This is an async generator. When ``emit_events`` is True it yields a + :class:`CompactionStartedEvent` immediately before the summarization LLM + call (so the streaming endpoint can surface progress). It always yields a + single :class:`CompactionResult` as its final item. + + The whole evaluate-and-summarize section runs under the conversation's lock + (R11). When compaction is disabled, the model has no registered context + window, or the conversation is not yet near the limit, the result simply + carries the unchanged params with ``compacted`` reflecting whether any + prior summary marker already exists. + + Parameters: + client: Llama Stack client. + params: The base Responses API params from ``prepare_responses_params``. + inference_config: Inference config (for the per-model context window). + compaction_config: Compaction tuning (enabled, threshold, buffer, ...). + emit_events: Whether to yield CompactionStartedEvent before summarizing. + encoding_name: tiktoken encoding name for estimation/summarization. + cache: Conversation cache, the preferred summary store and the home of + the persisted recursive fold. ``None`` (or a non-persisting backend) + falls back to marker-only summaries with no folding. + user_id: User identifier for cache reads/writes. + skip_user_id_check: Whether to bypass the cache's user_id validation. + + Yields: + Zero or more CompactionStartedEvent, then exactly one CompactionResult. + """ + if not compaction_config.enabled: + # ``enabled: false`` is a full off-switch: the request passes through + # unchanged (conversation parameter intact, full-history replay). Known + # limitation: disabling compaction *after* a conversation was already + # compacted reverts that conversation to full replay — which can re-hit + # the 413 path and resend marker text through the model. Toggling the + # flag mid-conversation on already-compacted conversations is therefore + # unsupported; leave it enabled for the lifetime of such conversations. + yield CompactionResult(params, compacted=False) + return + + conversation_id = params.conversation + model = params.model + system_prompt = params.instructions + original_input = params.input + + async with _get_lock(conversation_id): + items = await get_all_conversation_items(client, conversation_id) + recent_items = _items_after_last_marker(items) + + # Summaries: the cache is the preferred source of truth (and the home of + # any persisted recursive fold); the Llama Stack marker texts remain an + # authoritative fallback when no persisting cache is available. + cached_summaries = _read_cached_summaries( + cache, user_id, conversation_id, skip_user_id_check + ) + summaries = ( + [s.summary_text for s in cached_summaries] + if cached_summaries + else _marker_summaries(items) + ) + + context_window = get_context_window(model, inference_config) + if context_window is not None: + estimated = estimate_tokens(system_prompt or "", encoding_name) + estimated += sum(estimate_tokens(text, encoding_name) for text in summaries) + estimated += estimate_conversation_tokens( + recent_items, encoding_name=encoding_name + ) + estimated += _estimate_response_input_tokens(original_input, encoding_name) + + if _should_compact(estimated, context_window, compaction_config): + if emit_events: + yield CompactionStartedEvent(conversation_id=conversation_id) + + budget = int(context_window * compaction_config.buffer_max_ratio) + old_items, keep_items = partition_conversation( + recent_items, + available_budget_tokens=budget, + buffer_turns=compaction_config.buffer_turns, + encoding_name=encoding_name, + ) + if old_items: + already = len(items) - len(recent_items) + summary = await summarize_chunk( + client, + model, + old_items, + summarized_through_turn=already + len(old_items), + encoding_name=encoding_name, + ) + await _write_summary_marker( + client, conversation_id, summary.summary_text + ) + _store_cached_summary( + cache, user_id, conversation_id, summary, skip_user_id_check + ) + summaries.append(summary.summary_text) + cached_summaries = [*cached_summaries, summary] + recent_items = keep_items + + # Recursive fold (R3): when the persisted summaries themselves grow + # past the threshold, collapse them into a single fold and persist it + # so it is reused rather than recomputed each request. Requires a + # persisting cache; marker-only conversations keep additive summaries. + if cache is not None and len(cached_summaries) >= 2: + summaries_tokens = sum(s.token_count for s in cached_summaries) + if ( + summaries_tokens + > context_window * compaction_config.threshold_ratio + ): + logger.info( + "Folding %d summaries (%d tokens) for conversation %s", + len(cached_summaries), + summaries_tokens, + conversation_id, + ) + folded = await recursively_resummarize( + client, model, cached_summaries, encoding_name + ) + try: + cache.replace_summaries( + user_id, conversation_id, folded, skip_user_id_check + ) + cached_summaries = [folded] + summaries = [folded.summary_text] + except CacheError as exc: # keep the unfolded summaries + logger.warning( + "compaction: cache replace_summaries failed: %s", exc + ) + + if not summaries: + # No compaction has ever happened for this conversation: leave the + # normal conversation-parameter flow untouched. + yield CompactionResult(params, compacted=False) + return + + # Compacted mode: lightspeed owns the context. Build explicit input and + # stop passing the conversation parameter to inference. + explicit_input = _build_explicit_input(summaries, recent_items, original_input) + compacted_params = params.model_copy( + update={"input": explicit_input, "omit_conversation": True} + ) + yield CompactionResult( + compacted_params, compacted=True, original_input=original_input + ) + + +async def apply_compaction_blocking( # pylint: disable=too-many-arguments,too-many-positional-arguments + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + encoding_name: str = DEFAULT_ENCODING_NAME, + cache: Optional[Cache] = None, + user_id: str = "", + skip_user_id_check: bool = False, +) -> CompactionResult: + """Non-streaming wrapper around :func:`apply_compaction`. + + Drains the generator with event emission disabled and returns the final + :class:`CompactionResult`. See :func:`apply_compaction` for the ``cache`` / + ``user_id`` / ``skip_user_id_check`` parameters. + """ + result: Optional[CompactionResult] = None + async for item in apply_compaction( + client, + params, + inference_config, + compaction_config, + emit_events=False, + encoding_name=encoding_name, + cache=cache, + user_id=user_id, + skip_user_id_check=skip_user_id_check, + ): + if isinstance(item, CompactionResult): + result = item + if result is None: # pragma: no cover - the generator always yields one result + raise RuntimeError("apply_compaction did not yield a CompactionResult") + return result + + +async def needs_compaction_path( + client: AsyncLlamaStackClient, + params: ResponsesApiParams, + inference_config: InferenceConfiguration, + compaction_config: CompactionConfiguration, + encoding_name: str = DEFAULT_ENCODING_NAME, +) -> bool: + """Return whether this request needs the compaction-aware path (cheap check). + + Returns True when the conversation already has a summary marker (so it must + be served in compacted mode with explicit input) or when the estimated + tokens would trigger a new compaction. Performs no LLM call and takes no + lock — the authoritative evaluate-and-summarize work happens later under + the lock in :func:`apply_compaction`. Streaming endpoints use this to keep + non-compacting requests on their unchanged code path, so the in-stream + flow (and its SSE-error semantics) only ever applies to conversations that + are actually being compacted. + + Parameters: + client: Llama Stack client. + params: The base Responses API params. + inference_config: Inference config (for the per-model context window). + compaction_config: Compaction tuning. + encoding_name: tiktoken encoding name for estimation. + + Returns: + True if the compaction-aware streaming path should be used. + """ + if not compaction_config.enabled: + return False + items = await get_all_conversation_items(client, params.conversation) + if any(is_marker_item(item) for item in items): + return True + context_window = get_context_window(params.model, inference_config) + if context_window is None: + return False + estimated = estimate_tokens(params.instructions or "", encoding_name) + estimated += estimate_conversation_tokens(items, encoding_name=encoding_name) + estimated += _estimate_response_input_tokens(params.input, encoding_name) + return _should_compact(estimated, context_window, compaction_config) + + +async def store_compacted_turn( + client: AsyncLlamaStackClient, + conversation_id: str, + original_input: ResponseInput, + output_items: Sequence[Any], +) -> None: + """Append a completed turn to the conversation when in compacted mode. + + In compacted mode the ``conversation`` parameter is not sent to inference, + so Llama Stack does not auto-store the turn. lightspeed-stack appends the + user query and the LLM output to the conversation items itself, keeping the + full history (and the recent-turn buffer for the next request) intact. + """ + await append_turn_items_to_conversation( + client, conversation_id, original_input, output_items + ) diff --git a/tests/unit/app/endpoints/test_a2a.py b/tests/unit/app/endpoints/test_a2a.py index 6138d2568..e35b82b8e 100644 --- a/tests/unit/app/endpoints/test_a2a.py +++ b/tests/unit/app/endpoints/test_a2a.py @@ -801,6 +801,73 @@ async def test_process_task_streaming_handles_api_connection_error_on_retrieve_r error_message = call_args[1]["message"] assert "Unable to connect to Llama Stack backend service" in str(error_message) + @pytest.mark.asyncio + async def test_process_task_streaming_applies_compaction( + self, + mocker: MockerFixture, + setup_configuration: AppConfig, # pylint: disable=unused-argument + ) -> None: + """Test _process_task_streaming runs conversation compaction before the call.""" + executor = A2AAgentExecutor(auth_token="test-token") + + mock_message = mocker.MagicMock() + mock_message.role = "user" + mock_message.parts = [Part(root=TextPart(text="Hello"))] + mock_message.metadata = {} + + context = mocker.MagicMock(spec=RequestContext) + context.task_id = "task-123" + context.context_id = "ctx-456" + context.message = mock_message + context.get_user_input.return_value = "Hello" + + event_queue = mocker.AsyncMock(spec=EventQueue) + task_updater = mocker.MagicMock() + task_updater.update_status = mocker.AsyncMock() + task_updater.event_queue = event_queue + + mock_context_store = mocker.AsyncMock() + mock_context_store.get.return_value = None + mocker.patch( + "app.endpoints.a2a._get_context_store", return_value=mock_context_store + ) + + async def _empty_stream() -> Any: + """Yield no stream events.""" + return + yield # pragma: no cover (makes this an async generator) + + mock_client = mocker.AsyncMock() + mock_client.models.list = mocker.AsyncMock(return_value=[mocker.MagicMock()]) + mock_client.responses.create = mocker.AsyncMock(return_value=_empty_stream()) + mocker.patch( + "app.endpoints.a2a.AsyncLlamaStackClientHolder" + ).return_value.get_client.return_value = mock_client + + mock_params = mocker.Mock() + mock_params.model = "test-model" + mock_params.conversation = "conv_x" + mock_params.model_dump.return_value = {"input": "Hello", "model": "test-model"} + mocker.patch( + "app.endpoints.a2a.prepare_responses_params", + new=mocker.AsyncMock(return_value=mock_params), + ) + + compaction_result = mocker.Mock() + compaction_result.params = mock_params + compaction_result.summarized = False + compaction_result.original_input = None + apply = mocker.patch( + "app.endpoints.a2a.apply_compaction_blocking", + new=mocker.AsyncMock(return_value=compaction_result), + ) + + await executor._process_task_streaming( # pylint: disable=protected-access + context, task_updater, context.task_id, context.context_id + ) + + apply.assert_awaited_once() + @pytest.mark.asyncio async def test_cancel_raises_not_implemented(self, mocker: MockerFixture) -> None: """Test that cancel raises NotImplementedError.""" diff --git a/tests/unit/app/endpoints/test_responses.py b/tests/unit/app/endpoints/test_responses.py index 88fc23561..915615084 100644 --- a/tests/unit/app/endpoints/test_responses.py +++ b/tests/unit/app/endpoints/test_responses.py @@ -20,7 +20,9 @@ from pytest_mock import MockerFixture from app.endpoints.responses import ( + _append_previous_response_turn, _is_server_mcp_output_item, + _persist_blocked_response_turn, _sanitize_response_dict, _should_filter_mcp_chunk, handle_non_streaming_response, @@ -2783,3 +2785,85 @@ async def failing_stream() -> AsyncIterator[Any]: call_args = mock_record.call_args assert call_args.args[2] == "failure" assert call_args.kwargs.get("record_failure") is True + + +@pytest.mark.asyncio +async def test_append_previous_response_turn_compacted(mocker: MockerFixture) -> None: + """In compacted mode the turn is stored against the original input. + + When compaction rewrote the request, the conversation parameter was dropped + so Llama Stack did not store the turn. _append_previous_response_turn must + append it using the original user input (carried on the context), not the + rewritten explicit input on api_params. + """ + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + api_params = mocker.Mock( + store=True, + conversation="conv_x", + previous_response_id=None, + input=["rewritten explicit input"], + ) + context = mocker.Mock( + client=mocker.AsyncMock(), + compacted_original_input="the original query", + ) + + await _append_previous_response_turn(api_params, context, ["out"]) + + append.assert_awaited_once_with( + context.client, "conv_x", "the original query", ["out"] + ) + + +@pytest.mark.asyncio +async def test_append_previous_response_turn_not_stored_when_store_false( + mocker: MockerFixture, +) -> None: + """No append happens when store is disabled, even in compacted mode.""" + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + api_params = mocker.Mock( + store=False, conversation="conv_x", previous_response_id=None + ) + context = mocker.Mock(client=mocker.AsyncMock(), compacted_original_input="q") + + await _append_previous_response_turn(api_params, context, ["out"]) + + append.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_persist_blocked_response_turn_compacted(mocker: MockerFixture) -> None: + """A shield-blocked compacted turn is stored against the original input. + + In compacted mode the conversation parameter was dropped and api_params.input + is the explicit-input rewrite, so the blocked refusal turn must be persisted + against the original user input carried on the context (LCORE-1572). + """ + append = mocker.patch( + "app.endpoints.responses.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + refusal = mocker.Mock() + api_params = mocker.Mock( + store=True, conversation="conv_x", input=["rewritten explicit input"] + ) + context = mocker.Mock( + client=mocker.AsyncMock(), + compacted_original_input="the original query", + moderation_result=mocker.Mock(refusal_response=refusal), + ) + + await _persist_blocked_response_turn(api_params, context) + + append.assert_awaited_once_with( + client=context.client, + conversation_id="conv_x", + user_input="the original query", + llm_output=[refusal], + ) diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 19176d57f..b147822b9 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -46,6 +46,7 @@ from pytest_mock import MockerFixture from app.endpoints.streaming_query import ( + _persist_interrupted_turn, generate_response, response_generator, retrieve_response_generator, @@ -871,6 +872,7 @@ async def test_retrieve_response_generator_shield_blocked( mock_responses_params.model = "provider1/model1" mock_responses_params.input = "test query" mock_responses_params.conversation = "conv_123" + mock_responses_params.omit_conversation = False # non-compacted mock_context = mocker.Mock(spec=ResponseGeneratorContext) mock_context.client = mock_client @@ -887,7 +889,7 @@ async def test_retrieve_response_generator_shield_blocked( mock_moderation_result.moderation_id = "mod_123" mock_moderation_result.refusal_response = mocker.Mock() mock_context.moderation_result = mock_moderation_result - mocker.patch( + mock_append = mocker.patch( "app.endpoints.streaming_query.append_turn_items_to_conversation", new=mocker.AsyncMock(), ) @@ -898,6 +900,54 @@ async def test_retrieve_response_generator_shield_blocked( assert isinstance(turn_summary, TurnSummary) assert turn_summary.llm_response == "Content blocked" + # Structured refusal captured for compacted-mode persistence (LCORE-1572). + assert turn_summary.output_items == [mock_moderation_result.refusal_response] + # Non-compacted: the refusal turn is stored here. + mock_append.assert_awaited_once() + + @pytest.mark.asyncio + async def test_retrieve_response_generator_shield_blocked_compacted( + self, mocker: MockerFixture + ) -> None: + """In compacted mode the shield refusal is not stored here (no double-store). + + generate_response persists the compacted turn (with the original input), + so storing it again in the shield branch would duplicate it (LCORE-1572). + """ + mock_client = mocker.AsyncMock(spec=AsyncLlamaStackClient) + + mock_responses_params = mocker.Mock(spec=ResponsesApiParams) + mock_responses_params.model = "provider1/model1" + mock_responses_params.input = "explicit input" + mock_responses_params.conversation = "conv_123" + mock_responses_params.omit_conversation = True # compacted + + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.client = mock_client + mock_context.vector_store_ids = [] + mock_context.rag_id_mapping = {} + mock_context.inline_rag_context = RAGContext() + mock_context.query_request = QueryRequest( + query="test", media_type=MEDIA_TYPE_TEXT + ) # pyright: ignore[reportCallIssue] + + mock_moderation_result = mocker.Mock() + mock_moderation_result.decision = "blocked" + mock_moderation_result.message = "Content blocked" + mock_moderation_result.moderation_id = "mod_123" + mock_moderation_result.refusal_response = mocker.Mock() + mock_context.moderation_result = mock_moderation_result + mock_append = mocker.patch( + "app.endpoints.streaming_query.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + + _generator, turn_summary = await retrieve_response_generator( + mock_responses_params, mock_context, endpoint_path="" + ) + + assert turn_summary.output_items == [mock_moderation_result.refusal_response] + mock_append.assert_not_awaited() # compacted: generate_response stores it @pytest.mark.asyncio async def test_retrieve_response_generator_connection_error( @@ -1163,6 +1213,71 @@ async def mock_generator() -> AsyncIterator[str]: assert any("start" in item for item in result) assert any("end" in item for item in result) + @pytest.mark.asyncio + async def test_generate_response_compacted_persists_structured_turn( + self, mocker: MockerFixture + ) -> None: + """Compacted mode persists the turn via store_compacted_turn with the + original input and structured output items, not flattened strings + (LCORE-1572).""" + + async def mock_generator() -> AsyncIterator[str]: + yield "data: token\n\n" + + conv_id = "123e4567-e89b-12d3-a456-426614174000" + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.conversation_id = conv_id + mock_context.user_id = "user_123" + mock_context.query_request = QueryRequest( + query="test", conversation_id=conv_id + ) # pyright: ignore[reportCallIssue] + mock_context.started_at = "2024-01-01T00:00:00Z" + mock_context.skip_userid_check = False + mock_context.request_id = "223e4567-e89b-12d3-a456-426614174000" + mock_context.client = mocker.AsyncMock(spec=AsyncLlamaStackClient) + + mock_responses_params = mocker.Mock(spec=ResponsesApiParams) + mock_responses_params.model = "provider1/model1" + mock_responses_params.conversation = conv_id + + turn_summary = TurnSummary() + turn_summary.token_usage = TokenCounter(input_tokens=10, output_tokens=5) + output_item = mocker.Mock() + turn_summary.output_items = [output_item] + + mock_config = mocker.Mock() + mock_config.quota_limiters = [] + mocker.patch("app.endpoints.streaming_query.configuration", mock_config) + mocker.patch("app.endpoints.streaming_query.consume_query_tokens") + mocker.patch( + "app.endpoints.streaming_query.get_available_quotas", return_value={} + ) + mocker.patch("app.endpoints.streaming_query.store_query_results") + store_mock = mocker.patch( + "app.endpoints.streaming_query.store_compacted_turn", + new_callable=mocker.AsyncMock, + ) + + result = [ + item + async for item in generate_response( + mock_generator(), + mock_context, + mock_responses_params, + turn_summary, + compacted=True, + original_input="the original query", + ) + ] + + assert any("end" in item for item in result) + store_mock.assert_awaited_once_with( + mock_context.client, + conv_id, + "the original query", + [output_item], + ) + @pytest.mark.asyncio async def test_generate_response_with_topic_summary( self, mocker: MockerFixture @@ -2812,3 +2927,90 @@ async def mock_turn_response() -> AsyncIterator[OpenAIResponseObjectStream]: # Should have both tool call and result (fallback behavior) assert len(mock_turn_summary.tool_calls) == 1 assert len(mock_turn_summary.tool_results) == 1 + + +@pytest.mark.asyncio +async def test_response_generator_failed_captures_output_items( + mocker: MockerFixture, +) -> None: + """A failed terminal captures output_items for compacted persistence (LCORE-1572).""" + out_item = mocker.Mock() + + async def mock_turn_response() -> AsyncIterator[OpenAIResponseObjectStream]: + chunk = mocker.Mock(spec=FailedChunk) + chunk.type = "response.failed" + mock_response = mocker.Mock() + mock_response.output = [out_item] + mock_response.error = mocker.Mock(message="boom") + chunk.response = mock_response + yield chunk + + mock_context = mocker.Mock(spec=ResponseGeneratorContext) + mock_context.query_request = QueryRequest( + query="test", media_type=MEDIA_TYPE_JSON + ) # pyright: ignore[reportCallIssue] + mock_context.model_id = "provider1/model1" + mock_context.vector_store_ids = [] + mock_context.rag_id_mapping = {} + mock_context.inline_rag_context = RAGContext() + + turn_summary = TurnSummary() + mocker.patch( + "app.endpoints.streaming_query.extract_token_usage", + return_value=TokenCounter(input_tokens=0, output_tokens=0), + ) + mocker.patch( + "app.endpoints.streaming_query.parse_referenced_documents", return_value=[] + ) + + async for _ in response_generator( + mock_turn_response(), mock_context, turn_summary, endpoint_path="" + ): + pass + + assert turn_summary.output_items == [out_item] + + +@pytest.mark.asyncio +async def test_persist_interrupted_turn_compacted_uses_original_input( + mocker: MockerFixture, +) -> None: + """Interrupted compacted turn persists the original input (LCORE-1572). + + Not the explicit rewrite carried on responses_params.input. + """ + conv = "123e4567-e89b-12d3-a456-426614174000" + context = mocker.Mock(spec=ResponseGeneratorContext) + context.client = mocker.AsyncMock() + context.request_id = "req-1" + context.user_id = "user_1" + context.conversation_id = conv + context.started_at = "2024-01-01T00:00:00Z" + context.skip_userid_check = False + context.query_request = QueryRequest( + query="hi", conversation_id=conv + ) # pyright: ignore[reportCallIssue] + + responses_params = mocker.Mock(spec=ResponsesApiParams) + responses_params.conversation = conv + responses_params.model = "provider1/model1" + responses_params.input = ["explicit rewrite"] + + turn_summary = TurnSummary() + items = mocker.patch( + "app.endpoints.streaming_query.append_turn_items_to_conversation", + new=mocker.AsyncMock(), + ) + strs = mocker.patch( + "app.endpoints.streaming_query.append_turn_to_conversation", + new=mocker.AsyncMock(), + ) + mocker.patch("app.endpoints.streaming_query.store_query_results") + + await _persist_interrupted_turn( + context, responses_params, turn_summary, original_input="the original query" + ) + + items.assert_awaited_once() + assert items.call_args.args[2] == "the original query" + strs.assert_not_awaited() diff --git a/tests/unit/utils/test_conversation_compaction.py b/tests/unit/utils/test_conversation_compaction.py new file mode 100644 index 000000000..9a0fe2deb --- /dev/null +++ b/tests/unit/utils/test_conversation_compaction.py @@ -0,0 +1,563 @@ +"""Unit tests for runtime conversation compaction (LCORE-1572).""" + +# Tests exercise internal helpers directly. +# pylint: disable=protected-access + +from types import SimpleNamespace +from typing import Any, cast + +import pytest +from pytest_mock import MockerFixture + +from models.common.responses.responses_api_params import ResponsesApiParams +from models.compaction import ConversationSummary +from models.config import CompactionConfiguration, InferenceConfiguration +from utils import conversation_compaction as cc + +MODEL = "openai/gpt-4o-mini" +CONV = "conv_abc123" + + +def _msg(role: str, text: str) -> dict: + """Build a duck-typed conversation message item.""" + return {"role": role, "content": text} + + +def _marker(text: str) -> dict: + """Build a summary marker message item.""" + return {"role": "user", "content": f"{cc.MARKER_SENTINEL} {text}"} + + +def _params(input_text: str = "new question") -> ResponsesApiParams: + return ResponsesApiParams( + input=input_text, + model=MODEL, + conversation=CONV, + instructions="system prompt", + store=True, + stream=False, + ) + + +def _inference(window: int | None) -> InferenceConfiguration: + windows = {MODEL: window} if window is not None else {} + return InferenceConfiguration(context_windows=windows) + + +def _compaction(**kw: Any) -> CompactionConfiguration: + base = { + "enabled": True, + "threshold_ratio": 0.5, + "token_floor": 0, + "buffer_turns": 1, + "buffer_max_ratio": 0.3, + } + base.update(kw) + return CompactionConfiguration(**base) + + +# --- pure helpers --- + + +def test_is_marker_item() -> None: + """Marker messages are recognized; ordinary messages and non-messages are not.""" + assert cc.is_marker_item(_marker("s")) is True + assert cc.is_marker_item(_msg("user", "hello")) is False + assert cc.is_marker_item({"type": "function_call"}) is False + + +def test_items_after_last_marker() -> None: + """Only items following the last marker are treated as recent verbatim turns.""" + items = [ + _msg("user", "a"), + _marker("first summary"), + _msg("user", "b"), + _marker("second summary"), + _msg("assistant", "c"), + ] + recent = cc._items_after_last_marker(items) + assert recent == [_msg("assistant", "c")] + + +def test_items_after_last_marker_no_marker() -> None: + """With no marker, every item is recent.""" + items = [_msg("user", "a"), _msg("assistant", "b")] + assert cc._items_after_last_marker(items) == items + + +def test_marker_summaries_in_order() -> None: + """All marker summaries are returned oldest-first with the sentinel stripped.""" + items = [_marker("one"), _msg("user", "x"), _marker("two")] + assert cc._marker_summaries(items) == ["one", "two"] + + +def test_build_explicit_input_shape() -> None: + """Explicit input is summaries, then recent message turns, then the new query.""" + built = cc._build_explicit_input( + summaries=["earlier stuff"], + recent_items=[_msg("user", "recent q"), _msg("assistant", "recent a")], + original_input="brand new question", + ) + texts = [m.content for m in built] + assert "Summary of earlier conversation:\nearlier stuff" in texts[0] + assert texts[1] == "recent q" + assert texts[2] == "recent a" + assert texts[3] == "brand new question" + # items are typed OpenAIResponseMessage objects (so they serialize cleanly) + assert built[2].role == "assistant" + + +def test_should_compact() -> None: + """Trigger requires exceeding both the ratio threshold and the token floor.""" + cfg = _compaction(threshold_ratio=0.7, token_floor=100) + assert cc._should_compact(estimated_tokens=800, context_window=1000, config=cfg) + # below the ratio threshold + assert not cc._should_compact(600, 1000, cfg) + # above ratio but below the floor + assert not cc._should_compact( + 90, 100, _compaction(threshold_ratio=0.5, token_floor=100) + ) + + +# --- apply_compaction --- + + +@pytest.mark.asyncio +async def test_disabled_passes_through() -> None: + """When compaction is disabled the params are returned unchanged.""" + result = await cc.apply_compaction_blocking( + client=None, # not used when disabled + params=_params(), + inference_config=_inference(1000), + compaction_config=_compaction(enabled=False), + ) + assert result.compacted is False + assert result.params.omit_conversation is False + assert result.params.input == "new question" + + +@pytest.mark.asyncio +async def test_no_context_window_no_marker_passes_through( + mocker: MockerFixture, +) -> None: + """No registered context window and no prior summary => normal flow.""" + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=[]) + ) + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params(), + inference_config=_inference(None), + compaction_config=_compaction(), + ) + assert result.compacted is False + assert result.params.omit_conversation is False + + +@pytest.mark.asyncio +async def test_existing_marker_builds_explicit_input(mocker: MockerFixture) -> None: + """A conversation that already has a marker is served in compacted mode. + + Even when below the trigger threshold (large window), the presence of a + prior summary means lightspeed-stack must own the context: explicit input + and the conversation parameter dropped. + """ + items = [ + _msg("user", "old q"), + _marker("the earlier conversation summary"), + _msg("user", "recent q"), + _msg("assistant", "recent a"), + ] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("brand new"), + inference_config=_inference(1_000_000), # huge: no new trigger + compaction_config=_compaction(), + ) + + summarize.assert_not_called() # below threshold, no new summary + assert result.compacted is True + assert result.params.omit_conversation is True + assert isinstance(result.params.input, list) + texts = [m.content for m in result.params.input] + assert texts[0].endswith("the earlier conversation summary") + assert texts[-1] == "brand new" + assert result.original_input == "brand new" + + +@pytest.mark.asyncio +async def test_triggers_summarization_and_writes_marker(mocker: MockerFixture) -> None: + """Exceeding the threshold summarizes old turns and writes a marker item.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = ConversationSummary( + summary_text="condensed earlier turns", + summarized_through_turn=2, + token_count=4, + created_at="2026-05-26T00:00:00Z", + model_used=MODEL, + ) + summarize = mocker.patch.object( + cc, "summarize_chunk", mocker.AsyncMock(return_value=summary) + ) + write_marker = mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("follow-up"), + inference_config=_inference(50), # small window forces the trigger + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + ) + + summarize.assert_awaited_once() + write_marker.assert_awaited_once() + assert result.compacted is True + assert result.params.omit_conversation is True + texts = [m.content for m in result.params.input] + assert "condensed earlier turns" in texts[0] + assert texts[-1] == "follow-up" + + +@pytest.mark.asyncio +async def test_streaming_emits_event_before_summarizing(mocker: MockerFixture) -> None: + """In streaming mode a CompactionStartedEvent precedes the summary result.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = ConversationSummary( + summary_text="condensed", + summarized_through_turn=2, + token_count=2, + created_at="2026-05-26T00:00:00Z", + model_used=MODEL, + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock(return_value=summary)) + mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + + yielded = [] + async for item in cc.apply_compaction( + client=mocker.AsyncMock(), + params=_params(), + inference_config=_inference(50), + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + emit_events=True, + ): + yielded.append(item) + + assert isinstance(yielded[0], cc.CompactionStartedEvent) + assert yielded[0].conversation_id == CONV + assert isinstance(yielded[-1], cc.CompactionResult) + assert yielded[-1].compacted is True + + +@pytest.mark.asyncio +async def test_store_compacted_turn_appends(mocker: MockerFixture) -> None: + """store_compacted_turn delegates to append_turn_items_to_conversation.""" + append = mocker.patch.object( + cc, "append_turn_items_to_conversation", mocker.AsyncMock() + ) + client = mocker.AsyncMock() + await cc.store_compacted_turn(client, CONV, "the query", ["out"]) + append.assert_awaited_once_with(client, CONV, "the query", ["out"]) + + +# --- needs_compaction_path (the tight gate protecting non-compacting requests) --- + + +@pytest.mark.asyncio +async def test_needs_compaction_path_disabled(mocker: MockerFixture) -> None: + """The gate is False (unchanged path) whenever compaction is disabled.""" + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1000), + _compaction(enabled=False), + ) + is False + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_existing_marker(mocker: MockerFixture) -> None: + """A conversation with a prior summary marker always needs the compaction path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock(return_value=[_marker("earlier"), _msg("user", "recent")]), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1_000_000), # huge window: no new trigger, but marker exists + _compaction(), + ) + is True + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_over_threshold(mocker: MockerFixture) -> None: + """A conversation over the token threshold needs the compaction path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock( + return_value=[_msg("user", "q " * 50), _msg("assistant", "a " * 50)] + ), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(50), # small window: over threshold + _compaction(threshold_ratio=0.1), + ) + is True + ) + + +@pytest.mark.asyncio +async def test_needs_compaction_path_under_threshold(mocker: MockerFixture) -> None: + """A short conversation with no marker stays on the unchanged path.""" + mocker.patch.object( + cc, + "get_all_conversation_items", + mocker.AsyncMock(return_value=[_msg("user", "hi"), _msg("assistant", "hello")]), + ) + assert ( + await cc.needs_compaction_path( + mocker.AsyncMock(), + _params(), + _inference(1_000_000), # huge window: nowhere near the threshold + _compaction(), + ) + is False + ) + + +# --- cache as source of truth + recursive fold (R3) --- + + +def _summary(text: str, *, turn: int, tokens: int, at: str) -> ConversationSummary: + """Build a ConversationSummary for cache tests.""" + return ConversationSummary( + summary_text=text, + summarized_through_turn=turn, + token_count=tokens, + created_at=at, + model_used=MODEL, + ) + + +@pytest.mark.asyncio +async def test_cache_summaries_preferred_over_markers(mocker: MockerFixture) -> None: + """When the cache returns summaries, they are used instead of marker texts.""" + items = [_marker("STALE marker text"), _msg("user", "recent q")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + cached = _summary( + "FRESH cached summary", turn=2, tokens=5, at="2026-05-26T00:00:00Z" + ) + cache = mocker.Mock() + cache.get_summaries.return_value = [cached] + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("brand new"), + inference_config=_inference(1_000_000), # huge: no new trigger + compaction_config=_compaction(), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + summarize.assert_not_called() + cache.get_summaries.assert_called_once_with("u1", CONV, False) + texts = [m.content for m in result.params.input] + assert "FRESH cached summary" in texts[0] + assert "STALE marker text" not in texts[0] + + +@pytest.mark.asyncio +async def test_store_summary_called_on_compaction(mocker: MockerFixture) -> None: + """A new summary chunk is persisted to the cache when compaction triggers.""" + items = [_msg("user", "q1 " * 50), _msg("assistant", "a1 " * 50)] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summary = _summary("condensed", turn=2, tokens=4, at="2026-05-26T00:00:00Z") + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock(return_value=summary)) + mocker.patch.object(cc, "_write_summary_marker", mocker.AsyncMock()) + cache = mocker.Mock() + cache.get_summaries.return_value = [] + + await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("follow-up"), + inference_config=_inference(50), # small window forces the trigger + compaction_config=_compaction(threshold_ratio=0.1, buffer_turns=0), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + cache.store_summary.assert_called_once() + assert cache.store_summary.call_args[0] == ("u1", CONV, summary, False) + + +@pytest.mark.asyncio +async def test_fold_when_cached_summaries_exceed_threshold( + mocker: MockerFixture, +) -> None: + """Cached summaries above the threshold are folded and the fold persisted.""" + items = [_marker("m1"), _marker("m2"), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + summarize = mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + s1 = _summary("sum one", turn=4, tokens=20, at="2026-05-26T00:00:00Z") + s2 = _summary("sum two", turn=8, tokens=20, at="2026-05-26T00:10:00Z") + folded = _summary("FOLDED one+two", turn=8, tokens=10, at="2026-05-26T00:20:00Z") + cache = mocker.Mock() + cache.get_summaries.return_value = [s1, s2] + resum = mocker.patch.object( + cc, "recursively_resummarize", mocker.AsyncMock(return_value=folded) + ) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(50), # threshold 25 < 40 summary tokens + compaction_config=_compaction(threshold_ratio=0.5, buffer_turns=5), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + summarize.assert_not_called() # estimate stays small; only the fold fires + resum.assert_awaited_once() + cache.replace_summaries.assert_called_once() + assert cache.replace_summaries.call_args[0][2] is folded + texts = [m.content for m in result.params.input] + assert "FOLDED one+two" in texts[0] + assert not any("sum one" in t for t in texts) + + +@pytest.mark.asyncio +async def test_no_fold_without_cache(mocker: MockerFixture) -> None: + """With no cache, additive marker summaries are never folded (marker mode).""" + items = [_marker("m1 " * 30), _marker("m2 " * 30), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + resum = mocker.patch.object(cc, "recursively_resummarize", mocker.AsyncMock()) + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(1_000_000), + compaction_config=_compaction(), + cache=None, + ) + + resum.assert_not_awaited() + assert result.compacted is True # still compacted via markers + + +@pytest.mark.asyncio +async def test_marker_fallback_when_cache_empty(mocker: MockerFixture) -> None: + """An empty cache falls back to the authoritative marker texts.""" + items = [_marker("marker summary text"), _msg("user", "recent")] + mocker.patch.object( + cc, "get_all_conversation_items", mocker.AsyncMock(return_value=items) + ) + mocker.patch.object(cc, "summarize_chunk", mocker.AsyncMock()) + cache = mocker.Mock() + cache.get_summaries.return_value = [] + + result = await cc.apply_compaction_blocking( + client=mocker.AsyncMock(), + params=_params("next"), + inference_config=_inference(1_000_000), + compaction_config=_compaction(), + cache=cache, + user_id="u1", + skip_user_id_check=False, + ) + + texts = [m.content for m in result.params.input] + assert "marker summary text" in texts[0] + + +def test_configured_conversation_cache_none(mocker: MockerFixture) -> None: + """configured_conversation_cache returns None when no cache is configured.""" + mock_config = mocker.patch.object(cc, "configuration") + mock_config.compaction.enabled = True + mock_config.conversation_cache_configuration.type = None + assert cc.configured_conversation_cache() is None + + +def test_configured_conversation_cache_returns_cache(mocker: MockerFixture) -> None: + """configured_conversation_cache returns the configured cache instance.""" + mock_config = mocker.patch.object(cc, "configuration") + mock_config.compaction.enabled = True + mock_config.conversation_cache_configuration.type = "sqlite" + sentinel = object() + mock_config.conversation_cache = sentinel + assert cc.configured_conversation_cache() is sentinel + + +def test_configured_conversation_cache_none_when_compaction_disabled( + mocker: MockerFixture, +) -> None: + """Returns None when compaction is disabled, without reading the cache. + + Regression guard (LCORE-1572): the cache must not be initialized on every + request when compaction is off — that 500'd e2e requests on configs whose + SQLite cache could not be opened. The stub's ``conversation_cache`` raises if + read, so the test fails on any eager access, not only on the return value. + """ + + class _ConfigStub: # pylint: disable=too-few-public-methods + """Config whose conversation_cache raises if accessed.""" + + compaction = SimpleNamespace(enabled=False) + conversation_cache_configuration = SimpleNamespace(type="sqlite") + + @property + def conversation_cache(self) -> object: + """Fail the test if the disabled path reads the cache.""" + raise AssertionError( + "conversation_cache must not be accessed when compaction is disabled" + ) + + mocker.patch.object(cc, "configuration", _ConfigStub()) + assert cc.configured_conversation_cache() is None + + +def test_estimate_response_input_tokens_counts_list_form() -> None: + """List-form ResponseInput is counted toward the estimate, not treated as zero. + + Regression guard: counting only string input would undercount list-form + input (e.g. /v1/responses) and let a request skip compaction (LCORE-1572). + """ + big = "incident detail " * 50 + string_tokens = cc._estimate_response_input_tokens(big, cc.DEFAULT_ENCODING_NAME) + list_tokens = cc._estimate_response_input_tokens( + cast(Any, [_msg("user", big)]), cc.DEFAULT_ENCODING_NAME + ) + assert string_tokens > 10 + assert list_tokens > 10