-
Notifications
You must be signed in to change notification settings - Fork 551
Stop reusing stream ids of requests that have timed out due to client-side timeout #1114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When a timeout occurs, the ResponseFuture associated with the query returns its stream ID to the associated connection's free stream ID pool - so that the stream ID can be immediately reused by another query. However, that it incorrect and dangerous. If query A times out before it receives a response from the cluster, a different query B might be issued on the same connection and stream. If response for query A arrives earlier than the response for query B, the first one might be misinterpreted as the response for query B. This commit changes the logic so that stream IDs are not returned on timeout - now, they are only returned after receiving a response.
This commit fixes tracking of in_flight requests. Before it, in case of a client-side timeout, the response ID was not returned to the pool, but the in_flight counter was decremented anyway. This counter is used to determine if there is a need to wait for stream IDs to be freed - without this patch, it could happen that the driver throught that it can initiate another request due to in_flight counter being low, but there weren't any free stream IDs to allocate, so an assertion was triggered and the connection was defuncted and opened again. Now, requests timed out on the client side are tracked in the orphaned_request_ids field, and the in_flight counter is decremented only after the response is received.
Thanks for the pull request @haaawk ! I've been a bit swamped recently but I'll try to get this reviewed as soon as I can. Have you signed the Contributor License Agreement for contributions to DataStax open source projects? If not you can find it at https://cla.datastax.com. Thanks! |
Thanks @absurdfarce. I'm contributing this code on behalf of my employer ScyllaDB. We have signed Contributor License Agreement in the past when we were contributing fixes to Java driver. For more context see apache/cassandra-java-driver#1195 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this so far @haaawk! I agree this is an issue and I like the general direction you've taken to solve it.
I'm publishing a review noting a few things that stood out to me on an initial walk through. I'm going to try to take another pass at this early next week but I wanted to get some feedback to you as quickly as possible
Thanks again for all your work here!
with self._lock: | ||
if not self._is_replacing: | ||
self._is_replacing = True | ||
self._session.submit(self._replace, conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once this completes self._connection will be reset but conn will still be pointing to the prior value of self._connection which... has hit it's threshold. Seems like we want to update conn here as well. Shouldn't be much more than just another "conn = self._get_connection()" after the log message below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this code is ok.
The intention is to keep using the old connection until a new connection is ready to operate. Otherwise we would block the client until the new connection is read and we probably don't want to do this. self._get_connection()
will start to return a new connection after _replace
assigns self._connection
. It's ok to use the old connection for a bit longer as the new connection should be established relatively quickly. Does that make sense, @absurdfarce?
Until the replacing is called self._get_connection()
will return the same connection that's already assigned to conn
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, after reading this more carefully it became apparent I wasn't evaluating the full context here @haaawk . If anything I'd say the requirement is stronger than your argument above. "conn" is scoped to borrow_connection() and we're using it to validate a good working connection before returning from that fn in the "while True" loop below. There's no obvious point to setting conn at this point since the loop handles all of that.
I'm good with what you have here.
cassandra/connection.py
Outdated
self.orphaned_request_ids.remove(stream_id) | ||
need_notify_of_release = True | ||
if need_notify_of_release and self._owning_pool: | ||
self._owning_pool.on_orphaned_stream_released() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fairly concerned about introducing explicit awareness of the pool that contains a connection to the connection itself; it doesn't seem like that's something he should concern himself with. Rather it seems cleaner to make this a callback (say "on_orphan_stream_release" or something similar). The pool can wrap whatever function it wants for this callback when creating the connection. This logic would presumably be pool-specific, but that seems like the appropriate place for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like the fix you put in here @haaawk !
Side note: I looked into the AppVeyor test failures that sprang up after this change went in. Turns out this change introduces the possibility of a random test failure in test_cluster.DontPrepareOnIgnoredHostsTest. There's a race there that can trigger an assertion failure if one specific side wins the race. I'm planning on writing all that up and filing a separate ticket for it, however; based on my testing the actual failure case is pretty unlikely. So I don't see any reason why that issue should hold up getting this change in.
Before this patch, the following situation could occur: 1. On a single connection, multiple requests are spawned up to the maximum concurrency, 2. We want to issue more requests but we need to wait on a condition variable because requests spawned in 1. took all stream IDs and we need to wait until some of them are freed, 3. All requests from point 1. time out on the client side - we cannot free their stream IDs until the database node responds, 4. Responses for requests issued in point 1. arrive, but the Connection class has no access to the condition variable mentioned in point 2., so no requests from point 2. are admitted, 5. Requests from point 2. waiting on the condition variable time out despite there are stream IDs available. This commit adds an _on_orphaned_stream_released field to the Connection class, and now it notifies the owning pool in case a timed out request receives a late response and a stream ID is freed by calling _on_orphaned_stream_released callback.
In a situation of very high overload or poor networking conditions, it might happen that there is a large number of outstanding requests on a single connection. Each request reserves a stream ID which cannot be reused until a response for it arrives, even if the request already timed out on the client side. Because the pool of available stream IDs for a single connection is limited, such situation might cause the set of free stream IDs to shrink to a very small size (including zero), which will drastically reduce the available concurrency on the connection, or even render it unusable for some time. In order to prevent this, the following strategy is adopted: when the number of orphaned stream IDs reaches a certain threshold (e.g. 75% of all available stream IDs), the connection becomes marked as overloaded. Meanwhile, a new connection is opened - when it becomes available, it replaces the old one, and the old connection is moved to "trash" where it waits until all its outstanding requests either respond or time out. This feature is implemented for HostConnection but not for HostConnectionPool, which means that it will only work for clusters which use protocol v3 or newer. This fix is heavily inspired by the fix for JAVA-1519.
Thanks for the review @absurdfarce. |
@absurdfarce ping |
@haaawk ACKed. Thanks for the ping, and (again) my apologies for the delay. I think we're in a good state here but I wanted to give this one more pass before we called it done. I'll try to get that wrapped up today. |
try: | ||
callback, decoder, result_metadata = self._requests.pop(stream_id) | ||
# This can only happen if the stream_id was | ||
# removed due to an OperationTimedOut | ||
except KeyError: | ||
with self.lock: | ||
self.request_ids.append(stream_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be very clear... this change isn't explicitly connected to the problem you're trying to address in the PR, correct @haaawk ? The intent here is that if we get a stream ID that doesn't match up to a request we know about we should be safe to re-use that stream ID regardless of other conditions... do I have that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 12c9c30:
- If a request R has timed out due to a client side timeout, we stopped returning its stream ID to the pool of available stream IDs
- If the same request R receives a response after R has timed out on a client side, the stream ID of R will be put back to the pool of available stream IDs
This line performs (2).
if we get a stream ID that doesn't match up to a request we know about we should be safe to re-use that stream ID regardless of other conditions... do I have that right?
Yes. The assumption is that if we got a stream ID we don't know then it must be a stream ID of a request that has already timed out due to a client side timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, thanks for the verification @haaawk !
Hey @haaawk , after looking this over again I'm fine with the current state of things. I'll leave this open through today to give you a chance to reply to any of my more recent comments if you feel there's something to say but otherwise this will likely get merged tonight or tomorrow morning. |
Thanks @absurdfarce. I commented on #1114 (comment) |
FTR The ticket describing the test failures described above is https://datastax-oss.atlassian.net/browse/PYTHON-1287 |
…to sync_with_upstream * 'master' of https://github.com/datastax/python-driver: Merge pull request datastax#1126 from eamanu/fix-typos PYTHON-1294: Upgrade importlib-metadata to a much newer version Add tests for recent addition of execution profile support to cassandra.concurrent Merge pull request datastax#1122 from andy-slac/concurrent-execution-profiles Merge pull request datastax#1119 from datastax/python-1290 Merge pull request datastax#1117 from datastax/remove_unittest2 Removing file unexpectedly included in previous PR Merge pull request datastax#1114 from haaawk/stream_ids_fix Merge pull request datastax#1116 from Orenef11/fix_default_argument_value Comment update following off of datastax#1110 Merge pull request datastax#1103 from numberly/fix_empty_paging Merge pull request datastax#1103 from psarna/fix_deprecation_in_tracing Fixes to the Travis build. (datastax#1111)
…to sync_with_upstream_2 * 'master' of https://github.com/datastax/python-driver: Merge pull request datastax#1126 from eamanu/fix-typos PYTHON-1294: Upgrade importlib-metadata to a much newer version Add tests for recent addition of execution profile support to cassandra.concurrent Merge pull request datastax#1122 from andy-slac/concurrent-execution-profiles Merge pull request datastax#1119 from datastax/python-1290 Merge pull request datastax#1117 from datastax/remove_unittest2 Removing file unexpectedly included in previous PR Merge pull request datastax#1114 from haaawk/stream_ids_fix Merge pull request datastax#1116 from Orenef11/fix_default_argument_value Comment update following off of datastax#1110 Merge pull request datastax#1103 from numberly/fix_empty_paging Merge pull request datastax#1103 from psarna/fix_deprecation_in_tracing Fixes to the Travis build. (datastax#1111)
Context:
The driver allows to send multiple requests concurrently using a single connection.
Requests are matched with the corresponding responses using stream ids.
Each request gets a unique stream id and the corresponding response contains the same stream id.
The pool of stream ids is limited and in CQL protocol v3 and above has 32768 values.
That means stream ids have to be reused.
Usually when the request gets a response, the stream id used by this request is returned to the pool.
Problem:
The driver has an ability to time out a request on a client side.
The problem is that when such situation happens, the stream id of the timed out request is returned to the pool of available stream ids and another request can pick up the same stream id.
Problematic situation looks as follows:
This leads to data being returned for one query being interpreted as a response to another query.
For example if user sessions are stored in the DB under a user_id primary key then one user could get session of another.
This PR fixes the problem by making sure that stream ids of requests that time out due to client-side timeouts are not returned to the pool of available stream ids. This can lead to a situation when the pool shrinks significantly which decreases the number of concurrent requests being able to be sent over this connection. To avoid such a bad state the PR also monitors the amount of orphaned stream ids and when it reaches 75% of the initial pool size, the connection is recreated and all stream ids become available again.