From 96f8b1bb3cc03addf91e31374aed054c3295619f Mon Sep 17 00:00:00 2001 From: Karthic Raghupathi Date: Wed, 24 Jun 2026 19:58:50 -0400 Subject: [PATCH 1/5] Stop monitor_connection thread from crashing on dropped connection (#3) When Asterisk stops, the monitor thread's periodic Ping fails: send_action raises ManagerSocketError. Nothing caught it, so the exception propagated out of the thread target and dumped a traceback to stderr while the monitor died silently. Catch ManagerSocketError in the ping loop and break, mirroring how _MessageReader.run already handles a dead socket. Also return the monitoring thread so callers can join it, which makes the behavior testable without relying on sleep timing. Add a regression test that drives a monitor whose send_action raises and asserts the thread exits cleanly with no unhandled exception. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 3 +++ pystrix/ami/ami.py | 14 ++++++++++++- tests/test_ami_monitor.py | 43 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 tests/test_ami_monitor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 17a8fb8..53ac41a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. A broken socket during the periodic Ping raised `ManagerSocketError` inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches it and stops cleanly. The method also returns the monitoring thread so callers can join it (#3). + ## [1.3.0] - 2026-06-24 ### Added diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index 7722146..9fa88ec 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -458,13 +458,24 @@ def monitor_connection(self, interval=2.5): `interval` is the number of seconds to wait between automated Pings to see if Asterisk is still alive; defaults to 2.5. + + Returns the monitoring `threading.Thread`, which a caller may join. The thread stops on + its own when the connection drops: a broken socket raises `ManagerSocketError` during a + Ping, which the monitor catches to exit cleanly rather than dying with an unhandled + traceback. """ def _monitor_connection(): from pystrix.ami import core while self.is_connected(): - self.send_action(core.Ping()) + try: + self.send_action(core.Ping()) + except ManagerSocketError: + # The socket died between the connection check and the ping + # (for example, Asterisk stopped). Nothing can be reported, + # so the monitor stops cleanly instead of crashing the thread. + break time.sleep(interval) monitor = threading.Thread( @@ -472,6 +483,7 @@ def _monitor_connection(): ) monitor.daemon = True monitor.start() + return monitor def _compile_callback_definition(self, event, function): """ diff --git a/tests/test_ami_monitor.py b/tests/test_ami_monitor.py new file mode 100644 index 0000000..261dc2e --- /dev/null +++ b/tests/test_ami_monitor.py @@ -0,0 +1,43 @@ +"""Tests for the AMI connection monitor (`Manager.monitor_connection`).""" + +import threading + +from pystrix.ami.ami import Manager, ManagerSocketError + + +def _bare_manager(send_action): + # Bypass __init__ so no real socket or reader thread is created. The monitor + # only touches is_connected() and send_action(), so those are all we supply. + manager = Manager.__new__(Manager) + manager.is_connected = lambda: True + manager.send_action = send_action + # The manager was never connected, so its __del__ cleanup has nothing real + # to release; neutralize it to keep garbage collection quiet during tests. + manager.close = lambda: None + return manager + + +def test_monitor_connection_survives_socket_error(): + # Regression for #3: when Asterisk stops, the periodic Ping's send_action + # raises ManagerSocketError. The monitor thread must exit cleanly instead of + # dying with an unhandled exception (a traceback dumped to stderr). + reached = threading.Event() + + def send_action(request, *args, **kwargs): + reached.set() + raise ManagerSocketError("Asterisk service stopped") + + manager = _bare_manager(send_action) + + unhandled = [] + previous_hook = threading.excepthook + threading.excepthook = lambda args: unhandled.append(args) + try: + monitor = manager.monitor_connection(interval=0) + monitor.join(timeout=2) + finally: + threading.excepthook = previous_hook + + assert reached.is_set() # the monitor actually attempted a ping + assert not monitor.is_alive() # the thread terminated + assert unhandled == [] # nothing escaped the thread uncaught From cadc9deebd34f8fa2aa420ce68e11a630a7bc23d Mon Sep 17 00:00:00 2001 From: Karthic Raghupathi Date: Wed, 24 Jun 2026 20:13:58 -0400 Subject: [PATCH 2/5] Also catch ManagerError race and log monitor exit (#3 review) Review (Codex) found the catch was too narrow. The monitor checks is_connected(), then send_action(Ping()) runs its own liveness re-check and raises ManagerError -- a sibling of ManagerSocketError, not a subclass -- if the connection dropped in between. That race re-introduced the #3 crash via an uncaught ManagerError. Catch both ManagerError and ManagerSocketError, and log the reason at debug level when a logger is set, so a stopped monitor thread is traceable. Tests: add the ManagerError race path, the normal ping-until-disconnected exit path (which also pins that a joinable thread is returned), and a guard that an unexpected non-connection error still propagates rather than being swallowed by a too-broad except. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 2 +- pystrix/ami/ami.py | 19 +++++---- tests/test_ami_monitor.py | 86 +++++++++++++++++++++++++++++++++------ 3 files changed, 86 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53ac41a..10e3ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] ### Fixed -- `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. A broken socket during the periodic Ping raised `ManagerSocketError` inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches it and stops cleanly. The method also returns the monitoring thread so callers can join it (#3). +- `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. Pinging a downed connection raised `ManagerSocketError` (broken socket) or `ManagerError` (the liveness check inside `send_action` failing when the connection dropped just after the loop's own check) inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches both and stops cleanly, logging the reason at debug level when a logger is set. The method also returns the monitoring thread so callers can join it (#3). ## [1.3.0] - 2026-06-24 diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index 9fa88ec..d383061 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -460,9 +460,10 @@ def monitor_connection(self, interval=2.5): is still alive; defaults to 2.5. Returns the monitoring `threading.Thread`, which a caller may join. The thread stops on - its own when the connection drops: a broken socket raises `ManagerSocketError` during a - Ping, which the monitor catches to exit cleanly rather than dying with an unhandled - traceback. + its own when the connection drops: pinging a downed connection raises `ManagerSocketError` + (broken socket) or `ManagerError` (the liveness re-check inside `send_action` fails when + the connection dropped just after the loop's own check), and the monitor catches both to + exit cleanly rather than dying with an unhandled traceback. """ def _monitor_connection(): @@ -471,10 +472,14 @@ def _monitor_connection(): while self.is_connected(): try: self.send_action(core.Ping()) - except ManagerSocketError: - # The socket died between the connection check and the ping - # (for example, Asterisk stopped). Nothing can be reported, - # so the monitor stops cleanly instead of crashing the thread. + except (ManagerError, ManagerSocketError) as exc: + # The connection dropped between the loop's check and the ping: + # either the socket broke (ManagerSocketError) or send_action's + # own liveness check failed (ManagerError, e.g. Asterisk stopped). + # Nothing can be reported, so the monitor stops cleanly instead of + # crashing the thread. Record why it left, when a logger is set. + if self._logger: + self._logger.debug("AMI connection monitor stopping: %s" % exc) break time.sleep(interval) diff --git a/tests/test_ami_monitor.py b/tests/test_ami_monitor.py index 261dc2e..115f0de 100644 --- a/tests/test_ami_monitor.py +++ b/tests/test_ami_monitor.py @@ -2,14 +2,14 @@ import threading -from pystrix.ami.ami import Manager, ManagerSocketError +from pystrix.ami.ami import Manager, ManagerError, ManagerSocketError -def _bare_manager(send_action): +def _bare_manager(send_action, is_connected=None): # Bypass __init__ so no real socket or reader thread is created. The monitor # only touches is_connected() and send_action(), so those are all we supply. manager = Manager.__new__(Manager) - manager.is_connected = lambda: True + manager.is_connected = is_connected or (lambda: True) manager.send_action = send_action # The manager was never connected, so its __del__ cleanup has nothing real # to release; neutralize it to keep garbage collection quiet during tests. @@ -17,6 +17,20 @@ def _bare_manager(send_action): return manager +def _run_monitor(manager, interval=0): + # Run the monitor to completion, capturing anything that escapes the thread + # uncaught via threading.excepthook. Returns (monitor_thread, unhandled). + unhandled = [] + previous_hook = threading.excepthook + threading.excepthook = lambda args: unhandled.append(args) + try: + monitor = manager.monitor_connection(interval=interval) + monitor.join(timeout=2) + finally: + threading.excepthook = previous_hook + return monitor, unhandled + + def test_monitor_connection_survives_socket_error(): # Regression for #3: when Asterisk stops, the periodic Ping's send_action # raises ManagerSocketError. The monitor thread must exit cleanly instead of @@ -27,17 +41,63 @@ def send_action(request, *args, **kwargs): reached.set() raise ManagerSocketError("Asterisk service stopped") - manager = _bare_manager(send_action) - - unhandled = [] - previous_hook = threading.excepthook - threading.excepthook = lambda args: unhandled.append(args) - try: - monitor = manager.monitor_connection(interval=0) - monitor.join(timeout=2) - finally: - threading.excepthook = previous_hook + monitor, unhandled = _run_monitor(_bare_manager(send_action)) assert reached.is_set() # the monitor actually attempted a ping assert not monitor.is_alive() # the thread terminated assert unhandled == [] # nothing escaped the thread uncaught + + +def test_monitor_connection_survives_manager_error(): + # Race guard for #3: the connection can drop between the loop's is_connected() + # check and the liveness re-check inside send_action, which raises ManagerError + # (not ManagerSocketError). The monitor must catch that path too and exit + # cleanly rather than crash the thread. + reached = threading.Event() + + def send_action(request, *args, **kwargs): + reached.set() + raise ManagerError("Not connected to an Asterisk manager") + + monitor, unhandled = _run_monitor(_bare_manager(send_action)) + + assert reached.is_set() + assert not monitor.is_alive() + assert unhandled == [] + + +def test_monitor_connection_pings_until_disconnected(): + # The monitor pings on each loop while connected and exits cleanly when + # is_connected() turns False (the orderly-shutdown path). + checks = {"count": 0} + + def is_connected(): + checks["count"] += 1 + return checks["count"] <= 3 # connected for three iterations, then down + + pings = [] + + def send_action(request, *args, **kwargs): + pings.append(request) + + manager = _bare_manager(send_action, is_connected=is_connected) + monitor, unhandled = _run_monitor(manager) + + assert isinstance(monitor, threading.Thread) # returns a joinable thread + assert not monitor.is_alive() # exited on the first disconnected check + assert len(pings) == 3 # one ping per connected check, none after + assert unhandled == [] + + +def test_monitor_connection_propagates_unexpected_errors(): + # The catch is deliberately narrow: only connection-loss errors stop the + # monitor quietly. An unexpected error must still surface (escape the thread) + # rather than be silently swallowed by a too-broad except. + def send_action(request, *args, **kwargs): + raise ValueError("unexpected") + + monitor, unhandled = _run_monitor(_bare_manager(send_action)) + + assert not monitor.is_alive() + assert len(unhandled) == 1 # the ValueError was not swallowed + assert unhandled[0].exc_type is ValueError From 8a036bac3f8cf0be8487fbd4aa2978ab0ac29c56 Mon Sep 17 00:00:00 2001 From: Karthic Raghupathi Date: Wed, 24 Jun 2026 21:20:14 -0400 Subject: [PATCH 3/5] Guard send_action against disconnect race; address review info items (#3) Review (PR #62 panel) found the monitor could still crash: send_action passes its liveness check, registers the request, then a concurrent disconnect() nulls _connection before the send, so send_message is called on None -> AttributeError, which the monitor's (ManagerError, ManagerSocketError) catch misses. Recheck _connection under the lock already held for the send; if it is gone, drop the just-registered request and raise ManagerSocketError instead of dereferencing None. This converts the raw crash into the documented exception the monitor already handles, completing the #3 fix. Supersedes the separately-filed #63. Also from the panel: - Assert each monitored ping is actually a Ping request. - Add a test for the debug-log-on-exit branch. - Add a send_action test proving the race raises ManagerSocketError, not AttributeError. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 1 + pystrix/ami/ami.py | 8 ++++++++ tests/test_ami_monitor.py | 25 +++++++++++++++++++++++++ tests/test_ami_send_action.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+) create mode 100644 tests/test_ami_send_action.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 10e3ca5..96ddc6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Fixed - `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. Pinging a downed connection raised `ManagerSocketError` (broken socket) or `ManagerError` (the liveness check inside `send_action` failing when the connection dropped just after the loop's own check) inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches both and stops cleanly, logging the reason at debug level when a logger is set. The method also returns the monitoring thread so callers can join it (#3). +- `Manager.send_action` no longer raises a raw `AttributeError` when a concurrent `disconnect()` clears the connection between the liveness check and the send. It now drops the just-registered request and raises `ManagerSocketError` instead (#3). ## [1.3.0] - 2026-06-24 diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index d383061..4ab1b09 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -601,6 +601,14 @@ def send_action(self, request, action_id=None, **kwargs): ) events = self._add_outstanding_request(action_id, request) with self._connection_lock: + if self._connection is None: + # A concurrent disconnect() cleared the connection after the + # liveness check above. Drop the request we just registered and + # raise the documented exception instead of dereferencing None. + self._outstanding_requests.pop(action_id, None) + raise ManagerSocketError( + "Connection closed before the request could be sent" + ) self._connection.send_message(command) if ( diff --git a/tests/test_ami_monitor.py b/tests/test_ami_monitor.py index 115f0de..633d062 100644 --- a/tests/test_ami_monitor.py +++ b/tests/test_ami_monitor.py @@ -86,9 +86,34 @@ def send_action(request, *args, **kwargs): assert isinstance(monitor, threading.Thread) # returns a joinable thread assert not monitor.is_alive() # exited on the first disconnected check assert len(pings) == 3 # one ping per connected check, none after + assert [request["Action"] for request in pings] == ["Ping"] * 3 # all Pings assert unhandled == [] +def test_monitor_connection_logs_reason_on_exit(): + # When a logger is configured, the monitor records why it stopped so a + # vanished monitor thread is traceable rather than silent. + class _RecordingLogger: + def __init__(self): + self.messages = [] + + def debug(self, message): + self.messages.append(message) + + logger = _RecordingLogger() + + def send_action(request, *args, **kwargs): + raise ManagerSocketError("Asterisk service stopped") + + manager = _bare_manager(send_action) + manager._logger = logger + monitor, unhandled = _run_monitor(manager) + + assert not monitor.is_alive() + assert unhandled == [] + assert any("monitor stopping" in message for message in logger.messages) + + def test_monitor_connection_propagates_unexpected_errors(): # The catch is deliberately narrow: only connection-loss errors stop the # monitor quietly. An unexpected error must still surface (escape the thread) diff --git a/tests/test_ami_send_action.py b/tests/test_ami_send_action.py new file mode 100644 index 0000000..e345ace --- /dev/null +++ b/tests/test_ami_send_action.py @@ -0,0 +1,28 @@ +"""Tests for `Manager.send_action` connection-loss handling.""" + +import threading + +import pytest + +from pystrix.ami import core +from pystrix.ami.ami import Manager, ManagerSocketError + + +def test_send_action_raises_when_connection_closed_mid_send(): + # Race guard (#3): a concurrent disconnect() can clear _connection after + # send_action's liveness check but before the send. send_action must raise + # ManagerSocketError rather than dereferencing None with an AttributeError, + # which the connection monitor's catch would miss and crash the thread. + manager = Manager.__new__(Manager) + manager.is_connected = lambda: True + manager._connection = None # disconnect() raced in and cleared it + manager._connection_lock = threading.Lock() + manager._outstanding_requests = {} + # Never connected, so neutralize __del__ cleanup to keep GC quiet. + manager.close = lambda: None + + with pytest.raises(ManagerSocketError): + manager.send_action(core.Ping(), action_id="race-1") + + # The request registered just before the failed send is dropped again. + assert manager._outstanding_requests == {} From c06821a128f8d14763f48feba513f585176a8aab Mon Sep 17 00:00:00 2001 From: Karthic Raghupathi Date: Wed, 24 Jun 2026 21:41:16 -0400 Subject: [PATCH 4/5] Address full-panel findings on the send_action race delta (#3) From the review panel on the race-fix delta: - Tighten the monitor logging test to assert the exception detail survives into the message, not just the static prefix (a dropped "% exc" would otherwise pass). - Add a synchronous-request variant of the race test. No shipped request sets synchronous = True, so a minimal subclass exercises the (events, finalisers) tracking entry and confirms the cleanup drops it too, so a synchronous caller is not left waiting on events that never arrive. - Reword the ManagerSocketError message to match the "Asterisk manager" family used by the other socket-error raises. - Note in the send_action docstring that a concurrent disconnect surfaces as ManagerSocketError. Co-Authored-By: Claude Opus 4.8 --- pystrix/ami/ami.py | 5 +++-- tests/test_ami_monitor.py | 7 +++++- tests/test_ami_send_action.py | 42 ++++++++++++++++++++++++++++------- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index 4ab1b09..d49cb71 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -587,7 +587,8 @@ def send_action(self, request, action_id=None, **kwargs): Raises `ManagerError` if the manager is not connected. - Raises `ManagerSocketError` if the socket is broken during transmission. + Raises `ManagerSocketError` if the socket is broken during transmission, including when a + concurrent `disconnect()` closes the connection before the request is sent. This function is thread-safe. """ @@ -607,7 +608,7 @@ def send_action(self, request, action_id=None, **kwargs): # raise the documented exception instead of dereferencing None. self._outstanding_requests.pop(action_id, None) raise ManagerSocketError( - "Connection closed before the request could be sent" + "Connection to Asterisk manager closed before the request could be sent" ) self._connection.send_message(command) diff --git a/tests/test_ami_monitor.py b/tests/test_ami_monitor.py index 633d062..8fe3291 100644 --- a/tests/test_ami_monitor.py +++ b/tests/test_ami_monitor.py @@ -111,7 +111,12 @@ def send_action(request, *args, **kwargs): assert not monitor.is_alive() assert unhandled == [] - assert any("monitor stopping" in message for message in logger.messages) + # The log records both that the monitor stopped and the underlying reason, + # so the exception detail must survive into the message. + assert any( + "monitor stopping" in message and "Asterisk service stopped" in message + for message in logger.messages + ) def test_monitor_connection_propagates_unexpected_errors(): diff --git a/tests/test_ami_send_action.py b/tests/test_ami_send_action.py index e345ace..c8bd134 100644 --- a/tests/test_ami_send_action.py +++ b/tests/test_ami_send_action.py @@ -5,24 +5,50 @@ import pytest from pystrix.ami import core -from pystrix.ami.ami import Manager, ManagerSocketError +from pystrix.ami.ami import Manager, ManagerSocketError, _Request -def test_send_action_raises_when_connection_closed_mid_send(): - # Race guard (#3): a concurrent disconnect() can clear _connection after - # send_action's liveness check but before the send. send_action must raise - # ManagerSocketError rather than dereferencing None with an AttributeError, - # which the connection monitor's catch would miss and crash the thread. +class _SynchronousRequest(_Request): + # A request that registers a (events, finalisers) entry rather than the + # plain None an asynchronous request stores. No shipped request sets + # synchronous = True, so define a minimal one to exercise that branch. + synchronous = True + + +def _disconnected_manager(): + # is_connected() still reports True, but a raced disconnect() already + # cleared _connection. Bypass __init__ and supply only what send_action + # touches; neutralize __del__ cleanup to keep GC quiet. manager = Manager.__new__(Manager) manager.is_connected = lambda: True - manager._connection = None # disconnect() raced in and cleared it + manager._connection = None manager._connection_lock = threading.Lock() manager._outstanding_requests = {} - # Never connected, so neutralize __del__ cleanup to keep GC quiet. manager.close = lambda: None + return manager + + +def test_send_action_raises_when_connection_closed_mid_send(): + # Race guard (#3): a concurrent disconnect() can clear _connection after + # send_action's liveness check but before the send. send_action must raise + # ManagerSocketError rather than dereferencing None with an AttributeError, + # which the connection monitor's catch would miss and crash the thread. + manager = _disconnected_manager() with pytest.raises(ManagerSocketError): manager.send_action(core.Ping(), action_id="race-1") # The request registered just before the failed send is dropped again. assert manager._outstanding_requests == {} + + +def test_send_action_drops_synchronous_request_when_connection_closed_mid_send(): + # The same race for a synchronous request, whose tracking entry is a + # (events, finalisers) tuple rather than None. The cleanup must drop it too, + # so a synchronous caller cannot be left waiting on events that never arrive. + manager = _disconnected_manager() + + with pytest.raises(ManagerSocketError): + manager.send_action(_SynchronousRequest("Test"), action_id="race-2") + + assert manager._outstanding_requests == {} From 3ea0aac08a4254d6376b0ddc6b9814028fbbb5ef Mon Sep 17 00:00:00 2001 From: Karthic Raghupathi Date: Wed, 24 Jun 2026 21:57:03 -0400 Subject: [PATCH 5/5] Clean up outstanding request when send_message fails mid-write (#3) Review follow-up: the disconnect race fix only dropped the registered request on the _connection-is-None path. A live socket that breaks inside send_message raises ManagerSocketError before the normal cleanup at _serve_outstanding_request runs, leaking the just-registered action_id in _outstanding_requests. Wrap send_message in except ManagerSocketError, pop the action_id, and re-raise. Add fake-connection tests that raise from send_message and assert the request is dropped for both async and synchronous requests. Co-Authored-By: Claude Opus 4.8 --- pystrix/ami/ami.py | 9 ++++++- tests/test_ami_send_action.py | 46 +++++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index d49cb71..ad59254 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -610,7 +610,14 @@ def send_action(self, request, action_id=None, **kwargs): raise ManagerSocketError( "Connection to Asterisk manager closed before the request could be sent" ) - self._connection.send_message(command) + try: + self._connection.send_message(command) + except ManagerSocketError: + # The socket broke mid-send, before the normal cleanup below can + # run. Drop the request we just registered so it does not linger + # in _outstanding_requests, then surface the error. + self._outstanding_requests.pop(action_id, None) + raise if ( request.aggregate and not request.synchronous diff --git a/tests/test_ami_send_action.py b/tests/test_ami_send_action.py index c8bd134..8893d80 100644 --- a/tests/test_ami_send_action.py +++ b/tests/test_ami_send_action.py @@ -15,19 +15,32 @@ class _SynchronousRequest(_Request): synchronous = True -def _disconnected_manager(): - # is_connected() still reports True, but a raced disconnect() already - # cleared _connection. Bypass __init__ and supply only what send_action - # touches; neutralize __del__ cleanup to keep GC quiet. +def _bare_manager(connection): + # Bypass __init__ and supply only what send_action touches; neutralize + # __del__ cleanup to keep GC quiet. is_connected() reports True so the + # send path runs; the supplied connection decides how the send behaves. manager = Manager.__new__(Manager) manager.is_connected = lambda: True - manager._connection = None + manager._connection = connection manager._connection_lock = threading.Lock() manager._outstanding_requests = {} manager.close = lambda: None return manager +def _disconnected_manager(): + # A raced disconnect() already cleared _connection while is_connected() + # still reports True. + return _bare_manager(None) + + +class _FailingConnection: + # A connection whose send fails mid-write, as a real socket does when it + # breaks during transmission. + def send_message(self, command): + raise ManagerSocketError("Connection to Asterisk manager broken while sending") + + def test_send_action_raises_when_connection_closed_mid_send(): # Race guard (#3): a concurrent disconnect() can clear _connection after # send_action's liveness check but before the send. send_action must raise @@ -52,3 +65,26 @@ def test_send_action_drops_synchronous_request_when_connection_closed_mid_send() manager.send_action(_SynchronousRequest("Test"), action_id="race-2") assert manager._outstanding_requests == {} + + +def test_send_action_drops_request_when_send_fails_mid_write(): + # If the socket breaks during send_message, send_action must drop the + # request it just registered before re-raising, so it does not linger in + # _outstanding_requests with no response ever coming. + manager = _bare_manager(_FailingConnection()) + + with pytest.raises(ManagerSocketError): + manager.send_action(core.Ping(), action_id="send-fail-1") + + assert manager._outstanding_requests == {} + + +def test_send_action_drops_synchronous_request_when_send_fails_mid_write(): + # The same cleanup for a synchronous request, whose tracking entry is a + # (events, finalisers) tuple rather than None. + manager = _bare_manager(_FailingConnection()) + + with pytest.raises(ManagerSocketError): + manager.send_action(_SynchronousRequest("Test"), action_id="send-fail-2") + + assert manager._outstanding_requests == {}