From 12c9c306d39989142b21d06a315f6657d42d1eef Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 24 Sep 2021 13:00:43 +0200 Subject: [PATCH 1/4] ResponseFuture: do not return the stream ID on client timeout When a timeout occurs, the ResponseFuture associated with the query returns its stream ID to the associated connection's free stream ID pool - so that the stream ID can be immediately reused by another query. However, that it incorrect and dangerous. If query A times out before it receives a response from the cluster, a different query B might be issued on the same connection and stream. If response for query A arrives earlier than the response for query B, the first one might be misinterpreted as the response for query B. This commit changes the logic so that stream IDs are not returned on timeout - now, they are only returned after receiving a response. --- cassandra/cluster.py | 7 +++++-- cassandra/connection.py | 2 ++ tests/unit/test_response_future.py | 27 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 7e101afba8..14af52b9fa 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4361,8 +4361,11 @@ def _on_timeout(self, _attempts=0): pool = self.session._pools.get(self._current_host) if pool and not pool.is_shutdown: - with self._connection.lock: - self._connection.request_ids.append(self._req_id) + # Do not return the stream ID to the pool yet. We cannot reuse it + # because the node might still be processing the query and will + # return a late response to that query - if we used such stream + # before the response to the previous query has arrived, the new + # query could get a response from the old query pool.return_connection(self._connection) diff --git a/cassandra/connection.py b/cassandra/connection.py index 0d8a50e76f..090ba12357 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -1193,6 +1193,8 @@ def process_msg(self, header, body): # This can only happen if the stream_id was # removed due to an OperationTimedOut except KeyError: + with self.lock: + self.request_ids.append(stream_id) return try: diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 98d2156079..6159668372 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -17,6 +17,8 @@ except ImportError: import unittest # noqa +from collections import deque +from threading import RLock from mock import Mock, MagicMock, ANY from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut @@ -604,3 +606,28 @@ def test_repeat_orig_query_after_succesful_reprepare(self): rf._query = Mock(return_value=True) rf._execute_after_prepare('host', None, None, response) rf._query.assert_called_once_with('host') + + def test_timeout_does_not_release_stream_id(self): + """ + Make sure that stream ID is not reused immediately after client-side + timeout. Otherwise, a new request could reuse the stream ID and would + risk getting a response for the old, timed out query. + """ + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = [Mock(endpoint='ip1'), Mock(endpoint='ip2')] + pool = self.make_pool() + session._pools.get.return_value = pool + connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque()) + pool.borrow_connection.return_value = (connection, 1) + + rf = self.make_response_future(session) + rf.send_request() + + connection._requests[1] = (connection._handle_options_response, ProtocolHandler.decode_message, []) + + rf._on_timeout() + pool.return_connection.assert_called_once_with(connection) + self.assertRaisesRegexp(OperationTimedOut, "Client request timeout", rf.result) + + assert len(connection.request_ids) == 0, \ + "Request IDs should be empty but it's not: {}".format(connection.request_ids) From 3906c00658cc94c032d24a7fea38e63f35b49a94 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 28 Sep 2021 20:08:54 +0200 Subject: [PATCH 2/4] Connection: fix tracking of in_flight requests This commit fixes tracking of in_flight requests. Before it, in case of a client-side timeout, the response ID was not returned to the pool, but the in_flight counter was decremented anyway. This counter is used to determine if there is a need to wait for stream IDs to be freed - without this patch, it could happen that the driver throught that it can initiate another request due to in_flight counter being low, but there weren't any free stream IDs to allocate, so an assertion was triggered and the connection was defuncted and opened again. Now, requests timed out on the client side are tracked in the orphaned_request_ids field, and the in_flight counter is decremented only after the response is received. --- cassandra/cluster.py | 4 +++- cassandra/connection.py | 12 ++++++++++++ cassandra/pool.py | 16 +++++++++------- tests/unit/test_response_future.py | 2 +- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 14af52b9fa..1b965418bf 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4366,8 +4366,10 @@ def _on_timeout(self, _attempts=0): # return a late response to that query - if we used such stream # before the response to the previous query has arrived, the new # query could get a response from the old query + with self._connection.lock: + self._connection.orphaned_request_ids.add(self._req_id) - pool.return_connection(self._connection) + pool.return_connection(self._connection, stream_was_orphaned=True) errors = self._errors if not errors: diff --git a/cassandra/connection.py b/cassandra/connection.py index 090ba12357..4ef536ee57 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -690,6 +690,7 @@ class Connection(object): # The current number of operations that are in flight. More precisely, # the number of request IDs that are currently in use. + # This includes orphaned requests. in_flight = 0 # Max concurrent requests allowed per connection. This is set optimistically high, allowing @@ -707,6 +708,11 @@ class Connection(object): # request_ids set highest_request_id = 0 + # Tracks the request IDs which are no longer waited on (timed out), but + # cannot be reused yet because the node might still send a response + # on this stream + orphaned_request_ids = None + is_defunct = False is_closed = False lock = None @@ -764,6 +770,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None, self._io_buffer = _ConnectionIOBuffer(self) self._continuous_paging_sessions = {} self._socket_writable = True + self.orphaned_request_ids = set() if ssl_options: self._check_hostname = bool(self.ssl_options.pop('check_hostname', False)) @@ -1188,6 +1195,11 @@ def process_msg(self, header, body): decoder = paging_session.decoder result_metadata = None else: + with self.lock: + if stream_id in self.orphaned_request_ids: + self.in_flight -= 1 + self.orphaned_request_ids.remove(stream_id) + try: callback, decoder, result_metadata = self._requests.pop(stream_id) # This can only happen if the stream_id was diff --git a/cassandra/pool.py b/cassandra/pool.py index cd27656046..b96abe7c1f 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -430,11 +430,12 @@ def borrow_connection(self, timeout): raise NoConnectionsAvailable("All request IDs are currently in use") - def return_connection(self, connection): - with connection.lock: - connection.in_flight -= 1 - with self._stream_available_condition: - self._stream_available_condition.notify() + def return_connection(self, connection, stream_was_orphaned=False): + if not stream_was_orphaned: + with connection.lock: + connection.in_flight -= 1 + with self._stream_available_condition: + self._stream_available_condition.notify() if connection.is_defunct or connection.is_closed: if connection.signaled_error and not self.shutdown_on_error: @@ -712,9 +713,10 @@ def _wait_for_conn(self, timeout): raise NoConnectionsAvailable() - def return_connection(self, connection): + def return_connection(self, connection, stream_was_orphaned=False): with connection.lock: - connection.in_flight -= 1 + if not stream_was_orphaned: + connection.in_flight -= 1 in_flight = connection.in_flight if connection.is_defunct or connection.is_closed: diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 6159668372..a3a9fde61b 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -626,7 +626,7 @@ def test_timeout_does_not_release_stream_id(self): connection._requests[1] = (connection._handle_options_response, ProtocolHandler.decode_message, []) rf._on_timeout() - pool.return_connection.assert_called_once_with(connection) + pool.return_connection.assert_called_once_with(connection, stream_was_orphaned=True) self.assertRaisesRegexp(OperationTimedOut, "Client request timeout", rf.result) assert len(connection.request_ids) == 0, \ From a10e5253ad0351d615712aa3eaf2e88f75dc6997 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 28 Sep 2021 20:10:15 +0200 Subject: [PATCH 3/4] Connection: notify owning pool about released orphaned streams Before this patch, the following situation could occur: 1. On a single connection, multiple requests are spawned up to the maximum concurrency, 2. We want to issue more requests but we need to wait on a condition variable because requests spawned in 1. took all stream IDs and we need to wait until some of them are freed, 3. All requests from point 1. time out on the client side - we cannot free their stream IDs until the database node responds, 4. Responses for requests issued in point 1. arrive, but the Connection class has no access to the condition variable mentioned in point 2., so no requests from point 2. are admitted, 5. Requests from point 2. waiting on the condition variable time out despite there are stream IDs available. This commit adds an _on_orphaned_stream_released field to the Connection class, and now it notifies the owning pool in case a timed out request receives a late response and a stream ID is freed by calling _on_orphaned_stream_released callback. --- cassandra/connection.py | 9 ++++++++- cassandra/pool.py | 23 +++++++++++++++++++---- tests/unit/test_host_connection_pool.py | 14 +++++++------- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 4ef536ee57..698ad81ae0 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -739,6 +739,8 @@ class Connection(object): _is_checksumming_enabled = False + _on_orphaned_stream_released = None + @property def _iobuf(self): # backward compatibility, to avoid any change in the reactors @@ -748,7 +750,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None, ssl_options=None, sockopts=None, compression=True, cql_version=None, protocol_version=ProtocolVersion.MAX_SUPPORTED, is_control_connection=False, user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False, no_compact=False, - ssl_context=None): + ssl_context=None, on_orphaned_stream_released=None): # TODO next major rename host to endpoint and remove port kwarg. self.endpoint = host if isinstance(host, EndPoint) else DefaultEndPoint(host, port) @@ -771,6 +773,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None, self._continuous_paging_sessions = {} self._socket_writable = True self.orphaned_request_ids = set() + self._on_orphaned_stream_released = on_orphaned_stream_released if ssl_options: self._check_hostname = bool(self.ssl_options.pop('check_hostname', False)) @@ -1195,10 +1198,14 @@ def process_msg(self, header, body): decoder = paging_session.decoder result_metadata = None else: + need_notify_of_release = False with self.lock: if stream_id in self.orphaned_request_ids: self.in_flight -= 1 self.orphaned_request_ids.remove(stream_id) + need_notify_of_release = True + if need_notify_of_release and self._on_orphaned_stream_released: + self._on_orphaned_stream_released() try: callback, decoder, result_metadata = self._requests.pop(stream_id) diff --git a/cassandra/pool.py b/cassandra/pool.py index b96abe7c1f..b7dd5c5598 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -399,7 +399,7 @@ def __init__(self, host, host_distance, session): return log.debug("Initializing connection for host %s", self.host) - self._connection = session.cluster.connection_factory(host.endpoint) + self._connection = session.cluster.connection_factory(host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) self._keyspace = session.keyspace if self._keyspace: self._connection.set_keyspace_blocking(self._keyspace) @@ -463,6 +463,14 @@ def return_connection(self, connection, stream_was_orphaned=False): self._is_replacing = True self._session.submit(self._replace, connection) + def on_orphaned_stream_released(self): + """ + Called when a response for an orphaned stream (timed out on the client + side) was received. + """ + with self._stream_available_condition: + self._stream_available_condition.notify() + def _replace(self, connection): with self._lock: if self.is_shutdown: @@ -470,7 +478,7 @@ def _replace(self, connection): log.debug("Replacing connection (%s) to %s", id(connection), self.host) try: - conn = self._session.cluster.connection_factory(self.host.endpoint) + conn = self._session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) if self._keyspace: conn.set_keyspace_blocking(self._keyspace) self._connection = conn @@ -549,7 +557,7 @@ def __init__(self, host, host_distance, session): log.debug("Initializing new connection pool for host %s", self.host) core_conns = session.cluster.get_core_connections_per_host(host_distance) - self._connections = [session.cluster.connection_factory(host.endpoint) + self._connections = [session.cluster.connection_factory(host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) for i in range(core_conns)] self._keyspace = session.keyspace @@ -653,7 +661,7 @@ def _add_conn_if_under_max(self): log.debug("Going to open new connection to host %s", self.host) try: - conn = self._session.cluster.connection_factory(self.host.endpoint) + conn = self._session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) if self._keyspace: conn.set_keyspace_blocking(self._session.keyspace) self._next_trash_allowed_at = time.time() + _MIN_TRASH_INTERVAL @@ -752,6 +760,13 @@ def return_connection(self, connection, stream_was_orphaned=False): else: self._signal_available_conn() + def on_orphaned_stream_released(self): + """ + Called when a response for an orphaned stream (timed out on the client + side) was received. + """ + self._signal_available_conn() + def _maybe_trash_connection(self, connection): core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) did_trash = False diff --git a/tests/unit/test_host_connection_pool.py b/tests/unit/test_host_connection_pool.py index e62488b400..0415079648 100644 --- a/tests/unit/test_host_connection_pool.py +++ b/tests/unit/test_host_connection_pool.py @@ -45,7 +45,7 @@ def test_borrow_and_return(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) c, request_id = pool.borrow_connection(timeout=0.01) self.assertIs(c, conn) @@ -64,7 +64,7 @@ def test_failed_wait_for_connection(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -82,7 +82,7 @@ def test_successful_wait_for_connection(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -110,7 +110,7 @@ def test_spawn_when_at_max(self): session.cluster.get_max_connections_per_host.return_value = 2 pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -133,7 +133,7 @@ def test_return_defunct_connection(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) conn.is_defunct = True @@ -152,7 +152,7 @@ def test_return_defunct_connection_on_down_host(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) conn.is_defunct = True @@ -173,7 +173,7 @@ def test_return_closed_connection(self): session.cluster.connection_factory.return_value = conn pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.connection_factory.assert_called_once_with(host.endpoint) + session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released) pool.borrow_connection(timeout=0.01) conn.is_closed = True From 0a9d1c625d621f9e97b13c2b377c153bb9da0927 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 25 Oct 2021 18:38:10 +0200 Subject: [PATCH 4/4] HostConnection: implement replacing overloaded connections In a situation of very high overload or poor networking conditions, it might happen that there is a large number of outstanding requests on a single connection. Each request reserves a stream ID which cannot be reused until a response for it arrives, even if the request already timed out on the client side. Because the pool of available stream IDs for a single connection is limited, such situation might cause the set of free stream IDs to shrink to a very small size (including zero), which will drastically reduce the available concurrency on the connection, or even render it unusable for some time. In order to prevent this, the following strategy is adopted: when the number of orphaned stream IDs reaches a certain threshold (e.g. 75% of all available stream IDs), the connection becomes marked as overloaded. Meanwhile, a new connection is opened - when it becomes available, it replaces the old one, and the old connection is moved to "trash" where it waits until all its outstanding requests either respond or time out. This feature is implemented for HostConnection but not for HostConnectionPool, which means that it will only work for clusters which use protocol v3 or newer. This fix is heavily inspired by the fix for JAVA-1519. --- cassandra/cluster.py | 2 + cassandra/connection.py | 9 ++++ cassandra/pool.py | 58 +++++++++++++++++++++--- tests/unit/.noseids | Bin 0 -> 30098 bytes tests/unit/test_host_connection_pool.py | 6 +-- tests/unit/test_response_future.py | 3 +- 6 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 tests/unit/.noseids diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 1b965418bf..b978f89567 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4368,6 +4368,8 @@ def _on_timeout(self, _attempts=0): # query could get a response from the old query with self._connection.lock: self._connection.orphaned_request_ids.add(self._req_id) + if len(self._connection.orphaned_request_ids) >= self._connection.orphaned_threshold: + self._connection.orphaned_threshold_reached = True pool.return_connection(self._connection, stream_was_orphaned=True) diff --git a/cassandra/connection.py b/cassandra/connection.py index 698ad81ae0..0869584663 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -713,6 +713,15 @@ class Connection(object): # on this stream orphaned_request_ids = None + # Set to true if the orphaned stream ID count cross configured threshold + # and the connection will be replaced + orphaned_threshold_reached = False + + # If the number of orphaned streams reaches this threshold, this connection + # will become marked and will be replaced with a new connection by the + # owning pool (currently, only HostConnection supports this) + orphaned_threshold = 3 * max_in_flight // 4 + is_defunct = False is_closed = False lock = None diff --git a/cassandra/pool.py b/cassandra/pool.py index b7dd5c5598..c82dfe9a6b 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -390,6 +390,10 @@ def __init__(self, host, host_distance, session): # this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool. self._stream_available_condition = Condition(self._lock) self._is_replacing = False + # Contains connections which shouldn't be used anymore + # and are waiting until all requests time out or complete + # so that we can dispose of them. + self._trash = set() if host_distance == HostDistance.IGNORED: log.debug("Not opening connection to ignored host %s", self.host) @@ -405,7 +409,7 @@ def __init__(self, host, host_distance, session): self._connection.set_keyspace_blocking(self._keyspace) log.debug("Finished initializing connection for host %s", self.host) - def borrow_connection(self, timeout): + def _get_connection(self): if self.is_shutdown: raise ConnectionException( "Pool for %s is shutdown" % (self.host,), self.host) @@ -413,12 +417,25 @@ def borrow_connection(self, timeout): conn = self._connection if not conn: raise NoConnectionsAvailable() + return conn + + def borrow_connection(self, timeout): + conn = self._get_connection() + if conn.orphaned_threshold_reached: + with self._lock: + if not self._is_replacing: + self._is_replacing = True + self._session.submit(self._replace, conn) + log.debug( + "Connection to host %s reached orphaned stream limit, replacing...", + self.host + ) start = time.time() remaining = timeout while True: with conn.lock: - if conn.in_flight < conn.max_request_id: + if not (conn.orphaned_threshold_reached and conn.is_closed) and conn.in_flight < conn.max_request_id: conn.in_flight += 1 return conn, conn.get_request_id() if timeout is not None: @@ -426,7 +443,10 @@ def borrow_connection(self, timeout): if remaining < 0: break with self._stream_available_condition: - self._stream_available_condition.wait(remaining) + if conn.orphaned_threshold_reached and conn.is_closed: + conn = self._get_connection() + else: + self._stream_available_condition.wait(remaining) raise NoConnectionsAvailable("All request IDs are currently in use") @@ -462,6 +482,16 @@ def return_connection(self, connection, stream_was_orphaned=False): return self._is_replacing = True self._session.submit(self._replace, connection) + else: + if connection in self._trash: + with connection.lock: + if connection.in_flight == len(connection.orphaned_request_ids): + with self._lock: + if connection in self._trash: + self._trash.remove(connection) + log.debug("Closing trashed connection (%s) to %s", id(connection), self.host) + connection.close() + return def on_orphaned_stream_released(self): """ @@ -486,9 +516,15 @@ def _replace(self, connection): log.warning("Failed reconnecting %s. Retrying." % (self.host.endpoint,)) self._session.submit(self._replace, connection) else: - with self._lock: - self._is_replacing = False - self._stream_available_condition.notify() + with connection.lock: + with self._lock: + if connection.orphaned_threshold_reached: + if connection.in_flight == len(connection.orphaned_request_ids): + connection.close() + else: + self._trash.add(connection) + self._is_replacing = False + self._stream_available_condition.notify() def shutdown(self): with self._lock: @@ -502,6 +538,16 @@ def shutdown(self): self._connection.close() self._connection = None + trash_conns = None + with self._lock: + if self._trash: + trash_conns = self._trash + self._trash = set() + + if trash_conns is not None: + for conn in self._trash: + conn.close() + def _set_keyspace_for_all_conns(self, keyspace, callback): if self.is_shutdown or not self._connection: return diff --git a/tests/unit/.noseids b/tests/unit/.noseids new file mode 100644 index 0000000000000000000000000000000000000000..1c956146fc04cc0b92d287530f5167fb38737082 GIT binary patch literal 30098 zcmcg#cX%At5r;rX140PI*noSJkR=PtHrO;7Ng z{r3iE(SLU~wua}JMXv#09638YJXUrpVyvv|dTlZKU$yR+9eZ2RHJ5}t<_pjF#sb^) z$MoWoZX2RFW>`*8q)#;ZaQ>r}MUW)VaP-Wn77W_1cut9MTBa%)#deXJ_}#9F|%rJD?(0 zc#akLrehN@kLC5YE>Ca;e1jd1p$t5?g@(GQxDKL?ph&L#X< z^PIpngl6crRuGyee642sWletA`BPzH=kZUhd~tY3KWt48#3DT5E{P;v-PWzTZyKJK zO8jx*fn{ijWFutvD@pm65uRiPGY!2z)fyJ+fU9*<`uQ(hUaXoZ6H z;McX&37kL|wE>+(geP($T-=IK*1a-fJc$!ybt^^%TxeOhw5p~1^Nw3Vyz@En z+UDenDmkkQI5945os;LgU;sW&G`;)X|xL=*poTIRCC84jE)FM`!tzk5Pvv1%-W#q?hb;(aqWu*W$xahlwvYQh+Iu8>?31^@~ z991mQ9IrS7SXJGpt^P_jtTmbQf}@!)PN5W6v!Tgl*CEAdw_n2sB>h?n5bCeF0@j`( zJQI#zD9CkecTH-jh1G?cmZauNqO0-F+ zDY{Ngo0mh7z(zI#BiRI;!h$g14BHC5i49u%m{}j3W<}UZHxunko7u4B0>QIaLc-|E zGz^QQ1-a$X#Y1=2g140oZv&~ZZGO_FwIB{HD{?g<6fBh?GQ{*$_JKDV)(FQ@+LM$F z+UHG~hRQQcO=>y>0Rn6*u#50#!Gws{u}q_m);-KdvV3=VbvgwMYvG`RDH{0F21AFi zH^Jd5P7Y&TICxcIP=X>sV1$#vmUIHCIgxFgz&o>mtBwt8-h?xDh5|-8Y3xp?Azf^j z9MKpjk!^WIQq{4Y6MjP;eBuN&&I!6T9h4HJkjM0en=t#UIg!_=BfFTiOJMHc#N3>Y znYP3yIB_$i>^qCXR^B%Dusb=CGdygq3~`_Zse=qJzl)Q@#&oIIAcm^8*w=1O)OCn@ zP^cYF=Fm+W!cnuR$><5UvuijZX`*eAUSJr)^X3C|rhed3Hu>6Rp+?oCE<1fRtP$>^ z0OXh|I?XwgCyt_UR1%3ItqR2IX4Qhqf)zkQ$u;egwkYb93E0ENw)}8-s1GiXru0qP z`*ny!Qkd=KWG3@Ly-6u%776J(PAU`Kg=9Nv0=b@(#o_KOJg^fmnig?royx+Mb^|B1 zow?LdCsiF%b|9n@oz#R~f^;;}xRH~_?%rr5GD0ztn>dN&8;q|PKuI7Bb?9*1W~OK# zCkuIA&00Z{1<4}HQ|;$OAI()p0zJ+qO`bV?C3>2^r56O{1gDmTPa%T|1$OCWQ7H-h!y(z94Q^|v+=2^ zHmTsC=#yrXjZe)XQnN6m5$?WaDpCH&%q=8!%+or>)Lkgf^qK!OsF6vnEU<8sX+!aAfO*eX}BD2O@q2n{S2s5O3+zyPA%)H{Gj4JmY}fV4zPaLt8$5?a(V`6w=2iV_+p z31v_adP#@s1P~|$^YfrB&pb^up2baLB9{g_5NPW4JROQwzy=K3^#d~LXLB>zmCK~& z1XhtWlOmPpaC6y{$E9MP62+!eFe#8w5(QJw<>oV%$A^*>P}3;G?gFNQn+#g)?c5}` z_d+5vmWah2+$^^C!oq_FSCoOqjXs=WF(VGMreAH6@FV+`1YXGWyRmr6+a3`JyzT;*|;5nsLzU=E%El?55q4E z!~<6RY%kP~-wZyNSP5i%fc&tt#>!$M(*U{zoTpf6$c`K$Wys<{CjojZO@o#uOYIoQ zrd zV!!*0VM8ylFqbgli%%CI_C3tlBMH-^(@eKh-iw(DP@kp}e1bswS>`3-wSApseDEq- z^UZ=w@UJTW-u$EyPN4{9EXZoliBtYQc;Ns$u?-!|8Z=Dcg&Hgw8E^+0OWo>9J-kF= zK-Wf7H!lsZ>z|#f^ilz3MLR+DVIv%wls$q-Rg#MOD#1DXP=bN>UA&Sj3tnI+FIy&; zg@@2BZ|i_{#mNAr2+wF}^A_HI(EKu7$|*Y*?Oyk?;0|@*LTVqpAlyC2-6)_`k z*RgOr(i1l*iL??>Dgv&`Tq>~Bpu}F!!s^&^SvA!e@p=Obubc%N*)Ez+e8siF0(*V7 zIN%8#yp;voX?^ygtxMs479M+YctrO?<$mC44z#zr*uxU9<2SO9*gFUbr5&`;2Uu9- ztm(D|jV`sS(s!dSex>uxLrYg@xAmj5f{FlLN^(TLv4g1u=LaxBHb64YuGMe~@`fcI4NI`$jE zmVkI~W5Ju4l^v3Cm>xxPnQlZ-%gWnX;IHW)K6V}thBppydU0$ca9xln9bi_W`jRIs zuzU46G^I^Oe=+D@zf=btaRagoXoIgnM-!bQ-m!cx3;&e|m~>2hCjnfj3tcG~6TrCw zmIy(=lM8xx#&px6wUV%fi)T)>#&>a%xM@HVnrAt+s44#4Tx@pr$0o_)JzN~N^~S-4 z`Yt>hiOP5{7j%ynp`cqW;vp^)Jz7Kui581^9~YY*Eh1u*w21c)$U*hnXZkDkIn?;U zQq*4#31}`X2%R3~kbVn}j2`sbfqxHdgSM;>ET7QA1&_f>b}q1dy3lBHqpZLOxv+fFpc`{RF$D?t32>Sp;X>S=ix}AA6g+(a zn;=ZrgC`-R1NtZz`qmgdj!+6vSZD@f==4K-ta3pMSL>QF3Cr{KFNi;Ax*2c<7+0CXz(c^8B{&Y z1(@&=u|h$h*bQ!-o`AD*i+Df9g^T-V$89NHAiBuiq zHKc?^8QLI$5w&^mI=UKwll?3gY+?(HH67SqP^~)fJRHdUq=h$}qm}eI zE?|t(I#i7>TBoWT=lppt=!q%$X!f*SoWYt(Jn`r(7db3+oA8>35BGtQ!56p~3}-TE zB107hF3I@xJk%GtaEH3%g4cAcC0bEm;(}dGWxrU=s`Ter@K}o4i@w}vYpD;%P+X9{ z>rm=WnG(f($?n&TagA^c><_;=%?wo~+=J9YMaP;T6a-QKU*RLUa17(XUP#D0)G>*# za+4VEE(za(`|eJuo{+`YxM`p#%2EjIJXnF$)b>ut_^)#l*wF(4y2DRzanLhEA~vhY_5hd!kq<|)3>;39%i8#(fc+xz5RpGgM$$@ zvf_0`L8+tfaPzux5MHjRKz*WVa^K}9g_ps*xr6rANViMXf$n?Ugk}dJ1OW~Gng$CV z2$5$#N=usaS_uve^dQCexrrVagecW8nhR7wz=tdJ2i)B9_L|7at8F)NIR7Cxjfor) z)pXORHWbA}FS&xwSSXMmaWmO72ow1{1M>MXHy?LEJ{CM;QP7P=thz)0kXy&3JD9l? zq6hUKD9Kc?BNl&aNy~D;dHjTX9(g`3Q^T5DH8jg{7UeDDpK_DBejwp`i)OW|idaA6 zrZhJYr5@bc&$*dyp}S&mk^oV}M+v&5)luvFMR-I1m9FUMCEs6Ii6@6-p{w~Z)VXGD zbGqa37!A;pN4H-to5{j`cv9Gu51BZ%{fY^3Bo)!K&1O2U?R5OeK)_M&9i7}dg&utj zZR}vFQG-U@$OlT2*?P5I;F&NQg^uOd%OR1cCpN>TmN9D6+7sE|@D}@5WKl1a~nae}k1h#KtEv{89f8r;zs~0kmCB%wg8=F7#vzZ)>4c_v{ z%6{{3_7{F?qq)@Jp-_OYL7P4NDh>$y=(mjgm7m4ld=~Kyu#AQMH-1WO3p?R;Q$5|^ z`5EkJ6-t6ZzL)z4Kb6gyRLboh?w|bVBl+l^-tAxf9L6&_