Skip to content

Improve websocket reliability per Mist best practices#26

Open
tmunzer-AIDE wants to merge 3 commits intomainfrom
0.61.6
Open

Improve websocket reliability per Mist best practices#26
tmunzer-AIDE wants to merge 3 commits intomainfrom
0.61.6

Conversation

@tmunzer-AIDE
Copy link
Copy Markdown
Collaborator

@tmunzer-AIDE tmunzer-AIDE commented Apr 24, 2026

Summary

  • decouple websocket receive loop from user message processing using a callback worker queue
  • add subscription ack tracking with watchdog and reconnect-on-subscribe-failure behavior
  • add ping/pong hooks, 429-aware reconnect backoff, throughput logging, and channel count safeguards
  • update websocket defaults/docs to ping_interval=60 and ping_timeout=45
  • update websocket unit tests for the new behavior

Validation

  • tests/unit/test_websocket_client.py
  • full unit suite (tests/unit/*): 480 passed

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the reliability and operability of the Mist WebSocket client by introducing a callback worker queue (decoupling receive from user processing), adding subscription acknowledgement tracking with a watchdog-driven reconnect strategy, and updating ping/pong handling and reconnect backoff behavior. It also updates defaults/docs (ping interval/timeout) and extends unit tests to cover the new behavior.

Changes:

  • Add callback worker queue, subscription watchdog + subscribe-failure close/reconnect, ping/pong hooks, and 429-aware reconnect delay logic in the core WebSocket client.
  • Update default ping settings to ping_interval=60 and ping_timeout=45 across docs and wrappers.
  • Expand unit tests to validate callback-worker behavior and new validations (e.g., ping interval/timeout constraint, channel limit).

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
src/mistapi/websockets/__ws_client.py Implements callback worker queue, subscription watchdog/ack tracking, ping/pong hooks, rate-limit-aware backoff, and channel safeguards.
src/mistapi/websockets/sites.py Updates wrapper defaults/docs for ping interval/timeout.
src/mistapi/websockets/orgs.py Updates wrapper defaults/docs for ping interval/timeout.
src/mistapi/websockets/location.py Updates wrapper defaults/docs for ping interval/timeout.
src/mistapi/websockets/session.py Updates wrapper defaults/docs for ping interval/timeout.
tests/unit/test_websocket_client.py Adjusts tests for callback-worker behavior, ping defaults/validation, and channel limit enforcement.
README.md Updates documented defaults and documents new WebSocket kwargs and callbacks.
Comments suppressed due to low confidence (4)

src/mistapi/websockets/sites.py:84

  • This public channel __init__ doesn’t accept/forward the new base-class kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval). Given the README now documents them as supported for channel classes, this class (and the other channel wrappers) should accept and pass them through to super().__init__ (or accept **kwargs).
    def __init__(
        self,
        mist_session: APISession,
        site_ids: list[str],
        ping_interval: int = 60,
        ping_timeout: int = 45,
        auto_reconnect: bool = False,
        max_reconnect_attempts: int = 5,
        reconnect_backoff: float = 2.0,
        max_reconnect_backoff: float | None = None,
        queue_maxsize: int = 0,
    ) -> None:

src/mistapi/websockets/orgs.py:84

  • This channel wrapper __init__ doesn’t accept/forward the new base-class kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval). If these options are intended to be part of the public API (per README), they need to be accepted here and passed to super().__init__ (or use a **kwargs passthrough).
    def __init__(
        self,
        mist_session: APISession,
        org_id: str,
        ping_interval: int = 60,
        ping_timeout: int = 45,
        auto_reconnect: bool = False,
        max_reconnect_attempts: int = 5,
        reconnect_backoff: float = 2.0,
        max_reconnect_backoff: float | None = None,
        queue_maxsize: int = 0,
    ) -> None:

src/mistapi/websockets/location.py:87

  • This channel wrapper __init__ doesn’t accept/forward the new base-class kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval). If these options are intended to be publicly configurable, add them here and forward to super().__init__ (or accept **kwargs).
    def __init__(
        self,
        mist_session: APISession,
        site_id: str,
        map_ids: list[str],
        ping_interval: int = 60,
        ping_timeout: int = 45,
        auto_reconnect: bool = False,
        max_reconnect_attempts: int = 5,
        reconnect_backoff: float = 2.0,
        max_reconnect_backoff: float | None = None,
        queue_maxsize: int = 0,
    ) -> None:

src/mistapi/websockets/session.py:93

  • SessionWithUrl.__init__ doesn’t accept/forward the new base-class kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval). If the README intends these to be supported across channel classes, this wrapper should accept and pass them through to super().__init__ (or allow **kwargs).
    def __init__(
        self,
        mist_session: APISession,
        url: str,
        ping_interval: int = 60,
        ping_timeout: int = 45,
        auto_reconnect: bool = False,
        max_reconnect_attempts: int = 5,
        reconnect_backoff: float = 2.0,
        max_reconnect_backoff: float | None = None,
        queue_maxsize: int = 0,
    ) -> None:

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/mistapi/websockets/__ws_client.py Outdated
Comment thread tests/unit/test_websocket_client.py
Comment thread src/mistapi/websockets/__ws_client.py Outdated
Comment thread src/mistapi/websockets/__ws_client.py
Comment thread README.md Outdated
Comment thread README.md
Comment thread tests/unit/test_websocket_client.py
Comment thread tests/unit/test_websocket_client.py Outdated
Comment thread src/mistapi/websockets/__ws_client.py Outdated
Comment thread src/mistapi/websockets/__ws_client.py
@tmunzer-AIDE
Copy link
Copy Markdown
Collaborator Author

Addressed the review feedback in follow-up commits.\n\nWhat was fixed:\n- Public channel wrappers now accept and forward subscription_watchdog_timeout, rate_limit_backoff, and throughput_log_interval (session/orgs/sites/location).\n- _handle_error now always refreshes _last_http_status (prevents stale 429 state).\n- _handle_close now preserves previously-set close reason details unless explicit non-empty close values are provided.\n- _handle_message now normalizes non-object JSON payloads into a dict wrapper to match callback/receive typing expectations.\n- Test cleanup now uses disconnect(wait=True, timeout=1) for callback worker shutdown in affected tests.\n- README clarified ping_timeout constraint wording (applies when ping_interval > 0) and queue_maxsize semantics (receive + callback queues).\n- Additional hardening: reduced noisy ack/ping/pong logging and made throughput counters synchronized across threads.\n\nValidation:\n- tests/unit/test_websocket_client.py: 141 passed\n- full unit suite: 484 passed

@tmunzer-AIDE tmunzer-AIDE requested review from Copilot and removed request for Copilot May 1, 2026 15:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (4)

src/mistapi/websockets/location.py:48

  • The PARAMS section here doesn’t mention the newly added reliability kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval) and the queue_maxsize description is now incomplete since queue_maxsize also applies to callback delivery. Updating the docstring would keep per-class docs aligned with the updated initializer.
    ping_interval : int, default 60
        Interval in seconds to send WebSocket ping frames (keep-alive).
    ping_timeout : int, default 45
        Time in seconds to wait for a ping response before considering the connection dead.
    auto_reconnect : bool, default False
        Automatically reconnect on unexpected disconnections using exponential backoff.
    max_reconnect_attempts : int, default 5
        Maximum number of reconnect attempts before giving up.
    reconnect_backoff : float, default 2.0
        Base backoff delay in seconds. Doubles after each failed attempt.
    max_reconnect_backoff : float | None, default None
        Maximum backoff delay in seconds. If None, backoff grows indefinitely.
    queue_maxsize : int, default 0
        Maximum number of messages buffered in the internal queue for the
        ``receive()`` generator. ``0`` means unbounded. When set,
        incoming messages are dropped with a warning when the queue is
        full, preventing memory growth on high-frequency streams.

src/mistapi/websockets/session.py:55

  • SessionWithUrl now accepts subscription_watchdog_timeout, rate_limit_backoff, and throughput_log_interval, but the PARAMS docstring doesn’t list them and still frames queue_maxsize as only for receive(). Please update the docstring to reflect the actual initializer kwargs/behavior.
    ping_interval : int, default 60
        Interval in seconds to send WebSocket ping frames (keep-alive).
    ping_timeout : int, default 45
        Time in seconds to wait for a ping response before considering the connection dead.
    auto_reconnect : bool, default False
        Automatically reconnect on unexpected disconnections using exponential backoff.
    max_reconnect_attempts : int, default 5
        Maximum number of reconnect attempts before giving up.
    reconnect_backoff : float, default 2.0
        Base backoff delay in seconds. Doubles after each failed attempt.
    max_reconnect_backoff : float | None, default None
        Maximum backoff delay in seconds. If None, backoff grows indefinitely.
    queue_maxsize : int, default 0
        Maximum number of messages buffered in the internal queue for the
        ``receive()`` generator. ``0`` means unbounded. When set,
        incoming messages are dropped with a warning when the queue is
        full, preventing memory growth on high-frequency streams.

src/mistapi/websockets/sites.py:46

  • This class’ PARAMS docstring still describes queue_maxsize as only affecting the receive() queue and doesn’t document the newly added reliability kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval). Since these kwargs are now part of the public initializer, the docstring should be updated to match the actual behavior/parameters.
    ping_interval : int, default 60
        Interval in seconds to send WebSocket ping frames (keep-alive).
    ping_timeout : int, default 45
        Time in seconds to wait for a ping response before considering the connection dead.
    auto_reconnect : bool, default False
        Automatically reconnect on unexpected disconnections using exponential backoff.
    max_reconnect_attempts : int, default 5
        Maximum number of reconnect attempts before giving up.
    reconnect_backoff : float, default 2.0
        Base backoff delay in seconds. Doubles after each failed attempt.
    max_reconnect_backoff : float | None, default None
        Maximum backoff delay in seconds. If None, backoff grows indefinitely.
    queue_maxsize : int, default 0
        Maximum number of messages buffered in the internal queue for the
        ``receive()`` generator. ``0`` means unbounded. When set,
        incoming messages are dropped with a warning when the queue is
        full, preventing memory growth on high-frequency streams.

src/mistapi/websockets/orgs.py:46

  • This class’ PARAMS docstring doesn’t document the newly added reliability kwargs (subscription_watchdog_timeout, rate_limit_backoff, throughput_log_interval) and still describes queue_maxsize as only buffering receive() messages. Please update the docstring so the public API docs match the initializer signature/behavior.
    ping_interval : int, default 60
        Interval in seconds to send WebSocket ping frames (keep-alive).
    ping_timeout : int, default 45
        Time in seconds to wait for a ping response before considering the connection dead.
    auto_reconnect : bool, default False
        Automatically reconnect on unexpected disconnections using exponential backoff.
    max_reconnect_attempts : int, default 5
        Maximum number of reconnect attempts before giving up.
    reconnect_backoff : float, default 2.0
        Base backoff delay in seconds. Doubles after each failed attempt.
    max_reconnect_backoff : float | None, default None
        Maximum backoff delay in seconds. If None, backoff grows indefinitely.
    queue_maxsize : int, default 0
        Maximum number of messages buffered in the internal queue for the
        ``receive()`` generator. ``0`` means unbounded. When set,
        incoming messages are dropped with a warning when the queue is
        full, preventing memory growth on high-frequency streams.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +276 to +283
def _start_callback_worker(self) -> None:
if self._callback_thread is not None and self._callback_thread.is_alive():
return
self._callback_stop.clear()
self._callback_thread = threading.Thread(
target=self._run_callback_worker, daemon=True
)
self._callback_thread.start()
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_start_callback_worker() always clears _callback_stop before (re)starting the worker. If a disconnect/shutdown has already set _callback_stop (or _user_disconnect), an in-flight _handle_message() can restart the callback worker and re-enable callback processing after the client is supposed to be stopping. Consider gating worker startup so it doesn’t clear the stop flag when disconnect/shutdown is in progress (e.g., early-return when _callback_stop or _user_disconnect is set, or only clear the flag from connect()).

Copilot uses AI. Check for mistakes.
Comment on lines 671 to +680
finally:
self._cancel_subscription_watchdog()
self._callback_stop.set()
try:
self._queue.put_nowait(None) # sentinel — unblocks receive()
except queue.Full:
pass # _finished.set() below will unblock receive() independently
try:
self._callback_queue.put_nowait(None) # sentinel — unblocks worker
except queue.Full:
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _run_forever_safe() shutdown, setting _callback_stop unconditionally causes the callback worker to exit immediately, potentially dropping messages already enqueued in _callback_queue (and preventing the existing “drain until _finished and queue empty” logic from working). If the intent is graceful delivery of buffered callbacks on close, avoid setting _callback_stop here and let the worker exit via _finished/sentinel after draining, or explicitly drain before stopping.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants