Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,27 @@
have_future = False
futures = set()
try:
# Guard against stale on_up destroying a healthy pool.
# Case 1: Host was replaced in metadata (different object, same endpoint).
current_host = self.metadata.get_host(host.endpoint)
if current_host is not None and current_host is not host and current_host.is_up:
log.debug("Host %s has been replaced by %s which is already up; "
"skipping stale on_up handling", host, current_host)
with host.lock:
host._currently_handling_node_up = False
return
Comment thread
bitpathfinder marked this conversation as resolved.

# Case 2: A healthy pool already exists (e.g. on_add already ran).
for session in tuple(self.sessions):
pool = session._pools.get(host)
if pool and not pool.is_shutdown:
log.debug("Host %s already has a healthy pool; "
"skipping on_up pool teardown/rebuild", host)
with host.lock:
host.set_up()
host._currently_handling_node_up = False
return

log.info("Host %s may be up; will prepare queries and open connection pool", host)

reconnector = host.get_and_set_reconnection_handler(None)
Expand Down Expand Up @@ -4340,7 +4361,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4364 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
71 changes: 71 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,74 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self):
)

patched_logger.warning.assert_not_called()


class TestOnUpStaleHost(unittest.TestCase):
"""
Tests for on_up() not destroying a healthy pool when called with a stale
host reference after a replace-with-same-IP.
"""

def _make_cluster(self, sessions=None):
"""Create a minimal Cluster object without connecting."""
from threading import Lock
cluster = object.__new__(Cluster)
cluster.is_shutdown = False
cluster.metadata = Mock()
cluster.sessions = sessions or set()
cluster.profile_manager = Mock()
cluster.control_connection = Mock()
cluster._listeners = set()
cluster._listener_lock = Lock()
return cluster

def test_on_up_skips_when_host_replaced_in_metadata(self):
"""
If a NEW_NODE event already replaced the old host with a new one
(same endpoint, different host_id), on_up(old_host) should bail out
instead of tearing down the new host's pool.
"""
from cassandra.connection import DefaultEndPoint
endpoint = DefaultEndPoint('127.0.0.1')

old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
old_host.is_up = False
old_host._currently_handling_node_up = False

new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
new_host.is_up = True

cluster = self._make_cluster()
cluster.metadata.get_host = Mock(return_value=new_host)

cluster.on_up(old_host)

self.assertFalse(old_host._currently_handling_node_up,
"on_up should have returned early and reset _currently_handling_node_up")

def test_on_up_skips_when_healthy_pool_exists(self):
"""
If on_add already created a healthy pool for this host, a subsequent
on_up should not tear it down and rebuild it.
"""
from cassandra.connection import DefaultEndPoint
endpoint = DefaultEndPoint('127.0.0.1')

host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
host.is_up = False
host._currently_handling_node_up = False

mock_pool = Mock()
mock_pool.is_shutdown = False
mock_session = Mock()
mock_session._pools = {host: mock_pool}

cluster = self._make_cluster(sessions={mock_session})
cluster.metadata.get_host = Mock(return_value=host)

cluster.on_up(host)

mock_session.remove_pool.assert_not_called()
self.assertTrue(host.is_up, "on_up should have marked host as up")
self.assertFalse(host._currently_handling_node_up,
"on_up should have returned early and reset _currently_handling_node_up")
Loading