Skip to content

Commit 12c9c30

Browse files
piodulhaaawk
authored andcommitted
ResponseFuture: do not return the stream ID on client timeout
When a timeout occurs, the ResponseFuture associated with the query returns its stream ID to the associated connection's free stream ID pool - so that the stream ID can be immediately reused by another query. However, that it incorrect and dangerous. If query A times out before it receives a response from the cluster, a different query B might be issued on the same connection and stream. If response for query A arrives earlier than the response for query B, the first one might be misinterpreted as the response for query B. This commit changes the logic so that stream IDs are not returned on timeout - now, they are only returned after receiving a response.
1 parent 15d715f commit 12c9c30

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

cassandra/cluster.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -4361,8 +4361,11 @@ def _on_timeout(self, _attempts=0):
43614361

43624362
pool = self.session._pools.get(self._current_host)
43634363
if pool and not pool.is_shutdown:
4364-
with self._connection.lock:
4365-
self._connection.request_ids.append(self._req_id)
4364+
# Do not return the stream ID to the pool yet. We cannot reuse it
4365+
# because the node might still be processing the query and will
4366+
# return a late response to that query - if we used such stream
4367+
# before the response to the previous query has arrived, the new
4368+
# query could get a response from the old query
43664369

43674370
pool.return_connection(self._connection)
43684371

cassandra/connection.py

+2
Original file line numberDiff line numberDiff line change
@@ -1193,6 +1193,8 @@ def process_msg(self, header, body):
11931193
# This can only happen if the stream_id was
11941194
# removed due to an OperationTimedOut
11951195
except KeyError:
1196+
with self.lock:
1197+
self.request_ids.append(stream_id)
11961198
return
11971199

11981200
try:

tests/unit/test_response_future.py

+27
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
except ImportError:
1818
import unittest # noqa
1919

20+
from collections import deque
21+
from threading import RLock
2022
from mock import Mock, MagicMock, ANY
2123

2224
from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut
@@ -604,3 +606,28 @@ def test_repeat_orig_query_after_succesful_reprepare(self):
604606
rf._query = Mock(return_value=True)
605607
rf._execute_after_prepare('host', None, None, response)
606608
rf._query.assert_called_once_with('host')
609+
610+
def test_timeout_does_not_release_stream_id(self):
611+
"""
612+
Make sure that stream ID is not reused immediately after client-side
613+
timeout. Otherwise, a new request could reuse the stream ID and would
614+
risk getting a response for the old, timed out query.
615+
"""
616+
session = self.make_basic_session()
617+
session.cluster._default_load_balancing_policy.make_query_plan.return_value = [Mock(endpoint='ip1'), Mock(endpoint='ip2')]
618+
pool = self.make_pool()
619+
session._pools.get.return_value = pool
620+
connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque())
621+
pool.borrow_connection.return_value = (connection, 1)
622+
623+
rf = self.make_response_future(session)
624+
rf.send_request()
625+
626+
connection._requests[1] = (connection._handle_options_response, ProtocolHandler.decode_message, [])
627+
628+
rf._on_timeout()
629+
pool.return_connection.assert_called_once_with(connection)
630+
self.assertRaisesRegexp(OperationTimedOut, "Client request timeout", rf.result)
631+
632+
assert len(connection.request_ids) == 0, \
633+
"Request IDs should be empty but it's not: {}".format(connection.request_ids)

0 commit comments

Comments
 (0)