-
Notifications
You must be signed in to change notification settings - Fork 54
response-future: fence stale endpoint pool use #862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dk/base-response-future-stale-pool-857
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5482,6 +5482,7 @@ class ResponseFuture(object): | |
| _errbacks = None | ||
| _current_host = None | ||
| _connection = None | ||
| _connection_pool = None | ||
| _query_retries = 0 | ||
| _start_time = None | ||
| _metrics = None | ||
|
|
@@ -5578,7 +5579,7 @@ def _on_timeout(self, _attempts=0): | |
| # Capture connection stats before pool.return_connection() can alter state | ||
| conn_in_flight = self._connection.in_flight | ||
|
|
||
| pool = self.session._get_pool_by_host_identity(self._current_host) | ||
| pool = self._connection_pool | ||
| if pool and not pool.is_shutdown: | ||
| # Do not return the stream ID to the pool yet. We cannot reuse it | ||
| # because the node might still be processing the query and will | ||
|
|
@@ -5661,7 +5662,24 @@ def _query(self, host, message=None, cb=None): | |
| if message is None: | ||
| message = self.message | ||
|
|
||
| pool = self.session._get_pool_by_host_identity(host) | ||
| expected_endpoint = None | ||
| if isinstance(host, Host): | ||
| with host.lock: | ||
| expected_endpoint = host.endpoint | ||
| pool = self.session._get_pool_by_host_identity( | ||
| host, expected_endpoint=expected_endpoint) | ||
| else: | ||
| pool = self.session._get_pool_by_host_identity(host) | ||
|
|
||
| if pool and expected_endpoint is not None: | ||
| with host.lock: | ||
| endpoint_changed = not self.session._endpoints_match( | ||
| host.endpoint, expected_endpoint) | ||
| if endpoint_changed: | ||
| self._errors[host] = ConnectionException( | ||
| "Host endpoint changed while borrowing connection") | ||
| return None | ||
|
|
||
|
Comment on lines
+5665
to
+5682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Waaait, this is PR from one of your branches to another one of your branches. Why? I don't think you need PRs to make changes to your own work in progress. I think I should revisit this PR when it targets master. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, you mention this in cover letter, sorry. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still don't think there is much point in reviewing those changes before the base changes are understood and merged (or at least approved). |
||
| if not pool: | ||
| self._errors[host] = ConnectionException("Host has been marked down or removed") | ||
| return None | ||
|
|
@@ -5678,7 +5696,25 @@ def _query(self, host, message=None, cb=None): | |
| connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table) | ||
| else: | ||
| connection, request_id = pool.borrow_connection(timeout=2.0) | ||
|
|
||
| if expected_endpoint is not None: | ||
| with host.lock: | ||
| endpoint_changed = not self.session._endpoints_match( | ||
| host.endpoint, expected_endpoint) | ||
| if endpoint_changed: | ||
| try: | ||
| with connection.lock: | ||
| connection.request_ids.append(request_id) | ||
| pool.return_connection(connection) | ||
| finally: | ||
| connection = None | ||
| self._errors[host] = ConnectionException( | ||
| "Host endpoint changed while borrowing connection") | ||
| return None | ||
|
|
||
| self._connection = connection | ||
| self._connection_pool = pool | ||
|
|
||
| result_meta = self.prepared_statement.result_metadata if self.prepared_statement else [] | ||
|
|
||
| if cb is None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to be under lock for such simple assignment?