From 7211223ac911ab72f98b35ca9c4d0bd56043223b Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Tue, 25 Jan 2022 10:55:50 -0600 Subject: [PATCH 1/4] Removal of @asyncio.coroutine annotations. Still need to sort out some test issues. --- cassandra/io/asyncioreactor.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 7cb0444a32..87a068c10c 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -46,9 +46,8 @@ def __init__(self, timeout, callback, loop): self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) @staticmethod - @asyncio.coroutine - def _call_delayed_coro(timeout, callback, loop): - yield from asyncio.sleep(timeout, loop=loop) + async def _call_delayed_coro(timeout, callback, loop): + await asyncio.sleep(timeout, loop=loop) return callback() def __lt__(self, other): @@ -136,8 +135,7 @@ def close(self): self._close(), loop=self._loop ) - @asyncio.coroutine - def _close(self): + async def _close(self): log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) if self._write_watcher: self._write_watcher.cancel() @@ -174,21 +172,19 @@ def push(self, data): # avoid races/hangs by just scheduling this, not using threadsafe self._loop.create_task(self._push_msg(chunks)) - @asyncio.coroutine - def _push_msg(self, chunks): + async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue - with (yield from self._write_queue_lock): + with await self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) - @asyncio.coroutine - def handle_write(self): + async def handle_write(self): while True: try: - next_msg = yield from self._write_queue.get() + next_msg = await self._write_queue.get() if next_msg: - yield from self._loop.sock_sendall(self._socket, next_msg) + await self._loop.sock_sendall(self._socket, next_msg) except socket.error as err: log.debug("Exception in send for %s: %s", self, err) self.defunct(err) @@ -196,11 +192,10 @@ def handle_write(self): except asyncio.CancelledError: return - @asyncio.coroutine - def handle_read(self): + async def handle_read(self): while True: try: - buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size) + buf = await self._loop.sock_recv(self._socket, self.in_buffer_size) self._iobuf.write(buf) # sock_recv expects EWOULDBLOCK if socket provides no data, but # nonblocking ssl sockets raise these instead, so we handle them From c89d7fa81179218631d8fbc781079e6b7ca6aec6 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Tue, 25 Jan 2022 13:17:32 -0600 Subject: [PATCH 2/4] Convert async fns to coroutines --- cassandra/io/asyncioreactor.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 87a068c10c..27da1c4744 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -10,13 +10,6 @@ log = logging.getLogger(__name__) - -# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and -# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the -# managed coroutines are generator-based, not native coroutines. See PEP 492: -# https://www.python.org/dev/peps/pep-0492/#coroutine-objects - - try: asyncio.run_coroutine_threadsafe except AttributeError: @@ -40,9 +33,10 @@ def end(self): 'does not implement .end()') def __init__(self, timeout, callback, loop): - delayed = self._call_delayed_coro(timeout=timeout, + delayed = asyncio.wait_for(self._call_delayed_coro(timeout=timeout, callback=callback, - loop=loop) + loop=loop), + timeout=None) self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) @staticmethod @@ -96,10 +90,10 @@ def __init__(self, *args, **kwargs): # see initialize_reactor -- loop is running in a separate thread, so we # have to use a threadsafe call self._read_watcher = asyncio.run_coroutine_threadsafe( - self.handle_read(), loop=self._loop + asyncio.wait_for(self.handle_read(), timeout=None), loop=self._loop ) self._write_watcher = asyncio.run_coroutine_threadsafe( - self.handle_write(), loop=self._loop + asyncio.wait_for(self.handle_write(), timeout=None), loop=self._loop ) self._send_options_message() @@ -132,7 +126,7 @@ def close(self): # close from the loop thread to avoid races when removing file # descriptors asyncio.run_coroutine_threadsafe( - self._close(), loop=self._loop + asyncio.wait_for(self._close(), timeout=None), loop=self._loop ) async def _close(self): @@ -165,7 +159,7 @@ def push(self, data): if self._loop_thread.ident != get_ident(): asyncio.run_coroutine_threadsafe( - self._push_msg(chunks), + asyncio.wait_for(self._push_msg(chunks), timeout=None), loop=self._loop ) else: From 78e97a2a654f0e3d43395928160fa67e1b54e804 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Tue, 25 Jan 2022 13:29:21 -0600 Subject: [PATCH 3/4] Revert "Convert async fns to coroutines" This reverts commit c89d7fa81179218631d8fbc781079e6b7ca6aec6. --- cassandra/io/asyncioreactor.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 27da1c4744..87a068c10c 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -10,6 +10,13 @@ log = logging.getLogger(__name__) + +# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and +# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the +# managed coroutines are generator-based, not native coroutines. See PEP 492: +# https://www.python.org/dev/peps/pep-0492/#coroutine-objects + + try: asyncio.run_coroutine_threadsafe except AttributeError: @@ -33,10 +40,9 @@ def end(self): 'does not implement .end()') def __init__(self, timeout, callback, loop): - delayed = asyncio.wait_for(self._call_delayed_coro(timeout=timeout, + delayed = self._call_delayed_coro(timeout=timeout, callback=callback, - loop=loop), - timeout=None) + loop=loop) self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) @staticmethod @@ -90,10 +96,10 @@ def __init__(self, *args, **kwargs): # see initialize_reactor -- loop is running in a separate thread, so we # have to use a threadsafe call self._read_watcher = asyncio.run_coroutine_threadsafe( - asyncio.wait_for(self.handle_read(), timeout=None), loop=self._loop + self.handle_read(), loop=self._loop ) self._write_watcher = asyncio.run_coroutine_threadsafe( - asyncio.wait_for(self.handle_write(), timeout=None), loop=self._loop + self.handle_write(), loop=self._loop ) self._send_options_message() @@ -126,7 +132,7 @@ def close(self): # close from the loop thread to avoid races when removing file # descriptors asyncio.run_coroutine_threadsafe( - asyncio.wait_for(self._close(), timeout=None), loop=self._loop + self._close(), loop=self._loop ) async def _close(self): @@ -159,7 +165,7 @@ def push(self, data): if self._loop_thread.ident != get_ident(): asyncio.run_coroutine_threadsafe( - asyncio.wait_for(self._push_msg(chunks), timeout=None), + self._push_msg(chunks), loop=self._loop ) else: From 85aeb11c9251691c87a95dc42693ee467b000f99 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Tue, 25 Jan 2022 14:19:52 -0600 Subject: [PATCH 4/4] A fix for the yield-in-handle_read problem which seems at least sane --- cassandra/io/asyncioreactor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 87a068c10c..ab0e90ae09 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -202,7 +202,9 @@ async def handle_read(self): # ourselves by yielding to the event loop, where the socket will # get the reading/writing it "wants" before retrying except (ssl.SSLWantWriteError, ssl.SSLWantReadError): - yield + # Apparently the preferred way to yield to the event loop from within + # a native coroutine based on https://github.com/python/asyncio/issues/284 + await asyncio.sleep(0) continue except socket.error as err: log.debug("Exception during socket recv for %s: %s",