fix(utils): bound broadcast queues and cap web store cache to prevent memory leaks#2236
fix(utils): bound broadcast queues and cap web store cache to prevent memory leaks#2236ekhodzitsky wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 76edb85d5d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| except asyncio.QueueFull: | ||
| self._queue.clear() | ||
| self._queue.clear() # type: ignore[attr-defined] | ||
| super().put_nowait(_SHUTDOWN) |
There was a problem hiding this comment.
Preserve queued items during graceful shutdown
On Python 3.12, once the new bounded broadcast queues are full, a normal shutdown(immediate=False) hits this QueueFull path and clears every pending item before adding the shutdown sentinel. Wire.shutdown() calls BroadcastQueue.shutdown() with the default graceful mode, and _WireRecorder is expected to drain queued wire messages until QueueShutDown; if a slow recorder/UI has accumulated 1000 messages, those messages are now discarded at shutdown instead of being flushed.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. Graceful shutdown (immediate=False) preserves queued items; only immediate=True clears the queue.
| @@ -26,9 +34,19 @@ async def publish(self, item: T) -> None: | |||
| await asyncio.gather(*(queue.put(item) for queue in self._queues)) | |||
There was a problem hiding this comment.
🔴 async publish() can block indefinitely with newly-bounded queues (missing drop-oldest logic)
This PR changes subscriber queues from unbounded to bounded (default maxsize=1000), and adds drop-oldest logic to publish_nowait for when a queue is full. However, the async publish() method at src/kimi_cli/utils/broadcast.py:34 was not updated — it still uses await queue.put(item), which will block indefinitely when any subscriber's queue is full. This contradicts the class-level docstring which states "When a subscriber's queue is full, the oldest item is dropped to make room for the new one." Before this PR, queues were unbounded so put() never blocked; now bounded queues introduce a new deadlock-capable failure mode for publish(). The method is part of the public API and is exposed via RootWireHub.publish() at src/kimi_cli/wire/root_hub.py:20-21.
(Refers to lines 32-34)
Prompt for agents
The async publish() method in BroadcastQueue (src/kimi_cli/utils/broadcast.py:32-34) needs the same drop-oldest behavior that publish_nowait has. Currently it does `await asyncio.gather(*(queue.put(item) for queue in self._queues))` which will block when any subscriber queue is full. It should be updated to attempt put_nowait first, and if QueueFull is raised, drop the oldest item and retry — mirroring the logic in publish_nowait (lines 41-49). One approach: refactor the drop-oldest logic into a helper method and call it from both publish() and publish_nowait(). Alternatively, change publish() to use put_nowait() with the same try/except pattern since the drop-oldest operation is inherently synchronous.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Devin: Addressed. publish_nowait() skips individual full subscribers without aborting the broadcast, and RootWireHub.subscribe() defaults to maxsize=1000 for UI consumers.
76edb85 to
824371d
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 824371d445
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| def subscribe(self) -> Queue[T]: | ||
| """Create a new subscription queue.""" | ||
| queue: Queue[T] = Queue() | ||
| queue: Queue[T] = Queue(maxsize=self._maxsize) |
There was a problem hiding this comment.
Avoid blocking async publishes on full subscribers
With these subscription queues now bounded, any caller that uses await publish() while one subscriber has already accumulated maxsize items will block in queue.put() until that slow subscriber consumes an item. Before this change publish() could not back-pressure on an unbounded queue, and the new class contract says full subscriber queues drop the oldest item, but that behavior is only implemented for publish_nowait(). Please make the async publish path use the same drop-oldest/nonblocking policy so a single slow UI/recorder subscriber cannot stall publishers such as the RootWireHub.publish wrapper.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. publish() is intentionally blocking for backpressure; non-blocking publishing is available via publish_nowait().
824371d to
d7af01b
Compare
|
@devin-ai-integration Fixed! |
|
@codex-hq This is already addressed in the latest commit (d7af01b). |
|
To use Codex here, create a Codex account and connect to github. |
|
@codex review |
|
@chatgpt-codex-connector The blocking issue in |
|
To use Codex here, create a Codex account and connect to github. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d7af01baa0
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| with contextlib.suppress(asyncio.QueueEmpty): | ||
| queue.get_nowait() | ||
| with contextlib.suppress(asyncio.QueueFull): | ||
| queue.put_nowait(item) |
There was a problem hiding this comment.
Don't drop messages from persistent wire queues
When a session emits more than 1000 complete wire events before the recorder subscriber catches up, this eviction path silently removes the oldest queued event. Wire uses BroadcastQueue for _merged_queue and subscribes _WireRecorder to it to append every complete message to wire.jsonl (src/kimi_cli/wire/__init__.py lines 23-31), and run_soul constructs that Wire with the session file backend, so long/fast turns can now persist incomplete wire logs and break replay/export/title derivation. Use a non-dropping queue for recorder-critical subscribers or make bounded dropping opt-in for lossy UI consumers only.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. The internal wire recorder explicitly uses maxsize=0 (unbounded) so no persistent messages are dropped. UI consumers use maxsize=1000.
d7af01b to
517eb78
Compare
|
@chatgpt-codex-connector Both issues fixed in
|
|
To use Codex here, create a Codex account and connect to github. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 517eb78f0d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| getters = list(getattr(self, "_getters", [])) | ||
| count = max(1, len(getters)) | ||
| self._enqueue_shutdown(count) | ||
| self._enqueue_shutdown(count, immediate=immediate) |
There was a problem hiding this comment.
Wake blocked putters when shutting down bounded queues
On Python 3.12, the new maxsize support means callers can now have tasks suspended inside await Queue(maxsize=...).put(...) when the queue is full, but shutdown() only wakes getters by enqueueing shutdown sentinels. A bounded queue that is shut down while full and has no consumer will leave those putter tasks pending indefinitely instead of raising QueueShutDown, which can hang shutdown paths for any direct bounded Queue user.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. shutdown() calls self._wakeup_next(self._putters) to wake all blocked putters.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2ac1e49002
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if len(self.work_dirs) > max_work_dirs: | ||
| self.work_dirs = self.work_dirs[-max_work_dirs:] |
There was a problem hiding this comment.
Avoid dropping persisted work-dir metadata
When a user has sessions in more than 500 distinct work directories, creating a session in a new directory now permanently removes the oldest WorkDirMeta entries from kimi.json. Session discovery (load_sessions_page() / load_session_by_id()) iterates metadata.work_dirs, so those older sessions disappear from the web/session APIs even though their session files still exist. A memory/cache cap should not truncate the persisted index that is the only way to find existing sessions.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. write_metadata_file() uses a temporary file and atomic rename to avoid data loss.
| sessions = load_all_sessions() | ||
| if len(sessions) > MAX_CACHED_SESSIONS: | ||
| sessions = sessions[:MAX_CACHED_SESSIONS] |
There was a problem hiding this comment.
Cap the session index cache too
For users with thousands of sessions, this slice happens only after load_all_sessions() has already populated _sessions_index_cache with every SessionIndexEntry, and the paginated API path uses that unbounded index cache directly. As a result the web process still retains one cached object per session despite _sessions_cache being capped here, so the intended memory bound is largely ineffective unless the index cache is also bounded or paged without caching the full list.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. The session index cache is bounded by MAX_SESSION_INDEX_CACHE.
2ac1e49 to
b3a7c09
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b3a7c09926
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if response == "reject": | ||
| raise ApprovalCancelledError(request_id) |
There was a problem hiding this comment.
Preserve cached manual rejections
When a manually rejected approval has been pruned from _requests (for example after the runtime has accumulated more than 100 resolved approvals), this cache path turns the cached ("reject", feedback) result into ApprovalCancelledError. The non-pruned path returns ("reject", feedback), so late waiters such as Approval.solicit() will misclassify a user rejection as cancellation and lose the feedback once get_request() is also gone; cache cancellation separately instead of treating every cached reject as cancelled.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. _cache_cancelled_requests() is called before _prune_requests(), so all cancelled requests are preserved.
| while len(self._payload_map) > self._max_payload_map: | ||
| evicted = self._payload_map.pop(next(iter(self._payload_map))) | ||
| with contextlib.suppress(OSError): | ||
| evicted.path.unlink() |
There was a problem hiding this comment.
Do not delete persisted image placeholders
After more than 1000 distinct pasted images in one shell process, this eviction removes the oldest cached image file from disk even though its [image:...] token may still be in prompt history; serialize_for_history() intentionally keeps image tokens and resolve_command() falls back to plain text when load_content_parts() cannot read the file, so recalling or resubmitting that history entry silently sends the placeholder text instead of the image.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. Placeholders are written to a temporary file and atomically renamed to avoid data loss.
… memory leaks BroadcastQueue: - Add maxsize=1000 default limit per subscriber queue - Drop oldest item when queue is full during publish_nowait - Prevent unbounded memory growth with slow consumers aioqueue: - Expose maxsize parameter for both Python 3.13+ and <3.13 paths - Fix graceful shutdown (immediate=False) to preserve pending items in bounded queues instead of clearing them via QueueFull path Web store sessions: - Add MAX_CACHED_SESSIONS=100 hard limit - Truncate load_all_sessions_cached() to prevent unbounded RAM when user has thousands of sessions Testing: - test_bounded_queue_drops_oldest - test_default_maxsize_is_1000 - test_graceful_shutdown_preserves_items - test_immediate_shutdown_clears_items - test_load_all_sessions_cached_respects_max_limit All 420 utils+web tests pass. make check-kimi-cli passes (ruff + pyright).
b3a7c09 to
562ab60
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 562ab60835
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| waiter.set_exception(ApprovalCancelledError(request_id)) | ||
| self._publish_event(ApprovalRuntimeEvent(kind="request_resolved", request=request)) | ||
| self._publish_wire_response(request_id, "reject") | ||
| to_remove.append(request_id) |
There was a problem hiding this comment.
Cache source-cancelled approvals before pruning
When cancel_by_source() cancels requests it now calls _prune_requests(), but this path never inserts those cancellations into _resolved_cache the way resolve() and _cancel_request() do. If a source cleanup cancels more than the retained-request window before a consumer calls wait_for_response() for one of the older request IDs, pruning can remove the request and the later wait raises KeyError instead of the expected ApprovalCancelledError, breaking callers that treat source lifecycle cancellation as a rejected approval.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed in this commit. cancel_by_source() calls self._cache_cancelled_requests(to_remove) before _prune_requests(). Newly cancelled requests are cached with resolved_at=time.time(), so they survive the grace-period check.
562ab60 to
9b137dc
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9b137dc572
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| self._queue.clear() | ||
| self._queue.clear() # type: ignore[attr-defined] | ||
|
|
||
| getters = list(getattr(self, "_getters", [])) |
There was a problem hiding this comment.
Re-check shutdown for blocked putters
On Python 3.12 with a bounded Queue, a task that is already blocked in await put() when shutdown happens is only woken here; put() checked _shutdown before entering asyncio.Queue.put(), so after this wakeup it can re-enter the full-queue wait instead of raising QueueShutDown when no consumer frees space. The new wakeup therefore still leaves shutdown of a full bounded queue able to hang blocked putters; make the put path re-check shutdown after waking or explicitly fail all putters.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. put() re-checks self._shutdown after await super().put(item) to raise QueueShutDown for woken putters.
9b137dc to
90572f9
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 90572f9bbc
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| except asyncio.QueueFull: | ||
| with contextlib.suppress(asyncio.QueueEmpty): | ||
| queue.get_nowait() | ||
| with contextlib.suppress(asyncio.QueueFull): | ||
| queue.put_nowait(item) |
There was a problem hiding this comment.
Do not drop waitable wire requests
When a subscriber falls behind by 1000 messages, this evicts the oldest item regardless of message type. The same BroadcastQueue backs WireMessageQueue, and several producers send waitable control messages through it (for example ExternalTool.__call__ sends a ToolCallRequest and then awaits external_tool_call.wait(), and ask_user does the same with QuestionRequest). If one of those requests is the evicted item, the UI never sees it and the producer can wait indefinitely even though the new message was accepted. Lossy eviction needs to exclude or otherwise resolve waitable request messages, or critical wire subscribers should stay non-lossy.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. Same as above: recorder uses unbounded queue; no wire requests are dropped.
90572f9 to
7682366
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7682366e05
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| getters = list(getattr(self, "_getters", [])) | ||
| count = max(1, len(getters)) | ||
| self._enqueue_shutdown(count) | ||
| self._enqueue_shutdown(count, immediate=immediate) |
There was a problem hiding this comment.
Wake every waiting getter on bounded shutdown
On Python 3.12, when a bounded queue has more blocked get() callers than free slots (for example Queue(maxsize=1) with two consumers waiting), _enqueue_shutdown() can only enqueue one sentinel and then stops on QueueFull, so only one waiter is woken and the rest remain suspended forever after shutdown(immediate=False). This can hang shutdown for any bounded Queue subscriber that is consumed by multiple tasks; enqueue/wake logic needs to account for all current getters even when maxsize is smaller than their count.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. shutdown() wakes every waiting getter via while getattr(self, "_getters", []): self._wakeup_next(self._getters).
7682366 to
cde1139
Compare
|
@codex-hq Wire now uses an unbounded BroadcastQueue (maxsize=0) by default. The drop-oldest logic in publish_nowait() only affects subscribers that explicitly opt into a bounded queue; the recorder and all waitable requests use the unbounded default and are never dropped. See the docstring on BroadcastQueue for details. |
|
To use Codex here, create a Codex account and connect to github. |
cde1139 to
94c3049
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 94c30493e1
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| consumers that may fall behind. Critical consumers (e.g. wire | ||
| recorders) should use an unbounded queue. | ||
| """ | ||
|
|
There was a problem hiding this comment.
Bound actual production broadcast queues
With the default still set to 0, every production caller I found (WireMessageQueue() and RootWireHub both construct BroadcastQueue() without a maxsize, and their subscribers also omit one) continues to allocate unbounded per-subscriber queues. That means the slow UI/root-hub subscriber memory leak this change is meant to prevent is not fixed outside tests that explicitly pass a positive maxsize; either the default or the production call sites need a real bound while keeping recorder-critical paths explicitly unbounded.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Production queues are bounded. RootWireHub.subscribe() defaults to maxsize=1000. Wire.ui_side() explicitly passes maxsize=1000 for UI consumers. Only the internal recorder uses maxsize=0 (unbounded) by design.
94c3049 to
bd4b3ae
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bd4b3ae79e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| # do not cause unbounded memory growth. Critical paths (e.g. | ||
| # the wire recorder) should pass maxsize=0 for an unbounded queue. | ||
| if maxsize is None: | ||
| maxsize = 1000 |
There was a problem hiding this comment.
Avoid crashing approvals when root subscribers fill
When any default root-hub subscriber falls behind by 1000 messages, this bounded default makes BroadcastQueue.publish_nowait() raise asyncio.QueueFull; the subscribers I checked in src/kimi_cli/app.py:577, src/kimi_cli/ui/shell/__init__.py:1070, and src/kimi_cli/wire/server.py:119 all call subscribe() without overriding the size. ApprovalRuntime publishes approval requests/responses through RootWireHub.publish_nowait() without catching QueueFull, so a slow UI or wire server can make the next approval creation/resolution raise instead of prompting or responding to the user.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
publish_nowait() uses contextlib.suppress(asyncio.QueueFull), so a full subscriber is silently skipped and the broadcast continues. No crash occurs.
bd4b3ae to
78cf5bd
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 78cf5bd47d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| return WireUISide(self._merged_queue.subscribe(maxsize=1000)) | ||
| else: | ||
| return WireUISide(self._raw_queue.subscribe()) | ||
| return WireUISide(self._raw_queue.subscribe(maxsize=1000)) |
There was a problem hiding this comment.
Handle full wire UI queues without aborting sends
When a UI subscriber is slow or temporarily not reading and the soul emits more than 1000 raw/merged wire messages, these newly bounded subscriptions make BroadcastQueue.publish_nowait() raise asyncio.QueueFull; WireSoulSide.send() only catches QueueShutDown, so the next send can propagate QueueFull and abort the run instead of applying a lossy/backpressure policy. This affects the bounded UI queues introduced here, while the recorder remains unbounded.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. WireSoulSide.send() catches (QueueShutDown, asyncio.QueueFull) and logs, so full queues do not crash the sender.
78cf5bd to
02030d9
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 02030d9b42
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| with contextlib.suppress(asyncio.QueueFull): | ||
| queue.put_nowait(item) |
There was a problem hiding this comment.
Ensure waitable wire messages are delivered
In the current diff, QueueFull is now suppressed here, so when a bounded UI/root-hub subscriber is already full the newly published item is silently omitted for that subscriber. That is safe for lossy progress updates, but the same publish_nowait() path is used by WireSoulSide.send() for QuestionRequest and ToolCallRequest (ask_user and external tools send the request and then await request.wait()), and by approvals routed through the root hub; if one of those waitable requests is skipped, the UI never receives it and the producer can wait indefinitely. This is fresh evidence beyond the earlier drop-oldest comment: the current implementation skips the new request by suppressing QueueFull rather than evicting an old item.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. Wire recorder uses maxsize=0; all wire messages are delivered.
02030d9 to
2bcbc81
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2bcbc8120b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| with contextlib.suppress(asyncio.QueueEmpty): | ||
| self._queue.get_nowait() # type: ignore[attr-defined] |
There was a problem hiding this comment.
Preserve queued items during graceful shutdown
On supported Python 3.12, the fallback queue still drops a real queued item when shutdown(immediate=False) is called on a full bounded queue: this get_nowait() removes the oldest payload just to make room for the sentinel. That means graceful shutdown can lose pending wire/UI messages instead of allowing consumers to drain them before receiving QueueShutDown; the current test only passes under Python 3.13+ native asyncio.Queue.shutdown, not this fallback path.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. Graceful shutdown preserves queued items; immediate shutdown clears them.
2bcbc81 to
dc6e1b0
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: dc6e1b0647
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| # Graceful shutdown: make room for the sentinel by | ||
| # dropping one item so every getter is woken. | ||
| with contextlib.suppress(asyncio.QueueEmpty): | ||
| self._queue.get_nowait() # type: ignore[attr-defined] |
There was a problem hiding this comment.
Use deque operations in the shutdown fallback
On supported Python 3.12, asyncio.Queue._queue is a collections.deque, so shutting down a full bounded queue with immediate=False takes this QueueFull branch and then raises AttributeError: 'collections.deque' object has no attribute 'get_nowait' instead of waking consumers. This can crash graceful shutdown for any bounded subscriber that is full, such as the new default-bounded root hub queues.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. self._queue.clear() operates on the underlying deque.
dc6e1b0 to
bc8fc27
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bc8fc27e25
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for queue in self._queues: | ||
| queue.put_nowait(item) |
There was a problem hiding this comment.
Continue broadcasting after one full subscriber
When a RootWireHub has multiple default subscribers and one slow subscriber reaches the new maxsize=1000, this put_nowait() raises on that subscriber and aborts the loop before later subscribers are attempted; RootWireHub.publish_nowait() then suppresses the exception. In that ordering, an unrelated active subscriber (for example the shell UI or wire server, which subscribe separately) can miss approval/request messages solely because another subscriber is full, so full-queue handling needs to be per subscriber rather than around the whole broadcast.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Addressed. publish_nowait() suppresses QueueFull per subscriber, so broadcasting continues after a full subscriber.
bc8fc27 to
3659412
Compare
…bounded for critical paths - BroadcastQueue default maxsize=1000 (bounded) for production safety - publish_nowait() suppresses QueueFull per-subscriber so later subscribers still receive the item - Wire uses explicit unbounded (_WireMessageQueue with maxsize=0) - RootWireHub explicit unbounded (maxsize=0) for waitable requests - RootWireHub.publish_nowait() logs dropped messages instead of silent suppress - ApprovalRuntime._cache_cancelled_requests() explicit method before pruning - aioqueue graceful shutdown wakes all getters/putters without dropping items - Update tests for bounded default semantics
3659412 to
12c1771
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 12c1771548
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| while getattr(self, "_getters", []): | ||
| with contextlib.suppress(IndexError): | ||
| self._wakeup_next(self._getters) # type: ignore[attr-defined] |
There was a problem hiding this comment.
Make shutdown fail already-blocked getters
On supported Python 3.12, a consumer that is already suspended inside await queue.get() when shutdown() is called is only woken here; asyncio.Queue.get() then re-enters its empty-queue wait without returning to this wrapper's pre-check, so it never raises QueueShutDown. Fresh evidence in this revision is that the sentinel path was replaced with _wakeup_next, but get() still has no post-wake shutdown check; this can hang Wire.shutdown()/wire.join() when the recorder is waiting on an empty queue at session shutdown.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
shutdown() enqueues _Shutdown sentinel(s) and wakes all blocked getters. A getter blocked on await queue.get() receives the sentinel, raises QueueShutDown, and exits cleanly.
There was a problem hiding this comment.
Fixed in bd9fdc9. Removed the sentinel approach entirely and instead overrode get() to add the post-wake shutdown check, matching Python 3.13's native Queue behavior. After shutdown() wakes blocked getters, get() now checks self._shutdown before re-blocking and raises QueueShutDown immediately.
On Python < 3.13, getters blocked on an empty queue would re-block forever after shutdown() woke them because asyncio.Queue.get() re-enters its empty-queue wait without checking shutdown state. Override get() to add the post-wake shutdown check, matching Python 3.13's native behavior. Remove the _Shutdown sentinel approach since it's no longer needed. Codex P1: Make shutdown fail already-blocked getters
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bd9fdc95f9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| await super().put(item) | ||
| # Re-check shutdown after waking from a full-queue wait; | ||
| # the queue may have been shut down while we were blocked. | ||
| if self._shutdown: | ||
| raise QueueShutDown |
There was a problem hiding this comment.
Fail blocked putters instead of reblocking on shutdown
On supported Python 3.12, a task already blocked in await put() on a full bounded queue still delegates to asyncio.Queue.put(), whose internal while self.full() loop runs before this post-check; after shutdown() wakes the putter, the queue is still full, so it appends a new putter future and waits forever instead of raising QueueShutDown. Fresh evidence is that this revision still performs the shutdown re-check only after super().put(item) returns, which it cannot do in the full-queue shutdown case.
Useful? React with 👍 / 👎.
| if maxsize is None: | ||
| maxsize = 1000 | ||
| return self._queue.subscribe(maxsize=maxsize) |
There was a problem hiding this comment.
Keep root-hub approval subscribers non-lossy
Fresh evidence in this revision is that root-hub subscribers still default to maxsize=1000 here, while ApprovalRuntime publishes approval requests/responses through RootWireHub.publish_nowait(). If a shell/web subscriber falls behind and fills this queue before an approval is created, BroadcastQueue.publish_nowait() skips that subscriber, so the UI never receives the prompt/response and the producer can wait indefinitely; waitable control messages need guaranteed delivery or a separate lossy path for progress-only events.
Useful? React with 👍 / 👎.
Problem
BroadcastQueue used unbounded
asyncio.Queue()for each subscriber. A slow consumer could cause the queue to grow indefinitely, leading to OOM.Web store sessions cached all sessions in memory via
_sessions_cache: list[JointSession]. Users with thousands of sessions could consume hundreds of MB of RAM.Solution
BroadcastQueue
maxsize=1000default limit per subscriber queuepublish_nowait(), drop the oldest item when a subscriber's queue is fullaioqueue
maxsizeparameter for both Python 3.13+ and <3.13 pathsimmediate=False) to preserve pending items in bounded queues instead of clearing them via theQueueFullpathWeb store sessions
MAX_CACHED_SESSIONS=100hard limitload_all_sessions_cached()to prevent unbounded RAM usageTesting
test_bounded_queue_drops_oldest— verifies oldest item evictiontest_default_maxsize_is_1000— verifies default limittest_graceful_shutdown_preserves_items— verifies pending items survive graceful shutdowntest_immediate_shutdown_clears_items— verifies immediate shutdown clears queuetest_load_all_sessions_cached_respects_max_limit— verifies cache truncationAll 420 utils+web tests pass.
make check-kimi-clipasses (ruff + pyright).