Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5482,6 +5482,7 @@ class ResponseFuture(object):
_errbacks = None
_current_host = None
_connection = None
_connection_pool = None
_query_retries = 0
_start_time = None
_metrics = None
Expand Down Expand Up @@ -5578,7 +5579,7 @@ def _on_timeout(self, _attempts=0):
# Capture connection stats before pool.return_connection() can alter state
conn_in_flight = self._connection.in_flight

pool = self.session._get_pool_by_host_identity(self._current_host)
pool = self._connection_pool
if pool and not pool.is_shutdown:
# Do not return the stream ID to the pool yet. We cannot reuse it
# because the node might still be processing the query and will
Expand Down Expand Up @@ -5661,7 +5662,24 @@ def _query(self, host, message=None, cb=None):
if message is None:
message = self.message

pool = self.session._get_pool_by_host_identity(host)
expected_endpoint = None
if isinstance(host, Host):
with host.lock:
expected_endpoint = host.endpoint
Comment on lines +5666 to +5668
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to be under lock for such simple assignment?

pool = self.session._get_pool_by_host_identity(
host, expected_endpoint=expected_endpoint)
else:
pool = self.session._get_pool_by_host_identity(host)

if pool and expected_endpoint is not None:
with host.lock:
endpoint_changed = not self.session._endpoints_match(
host.endpoint, expected_endpoint)
if endpoint_changed:
self._errors[host] = ConnectionException(
"Host endpoint changed while borrowing connection")
return None

Comment on lines +5665 to +5682
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waaait, this is PR from one of your branches to another one of your branches. Why?
I wanted to locally check what is _get_pool_by_host_identity because I don't understand the purpose of this change, but there is not such function.

I don't think you need PRs to make changes to your own work in progress. I think I should revisit this PR when it targets master.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mention this in cover letter, sorry.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think there is much point in reviewing those changes before the base changes are understood and merged (or at least approved).

if not pool:
self._errors[host] = ConnectionException("Host has been marked down or removed")
return None
Expand All @@ -5678,7 +5696,25 @@ def _query(self, host, message=None, cb=None):
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
else:
connection, request_id = pool.borrow_connection(timeout=2.0)

if expected_endpoint is not None:
with host.lock:
endpoint_changed = not self.session._endpoints_match(
host.endpoint, expected_endpoint)
if endpoint_changed:
try:
with connection.lock:
connection.request_ids.append(request_id)
pool.return_connection(connection)
finally:
connection = None
self._errors[host] = ConnectionException(
"Host endpoint changed while borrowing connection")
return None

self._connection = connection
self._connection_pool = pool

result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []

if cb is None:
Expand Down
Loading
Loading