Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed
- `Manager.monitor_connection` no longer crashes its monitoring thread when the connection drops. Pinging a downed connection raised `ManagerSocketError` (broken socket) or `ManagerError` (the liveness check inside `send_action` failing when the connection dropped just after the loop's own check) inside the thread, which dumped a traceback to stderr and killed the monitor. The monitor now catches both and stops cleanly, logging the reason at debug level when a logger is set. The method also returns the monitoring thread so callers can join it (#3).
- `Manager.send_action` no longer raises a raw `AttributeError` when a concurrent `disconnect()` clears the connection between the liveness check and the send. It now drops the just-registered request and raises `ManagerSocketError` instead (#3).

## [1.3.0] - 2026-06-24

### Added
Expand Down
39 changes: 36 additions & 3 deletions pystrix/ami/ami.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,20 +458,37 @@ def monitor_connection(self, interval=2.5):

`interval` is the number of seconds to wait between automated Pings to see if Asterisk
is still alive; defaults to 2.5.

Returns the monitoring `threading.Thread`, which a caller may join. The thread stops on
its own when the connection drops: pinging a downed connection raises `ManagerSocketError`
(broken socket) or `ManagerError` (the liveness re-check inside `send_action` fails when
the connection dropped just after the loop's own check), and the monitor catches both to
exit cleanly rather than dying with an unhandled traceback.
"""

def _monitor_connection():
from pystrix.ami import core

while self.is_connected():
self.send_action(core.Ping())
try:
self.send_action(core.Ping())
except (ManagerError, ManagerSocketError) as exc:
# The connection dropped between the loop's check and the ping:
# either the socket broke (ManagerSocketError) or send_action's
# own liveness check failed (ManagerError, e.g. Asterisk stopped).
# Nothing can be reported, so the monitor stops cleanly instead of
# crashing the thread. Record why it left, when a logger is set.
if self._logger:
self._logger.debug("AMI connection monitor stopping: %s" % exc)
break
time.sleep(interval)

monitor = threading.Thread(
target=_monitor_connection, name="pystrix-ami-monitor"
)
monitor.daemon = True
monitor.start()
return monitor

def _compile_callback_definition(self, event, function):
"""
Expand Down Expand Up @@ -570,7 +587,8 @@ def send_action(self, request, action_id=None, **kwargs):

Raises `ManagerError` if the manager is not connected.

Raises `ManagerSocketError` if the socket is broken during transmission.
Raises `ManagerSocketError` if the socket is broken during transmission, including when a
concurrent `disconnect()` closes the connection before the request is sent.

This function is thread-safe.
"""
Expand All @@ -584,7 +602,22 @@ def send_action(self, request, action_id=None, **kwargs):
)
events = self._add_outstanding_request(action_id, request)
with self._connection_lock:
self._connection.send_message(command)
if self._connection is None:
# A concurrent disconnect() cleared the connection after the
# liveness check above. Drop the request we just registered and
# raise the documented exception instead of dereferencing None.
self._outstanding_requests.pop(action_id, None)
raise ManagerSocketError(
"Connection to Asterisk manager closed before the request could be sent"
)
try:
self._connection.send_message(command)
except ManagerSocketError:
# The socket broke mid-send, before the normal cleanup below can
# run. Drop the request we just registered so it does not linger
# in _outstanding_requests, then surface the error.
self._outstanding_requests.pop(action_id, None)
raise

if (
request.aggregate and not request.synchronous
Expand Down
133 changes: 133 additions & 0 deletions tests/test_ami_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Tests for the AMI connection monitor (`Manager.monitor_connection`)."""

import threading

from pystrix.ami.ami import Manager, ManagerError, ManagerSocketError


def _bare_manager(send_action, is_connected=None):
# Bypass __init__ so no real socket or reader thread is created. The monitor
# only touches is_connected() and send_action(), so those are all we supply.
manager = Manager.__new__(Manager)
manager.is_connected = is_connected or (lambda: True)
manager.send_action = send_action
# The manager was never connected, so its __del__ cleanup has nothing real
# to release; neutralize it to keep garbage collection quiet during tests.
manager.close = lambda: None
return manager


def _run_monitor(manager, interval=0):
# Run the monitor to completion, capturing anything that escapes the thread
# uncaught via threading.excepthook. Returns (monitor_thread, unhandled).
unhandled = []
previous_hook = threading.excepthook
threading.excepthook = lambda args: unhandled.append(args)
try:
monitor = manager.monitor_connection(interval=interval)
monitor.join(timeout=2)
finally:
threading.excepthook = previous_hook
return monitor, unhandled


def test_monitor_connection_survives_socket_error():
# Regression for #3: when Asterisk stops, the periodic Ping's send_action
# raises ManagerSocketError. The monitor thread must exit cleanly instead of
# dying with an unhandled exception (a traceback dumped to stderr).
reached = threading.Event()

def send_action(request, *args, **kwargs):
reached.set()
raise ManagerSocketError("Asterisk service stopped")

monitor, unhandled = _run_monitor(_bare_manager(send_action))

assert reached.is_set() # the monitor actually attempted a ping
assert not monitor.is_alive() # the thread terminated
assert unhandled == [] # nothing escaped the thread uncaught


def test_monitor_connection_survives_manager_error():
# Race guard for #3: the connection can drop between the loop's is_connected()
# check and the liveness re-check inside send_action, which raises ManagerError
# (not ManagerSocketError). The monitor must catch that path too and exit
# cleanly rather than crash the thread.
reached = threading.Event()

def send_action(request, *args, **kwargs):
reached.set()
raise ManagerError("Not connected to an Asterisk manager")

monitor, unhandled = _run_monitor(_bare_manager(send_action))

assert reached.is_set()
assert not monitor.is_alive()
assert unhandled == []


def test_monitor_connection_pings_until_disconnected():
# The monitor pings on each loop while connected and exits cleanly when
# is_connected() turns False (the orderly-shutdown path).
checks = {"count": 0}

def is_connected():
checks["count"] += 1
return checks["count"] <= 3 # connected for three iterations, then down

pings = []

def send_action(request, *args, **kwargs):
pings.append(request)

manager = _bare_manager(send_action, is_connected=is_connected)
monitor, unhandled = _run_monitor(manager)

assert isinstance(monitor, threading.Thread) # returns a joinable thread
assert not monitor.is_alive() # exited on the first disconnected check
assert len(pings) == 3 # one ping per connected check, none after
assert [request["Action"] for request in pings] == ["Ping"] * 3 # all Pings
assert unhandled == []


def test_monitor_connection_logs_reason_on_exit():
# When a logger is configured, the monitor records why it stopped so a
# vanished monitor thread is traceable rather than silent.
class _RecordingLogger:
def __init__(self):
self.messages = []

def debug(self, message):
self.messages.append(message)

logger = _RecordingLogger()

def send_action(request, *args, **kwargs):
raise ManagerSocketError("Asterisk service stopped")

manager = _bare_manager(send_action)
manager._logger = logger
monitor, unhandled = _run_monitor(manager)

assert not monitor.is_alive()
assert unhandled == []
# The log records both that the monitor stopped and the underlying reason,
# so the exception detail must survive into the message.
assert any(
"monitor stopping" in message and "Asterisk service stopped" in message
for message in logger.messages
)


def test_monitor_connection_propagates_unexpected_errors():
# The catch is deliberately narrow: only connection-loss errors stop the
# monitor quietly. An unexpected error must still surface (escape the thread)
# rather than be silently swallowed by a too-broad except.
def send_action(request, *args, **kwargs):
raise ValueError("unexpected")

monitor, unhandled = _run_monitor(_bare_manager(send_action))

assert not monitor.is_alive()
assert len(unhandled) == 1 # the ValueError was not swallowed
assert unhandled[0].exc_type is ValueError
90 changes: 90 additions & 0 deletions tests/test_ami_send_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Tests for `Manager.send_action` connection-loss handling."""

import threading

import pytest

from pystrix.ami import core
from pystrix.ami.ami import Manager, ManagerSocketError, _Request


class _SynchronousRequest(_Request):
# A request that registers a (events, finalisers) entry rather than the
# plain None an asynchronous request stores. No shipped request sets
# synchronous = True, so define a minimal one to exercise that branch.
synchronous = True


def _bare_manager(connection):
# Bypass __init__ and supply only what send_action touches; neutralize
# __del__ cleanup to keep GC quiet. is_connected() reports True so the
# send path runs; the supplied connection decides how the send behaves.
manager = Manager.__new__(Manager)
manager.is_connected = lambda: True
manager._connection = connection
manager._connection_lock = threading.Lock()
manager._outstanding_requests = {}
manager.close = lambda: None
return manager


def _disconnected_manager():
# A raced disconnect() already cleared _connection while is_connected()
# still reports True.
return _bare_manager(None)


class _FailingConnection:
# A connection whose send fails mid-write, as a real socket does when it
# breaks during transmission.
def send_message(self, command):
raise ManagerSocketError("Connection to Asterisk manager broken while sending")


def test_send_action_raises_when_connection_closed_mid_send():
# Race guard (#3): a concurrent disconnect() can clear _connection after
# send_action's liveness check but before the send. send_action must raise
# ManagerSocketError rather than dereferencing None with an AttributeError,
# which the connection monitor's catch would miss and crash the thread.
manager = _disconnected_manager()

with pytest.raises(ManagerSocketError):
manager.send_action(core.Ping(), action_id="race-1")

# The request registered just before the failed send is dropped again.
assert manager._outstanding_requests == {}


def test_send_action_drops_synchronous_request_when_connection_closed_mid_send():
# The same race for a synchronous request, whose tracking entry is a
# (events, finalisers) tuple rather than None. The cleanup must drop it too,
# so a synchronous caller cannot be left waiting on events that never arrive.
manager = _disconnected_manager()

with pytest.raises(ManagerSocketError):
manager.send_action(_SynchronousRequest("Test"), action_id="race-2")

assert manager._outstanding_requests == {}


def test_send_action_drops_request_when_send_fails_mid_write():
# If the socket breaks during send_message, send_action must drop the
# request it just registered before re-raising, so it does not linger in
# _outstanding_requests with no response ever coming.
manager = _bare_manager(_FailingConnection())

with pytest.raises(ManagerSocketError):
manager.send_action(core.Ping(), action_id="send-fail-1")

assert manager._outstanding_requests == {}


def test_send_action_drops_synchronous_request_when_send_fails_mid_write():
# The same cleanup for a synchronous request, whose tracking entry is a
# (events, finalisers) tuple rather than None.
manager = _bare_manager(_FailingConnection())

with pytest.raises(ManagerSocketError):
manager.send_action(_SynchronousRequest("Test"), action_id="send-fail-2")

assert manager._outstanding_requests == {}
Loading