diff --git a/CHANGELOG.md b/CHANGELOG.md index 17a8fb8..96ddc6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. Pinging a downed connection raised `ManagerSocketError` (broken socket) or `ManagerError` (the liveness check inside `send_action` failing when the connection dropped just after the loop's own check) inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches both and stops cleanly, logging the reason at debug level when a logger is set. The method also returns the monitoring thread so callers can join it (#3). +- `Manager.send_action` no longer raises a raw `AttributeError` when a concurrent `disconnect()` clears the connection between the liveness check and the send. It now drops the just-registered request and raises `ManagerSocketError` instead (#3). + ## [1.3.0] - 2026-06-24 ### Added diff --git a/pystrix/ami/ami.py b/pystrix/ami/ami.py index 7722146..ad59254 100644 --- a/pystrix/ami/ami.py +++ b/pystrix/ami/ami.py @@ -458,13 +458,29 @@ def monitor_connection(self, interval=2.5): `interval` is the number of seconds to wait between automated Pings to see if Asterisk is still alive; defaults to 2.5. + + Returns the monitoring `threading.Thread`, which a caller may join. The thread stops on + its own when the connection drops: pinging a downed connection raises `ManagerSocketError` + (broken socket) or `ManagerError` (the liveness re-check inside `send_action` fails when + the connection dropped just after the loop's own check), and the monitor catches both to + exit cleanly rather than dying with an unhandled traceback. """ def _monitor_connection(): from pystrix.ami import core while self.is_connected(): - self.send_action(core.Ping()) + try: + self.send_action(core.Ping()) + except (ManagerError, ManagerSocketError) as exc: + # The connection dropped between the loop's check and the ping: + # either the socket broke (ManagerSocketError) or send_action's + # own liveness check failed (ManagerError, e.g. Asterisk stopped). + # Nothing can be reported, so the monitor stops cleanly instead of + # crashing the thread. Record why it left, when a logger is set. + if self._logger: + self._logger.debug("AMI connection monitor stopping: %s" % exc) + break time.sleep(interval) monitor = threading.Thread( @@ -472,6 +488,7 @@ def _monitor_connection(): ) monitor.daemon = True monitor.start() + return monitor def _compile_callback_definition(self, event, function): """ @@ -570,7 +587,8 @@ def send_action(self, request, action_id=None, **kwargs): Raises `ManagerError` if the manager is not connected. - Raises `ManagerSocketError` if the socket is broken during transmission. + Raises `ManagerSocketError` if the socket is broken during transmission, including when a + concurrent `disconnect()` closes the connection before the request is sent. This function is thread-safe. """ @@ -584,7 +602,22 @@ def send_action(self, request, action_id=None, **kwargs): ) events = self._add_outstanding_request(action_id, request) with self._connection_lock: - self._connection.send_message(command) + if self._connection is None: + # A concurrent disconnect() cleared the connection after the + # liveness check above. Drop the request we just registered and + # raise the documented exception instead of dereferencing None. + self._outstanding_requests.pop(action_id, None) + raise ManagerSocketError( + "Connection to Asterisk manager closed before the request could be sent" + ) + try: + self._connection.send_message(command) + except ManagerSocketError: + # The socket broke mid-send, before the normal cleanup below can + # run. Drop the request we just registered so it does not linger + # in _outstanding_requests, then surface the error. + self._outstanding_requests.pop(action_id, None) + raise if ( request.aggregate and not request.synchronous diff --git a/tests/test_ami_monitor.py b/tests/test_ami_monitor.py new file mode 100644 index 0000000..8fe3291 --- /dev/null +++ b/tests/test_ami_monitor.py @@ -0,0 +1,133 @@ +"""Tests for the AMI connection monitor (`Manager.monitor_connection`).""" + +import threading + +from pystrix.ami.ami import Manager, ManagerError, ManagerSocketError + + +def _bare_manager(send_action, is_connected=None): + # Bypass __init__ so no real socket or reader thread is created. The monitor + # only touches is_connected() and send_action(), so those are all we supply. + manager = Manager.__new__(Manager) + manager.is_connected = is_connected or (lambda: True) + manager.send_action = send_action + # The manager was never connected, so its __del__ cleanup has nothing real + # to release; neutralize it to keep garbage collection quiet during tests. + manager.close = lambda: None + return manager + + +def _run_monitor(manager, interval=0): + # Run the monitor to completion, capturing anything that escapes the thread + # uncaught via threading.excepthook. Returns (monitor_thread, unhandled). + unhandled = [] + previous_hook = threading.excepthook + threading.excepthook = lambda args: unhandled.append(args) + try: + monitor = manager.monitor_connection(interval=interval) + monitor.join(timeout=2) + finally: + threading.excepthook = previous_hook + return monitor, unhandled + + +def test_monitor_connection_survives_socket_error(): + # Regression for #3: when Asterisk stops, the periodic Ping's send_action + # raises ManagerSocketError. The monitor thread must exit cleanly instead of + # dying with an unhandled exception (a traceback dumped to stderr). + reached = threading.Event() + + def send_action(request, *args, **kwargs): + reached.set() + raise ManagerSocketError("Asterisk service stopped") + + monitor, unhandled = _run_monitor(_bare_manager(send_action)) + + assert reached.is_set() # the monitor actually attempted a ping + assert not monitor.is_alive() # the thread terminated + assert unhandled == [] # nothing escaped the thread uncaught + + +def test_monitor_connection_survives_manager_error(): + # Race guard for #3: the connection can drop between the loop's is_connected() + # check and the liveness re-check inside send_action, which raises ManagerError + # (not ManagerSocketError). The monitor must catch that path too and exit + # cleanly rather than crash the thread. + reached = threading.Event() + + def send_action(request, *args, **kwargs): + reached.set() + raise ManagerError("Not connected to an Asterisk manager") + + monitor, unhandled = _run_monitor(_bare_manager(send_action)) + + assert reached.is_set() + assert not monitor.is_alive() + assert unhandled == [] + + +def test_monitor_connection_pings_until_disconnected(): + # The monitor pings on each loop while connected and exits cleanly when + # is_connected() turns False (the orderly-shutdown path). + checks = {"count": 0} + + def is_connected(): + checks["count"] += 1 + return checks["count"] <= 3 # connected for three iterations, then down + + pings = [] + + def send_action(request, *args, **kwargs): + pings.append(request) + + manager = _bare_manager(send_action, is_connected=is_connected) + monitor, unhandled = _run_monitor(manager) + + assert isinstance(monitor, threading.Thread) # returns a joinable thread + assert not monitor.is_alive() # exited on the first disconnected check + assert len(pings) == 3 # one ping per connected check, none after + assert [request["Action"] for request in pings] == ["Ping"] * 3 # all Pings + assert unhandled == [] + + +def test_monitor_connection_logs_reason_on_exit(): + # When a logger is configured, the monitor records why it stopped so a + # vanished monitor thread is traceable rather than silent. + class _RecordingLogger: + def __init__(self): + self.messages = [] + + def debug(self, message): + self.messages.append(message) + + logger = _RecordingLogger() + + def send_action(request, *args, **kwargs): + raise ManagerSocketError("Asterisk service stopped") + + manager = _bare_manager(send_action) + manager._logger = logger + monitor, unhandled = _run_monitor(manager) + + assert not monitor.is_alive() + assert unhandled == [] + # The log records both that the monitor stopped and the underlying reason, + # so the exception detail must survive into the message. + assert any( + "monitor stopping" in message and "Asterisk service stopped" in message + for message in logger.messages + ) + + +def test_monitor_connection_propagates_unexpected_errors(): + # The catch is deliberately narrow: only connection-loss errors stop the + # monitor quietly. An unexpected error must still surface (escape the thread) + # rather than be silently swallowed by a too-broad except. + def send_action(request, *args, **kwargs): + raise ValueError("unexpected") + + monitor, unhandled = _run_monitor(_bare_manager(send_action)) + + assert not monitor.is_alive() + assert len(unhandled) == 1 # the ValueError was not swallowed + assert unhandled[0].exc_type is ValueError diff --git a/tests/test_ami_send_action.py b/tests/test_ami_send_action.py new file mode 100644 index 0000000..8893d80 --- /dev/null +++ b/tests/test_ami_send_action.py @@ -0,0 +1,90 @@ +"""Tests for `Manager.send_action` connection-loss handling.""" + +import threading + +import pytest + +from pystrix.ami import core +from pystrix.ami.ami import Manager, ManagerSocketError, _Request + + +class _SynchronousRequest(_Request): + # A request that registers a (events, finalisers) entry rather than the + # plain None an asynchronous request stores. No shipped request sets + # synchronous = True, so define a minimal one to exercise that branch. + synchronous = True + + +def _bare_manager(connection): + # Bypass __init__ and supply only what send_action touches; neutralize + # __del__ cleanup to keep GC quiet. is_connected() reports True so the + # send path runs; the supplied connection decides how the send behaves. + manager = Manager.__new__(Manager) + manager.is_connected = lambda: True + manager._connection = connection + manager._connection_lock = threading.Lock() + manager._outstanding_requests = {} + manager.close = lambda: None + return manager + + +def _disconnected_manager(): + # A raced disconnect() already cleared _connection while is_connected() + # still reports True. + return _bare_manager(None) + + +class _FailingConnection: + # A connection whose send fails mid-write, as a real socket does when it + # breaks during transmission. + def send_message(self, command): + raise ManagerSocketError("Connection to Asterisk manager broken while sending") + + +def test_send_action_raises_when_connection_closed_mid_send(): + # Race guard (#3): a concurrent disconnect() can clear _connection after + # send_action's liveness check but before the send. send_action must raise + # ManagerSocketError rather than dereferencing None with an AttributeError, + # which the connection monitor's catch would miss and crash the thread. + manager = _disconnected_manager() + + with pytest.raises(ManagerSocketError): + manager.send_action(core.Ping(), action_id="race-1") + + # The request registered just before the failed send is dropped again. + assert manager._outstanding_requests == {} + + +def test_send_action_drops_synchronous_request_when_connection_closed_mid_send(): + # The same race for a synchronous request, whose tracking entry is a + # (events, finalisers) tuple rather than None. The cleanup must drop it too, + # so a synchronous caller cannot be left waiting on events that never arrive. + manager = _disconnected_manager() + + with pytest.raises(ManagerSocketError): + manager.send_action(_SynchronousRequest("Test"), action_id="race-2") + + assert manager._outstanding_requests == {} + + +def test_send_action_drops_request_when_send_fails_mid_write(): + # If the socket breaks during send_message, send_action must drop the + # request it just registered before re-raising, so it does not linger in + # _outstanding_requests with no response ever coming. + manager = _bare_manager(_FailingConnection()) + + with pytest.raises(ManagerSocketError): + manager.send_action(core.Ping(), action_id="send-fail-1") + + assert manager._outstanding_requests == {} + + +def test_send_action_drops_synchronous_request_when_send_fails_mid_write(): + # The same cleanup for a synchronous request, whose tracking entry is a + # (events, finalisers) tuple rather than None. + manager = _bare_manager(_FailingConnection()) + + with pytest.raises(ManagerSocketError): + manager.send_action(_SynchronousRequest("Test"), action_id="send-fail-2") + + assert manager._outstanding_requests == {}