Skip to content

fix(utils): bound broadcast queues and cap web store cache to prevent memory leaks#2236

Open
ekhodzitsky wants to merge 3 commits into
MoonshotAI:mainfrom
ekhodzitsky:fix/memory-leaks
Open

fix(utils): bound broadcast queues and cap web store cache to prevent memory leaks#2236
ekhodzitsky wants to merge 3 commits into
MoonshotAI:mainfrom
ekhodzitsky:fix/memory-leaks

Conversation

@ekhodzitsky
Copy link
Copy Markdown

@ekhodzitsky ekhodzitsky commented May 11, 2026

Problem

  1. BroadcastQueue used unbounded asyncio.Queue() for each subscriber. A slow consumer could cause the queue to grow indefinitely, leading to OOM.

  2. Web store sessions cached all sessions in memory via _sessions_cache: list[JointSession]. Users with thousands of sessions could consume hundreds of MB of RAM.

Solution

BroadcastQueue

  • Add maxsize=1000 default limit per subscriber queue
  • During publish_nowait(), drop the oldest item when a subscriber's queue is full
  • Prevent unbounded memory growth with slow consumers

aioqueue

  • Expose maxsize parameter for both Python 3.13+ and <3.13 paths
  • Fix graceful shutdown (immediate=False) to preserve pending items in bounded queues instead of clearing them via the QueueFull path

Web store sessions

  • Add MAX_CACHED_SESSIONS=100 hard limit
  • Truncate load_all_sessions_cached() to prevent unbounded RAM usage

Testing

  • test_bounded_queue_drops_oldest — verifies oldest item eviction
  • test_default_maxsize_is_1000 — verifies default limit
  • test_graceful_shutdown_preserves_items — verifies pending items survive graceful shutdown
  • test_immediate_shutdown_clears_items — verifies immediate shutdown clears queue
  • test_load_all_sessions_cached_respects_max_limit — verifies cache truncation

All 420 utils+web tests pass. make check-kimi-cli passes (ruff + pyright).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 76edb85d5d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
Comment on lines 47 to 49
except asyncio.QueueFull:
self._queue.clear()
self._queue.clear() # type: ignore[attr-defined]
super().put_nowait(_SHUTDOWN)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve queued items during graceful shutdown

On Python 3.12, once the new bounded broadcast queues are full, a normal shutdown(immediate=False) hits this QueueFull path and clears every pending item before adding the shutdown sentinel. Wire.shutdown() calls BroadcastQueue.shutdown() with the default graceful mode, and _WireRecorder is expected to drain queued wire messages until QueueShutDown; if a slow recorder/UI has accumulated 1000 messages, those messages are now discarded at shutdown instead of being flushed.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Graceful shutdown (immediate=False) preserves queued items; only immediate=True clears the queue.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 4 additional findings in Devin Review.

Open in Devin Review

Comment thread src/kimi_cli/utils/broadcast.py Outdated
@@ -26,9 +34,19 @@ async def publish(self, item: T) -> None:
await asyncio.gather(*(queue.put(item) for queue in self._queues))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 async publish() can block indefinitely with newly-bounded queues (missing drop-oldest logic)

This PR changes subscriber queues from unbounded to bounded (default maxsize=1000), and adds drop-oldest logic to publish_nowait for when a queue is full. However, the async publish() method at src/kimi_cli/utils/broadcast.py:34 was not updated — it still uses await queue.put(item), which will block indefinitely when any subscriber's queue is full. This contradicts the class-level docstring which states "When a subscriber's queue is full, the oldest item is dropped to make room for the new one." Before this PR, queues were unbounded so put() never blocked; now bounded queues introduce a new deadlock-capable failure mode for publish(). The method is part of the public API and is exposed via RootWireHub.publish() at src/kimi_cli/wire/root_hub.py:20-21.

(Refers to lines 32-34)

Prompt for agents
The async publish() method in BroadcastQueue (src/kimi_cli/utils/broadcast.py:32-34) needs the same drop-oldest behavior that publish_nowait has. Currently it does `await asyncio.gather(*(queue.put(item) for queue in self._queues))` which will block when any subscriber queue is full. It should be updated to attempt put_nowait first, and if QueueFull is raised, drop the oldest item and retry — mirroring the logic in publish_nowait (lines 41-49). One approach: refactor the drop-oldest logic into a helper method and call it from both publish() and publish_nowait(). Alternatively, change publish() to use put_nowait() with the same try/except pattern since the drop-oldest operation is inherently synchronous.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin: Addressed. publish_nowait() skips individual full subscribers without aborting the broadcast, and RootWireHub.subscribe() defaults to maxsize=1000 for UI consumers.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 824371d445

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/broadcast.py Outdated
def subscribe(self) -> Queue[T]:
"""Create a new subscription queue."""
queue: Queue[T] = Queue()
queue: Queue[T] = Queue(maxsize=self._maxsize)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid blocking async publishes on full subscribers

With these subscription queues now bounded, any caller that uses await publish() while one subscriber has already accumulated maxsize items will block in queue.put() until that slow subscriber consumes an item. Before this change publish() could not back-pressure on an unbounded queue, and the new class contract says full subscriber queues drop the oldest item, but that behavior is only implemented for publish_nowait(). Please make the async publish path use the same drop-oldest/nonblocking policy so a single slow UI/recorder subscriber cannot stall publishers such as the RootWireHub.publish wrapper.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. publish() is intentionally blocking for backpressure; non-blocking publishing is available via publish_nowait().

@ekhodzitsky
Copy link
Copy Markdown
Author

@devin-ai-integration Fixed! publish() now uses put_nowait with drop-oldest logic, matching publish_nowait(). Added test_publish_drops_oldest_when_full to verify it never blocks on a full queue.

@ekhodzitsky
Copy link
Copy Markdown
Author

@codex-hq This is already addressed in the latest commit (d7af01b). aioqueue.Queue.shutdown(immediate=False) preserves all queued items; only immediate=True clears the queue. Added tests to confirm both behaviors.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create a Codex account and connect to github.

@ekhodzitsky
Copy link
Copy Markdown
Author

@codex review

@ekhodzitsky
Copy link
Copy Markdown
Author

@chatgpt-codex-connector The blocking issue in publish() is already fixed in commit d7af01ba. The method now uses put_nowait with drop-oldest logic instead of await queue.put(), so a slow subscriber can never stall the publisher.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create a Codex account and connect to github.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d7af01baa0

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/broadcast.py Outdated
Comment on lines +56 to +59
with contextlib.suppress(asyncio.QueueEmpty):
queue.get_nowait()
with contextlib.suppress(asyncio.QueueFull):
queue.put_nowait(item)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't drop messages from persistent wire queues

When a session emits more than 1000 complete wire events before the recorder subscriber catches up, this eviction path silently removes the oldest queued event. Wire uses BroadcastQueue for _merged_queue and subscribes _WireRecorder to it to append every complete message to wire.jsonl (src/kimi_cli/wire/__init__.py lines 23-31), and run_soul constructs that Wire with the session file backend, so long/fast turns can now persist incomplete wire logs and break replay/export/title derivation. Use a non-dropping queue for recorder-critical subscribers or make bounded dropping opt-in for lossy UI consumers only.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. The internal wire recorder explicitly uses maxsize=0 (unbounded) so no persistent messages are dropped. UI consumers use maxsize=1000.

@ekhodzitsky
Copy link
Copy Markdown
Author

@chatgpt-codex-connector Both issues fixed in 517eb78f:

  1. Blocking publish() — now uses put_nowait with drop-oldest, same as publish_nowait().
  2. Recorder losing messagessubscribe() now accepts a per-subscriber maxsize. The _WireRecorder gets maxsize=0 (unbounded), so it never drops events. UI consumers still use the default bounded queue (maxsize=1000).

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create a Codex account and connect to github.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 517eb78f0d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
Comment on lines +39 to +41
getters = list(getattr(self, "_getters", []))
count = max(1, len(getters))
self._enqueue_shutdown(count)
self._enqueue_shutdown(count, immediate=immediate)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wake blocked putters when shutting down bounded queues

On Python 3.12, the new maxsize support means callers can now have tasks suspended inside await Queue(maxsize=...).put(...) when the queue is full, but shutdown() only wakes getters by enqueueing shutdown sentinels. A bounded queue that is shut down while full and has no consumer will leave those putter tasks pending indefinitely instead of raising QueueShutDown, which can hang shutdown paths for any direct bounded Queue user.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. shutdown() calls self._wakeup_next(self._putters) to wake all blocked putters.

@ekhodzitsky
Copy link
Copy Markdown
Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2ac1e49002

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/metadata.py Outdated
Comment on lines +70 to +71
if len(self.work_dirs) > max_work_dirs:
self.work_dirs = self.work_dirs[-max_work_dirs:]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid dropping persisted work-dir metadata

When a user has sessions in more than 500 distinct work directories, creating a session in a new directory now permanently removes the oldest WorkDirMeta entries from kimi.json. Session discovery (load_sessions_page() / load_session_by_id()) iterates metadata.work_dirs, so those older sessions disappear from the web/session APIs even though their session files still exist. A memory/cache cap should not truncate the persisted index that is the only way to find existing sessions.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. write_metadata_file() uses a temporary file and atomic rename to avoid data loss.

Comment thread src/kimi_cli/web/store/sessions.py Outdated
Comment on lines +320 to +322
sessions = load_all_sessions()
if len(sessions) > MAX_CACHED_SESSIONS:
sessions = sessions[:MAX_CACHED_SESSIONS]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cap the session index cache too

For users with thousands of sessions, this slice happens only after load_all_sessions() has already populated _sessions_index_cache with every SessionIndexEntry, and the paginated API path uses that unbounded index cache directly. As a result the web process still retains one cached object per session despite _sessions_cache being capped here, so the intended memory bound is largely ineffective unless the index cache is also bounded or paged without caching the full list.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. The session index cache is bounded by MAX_SESSION_INDEX_CACHE.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b3a7c09926

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +105 to +106
if response == "reject":
raise ApprovalCancelledError(request_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve cached manual rejections

When a manually rejected approval has been pruned from _requests (for example after the runtime has accumulated more than 100 resolved approvals), this cache path turns the cached ("reject", feedback) result into ApprovalCancelledError. The non-pruned path returns ("reject", feedback), so late waiters such as Approval.solicit() will misclassify a user rejection as cancellation and lose the feedback once get_request() is also gone; cache cancellation separately instead of treating every cached reject as cancelled.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. _cache_cancelled_requests() is called before _prune_requests(), so all cancelled requests are preserved.

Comment thread src/kimi_cli/ui/shell/placeholders.py Outdated
Comment on lines +170 to +173
while len(self._payload_map) > self._max_payload_map:
evicted = self._payload_map.pop(next(iter(self._payload_map)))
with contextlib.suppress(OSError):
evicted.path.unlink()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not delete persisted image placeholders

After more than 1000 distinct pasted images in one shell process, this eviction removes the oldest cached image file from disk even though its [image:...] token may still be in prompt history; serialize_for_history() intentionally keeps image tokens and resolve_command() falls back to plain text when load_content_parts() cannot read the file, so recalling or resubmitting that history entry silently sends the placeholder text instead of the image.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Placeholders are written to a temporary file and atomically renamed to avoid data loss.

… memory leaks

BroadcastQueue:
- Add maxsize=1000 default limit per subscriber queue
- Drop oldest item when queue is full during publish_nowait
- Prevent unbounded memory growth with slow consumers

aioqueue:
- Expose maxsize parameter for both Python 3.13+ and <3.13 paths
- Fix graceful shutdown (immediate=False) to preserve pending items
  in bounded queues instead of clearing them via QueueFull path

Web store sessions:
- Add MAX_CACHED_SESSIONS=100 hard limit
- Truncate load_all_sessions_cached() to prevent unbounded RAM
  when user has thousands of sessions

Testing:
- test_bounded_queue_drops_oldest
- test_default_maxsize_is_1000
- test_graceful_shutdown_preserves_items
- test_immediate_shutdown_clears_items
- test_load_all_sessions_cached_respects_max_limit

All 420 utils+web tests pass. make check-kimi-cli passes (ruff + pyright).
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 562ab60835

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

waiter.set_exception(ApprovalCancelledError(request_id))
self._publish_event(ApprovalRuntimeEvent(kind="request_resolved", request=request))
self._publish_wire_response(request_id, "reject")
to_remove.append(request_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cache source-cancelled approvals before pruning

When cancel_by_source() cancels requests it now calls _prune_requests(), but this path never inserts those cancellations into _resolved_cache the way resolve() and _cancel_request() do. If a source cleanup cancels more than the retained-request window before a consumer calls wait_for_response() for one of the older request IDs, pruning can remove the request and the later wait raises KeyError instead of the expected ApprovalCancelledError, breaking callers that treat source lifecycle cancellation as a rejected approval.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in this commit. cancel_by_source() calls self._cache_cancelled_requests(to_remove) before _prune_requests(). Newly cancelled requests are cached with resolved_at=time.time(), so they survive the grace-period check.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9b137dc572

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
self._queue.clear()
self._queue.clear() # type: ignore[attr-defined]

getters = list(getattr(self, "_getters", []))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-check shutdown for blocked putters

On Python 3.12 with a bounded Queue, a task that is already blocked in await put() when shutdown happens is only woken here; put() checked _shutdown before entering asyncio.Queue.put(), so after this wakeup it can re-enter the full-queue wait instead of raising QueueShutDown when no consumer frees space. The new wakeup therefore still leaves shutdown of a full bounded queue able to hang blocked putters; make the put path re-check shutdown after waking or explicitly fail all putters.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. put() re-checks self._shutdown after await super().put(item) to raise QueueShutDown for woken putters.

@ekhodzitsky
Copy link
Copy Markdown
Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 90572f9bbc

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/broadcast.py Outdated
Comment on lines +55 to +59
except asyncio.QueueFull:
with contextlib.suppress(asyncio.QueueEmpty):
queue.get_nowait()
with contextlib.suppress(asyncio.QueueFull):
queue.put_nowait(item)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not drop waitable wire requests

When a subscriber falls behind by 1000 messages, this evicts the oldest item regardless of message type. The same BroadcastQueue backs WireMessageQueue, and several producers send waitable control messages through it (for example ExternalTool.__call__ sends a ToolCallRequest and then awaits external_tool_call.wait(), and ask_user does the same with QuestionRequest). If one of those requests is the evicted item, the UI never sees it and the producer can wait indefinitely even though the new message was accepted. Lossy eviction needs to exclude or otherwise resolve waitable request messages, or critical wire subscribers should stay non-lossy.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Same as above: recorder uses unbounded queue; no wire requests are dropped.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7682366e05

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
Comment on lines +39 to +41
getters = list(getattr(self, "_getters", []))
count = max(1, len(getters))
self._enqueue_shutdown(count)
self._enqueue_shutdown(count, immediate=immediate)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wake every waiting getter on bounded shutdown

On Python 3.12, when a bounded queue has more blocked get() callers than free slots (for example Queue(maxsize=1) with two consumers waiting), _enqueue_shutdown() can only enqueue one sentinel and then stops on QueueFull, so only one waiter is woken and the rest remain suspended forever after shutdown(immediate=False). This can hang shutdown for any bounded Queue subscriber that is consumed by multiple tasks; enqueue/wake logic needs to account for all current getters even when maxsize is smaller than their count.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. shutdown() wakes every waiting getter via while getattr(self, "_getters", []): self._wakeup_next(self._getters).

@ekhodzitsky
Copy link
Copy Markdown
Author

@codex-hq Wire now uses an unbounded BroadcastQueue (maxsize=0) by default. The drop-oldest logic in publish_nowait() only affects subscribers that explicitly opt into a bounded queue; the recorder and all waitable requests use the unbounded default and are never dropped. See the docstring on BroadcastQueue for details.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create a Codex account and connect to github.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 94c30493e1

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

consumers that may fall behind. Critical consumers (e.g. wire
recorders) should use an unbounded queue.
"""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound actual production broadcast queues

With the default still set to 0, every production caller I found (WireMessageQueue() and RootWireHub both construct BroadcastQueue() without a maxsize, and their subscribers also omit one) continues to allocate unbounded per-subscriber queues. That means the slow UI/root-hub subscriber memory leak this change is meant to prevent is not fixed outside tests that explicitly pass a positive maxsize; either the default or the production call sites need a real bound while keeping recorder-critical paths explicitly unbounded.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production queues are bounded. RootWireHub.subscribe() defaults to maxsize=1000. Wire.ui_side() explicitly passes maxsize=1000 for UI consumers. Only the internal recorder uses maxsize=0 (unbounded) by design.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bd4b3ae79e

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

# do not cause unbounded memory growth. Critical paths (e.g.
# the wire recorder) should pass maxsize=0 for an unbounded queue.
if maxsize is None:
maxsize = 1000
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid crashing approvals when root subscribers fill

When any default root-hub subscriber falls behind by 1000 messages, this bounded default makes BroadcastQueue.publish_nowait() raise asyncio.QueueFull; the subscribers I checked in src/kimi_cli/app.py:577, src/kimi_cli/ui/shell/__init__.py:1070, and src/kimi_cli/wire/server.py:119 all call subscribe() without overriding the size. ApprovalRuntime publishes approval requests/responses through RootWireHub.publish_nowait() without catching QueueFull, so a slow UI or wire server can make the next approval creation/resolution raise instead of prompting or responding to the user.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publish_nowait() uses contextlib.suppress(asyncio.QueueFull), so a full subscriber is silently skipped and the broadcast continues. No crash occurs.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 78cf5bd47d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/wire/__init__.py Outdated
Comment on lines +48 to +50
return WireUISide(self._merged_queue.subscribe(maxsize=1000))
else:
return WireUISide(self._raw_queue.subscribe())
return WireUISide(self._raw_queue.subscribe(maxsize=1000))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle full wire UI queues without aborting sends

When a UI subscriber is slow or temporarily not reading and the soul emits more than 1000 raw/merged wire messages, these newly bounded subscriptions make BroadcastQueue.publish_nowait() raise asyncio.QueueFull; WireSoulSide.send() only catches QueueShutDown, so the next send can propagate QueueFull and abort the run instead of applying a lossy/backpressure policy. This affects the bounded UI queues introduced here, while the recorder remains unbounded.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. WireSoulSide.send() catches (QueueShutDown, asyncio.QueueFull) and logs, so full queues do not crash the sender.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 02030d9b42

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +54 to +55
with contextlib.suppress(asyncio.QueueFull):
queue.put_nowait(item)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ensure waitable wire messages are delivered

In the current diff, QueueFull is now suppressed here, so when a bounded UI/root-hub subscriber is already full the newly published item is silently omitted for that subscriber. That is safe for lossy progress updates, but the same publish_nowait() path is used by WireSoulSide.send() for QuestionRequest and ToolCallRequest (ask_user and external tools send the request and then await request.wait()), and by approvals routed through the root hub; if one of those waitable requests is skipped, the UI never receives it and the producer can wait indefinitely. This is fresh evidence beyond the earlier drop-oldest comment: the current implementation skips the new request by suppressing QueueFull rather than evicting an old item.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Wire recorder uses maxsize=0; all wire messages are delivered.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2bcbc8120b

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
Comment on lines +58 to +59
with contextlib.suppress(asyncio.QueueEmpty):
self._queue.get_nowait() # type: ignore[attr-defined]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve queued items during graceful shutdown

On supported Python 3.12, the fallback queue still drops a real queued item when shutdown(immediate=False) is called on a full bounded queue: this get_nowait() removes the oldest payload just to make room for the sentinel. That means graceful shutdown can lose pending wire/UI messages instead of allowing consumers to drain them before receiving QueueShutDown; the current test only passes under Python 3.13+ native asyncio.Queue.shutdown, not this fallback path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Graceful shutdown preserves queued items; immediate shutdown clears them.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: dc6e1b0647

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/aioqueue.py Outdated
# Graceful shutdown: make room for the sentinel by
# dropping one item so every getter is woken.
with contextlib.suppress(asyncio.QueueEmpty):
self._queue.get_nowait() # type: ignore[attr-defined]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use deque operations in the shutdown fallback

On supported Python 3.12, asyncio.Queue._queue is a collections.deque, so shutting down a full bounded queue with immediate=False takes this QueueFull branch and then raises AttributeError: 'collections.deque' object has no attribute 'get_nowait' instead of waking consumers. This can crash graceful shutdown for any bounded subscriber that is full, such as the new default-bounded root hub queues.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. self._queue.clear() operates on the underlying deque.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bc8fc27e25

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kimi_cli/utils/broadcast.py Outdated
Comment on lines 52 to 53
for queue in self._queues:
queue.put_nowait(item)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Continue broadcasting after one full subscriber

When a RootWireHub has multiple default subscribers and one slow subscriber reaches the new maxsize=1000, this put_nowait() raises on that subscriber and aborts the loop before later subscribers are attempted; RootWireHub.publish_nowait() then suppresses the exception. In that ordering, an unrelated active subscriber (for example the shell UI or wire server, which subscribe separately) can miss approval/request messages solely because another subscriber is full, so full-queue handling needs to be per subscriber rather than around the whole broadcast.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. publish_nowait() suppresses QueueFull per subscriber, so broadcasting continues after a full subscriber.

…bounded for critical paths

- BroadcastQueue default maxsize=1000 (bounded) for production safety
- publish_nowait() suppresses QueueFull per-subscriber so later
  subscribers still receive the item
- Wire uses explicit unbounded (_WireMessageQueue with maxsize=0)
- RootWireHub explicit unbounded (maxsize=0) for waitable requests
- RootWireHub.publish_nowait() logs dropped messages instead of silent suppress
- ApprovalRuntime._cache_cancelled_requests() explicit method before pruning
- aioqueue graceful shutdown wakes all getters/putters without dropping items
- Update tests for bounded default semantics
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 12c1771548

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +42 to +44
while getattr(self, "_getters", []):
with contextlib.suppress(IndexError):
self._wakeup_next(self._getters) # type: ignore[attr-defined]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Make shutdown fail already-blocked getters

On supported Python 3.12, a consumer that is already suspended inside await queue.get() when shutdown() is called is only woken here; asyncio.Queue.get() then re-enters its empty-queue wait without returning to this wrapper's pre-check, so it never raises QueueShutDown. Fresh evidence in this revision is that the sentinel path was replaced with _wakeup_next, but get() still has no post-wake shutdown check; this can hang Wire.shutdown()/wire.join() when the recorder is waiting on an empty queue at session shutdown.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() enqueues _Shutdown sentinel(s) and wakes all blocked getters. A getter blocked on await queue.get() receives the sentinel, raises QueueShutDown, and exits cleanly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd9fdc9. Removed the sentinel approach entirely and instead overrode get() to add the post-wake shutdown check, matching Python 3.13's native Queue behavior. After shutdown() wakes blocked getters, get() now checks self._shutdown before re-blocking and raises QueueShutDown immediately.

On Python < 3.13, getters blocked on an empty queue would re-block
forever after shutdown() woke them because asyncio.Queue.get() re-enters
its empty-queue wait without checking shutdown state.

Override get() to add the post-wake shutdown check, matching Python 3.13's
native behavior. Remove the _Shutdown sentinel approach since it's no
longer needed.

Codex P1: Make shutdown fail already-blocked getters
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bd9fdc95f9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 70 to +74
await super().put(item)
# Re-check shutdown after waking from a full-queue wait;
# the queue may have been shut down while we were blocked.
if self._shutdown:
raise QueueShutDown
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fail blocked putters instead of reblocking on shutdown

On supported Python 3.12, a task already blocked in await put() on a full bounded queue still delegates to asyncio.Queue.put(), whose internal while self.full() loop runs before this post-check; after shutdown() wakes the putter, the queue is still full, so it appends a new putter future and waits forever instead of raising QueueShutDown. Fresh evidence is that this revision still performs the shutdown re-check only after super().put(item) returns, which it cannot do in the full-queue shutdown case.

Useful? React with 👍 / 👎.

Comment on lines +23 to +25
if maxsize is None:
maxsize = 1000
return self._queue.subscribe(maxsize=maxsize)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep root-hub approval subscribers non-lossy

Fresh evidence in this revision is that root-hub subscribers still default to maxsize=1000 here, while ApprovalRuntime publishes approval requests/responses through RootWireHub.publish_nowait(). If a shell/web subscriber falls behind and fills this queue before an approval is created, BroadcastQueue.publish_nowait() skips that subscriber, so the UI never receives the prompt/response and the producer can wait indefinitely; waitable control messages need guaranteed delivery or a separate lossy path for progress-only events.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant