feat(auto-contextMemory): implement AutoContextMemory with multi-strategy intelligent context compression and offloading#1523
Conversation
…text compression and offloading
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR makes AutoContext context compression fully reactive (non-blocking), adds per-strategy enable/disable switches, and improves SYSTEM message handling so multiple SYSTEM prompts are preserved while appending the offload instruction only to the first.
Changes:
- Refactor compression pipeline to
compressIfNeededAsync()using Reactor (Mono/Flux) and deprecate the blockingcompressIfNeeded(). - Add configuration flags to enable/disable compression strategies 1–6.
- Preserve multiple SYSTEM messages in
AutoContextHookand add a unit test covering this behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 12 comments.
| File | Description |
|---|---|
| agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java | Converts compression strategies to a reactive pipeline; adds concurrency for some summarization paths; keeps a deprecated blocking wrapper. |
| agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java | Switches PreReasoning handling to the async compressor and preserves multiple SYSTEM messages. |
| agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextConfig.java | Adds per-strategy enable/disable switches and exposes them via getters + builder. |
| agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextHookTest.java | Adds test to ensure multiple SYSTEM messages are preserved and instruction is appended only to the first. |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
AgentScopeJavaBot
left a comment
There was a problem hiding this comment.
🤖 AI Review
This PR makes three valuable changes: per-strategy enable/disable flags, multi-SYSTEM message preservation in the hook, and conversion of the compression pipeline from blocking .block() calls to a Reactor chain. The refactor is comprehensive and well-documented, with new tests covering the SYSTEM-preservation and disabled-strategy cases. However, the new concurrent execution in Strategy 4 and Strategy 5 introduces a real thread-safety regression: offloadContext is a plain HashMap and is now mutated from multiple boundedElastic worker threads simultaneously. There are also a few smaller issues (loss of subscribeOn(boundedElastic) in the hook, hard-coded block(60s) in the deprecated wrapper, raw Object[] tuple in Strategy 4, misleading "exhausted" log when all strategies are disabled). Recommend changes before merge.
| offloadMsg.add(msg); | ||
| return Mono.fromRunnable( | ||
| () -> { | ||
| offload(uuid, offloadMsg); |
There was a problem hiding this comment.
[critical] Thread-safety violation. offloadContext is declared as a plain HashMap (line 98), but here offload(uuid, offloadMsg) (which calls offloadContext.put) is invoked inside a Flux.flatMap(..., concurrency=5) whose inner Mono.fromRunnable(...) is dispatched onto Schedulers.boundedElastic() (line 705). With concurrency up to 5, multiple boundedElastic worker threads execute HashMap.put on the same instance concurrently, which can cause lost entries, corrupted internal state, or (on older JDKs) infinite loops. Either switch the field to ConcurrentHashMap, run the offload step sequentially before the LLM calls (as Strategy 4 already does in summaryPreviousRoundMessages), or wrap the put in a synchronized(offloadContext) block.
|
|
||
| // Trigger compression asynchronously, then rebuild input messages | ||
| return autoContextMemory | ||
| .compressIfNeededAsync() |
There was a problem hiding this comment.
[major] The previous implementation explicitly applied .subscribeOn(Schedulers.boundedElastic()) so that all the synchronous CPU-bound work inside compressIfNeededAsync (TokenCounterUtil.calculateToken, offloadingLargePayload, ArrayList copies, etc.) ran on an elastic worker. After this refactor compressIfNeededAsync() itself does not apply any subscribeOn, and the hook also no longer applies one. If the hook chain is subscribed from a non-blocking thread (Netty event loop, Reactor parallel scheduler used by the agent runtime) the synchronous parts of the strategy chain will execute on that caller thread. Please re-add .subscribeOn(Schedulers.boundedElastic()) either inside compressIfNeededAsync() or here at the hook entry to preserve the original threading guarantee.
| */ | ||
| @Deprecated | ||
| public boolean compressIfNeeded() { | ||
| Boolean result = compressIfNeededAsync().block(java.time.Duration.ofSeconds(60)); |
There was a problem hiding this comment.
[major] The deprecated sync wrapper now calls compressIfNeededAsync().block(Duration.ofSeconds(60)). Two regressions versus the prior synchronous implementation: (1) Reactor's .block() throws IllegalStateException when invoked from a non-blocking scheduler thread (e.g., Schedulers.parallel()), so legacy callers that previously executed inline will now fail at runtime in those contexts. (2) The 60s timeout is hard-coded; on slow LLMs strategy 4/5 (which can fan out up to 5 LLM calls) may exceed it, returning false even though compression is still in flight. Consider either making the timeout configurable, propagating it from AutoContextConfig, or removing the timeout entirely (block() without argument) for behavioral parity.
| candidate -> | ||
| summaryPreviousRoundConversationAsync( | ||
| candidate.messagesToSummarize(), candidate.uuid()) | ||
| .filter( |
There was a problem hiding this comment.
[major] Strategy 4 silently filters out summaries whose text content is empty/whitespace, but the corresponding offload(uuid, messagesToSummarize) for that round has already been written to offloadContext earlier (line ~1358). When the filter drops a summary, the offload entry becomes orphaned (memory leak — never reachable via any rendered offload tag) and there is no log indicating the failure. Add a doOnDiscard/switchIfEmpty log, or call clear(uuid) for filtered-out candidates so the offload map does not grow unbounded on bad LLM responses.
| .getChatUsage() | ||
| .getTime()); | ||
| } | ||
| return new Object[] { |
There was a problem hiding this comment.
[major] Strategy 4 emits new Object[]{candidate, summaryMsg, metadata} and casts back via raw (Object[]) and (SummaryCandidate) at lines 1419-1429. This loses type safety, requires @SuppressWarnings("unchecked"), and is inconsistent with Strategy 5 (line 709) which uses reactor.util.function.Tuples.of(...). Recommend using Tuples.of(candidate, summaryMsg, metadata) (a typed Tuple3) or a small private record (you already use one for SummaryCandidate).
| applied -> { | ||
| if (!applied) | ||
| log.warn( | ||
| "All compression strategies exhausted but context still" |
There was a problem hiding this comment.
[minor] Misleading log: when all strategies are disabled by config, the chain stays at Mono.just(false) and this doOnNext logs "All compression strategies exhausted but context still exceeds threshold". "Exhausted" implies strategies ran and failed; in this case nothing ran. Distinguish disabled-everything from genuine exhaustion by checking whether any strategy was enabled before logging at WARN level (the disabled case can stay at DEBUG/INFO).
| // Step 4: Run summaries concurrently (max 5 in-flight), collect all results | ||
| int concurrency = Math.min(candidates.size(), 5); | ||
|
|
||
| return Flux.fromIterable(candidates) |
There was a problem hiding this comment.
[minor] Atomicity regression vs. the prior implementation. The old sequential code mutated rawMessages after each successful round summary, so a failure in round N still preserved the work done for rounds N+1..N+M. The new Flux.collectList() is all-or-nothing: if any one of the up to 5 concurrent summary Monos signals an error, no rawMessages mutations are applied — but every offload was already inserted into offloadContext before the Flux subscribed. Consider either applying replacements per-round inside flatMap (with proper synchronization) or using onErrorContinue/onErrorResume per candidate so a single LLM failure does not waste the work of the other rounds.
| MsgUtils.calculateMessageCharCount(msg), | ||
| uuid); | ||
| }) | ||
| .subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()) |
There was a problem hiding this comment.
[minor] Fully-qualified reactor.core.scheduler.Schedulers.boundedElastic() and reactor.util.function.Tuples.of(...) are used inline instead of importing them. The original file already imported reactor.core.publisher.Flux and reactor.core.publisher.Mono; please add import reactor.core.scheduler.Schedulers; and import reactor.util.function.Tuples; (and Tuple2 if needed) for consistency with the rest of the codebase.
AgentScope-Java Version
1.0.13-SNAPSHOT
Description
This PR solves issue #1350 by addressing structural and behavioral limitations within
AutoContextMemoryandAutoContextHook.Background & Purpose:
While extending
AutoContextMemory, several limitations restricted flexibility and latency:.block()calls, breaking the reactive chain and degrading performance under heavy compression scenarios.AutoContextHookonly preserved the firstSYSTEMmessage, accidentally wiping out any subsequent system instructions (e.g., roleplay directives alongside system prompts).Changes Made:
AutoContextConfig: Introducedstrategy1Enabledthroughstrategy6Enabled(defaulting to true) to allow fine-grained control over which compression rules execute.AutoContextMemory: Refactored the core implementation to be completely asynchronous using Spring Reactor types (MonoandFlux). All synchronous.block()wrappers were eliminated internally, and the primary entry point is nowcompressIfNeededAsync(). (The synchronouscompressIfNeeded()is kept as a@Deprecatedwrapper for backwards compatibility).AutoContextHook: ModifiedhandlePreReasoningto collect and preserve allSYSTEMmessages. The memory-offloading instruction is now correctly appended to only the firstSYSTEMmessage without discarding the others.AutoContextHookTest,AutoContextMemoryTest, andAutoContextConfigTestto verify that concurrent evaluation logic resolves correctly and that multiple system prompts remain intact.How to test this PR:
mvn clean test -pl agentscope-extensions/agentscope-extensions-autocontext-memory -am.mvn spotless:check.Checklist
Please check the following items before code is ready to be reviewed.
mvn spotless:applymvn test)