Skip to content

PYTHON-1290 Convert asyncio reactor away from @asyncio.coroutine #1119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ def __init__(self, timeout, callback, loop):
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)

@staticmethod
@asyncio.coroutine
def _call_delayed_coro(timeout, callback, loop):
yield from asyncio.sleep(timeout, loop=loop)
async def _call_delayed_coro(timeout, callback, loop):
await asyncio.sleep(timeout, loop=loop)
return callback()

def __lt__(self, other):
Expand Down Expand Up @@ -136,8 +135,7 @@ def close(self):
self._close(), loop=self._loop
)

@asyncio.coroutine
def _close(self):
async def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
Expand Down Expand Up @@ -174,40 +172,39 @@ def push(self, data):
# avoid races/hangs by just scheduling this, not using threadsafe
self._loop.create_task(self._push_msg(chunks))

@asyncio.coroutine
def _push_msg(self, chunks):
async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with (yield from self._write_queue_lock):
with await self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)


@asyncio.coroutine
def handle_write(self):
async def handle_write(self):
while True:
try:
next_msg = yield from self._write_queue.get()
next_msg = await self._write_queue.get()
if next_msg:
yield from self._loop.sock_sendall(self._socket, next_msg)
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return

@asyncio.coroutine
def handle_read(self):
async def handle_read(self):
while True:
try:
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
yield
# Apparently the preferred way to yield to the event loop from within
# a native coroutine based on https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
continue
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
Expand Down