diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 5e7a68bc1c..953f586afb 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1930,6 +1930,27 @@ def on_up(self, host): have_future = False futures = set() try: + # Guard against stale on_up destroying a healthy pool. + # Case 1: Host was replaced in metadata (different object, same endpoint). + current_host = self.metadata.get_host(host.endpoint) + if current_host is not None and current_host is not host and current_host.is_up: + log.debug("Host %s has been replaced by %s which is already up; " + "skipping stale on_up handling", host, current_host) + with host.lock: + host._currently_handling_node_up = False + return + + # Case 2: A healthy pool already exists (e.g. on_add already ran). + for session in tuple(self.sessions): + pool = session._pools.get(host) + if pool and not pool.is_shutdown: + log.debug("Host %s already has a healthy pool; " + "skipping on_up pool teardown/rebuild", host) + with host.lock: + host.set_up() + host._currently_handling_node_up = False + return + log.info("Host %s may be up; will prepare queries and open connection pool", host) reconnector = host.get_and_set_reconnection_handler(None) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index a4f0ebc4d3..107f73fd47 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -719,3 +719,74 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self): ) patched_logger.warning.assert_not_called() + + +class TestOnUpStaleHost(unittest.TestCase): + """ + Tests for on_up() not destroying a healthy pool when called with a stale + host reference after a replace-with-same-IP. + """ + + def _make_cluster(self, sessions=None): + """Create a minimal Cluster object without connecting.""" + from threading import Lock + cluster = object.__new__(Cluster) + cluster.is_shutdown = False + cluster.metadata = Mock() + cluster.sessions = sessions or set() + cluster.profile_manager = Mock() + cluster.control_connection = Mock() + cluster._listeners = set() + cluster._listener_lock = Lock() + return cluster + + def test_on_up_skips_when_host_replaced_in_metadata(self): + """ + If a NEW_NODE event already replaced the old host with a new one + (same endpoint, different host_id), on_up(old_host) should bail out + instead of tearing down the new host's pool. + """ + from cassandra.connection import DefaultEndPoint + endpoint = DefaultEndPoint('127.0.0.1') + + old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + old_host.is_up = False + old_host._currently_handling_node_up = False + + new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + new_host.is_up = True + + cluster = self._make_cluster() + cluster.metadata.get_host = Mock(return_value=new_host) + + cluster.on_up(old_host) + + self.assertFalse(old_host._currently_handling_node_up, + "on_up should have returned early and reset _currently_handling_node_up") + + def test_on_up_skips_when_healthy_pool_exists(self): + """ + If on_add already created a healthy pool for this host, a subsequent + on_up should not tear it down and rebuild it. + """ + from cassandra.connection import DefaultEndPoint + endpoint = DefaultEndPoint('127.0.0.1') + + host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + host.is_up = False + host._currently_handling_node_up = False + + mock_pool = Mock() + mock_pool.is_shutdown = False + mock_session = Mock() + mock_session._pools = {host: mock_pool} + + cluster = self._make_cluster(sessions={mock_session}) + cluster.metadata.get_host = Mock(return_value=host) + + cluster.on_up(host) + + mock_session.remove_pool.assert_not_called() + self.assertTrue(host.is_up, "on_up should have marked host as up") + self.assertFalse(host._currently_handling_node_up, + "on_up should have returned early and reset _currently_handling_node_up")