Skip to content

fir(test/__init__.py): Ignoring DeprecationWarning from "cassandra.io.asyncioreactor" #1118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 191 additions & 90 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import abc
import sys

import six

from cassandra.connection import Connection, ConnectionShutdown

import asyncio
Expand All @@ -9,24 +14,16 @@


log = logging.getLogger(__name__)
is_at_least_python_version_3_5 = sys.version_info >= (3, 5)


# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
# managed coroutines are generator-based, not native coroutines. See PEP 492:
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects

if not hasattr(asyncio, "run_coroutine_threadsafe"):
raise ImportError("Cannot use asyncioreactor without access to asyncio.run_coroutine_threadsafe"
" (added in 3.4.6 and 3.5.1)")

try:
asyncio.run_coroutine_threadsafe
except AttributeError:
raise ImportError(
'Cannot use asyncioreactor without access to '
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
)


class AsyncioTimer(object):
@six.add_metaclass(abc.ABCMeta)
class BaseAsyncioTimer(object):
"""
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
but with a slightly different API due to limitations in the underlying
Expand All @@ -45,12 +42,6 @@ def __init__(self, timeout, callback, loop):
loop=loop)
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)

@staticmethod
@asyncio.coroutine
def _call_delayed_coro(timeout, callback, loop):
yield from asyncio.sleep(timeout, loop=loop)
return callback()

def __lt__(self, other):
try:
return self._handle < other._handle
Expand All @@ -66,15 +57,21 @@ def finish(self):
raise NotImplementedError('{} is not compatible with TimerManager and '
'does not implement .finish()')

@staticmethod
@abc.abstractmethod
def _call_delayed_coro(timeout, callback, loop):
pass

class AsyncioConnection(Connection):
"""
An experimental implementation of :class:`.Connection` that uses the
``asyncio`` module in the Python standard library for its event loop.

Note that it requires ``asyncio`` features that were only introduced in the
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
@six.add_metaclass(abc.ABCMeta)
class BaseAsyncioConnection(Connection):
"""
An experimental implementation of :class:`.Connection` that uses the
``asyncio`` module in the Python standard library for its event loop.

Note that it requires ``asyncio`` features that were only introduced in the
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
"""

_loop = None
_pid = os.getpid()
Expand Down Expand Up @@ -136,26 +133,6 @@ def close(self):
self._close(), loop=self._loop
)

@asyncio.coroutine
def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
# don't leave in-progress operations hanging
self.connected_event.set()

def push(self, data):
buff_size = self.out_buffer_size
if len(data) > buff_size:
Expand All @@ -174,52 +151,176 @@ def push(self, data):
# avoid races/hangs by just scheduling this, not using threadsafe
self._loop.create_task(self._push_msg(chunks))

@asyncio.coroutine
def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with (yield from self._write_queue_lock):
for chunk in chunks:
self._write_queue.put_nowait(chunk)
@abc.abstractmethod
def _close(self):
pass

@abc.abstractmethod
def _push_msg(self, chunks):
pass

@asyncio.coroutine
@abc.abstractmethod
def handle_write(self):
while True:
try:
next_msg = yield from self._write_queue.get()
if next_msg:
yield from self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return
pass

@asyncio.coroutine
@abc.abstractmethod
def handle_read(self):
while True:
try:
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
yield
continue
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
return # leave the read loop
except asyncio.CancelledError:
return

if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
pass


if is_at_least_python_version_3_5:
class AsyncioTimer(BaseAsyncioTimer):
@staticmethod
async def _call_delayed_coro(timeout, callback, loop):
await asyncio.sleep(timeout, loop=loop)
return callback()

class AsyncioConnection(BaseAsyncioConnection):
async def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
# don't leave in-progress operations hanging
self.connected_event.set()

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

async def handle_write(self):
while True:
try:
next_msg = await self._write_queue.get()
if next_msg:
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return

async def handle_read(self):
while True:
try:
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
pass
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
return # leave the read loop
except asyncio.CancelledError:
return

if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
else:
class AsyncioTimer(BaseAsyncioTimer):
@staticmethod
@asyncio.coroutine
def _call_delayed_coro(timeout, callback, loop):
yield from asyncio.sleep(timeout, loop=loop)
return callback()

class AsyncioConnection(BaseAsyncioConnection):
@asyncio.coroutine
def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
# don't leave in-progress operations hanging
self.connected_event.set()

log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
# don't leave in-progress operations hanging
self.connected_event.set()

@asyncio.coroutine
def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with (yield from self._write_queue_lock):
for chunk in chunks:
self._write_queue.put_nowait(chunk)

@asyncio.coroutine
def handle_write(self):
while True:
try:
next_msg = yield from self._write_queue.get()
if next_msg:
yield from self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return

@asyncio.coroutine
def handle_read(self):
while True:
try:
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
yield
continue
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
return # leave the read loop
except asyncio.CancelledError:
return

if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
8 changes: 2 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
geomet>=0.1,<0.3
six >=1.9
futures <=2.2.0
# Futures is not required for Python 3, but it works up through 2.2.0 (after which it introduced breaking syntax).
# This is left here to make sure install -r works with any runtime. When installing via setup.py, futures is omitted
# for Python 3, in favor of the standard library implementation.
# see PYTHON-393
six
futures; python_version < '3.0'
10 changes: 5 additions & 5 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_server_versions():
if cass_version is not None:
return (cass_version, cql_version)

c = TestCluster()
c = IntegrationTestCluster()
s = c.connect()
row = s.execute('SELECT cql_version, release_version FROM system.local')[0]

Expand Down Expand Up @@ -706,9 +706,9 @@ def setup_keyspace(ipformat=None, wait=True, protocol_version=None):
_protocol_version = PROTOCOL_VERSION

if not ipformat:
cluster = TestCluster(protocol_version=_protocol_version)
cluster = IntegrationTestCluster(protocol_version=_protocol_version)
else:
cluster = TestCluster(contact_points=["::1"], protocol_version=_protocol_version)
cluster = IntegrationTestCluster(contact_points=["::1"], protocol_version=_protocol_version)
session = cluster.connect()

try:
Expand Down Expand Up @@ -802,7 +802,7 @@ def create_keyspace(cls, rf):

@classmethod
def common_setup(cls, rf, keyspace_creation=True, create_class_table=False, **cluster_kwargs):
cls.cluster = TestCluster(**cluster_kwargs)
cls.cluster = IntegrationTestCluster(**cluster_kwargs)
cls.session = cls.cluster.connect(wait_for_all_pools=True)
cls.ks_name = cls.__name__.lower()
if keyspace_creation:
Expand Down Expand Up @@ -990,7 +990,7 @@ def assert_startswith(s, prefix):
)


class TestCluster(object):
class IntegrationTestCluster(object):
DEFAULT_PROTOCOL_VERSION = default_protocol_version
DEFAULT_CASSANDRA_IP = CASSANDRA_IP
DEFAULT_ALLOW_BETA = ALLOW_BETA_PROTOCOL
Expand Down
Loading