From f3f3aa0d6ad3c26484d377d0ba31f77aa4d19b80 Mon Sep 17 00:00:00 2001 From: Efraimov Oren Date: Tue, 14 Dec 2021 19:04:29 +0200 Subject: [PATCH 1/4] fix(cassandra/io/asyncioreactor.py): Using async and await inside @asyncio.coroutine * Replace all places that use old async node (`@asyncio.coroutine` and `yield from`) and use a new Python syntax (Only for Python v3.5+) * Save the old code to allow users with Python v2+ to run our tests. * requirements.txt: * Install `futures` package only for Python v2 * Use the latest `six` package version --- cassandra/io/asyncioreactor.py | 281 ++++++++++++++++++++++----------- requirements.txt | 8 +- 2 files changed, 193 insertions(+), 96 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 7cb0444a32..5abad786b3 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,3 +1,8 @@ +import abc +import sys + +import six + from cassandra.connection import Connection, ConnectionShutdown import asyncio @@ -9,24 +14,16 @@ log = logging.getLogger(__name__) +is_at_least_python_version_3_5 = sys.version_info >= (3, 5) -# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and -# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the -# managed coroutines are generator-based, not native coroutines. See PEP 492: -# https://www.python.org/dev/peps/pep-0492/#coroutine-objects - +if not hasattr(asyncio, "run_coroutine_threadsafe"): + raise ImportError("Cannot use asyncioreactor without access to asyncio.run_coroutine_threadsafe" + " (added in 3.4.6 and 3.5.1)") -try: - asyncio.run_coroutine_threadsafe -except AttributeError: - raise ImportError( - 'Cannot use asyncioreactor without access to ' - 'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)' - ) - -class AsyncioTimer(object): +@six.add_metaclass(abc.ABCMeta) +class BaseAsyncioTimer(object): """ An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer, but with a slightly different API due to limitations in the underlying @@ -45,12 +42,6 @@ def __init__(self, timeout, callback, loop): loop=loop) self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) - @staticmethod - @asyncio.coroutine - def _call_delayed_coro(timeout, callback, loop): - yield from asyncio.sleep(timeout, loop=loop) - return callback() - def __lt__(self, other): try: return self._handle < other._handle @@ -66,15 +57,21 @@ def finish(self): raise NotImplementedError('{} is not compatible with TimerManager and ' 'does not implement .finish()') + @staticmethod + @abc.abstractmethod + def _call_delayed_coro(timeout, callback, loop): + pass -class AsyncioConnection(Connection): - """ - An experimental implementation of :class:`.Connection` that uses the - ``asyncio`` module in the Python standard library for its event loop. - Note that it requires ``asyncio`` features that were only introduced in the - 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1. +@six.add_metaclass(abc.ABCMeta) +class BaseAsyncioConnection(Connection): """ + An experimental implementation of :class:`.Connection` that uses the + ``asyncio`` module in the Python standard library for its event loop. + + Note that it requires ``asyncio`` features that were only introduced in the + 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1. + """ _loop = None _pid = os.getpid() @@ -136,26 +133,6 @@ def close(self): self._close(), loop=self._loop ) - @asyncio.coroutine - def _close(self): - log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) - if self._write_watcher: - self._write_watcher.cancel() - if self._read_watcher: - self._read_watcher.cancel() - if self._socket: - self._loop.remove_writer(self._socket.fileno()) - self._loop.remove_reader(self._socket.fileno()) - self._socket.close() - - log.debug("Closed socket to %s" % (self.endpoint,)) - - if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) - # don't leave in-progress operations hanging - self.connected_event.set() - def push(self, data): buff_size = self.out_buffer_size if len(data) > buff_size: @@ -174,52 +151,176 @@ def push(self, data): # avoid races/hangs by just scheduling this, not using threadsafe self._loop.create_task(self._push_msg(chunks)) - @asyncio.coroutine - def _push_msg(self, chunks): - # This lock ensures all chunks of a message are sequential in the Queue - with (yield from self._write_queue_lock): - for chunk in chunks: - self._write_queue.put_nowait(chunk) + @abc.abstractmethod + def _close(self): + pass + @abc.abstractmethod + def _push_msg(self, chunks): + pass - @asyncio.coroutine + @abc.abstractmethod def handle_write(self): - while True: - try: - next_msg = yield from self._write_queue.get() - if next_msg: - yield from self._loop.sock_sendall(self._socket, next_msg) - except socket.error as err: - log.debug("Exception in send for %s: %s", self, err) - self.defunct(err) - return - except asyncio.CancelledError: - return + pass - @asyncio.coroutine + @abc.abstractmethod def handle_read(self): - while True: - try: - buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size) - self._iobuf.write(buf) - # sock_recv expects EWOULDBLOCK if socket provides no data, but - # nonblocking ssl sockets raise these instead, so we handle them - # ourselves by yielding to the event loop, where the socket will - # get the reading/writing it "wants" before retrying - except (ssl.SSLWantWriteError, ssl.SSLWantReadError): - yield - continue - except socket.error as err: - log.debug("Exception during socket recv for %s: %s", - self, err) - self.defunct(err) - return # leave the read loop - except asyncio.CancelledError: - return - - if buf and self._iobuf.tell(): - self.process_io_buffer() - else: - log.debug("Connection %s closed by server", self) - self.close() - return + pass + + +if is_at_least_python_version_3_5: + class AsyncioTimer(BaseAsyncioTimer): + @staticmethod + async def _call_delayed_coro(timeout, callback, loop): + await asyncio.sleep(timeout, loop=loop) + return callback() + + class AsyncioConnection(BaseAsyncioConnection): + async def _close(self): + log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) + if self._write_watcher: + self._write_watcher.cancel() + if self._read_watcher: + self._read_watcher.cancel() + if self._socket: + self._loop.remove_writer(self._socket.fileno()) + self._loop.remove_reader(self._socket.fileno()) + self._socket.close() + + log.debug("Closed socket to %s" % (self.endpoint,)) + + if not self.is_defunct: + self.error_all_requests( + ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + # don't leave in-progress operations hanging + self.connected_event.set() + + async def _push_msg(self, chunks): + # This lock ensures all chunks of a message are sequential in the Queue + async with self._write_queue_lock: + for chunk in chunks: + self._write_queue.put_nowait(chunk) + + async def handle_write(self): + while True: + try: + next_msg = await self._write_queue.get() + if next_msg: + await self._loop.sock_sendall(self._socket, next_msg) + except socket.error as err: + log.debug("Exception in send for %s: %s", self, err) + self.defunct(err) + return + except asyncio.CancelledError: + return + + async def handle_read(self): + while True: + try: + buf = await self._loop.sock_recv(self._socket, self.in_buffer_size) + self._iobuf.write(buf) + # sock_recv expects EWOULDBLOCK if socket provides no data, but + # nonblocking ssl sockets raise these instead, so we handle them + # ourselves by yielding to the event loop, where the socket will + # get the reading/writing it "wants" before retrying + except (ssl.SSLWantWriteError, ssl.SSLWantReadError): + pass + except socket.error as err: + log.debug("Exception during socket recv for %s: %s", + self, err) + self.defunct(err) + return # leave the read loop + except asyncio.CancelledError: + return + + if buf and self._iobuf.tell(): + self.process_io_buffer() + else: + log.debug("Connection %s closed by server", self) + self.close() + return +else: + class AsyncioTimer(BaseAsyncioTimer): + @staticmethod + @asyncio.coroutine + def _call_delayed_coro(timeout, callback, loop): + yield from asyncio.sleep(timeout, loop=loop) + return callback() + + class AsyncioConnection(BaseAsyncioConnection): + @asyncio.coroutine + def _close(self): + log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) + if self._write_watcher: + self._write_watcher.cancel() + if self._read_watcher: + self._read_watcher.cancel() + if self._socket: + self._loop.remove_writer(self._socket.fileno()) + self._loop.remove_reader(self._socket.fileno()) + self._socket.close() + + log.debug("Closed socket to %s" % (self.endpoint,)) + + if not self.is_defunct: + self.error_all_requests( + ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + # don't leave in-progress operations hanging + self.connected_event.set() + + log.debug("Closed socket to %s" % (self.endpoint,)) + + if not self.is_defunct: + self.error_all_requests( + ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + # don't leave in-progress operations hanging + self.connected_event.set() + + @asyncio.coroutine + def _push_msg(self, chunks): + # This lock ensures all chunks of a message are sequential in the Queue + with (yield from self._write_queue_lock): + for chunk in chunks: + self._write_queue.put_nowait(chunk) + + @asyncio.coroutine + def handle_write(self): + while True: + try: + next_msg = yield from self._write_queue.get() + if next_msg: + yield from self._loop.sock_sendall(self._socket, next_msg) + except socket.error as err: + log.debug("Exception in send for %s: %s", self, err) + self.defunct(err) + return + except asyncio.CancelledError: + return + + @asyncio.coroutine + def handle_read(self): + while True: + try: + buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size) + self._iobuf.write(buf) + # sock_recv expects EWOULDBLOCK if socket provides no data, but + # nonblocking ssl sockets raise these instead, so we handle them + # ourselves by yielding to the event loop, where the socket will + # get the reading/writing it "wants" before retrying + except (ssl.SSLWantWriteError, ssl.SSLWantReadError): + yield + continue + except socket.error as err: + log.debug("Exception during socket recv for %s: %s", + self, err) + self.defunct(err) + return # leave the read loop + except asyncio.CancelledError: + return + + if buf and self._iobuf.tell(): + self.process_io_buffer() + else: + log.debug("Connection %s closed by server", self) + self.close() + return diff --git a/requirements.txt b/requirements.txt index f784fba1b9..6e7ef642ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,3 @@ geomet>=0.1,<0.3 -six >=1.9 -futures <=2.2.0 -# Futures is not required for Python 3, but it works up through 2.2.0 (after which it introduced breaking syntax). -# This is left here to make sure install -r works with any runtime. When installing via setup.py, futures is omitted -# for Python 3, in favor of the standard library implementation. -# see PYTHON-393 +six +futures; python_version < '3.0' \ No newline at end of file From e928c03129570abceb1915858cb88613f0dc2335 Mon Sep 17 00:00:00 2001 From: Efraimov Oren Date: Tue, 14 Dec 2021 19:08:37 +0200 Subject: [PATCH 2/4] fix(tests/integration/standard/test_cluster.py): Fixing "DeprecationWarning: invalid escape sequence \" warning --- tests/integration/standard/test_cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index c7d8266fd9..7f4a4ffd19 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -150,7 +150,8 @@ def test_raise_error_on_control_connection_timeout(self): get_node(1).pause() cluster = TestCluster(contact_points=['127.0.0.1'], connect_timeout=1) - with self.assertRaisesRegexp(NoHostAvailable, "OperationTimedOut\('errors=Timed out creating connection \(1 seconds\)"): + with self.assertRaisesRegexp(NoHostAvailable, + r"OperationTimedOut\('errors=Timed out creating connection \(1 seconds\)"): cluster.connect() cluster.shutdown() From f8171913309bb5e463810141cb3cffce2f264173 Mon Sep 17 00:00:00 2001 From: Efraimov Oren Date: Tue, 14 Dec 2021 19:09:00 +0200 Subject: [PATCH 3/4] fix(tests/integration/standard/test_metadata.py): Fixing "DeprecationWarning: invalid escape sequence \" warning --- tests/integration/standard/test_metadata.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index bd556f357d..ce6260f06e 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -1593,7 +1593,7 @@ def test_function_no_parameters(self): with self.VerifiedFunction(self, **kwargs) as vf: fn_meta = self.keyspace_function_meta[vf.signature] - self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*%s\(\) .*" % kwargs['name']) + self.assertRegexpMatches(fn_meta.as_cql_query(), r"CREATE FUNCTION.*%s\(\) .*" % kwargs['name']) def test_functions_follow_keyspace_alter(self): """ @@ -1641,12 +1641,13 @@ def test_function_cql_called_on_null(self): kwargs['called_on_null_input'] = True with self.VerifiedFunction(self, **kwargs) as vf: fn_meta = self.keyspace_function_meta[vf.signature] - self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) CALLED ON NULL INPUT RETURNS .*") + self.assertRegexpMatches(fn_meta.as_cql_query(), r"CREATE FUNCTION.*\) CALLED ON NULL INPUT RETURNS .*") kwargs['called_on_null_input'] = False with self.VerifiedFunction(self, **kwargs) as vf: fn_meta = self.keyspace_function_meta[vf.signature] - self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*") + self.assertRegexpMatches(fn_meta.as_cql_query(), + r"CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*") class AggregateMetadata(FunctionTest): From 39fcfbb2acb9e59bf35cb4c581a0789713f66cc5 Mon Sep 17 00:00:00 2001 From: Efraimov Oren Date: Tue, 14 Dec 2021 19:10:32 +0200 Subject: [PATCH 4/4] fix(tests/integration/__init__.py): Rename "TestCluster" to "IntegrationTestCluster" --- tests/integration/__init__.py | 10 +- tests/integration/advanced/__init__.py | 6 +- tests/integration/advanced/graph/__init__.py | 6 +- .../integration/advanced/graph/test_graph.py | 4 +- .../integration/advanced/test_adv_metadata.py | 4 +- tests/integration/advanced/test_auth.py | 12 +- .../integration/advanced/test_cont_paging.py | 4 +- .../test_cqlengine_where_operators.py | 6 +- .../advanced/test_unixsocketendpoint.py | 4 +- .../cqlengine/advanced/test_cont_paging.py | 6 +- .../cqlengine/connections/test_connection.py | 14 +- .../cqlengine/query/test_queryset.py | 4 +- .../statements/test_base_statement.py | 4 +- .../integration/cqlengine/test_connections.py | 6 +- tests/integration/long/test_consistency.py | 18 +-- tests/integration/long/test_failure_types.py | 6 +- tests/integration/long/test_ipv6.py | 12 +- tests/integration/long/test_large_data.py | 4 +- .../long/test_loadbalancingpolicies.py | 14 +- tests/integration/long/test_policies.py | 4 +- tests/integration/long/test_schema.py | 10 +- tests/integration/long/test_ssl.py | 12 +- .../integration/long/test_topology_change.py | 4 +- .../standard/test_authentication.py | 8 +- .../test_authentication_misconfiguration.py | 4 +- .../standard/test_client_warnings.py | 4 +- tests/integration/standard/test_cluster.py | 124 +++++++++--------- tests/integration/standard/test_concurrent.py | 4 +- tests/integration/standard/test_connection.py | 14 +- .../standard/test_control_connection.py | 6 +- .../standard/test_custom_cluster.py | 12 +- .../standard/test_custom_payload.py | 4 +- .../standard/test_custom_protocol_handler.py | 12 +- .../standard/test_cython_protocol_handlers.py | 10 +- tests/integration/standard/test_dse.py | 4 +- tests/integration/standard/test_metadata.py | 52 ++++---- tests/integration/standard/test_metrics.py | 14 +- tests/integration/standard/test_policies.py | 10 +- .../standard/test_prepared_statements.py | 14 +- tests/integration/standard/test_query.py | 38 +++--- .../integration/standard/test_query_paging.py | 4 +- tests/integration/standard/test_routing.py | 4 +- .../standard/test_row_factories.py | 8 +- .../standard/test_single_interface.py | 4 +- tests/integration/standard/test_types.py | 20 +-- tests/integration/standard/test_udts.py | 26 ++-- 46 files changed, 287 insertions(+), 287 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 70ec11c213..71f8c0347e 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -75,7 +75,7 @@ def get_server_versions(): if cass_version is not None: return (cass_version, cql_version) - c = TestCluster() + c = IntegrationTestCluster() s = c.connect() row = s.execute('SELECT cql_version, release_version FROM system.local')[0] @@ -706,9 +706,9 @@ def setup_keyspace(ipformat=None, wait=True, protocol_version=None): _protocol_version = PROTOCOL_VERSION if not ipformat: - cluster = TestCluster(protocol_version=_protocol_version) + cluster = IntegrationTestCluster(protocol_version=_protocol_version) else: - cluster = TestCluster(contact_points=["::1"], protocol_version=_protocol_version) + cluster = IntegrationTestCluster(contact_points=["::1"], protocol_version=_protocol_version) session = cluster.connect() try: @@ -802,7 +802,7 @@ def create_keyspace(cls, rf): @classmethod def common_setup(cls, rf, keyspace_creation=True, create_class_table=False, **cluster_kwargs): - cls.cluster = TestCluster(**cluster_kwargs) + cls.cluster = IntegrationTestCluster(**cluster_kwargs) cls.session = cls.cluster.connect(wait_for_all_pools=True) cls.ks_name = cls.__name__.lower() if keyspace_creation: @@ -990,7 +990,7 @@ def assert_startswith(s, prefix): ) -class TestCluster(object): +class IntegrationTestCluster(object): DEFAULT_PROTOCOL_VERSION = default_protocol_version DEFAULT_CASSANDRA_IP = CASSANDRA_IP DEFAULT_ALLOW_BETA = ALLOW_BETA_PROTOCOL diff --git a/tests/integration/advanced/__init__.py b/tests/integration/advanced/__init__.py index b2820e037b..112df8e0a1 100644 --- a/tests/integration/advanced/__init__.py +++ b/tests/integration/advanced/__init__.py @@ -26,7 +26,7 @@ from ccmlib import common from tests.integration import get_server_versions, BasicKeyspaceUnitTestCase, \ - drop_keyspace_shutdown_cluster, get_node, USE_CASS_EXTERNAL, TestCluster + drop_keyspace_shutdown_cluster, get_node, USE_CASS_EXTERNAL, IntegrationTestCluster from tests.integration import use_singledc, use_single_node, wait_for_node_socket, CASSANDRA_IP home = expanduser('~') @@ -106,7 +106,7 @@ def use_cluster_with_graph(num_nodes): # Wait for spark master to start up spark_master_http = ("localhost", 7080) common.check_socket_listening(spark_master_http, timeout=60) - tmp_cluster = TestCluster() + tmp_cluster = IntegrationTestCluster() # Start up remaining nodes. try: @@ -134,7 +134,7 @@ class BasicGeometricUnitTestCase(BasicKeyspaceUnitTestCase): @classmethod def common_dse_setup(cls, rf, keyspace_creation=True): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls.ks_name = cls.__name__.lower() if keyspace_creation: diff --git a/tests/integration/advanced/graph/__init__.py b/tests/integration/advanced/graph/__init__.py index 6c9458dd02..c523f24974 100644 --- a/tests/integration/advanced/graph/__init__.py +++ b/tests/integration/advanced/graph/__init__.py @@ -160,7 +160,7 @@ def session_setup(self): ) ) - self.cluster = TestCluster(execution_profiles={ + self.cluster = IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_GRAPH_DEFAULT: ep_graphson1, EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT: ep_analytics, "graphson1": ep_graphson1, @@ -275,7 +275,7 @@ def session_setup(self): ) ) - self.cluster = TestCluster(execution_profiles={ + self.cluster = IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_GRAPH_DEFAULT: ep_graphson1, EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT: ep_analytics, "graphson1": ep_graphson1, @@ -360,7 +360,7 @@ class BasicSharedGraphUnitTestCase(BasicKeyspaceUnitTestCase): @classmethod def session_setup(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls.ks_name = cls.__name__.lower() cls.cass_version, cls.cql_version = get_server_versions() diff --git a/tests/integration/advanced/graph/test_graph.py b/tests/integration/advanced/graph/test_graph.py index a0b6534c34..8fcc3282a8 100644 --- a/tests/integration/advanced/graph/test_graph.py +++ b/tests/integration/advanced/graph/test_graph.py @@ -25,7 +25,7 @@ from cassandra.util import SortedSet from tests.integration import DSE_VERSION, greaterthanorequaldse51, greaterthanorequaldse68, \ - requiredse, TestCluster + requiredse, IntegrationTestCluster from tests.integration.advanced.graph import BasicGraphUnitTestCase, GraphUnitTestCase, \ GraphProtocol, ClassicGraphSchema, CoreGraphSchema, use_single_node_with_graph @@ -150,7 +150,7 @@ def test_graph_profile(self): exec_short_timeout.graph_options.graph_name = self.graph_name # Add a single execution policy on cluster creation - local_cluster = TestCluster(execution_profiles={"exec_dif_factory": exec_dif_factory}) + local_cluster = IntegrationTestCluster(execution_profiles={"exec_dif_factory": exec_dif_factory}) local_session = local_cluster.connect() self.addCleanup(local_cluster.shutdown) diff --git a/tests/integration/advanced/test_adv_metadata.py b/tests/integration/advanced/test_adv_metadata.py index b3af6fa5d1..3819f16fcc 100644 --- a/tests/integration/advanced/test_adv_metadata.py +++ b/tests/integration/advanced/test_adv_metadata.py @@ -18,7 +18,7 @@ BasicSharedKeyspaceUnitTestCaseRF1, greaterthanorequaldse51, greaterthanorequaldse60, greaterthanorequaldse68, use_single_node, - DSE_VERSION, requiredse, TestCluster) + DSE_VERSION, requiredse, IntegrationTestCluster) try: import unittest2 as unittest @@ -392,4 +392,4 @@ def test_connection_on_graph_schema_error(self): """ % (self.ks_name,)) self.session.execute('TRUNCATE system_schema.vertices') - TestCluster().connect().shutdown() + IntegrationTestCluster().connect().shutdown() diff --git a/tests/integration/advanced/test_auth.py b/tests/integration/advanced/test_auth.py index 7e9aa8c23e..b8cf65c520 100644 --- a/tests/integration/advanced/test_auth.py +++ b/tests/integration/advanced/test_auth.py @@ -30,7 +30,7 @@ from cassandra.protocol import Unauthorized from cassandra.query import SimpleStatement from tests.integration import (get_cluster, greaterthanorequaldse51, - remove_cluster, requiredse, DSE_VERSION, TestCluster) + remove_cluster, requiredse, DSE_VERSION, IntegrationTestCluster) from tests.integration.advanced import ADS_HOME, use_single_node_with_graph from tests.integration.advanced.graph import reset_graph, ClassicGraphFixtures @@ -158,7 +158,7 @@ def connect_and_query(self, auth_provider, query=None): Runs a simple system query with the auth_provided specified. """ os.environ['KRB5_CONFIG'] = self.krb_conf - self.cluster = TestCluster(auth_provider=auth_provider) + self.cluster = IntegrationTestCluster(auth_provider=auth_provider) self.session = self.cluster.connect() query = query if query else "SELECT * FROM system.local" statement = SimpleStatement(query) @@ -321,7 +321,7 @@ def _remove_proxy_setup(self): os.environ['KRB5_CONFIG'] = self.krb_conf self.refresh_kerberos_tickets(self.cassandra_keytab, "cassandra@DATASTAX.COM", self.krb_conf) auth_provider = DSEGSSAPIAuthProvider(service='dse', qops=["auth"], principal='cassandra@DATASTAX.COM') - cluster = TestCluster(auth_provider=auth_provider) + cluster = IntegrationTestCluster(auth_provider=auth_provider) session = cluster.connect() session.execute("REVOKE PROXY.LOGIN ON ROLE '{0}' FROM '{1}'".format('charlie@DATASTAX.COM', 'bob@DATASTAX.COM')) @@ -339,7 +339,7 @@ def _setup_for_proxy(self, grant=True): os.environ['KRB5_CONFIG'] = self.krb_conf self.refresh_kerberos_tickets(self.cassandra_keytab, "cassandra@DATASTAX.COM", self.krb_conf) auth_provider = DSEGSSAPIAuthProvider(service='dse', qops=["auth"], principal='cassandra@DATASTAX.COM') - cluster = TestCluster(auth_provider=auth_provider) + cluster = IntegrationTestCluster(auth_provider=auth_provider) session = cluster.connect() stmts = [ @@ -404,7 +404,7 @@ def setUpClass(self): # Create users and test keyspace self.user_role = 'user1' self.server_role = 'server' - self.root_cluster = TestCluster(auth_provider=DSEPlainTextAuthProvider('cassandra', 'cassandra')) + self.root_cluster = IntegrationTestCluster(auth_provider=DSEPlainTextAuthProvider('cassandra', 'cassandra')) self.root_session = self.root_cluster.connect() stmts = [ @@ -470,7 +470,7 @@ def get_sasl_options(self, mechanism='PLAIN'): return sasl_options def connect_and_query(self, auth_provider, execute_as=None, query="SELECT * FROM testproxy.testproxy"): - self.cluster = TestCluster(auth_provider=auth_provider) + self.cluster = IntegrationTestCluster(auth_provider=auth_provider) self.session = self.cluster.connect() rs = self.session.execute(query, execute_as=execute_as) return rs diff --git a/tests/integration/advanced/test_cont_paging.py b/tests/integration/advanced/test_cont_paging.py index c5f1cbfff3..567aa5ab3c 100644 --- a/tests/integration/advanced/test_cont_paging.py +++ b/tests/integration/advanced/test_cont_paging.py @@ -13,7 +13,7 @@ # limitations under the License. from tests.integration import use_singledc, greaterthanorequaldse51, BasicSharedKeyspaceUnitTestCaseRF3WM, \ - DSE_VERSION, ProtocolVersion, greaterthanorequaldse60, requiredse, TestCluster + DSE_VERSION, ProtocolVersion, greaterthanorequaldse60, requiredse, IntegrationTestCluster import logging log = logging.getLogger(__name__) @@ -64,7 +64,7 @@ def tearDownClass(cls): @classmethod def create_cluster(cls): - cls.cluster_with_profiles = TestCluster(protocol_version=cls.protocol_version, execution_profiles=cls.execution_profiles) + cls.cluster_with_profiles = IntegrationTestCluster(protocol_version=cls.protocol_version, execution_profiles=cls.execution_profiles) cls.session_with_profiles = cls.cluster_with_profiles.connect(wait_for_all_pools=True) statements_and_params = zip( diff --git a/tests/integration/advanced/test_cqlengine_where_operators.py b/tests/integration/advanced/test_cqlengine_where_operators.py index 8ade3db09d..d5b215e802 100644 --- a/tests/integration/advanced/test_cqlengine_where_operators.py +++ b/tests/integration/advanced/test_cqlengine_where_operators.py @@ -25,7 +25,7 @@ create_keyspace_simple, drop_table, sync_table) from cassandra.cqlengine.statements import IsNotNull -from tests.integration import DSE_VERSION, requiredse, CASSANDRA_IP, greaterthanorequaldse60, TestCluster +from tests.integration import DSE_VERSION, requiredse, CASSANDRA_IP, greaterthanorequaldse60, IntegrationTestCluster from tests.integration.advanced import use_single_node_with_graph_and_solr from tests.integration.cqlengine import DEFAULT_KEYSPACE @@ -64,7 +64,7 @@ class IsNotNullTests(unittest.TestCase): @classmethod def setUpClass(cls): if DSE_VERSION: - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() @greaterthanorequaldse60 def test_is_not_null_execution(self): @@ -80,7 +80,7 @@ def test_is_not_null_execution(self): @test_category cqlengine """ - cluster = TestCluster() + cluster = IntegrationTestCluster() self.addCleanup(cluster.shutdown) session = cluster.connect() diff --git a/tests/integration/advanced/test_unixsocketendpoint.py b/tests/integration/advanced/test_unixsocketendpoint.py index 10cbc1b362..9ce8e0b5c6 100644 --- a/tests/integration/advanced/test_unixsocketendpoint.py +++ b/tests/integration/advanced/test_unixsocketendpoint.py @@ -25,7 +25,7 @@ from cassandra.policies import WhiteListRoundRobinPolicy, RoundRobinPolicy from tests import notwindows -from tests.integration import use_single_node, TestCluster +from tests.integration import use_single_node, IntegrationTestCluster log = logging.getLogger() log.setLevel('DEBUG') @@ -65,7 +65,7 @@ def setUpClass(cls): lbp = UnixSocketWhiteListRoundRobinPolicy([UNIX_SOCKET_PATH]) ep = ExecutionProfile(load_balancing_policy=lbp) endpoint = UnixSocketEndPoint(UNIX_SOCKET_PATH) - cls.cluster = TestCluster(contact_points=[endpoint], execution_profiles={EXEC_PROFILE_DEFAULT: ep}) + cls.cluster = IntegrationTestCluster(contact_points=[endpoint], execution_profiles={EXEC_PROFILE_DEFAULT: ep}) @classmethod def tearDownClass(cls): diff --git a/tests/integration/cqlengine/advanced/test_cont_paging.py b/tests/integration/cqlengine/advanced/test_cont_paging.py index 38b4355312..399dab64b0 100644 --- a/tests/integration/cqlengine/advanced/test_cont_paging.py +++ b/tests/integration/cqlengine/advanced/test_cont_paging.py @@ -27,7 +27,7 @@ from cassandra.cqlengine import columns, connection, models from cassandra.cqlengine.management import drop_table, sync_table from tests.integration import (DSE_VERSION, greaterthanorequaldse51, - greaterthanorequaldse60, requiredse, TestCluster) + greaterthanorequaldse60, requiredse, IntegrationTestCluster) class TestMultiKeyModel(models.Model): @@ -76,8 +76,8 @@ def tearDownClass(cls): def _create_cluster_with_cp_options(cls, name, cp_options): execution_profiles = {EXEC_PROFILE_DEFAULT: ExecutionProfile(continuous_paging_options=cp_options)} - cls.cluster_default = TestCluster(protocol_version=cls.protocol_version, - execution_profiles=execution_profiles) + cls.cluster_default = IntegrationTestCluster(protocol_version=cls.protocol_version, + execution_profiles=execution_profiles) cls.session_default = cls.cluster_default.connect(wait_for_all_pools=True) connection.register_connection(name, default=True, session=cls.session_default) cls.connections.add(name) diff --git a/tests/integration/cqlengine/connections/test_connection.py b/tests/integration/cqlengine/connections/test_connection.py index c46df31280..c31dec56c4 100644 --- a/tests/integration/cqlengine/connections/test_connection.py +++ b/tests/integration/cqlengine/connections/test_connection.py @@ -26,7 +26,7 @@ from cassandra.policies import RoundRobinPolicy from cassandra.query import dict_factory -from tests.integration import CASSANDRA_IP, PROTOCOL_VERSION, execute_with_long_wait_retry, local, TestCluster +from tests.integration import CASSANDRA_IP, PROTOCOL_VERSION, execute_with_long_wait_retry, local, IntegrationTestCluster from tests.integration.cqlengine.base import BaseCassEngTestCase from tests.integration.cqlengine import DEFAULT_KEYSPACE, setup_connection @@ -76,7 +76,7 @@ def setUpClass(cls): cls.keyspace1 = 'ctest1' cls.keyspace2 = 'ctest2' super(SeveralConnectionsTest, cls).setUpClass() - cls.setup_cluster = TestCluster() + cls.setup_cluster = IntegrationTestCluster() cls.setup_session = cls.setup_cluster.connect() ddl = "CREATE KEYSPACE {0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '{1}'}}".format(cls.keyspace1, 1) execute_with_long_wait_retry(cls.setup_session, ddl) @@ -93,7 +93,7 @@ def tearDownClass(cls): models.DEFAULT_KEYSPACE def setUp(self): - self.c = TestCluster() + self.c = IntegrationTestCluster() self.session1 = self.c.connect(keyspace=self.keyspace1) self.session1.row_factory = dict_factory self.session2 = self.c.connect(keyspace=self.keyspace2) @@ -149,7 +149,7 @@ def test_connection_with_legacy_settings(self): self.assertEqual(conn.cluster._config_mode, _ConfigMode.LEGACY) def test_connection_from_session_with_execution_profile(self): - cluster = TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)}) + cluster = IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)}) session = cluster.connect() connection.default() connection.set_session(session) @@ -157,7 +157,7 @@ def test_connection_from_session_with_execution_profile(self): self.assertEqual(conn.cluster._config_mode, _ConfigMode.PROFILES) def test_connection_from_session_with_legacy_settings(self): - cluster = TestCluster(load_balancing_policy=RoundRobinPolicy()) + cluster = IntegrationTestCluster(load_balancing_policy=RoundRobinPolicy()) session = cluster.connect() session.row_factory = dict_factory connection.set_session(session) @@ -165,7 +165,7 @@ def test_connection_from_session_with_legacy_settings(self): self.assertEqual(conn.cluster._config_mode, _ConfigMode.LEGACY) def test_uncommitted_session_uses_legacy(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() session.row_factory = dict_factory connection.set_session(session) @@ -186,7 +186,7 @@ def test_legacy_insert_query(self): self.assertEqual(ConnectionModel.objects(key=0)[0].some_data, 'text0') def test_execution_profile_insert_query(self): - cluster = TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)}) + cluster = IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)}) session = cluster.connect() connection.default() connection.set_session(session) diff --git a/tests/integration/cqlengine/query/test_queryset.py b/tests/integration/cqlengine/query/test_queryset.py index 6bc9d701b8..6569a222d1 100644 --- a/tests/integration/cqlengine/query/test_queryset.py +++ b/tests/integration/cqlengine/query/test_queryset.py @@ -42,7 +42,7 @@ from cassandra.util import uuid_from_time from cassandra.cqlengine.connection import get_session from tests.integration import PROTOCOL_VERSION, CASSANDRA_VERSION, greaterthancass20, greaterthancass21, \ - greaterthanorequalcass30, TestCluster + greaterthanorequalcass30, IntegrationTestCluster from tests.integration.cqlengine import execute_count, DEFAULT_KEYSPACE @@ -775,7 +775,7 @@ def test_custom_indexed_field_can_be_queried(self): with self.assertRaises(InvalidRequest): list(CustomIndexedTestModel.objects.filter(description__gte='test')) - with TestCluster().connect() as session: + with IntegrationTestCluster().connect() as session: session.execute("CREATE INDEX custom_index_cqlengine ON {}.{} (description)". format(DEFAULT_KEYSPACE, CustomIndexedTestModel._table_name)) diff --git a/tests/integration/cqlengine/statements/test_base_statement.py b/tests/integration/cqlengine/statements/test_base_statement.py index 474c45d02b..8ce93efd36 100644 --- a/tests/integration/cqlengine/statements/test_base_statement.py +++ b/tests/integration/cqlengine/statements/test_base_statement.py @@ -29,7 +29,7 @@ from tests.integration.cqlengine.base import BaseCassEngTestCase, TestQueryUpdateModel from tests.integration.cqlengine import DEFAULT_KEYSPACE -from tests.integration import greaterthanorequalcass3_10, TestCluster +from tests.integration import greaterthanorequalcass3_10, IntegrationTestCluster from cassandra.cqlengine.connection import execute @@ -115,7 +115,7 @@ def test_like_operator(self): @test_category data_types:object_mapper """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() self.addCleanup(cluster.shutdown) diff --git a/tests/integration/cqlengine/test_connections.py b/tests/integration/cqlengine/test_connections.py index 15adff3380..fcc13ef176 100644 --- a/tests/integration/cqlengine/test_connections.py +++ b/tests/integration/cqlengine/test_connections.py @@ -22,7 +22,7 @@ from tests.integration.cqlengine import setup_connection, DEFAULT_KEYSPACE from tests.integration.cqlengine.base import BaseCassEngTestCase from tests.integration.cqlengine.query import test_queryset -from tests.integration import local, CASSANDRA_IP, TestCluster +from tests.integration import local, CASSANDRA_IP, IntegrationTestCluster class TestModel(Model): @@ -226,7 +226,7 @@ def test_connection_creation_from_session(self): @test_category object_mapper """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() connection_name = 'from_session' conn.register_connection(connection_name, session=session) @@ -257,7 +257,7 @@ def test_connection_param_validation(self): @test_category object_mapper """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() with self.assertRaises(CQLEngineException): conn.register_connection("bad_coonection1", session=session, consistency="not_null") diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index bbf446861a..9cfd710bc6 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -22,7 +22,7 @@ from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy from cassandra.query import SimpleStatement -from tests.integration import use_singledc, execute_until_pass, TestCluster +from tests.integration import use_singledc, execute_until_pass, IntegrationTestCluster from tests.integration.long.utils import ( force_stop, create_schema, wait_for_down, wait_for_up, start, CoordinatorStats @@ -129,7 +129,7 @@ def _assert_reads_fail(self, session, keyspace, consistency_levels): pass def _test_tokenaware_one_node_down(self, keyspace, rf, accepted): - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(TokenAwarePolicy(RoundRobinPolicy()))} ) session = cluster.connect(wait_for_all_pools=True) @@ -181,7 +181,7 @@ def test_rfthree_tokenaware_one_node_down(self): def test_rfthree_tokenaware_none_down(self): keyspace = 'test_rfthree_tokenaware_none_down' - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(TokenAwarePolicy(RoundRobinPolicy()))} ) session = cluster.connect(wait_for_all_pools=True) @@ -205,7 +205,7 @@ def test_rfthree_tokenaware_none_down(self): cluster.shutdown() def _test_downgrading_cl(self, keyspace, rf, accepted): - cluster = TestCluster(execution_profiles={ + cluster = IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(TokenAwarePolicy(RoundRobinPolicy()), DowngradingConsistencyRetryPolicy()) }) @@ -249,7 +249,7 @@ def test_rftwo_downgradingcl(self): def test_rfthree_roundrobin_downgradingcl(self): keyspace = 'test_rfthree_roundrobin_downgradingcl' - with TestCluster(execution_profiles={ + with IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(RoundRobinPolicy(), DowngradingConsistencyRetryPolicy()) }) as cluster: @@ -257,7 +257,7 @@ def test_rfthree_roundrobin_downgradingcl(self): def test_rfthree_tokenaware_downgradingcl(self): keyspace = 'test_rfthree_tokenaware_downgradingcl' - with TestCluster(execution_profiles={ + with IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(TokenAwarePolicy(RoundRobinPolicy()), DowngradingConsistencyRetryPolicy()) }) as cluster: @@ -339,7 +339,7 @@ def test_pool_with_host_down(self): all_contact_points = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # Connect up and find out which host will bet queries routed to to first - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect(wait_for_all_pools=True) hosts = cluster.metadata.all_hosts() address = hosts[0].address @@ -349,13 +349,13 @@ def test_pool_with_host_down(self): # We now register a cluster that has it's Control Connection NOT on the node that we are shutting down. # We do this so we don't miss the event contact_point = '127.0.0.{0}'.format(self.get_node_not_x(node_to_stop)) - cluster = TestCluster(contact_points=[contact_point]) + cluster = IntegrationTestCluster(contact_points=[contact_point]) cluster.connect(wait_for_all_pools=True) try: force_stop(node_to_stop) wait_for_down(cluster, node_to_stop) # Attempt a query against that node. It should complete - cluster2 = TestCluster(contact_points=all_contact_points) + cluster2 = IntegrationTestCluster(contact_points=all_contact_points) session2 = cluster2.connect() session2.execute("SELECT * FROM system.local") finally: diff --git a/tests/integration/long/test_failure_types.py b/tests/integration/long/test_failure_types.py index 6bdff8d15d..5e4a798398 100644 --- a/tests/integration/long/test_failure_types.py +++ b/tests/integration/long/test_failure_types.py @@ -31,7 +31,7 @@ from tests.integration import ( use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node, start_cluster_wait_for_up, requiresmallclockgranularity, - local, CASSANDRA_VERSION, TestCluster) + local, CASSANDRA_VERSION, IntegrationTestCluster) try: @@ -83,7 +83,7 @@ def setUp(self): raise unittest.SkipTest( "Native protocol 4,0+ is required for custom payloads, currently using %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() self.nodes_currently_failing = [] self.node1, self.node2, self.node3 = get_cluster().nodes.values() @@ -332,7 +332,7 @@ def setUp(self): """ Setup sessions and pause node1 """ - self.cluster = TestCluster( + self.cluster = IntegrationTestCluster( execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=HostFilterPolicy( diff --git a/tests/integration/long/test_ipv6.py b/tests/integration/long/test_ipv6.py index a49c1677e8..a4793a6bac 100644 --- a/tests/integration/long/test_ipv6.py +++ b/tests/integration/long/test_ipv6.py @@ -19,7 +19,7 @@ from cassandra.io.asyncorereactor import AsyncoreConnection from tests import is_monkey_patched -from tests.integration import use_cluster, remove_cluster, TestCluster +from tests.integration import use_cluster, remove_cluster, IntegrationTestCluster if is_monkey_patched(): LibevConnection = -1 @@ -75,7 +75,7 @@ class IPV6ConnectionTest(object): connection_class = None def test_connect(self): - cluster = TestCluster(connection_class=self.connection_class, contact_points=['::1'], connect_timeout=10) + cluster = IntegrationTestCluster(connection_class=self.connection_class, contact_points=['::1'], connect_timeout=10) session = cluster.connect() future = session.execute_async("SELECT * FROM system.local") future.result() @@ -83,16 +83,16 @@ def test_connect(self): cluster.shutdown() def test_error(self): - cluster = TestCluster(connection_class=self.connection_class, contact_points=['::1'], port=9043, - connect_timeout=10) + cluster = IntegrationTestCluster(connection_class=self.connection_class, contact_points=['::1'], port=9043, + connect_timeout=10) self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*%s.*::1\', 9043.*Connection refused.*' % errno.ECONNREFUSED, cluster.connect) def test_error_multiple(self): if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2: raise unittest.SkipTest('localhost only resolves one address') - cluster = TestCluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043, - connect_timeout=10) + cluster = IntegrationTestCluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043, + connect_timeout=10) self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error', cluster.connect) diff --git a/tests/integration/long/test_large_data.py b/tests/integration/long/test_large_data.py index ce7e4398da..fe9c5a41cb 100644 --- a/tests/integration/long/test_large_data.py +++ b/tests/integration/long/test_large_data.py @@ -24,7 +24,7 @@ from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.query import dict_factory from cassandra.query import SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, IntegrationTestCluster from tests.integration.long.utils import create_schema try: @@ -61,7 +61,7 @@ def setUp(self): self.keyspace = 'large_data' def make_session_and_keyspace(self): - cluster = TestCluster(execution_profiles={ + cluster = IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(request_timeout=20, row_factory=dict_factory) }) session = cluster.connect() diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index f245569a80..32118cd668 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -30,7 +30,7 @@ ) from cassandra.query import SimpleStatement -from tests.integration import use_singledc, use_multidc, remove_cluster, TestCluster, greaterthanorequalcass40, notdse +from tests.integration import use_singledc, use_multidc, remove_cluster, IntegrationTestCluster, greaterthanorequalcass40, notdse from tests.integration.long.utils import (wait_for_up, create_schema, CoordinatorStats, force_stop, wait_for_down, decommission, start, @@ -63,7 +63,7 @@ def teardown_class(cls): def _connect_probe_cluster(self): if not self.probe_cluster: # distinct cluster so we can see the status of nodes ignored by the LBP being tested - self.probe_cluster = TestCluster( + self.probe_cluster = IntegrationTestCluster( schema_metadata_enabled=False, token_metadata_enabled=False, execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=RoundRobinPolicy())} @@ -94,8 +94,8 @@ def _wait_for_nodes_down(self, nodes, cluster=None): def _cluster_session_with_lbp(self, lbp): # create a cluster with no delay on events - cluster = TestCluster(topology_event_refresh_window=0, status_event_refresh_window=0, - execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=lbp)}) + cluster = IntegrationTestCluster(topology_event_refresh_window=0, status_event_refresh_window=0, + execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=lbp)}) session = cluster.connect() return cluster, session @@ -184,7 +184,7 @@ def test_token_aware_is_used_by_default(self): @test_category load_balancing:token_aware """ - cluster = TestCluster() + cluster = IntegrationTestCluster() self.addCleanup(cluster.shutdown) if murmur3 is not None: @@ -694,7 +694,7 @@ def test_white_list(self): use_singledc() keyspace = 'test_white_list' - cluster = TestCluster( + cluster = IntegrationTestCluster( contact_points=('127.0.0.2',), topology_event_refresh_window=0, status_event_refresh_window=0, execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile( @@ -746,7 +746,7 @@ def test_black_list_with_host_filter_policy(self): child_policy=RoundRobinPolicy(), predicate=lambda host: host.address != ignored_address ) - cluster = TestCluster( + cluster = IntegrationTestCluster( contact_points=(IP_FORMAT % 1,), topology_event_refresh_window=0, status_event_refresh_window=0, diff --git a/tests/integration/long/test_policies.py b/tests/integration/long/test_policies.py index 0648e6cc93..83dde1e752 100644 --- a/tests/integration/long/test_policies.py +++ b/tests/integration/long/test_policies.py @@ -20,7 +20,7 @@ from cassandra import ConsistencyLevel, Unavailable from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT -from tests.integration import use_cluster, get_cluster, get_node, TestCluster +from tests.integration import use_cluster, get_cluster, get_node, IntegrationTestCluster def setup_module(): @@ -47,7 +47,7 @@ def test_should_rethrow_on_unvailable_with_default_policy_if_cas(self): ep = ExecutionProfile(consistency_level=ConsistencyLevel.ALL, serial_consistency_level=ConsistencyLevel.SERIAL) - cluster = TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ep}) + cluster = IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: ep}) session = cluster.connect() session.execute("CREATE KEYSPACE test_retry_policy_cas WITH replication = {'class':'SimpleStrategy','replication_factor': 3};") diff --git a/tests/integration/long/test_schema.py b/tests/integration/long/test_schema.py index e2945a117b..768e4c19ea 100644 --- a/tests/integration/long/test_schema.py +++ b/tests/integration/long/test_schema.py @@ -17,7 +17,7 @@ from cassandra import ConsistencyLevel, AlreadyExists from cassandra.query import SimpleStatement -from tests.integration import use_singledc, execute_until_pass, TestCluster +from tests.integration import use_singledc, execute_until_pass, IntegrationTestCluster import time @@ -37,7 +37,7 @@ class SchemaTests(unittest.TestCase): @classmethod def setup_class(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect(wait_for_all_pools=True) @classmethod @@ -98,7 +98,7 @@ def test_for_schema_disagreements_same_keyspace(self): Tests for any schema disagreements using the same keyspace multiple times """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect(wait_for_all_pools=True) for i in range(30): @@ -132,7 +132,7 @@ def test_for_schema_disagreement_attribute(self): @test_category schema """ # This should yield a schema disagreement - cluster = TestCluster(max_schema_agreement_wait=0.001) + cluster = IntegrationTestCluster(max_schema_agreement_wait=0.001) session = cluster.connect(wait_for_all_pools=True) rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}") @@ -145,7 +145,7 @@ def test_for_schema_disagreement_attribute(self): cluster.shutdown() # These should have schema agreement - cluster = TestCluster(max_schema_agreement_wait=100) + cluster = IntegrationTestCluster(max_schema_agreement_wait=100) session = cluster.connect() rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}") self.check_and_wait_for_agreement(session, rs, True) diff --git a/tests/integration/long/test_ssl.py b/tests/integration/long/test_ssl.py index 4de46f4649..c31e1453c8 100644 --- a/tests/integration/long/test_ssl.py +++ b/tests/integration/long/test_ssl.py @@ -26,7 +26,7 @@ from OpenSSL import SSL, crypto from tests.integration import ( - get_cluster, remove_cluster, use_single_node, start_cluster_wait_for_up, EVENT_LOOP_MANAGER, TestCluster + get_cluster, remove_cluster, use_single_node, start_cluster_wait_for_up, EVENT_LOOP_MANAGER, IntegrationTestCluster ) if not hasattr(ssl, 'match_hostname'): @@ -103,7 +103,7 @@ def validate_ssl_options(**kwargs): if tries > 5: raise RuntimeError("Failed to connect to SSL cluster after 5 attempts") try: - cluster = TestCluster( + cluster = IntegrationTestCluster( contact_points=[DefaultEndPoint(hostname)], ssl_options=ssl_options, ssl_context=ssl_context @@ -184,7 +184,7 @@ def test_can_connect_with_ssl_long_running(self): if tries > 5: raise RuntimeError("Failed to connect to SSL cluster after 5 attempts") try: - cluster = TestCluster(ssl_options=ssl_options) + cluster = IntegrationTestCluster(ssl_options=ssl_options) session = cluster.connect(wait_for_all_pools=True) break except Exception: @@ -290,7 +290,7 @@ def test_cannot_connect_without_client_auth(self): @test_category connection:ssl """ - cluster = TestCluster(ssl_options={'ca_certs': CLIENT_CA_CERTS, + cluster = IntegrationTestCluster(ssl_options={'ca_certs': CLIENT_CA_CERTS, 'ssl_version': ssl_version}) with self.assertRaises(NoHostAvailable) as _: @@ -319,7 +319,7 @@ def test_cannot_connect_with_bad_client_auth(self): # I don't set the bad certfile for pyopenssl because it hangs ssl_options['certfile'] = DRIVER_CERTFILE_BAD - cluster = TestCluster( + cluster = IntegrationTestCluster( ssl_options={'ca_certs': CLIENT_CA_CERTS, 'ssl_version': ssl_version, 'keyfile': DRIVER_KEYFILE} @@ -364,7 +364,7 @@ def test_ssl_want_write_errors_are_retried(self): """ ssl_options = {'ca_certs': CLIENT_CA_CERTS, 'ssl_version': ssl_version} - cluster = TestCluster(ssl_options=ssl_options) + cluster = IntegrationTestCluster(ssl_options=ssl_options) session = cluster.connect(wait_for_all_pools=True) try: session.execute('drop keyspace ssl_error_test') diff --git a/tests/integration/long/test_topology_change.py b/tests/integration/long/test_topology_change.py index 5b12eef28c..034c087cdb 100644 --- a/tests/integration/long/test_topology_change.py +++ b/tests/integration/long/test_topology_change.py @@ -1,7 +1,7 @@ from unittest import TestCase from cassandra.policies import HostStateListener -from tests.integration import get_node, use_cluster, local, TestCluster +from tests.integration import get_node, use_cluster, local, IntegrationTestCluster from tests.integration.long.utils import decommission from tests.util import wait_until @@ -31,7 +31,7 @@ def test_removed_node_stops_reconnecting(self): use_cluster("test_down_then_removed", [3], start=True) state_listener = StateListener() - cluster = TestCluster() + cluster = IntegrationTestCluster() self.addCleanup(cluster.shutdown) cluster.register_listener(state_listener) session = cluster.connect(wait_for_all_pools=True) diff --git a/tests/integration/standard/test_authentication.py b/tests/integration/standard/test_authentication.py index 9755c5098b..7d3f097898 100644 --- a/tests/integration/standard/test_authentication.py +++ b/tests/integration/standard/test_authentication.py @@ -19,7 +19,7 @@ from cassandra.auth import PlainTextAuthProvider, SASLClient, SaslAuthProvider from tests.integration import use_singledc, get_cluster, remove_cluster, PROTOCOL_VERSION, CASSANDRA_IP, \ - USE_CASS_EXTERNAL, start_cluster_wait_for_up, TestCluster + USE_CASS_EXTERNAL, start_cluster_wait_for_up, IntegrationTestCluster from tests.integration.util import assert_quiescent_pool_state try: @@ -75,12 +75,12 @@ def cluster_as(self, usr, pwd): # to ensure the role manager is setup for _ in range(5): try: - cluster = TestCluster( + cluster = IntegrationTestCluster( idle_heartbeat_interval=0, auth_provider=self.get_authentication_provider(username='cassandra', password='cassandra')) cluster.connect(wait_for_all_pools=True) - return TestCluster( + return IntegrationTestCluster( idle_heartbeat_interval=0, auth_provider=self.get_authentication_provider(username=usr, password=pwd)) except Exception as e: @@ -143,7 +143,7 @@ def test_connect_empty_pwd(self): cluster.shutdown() def test_connect_no_auth_provider(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() try: self.assertRaisesRegexp(NoHostAvailable, '.*AuthenticationFailed.*', diff --git a/tests/integration/standard/test_authentication_misconfiguration.py b/tests/integration/standard/test_authentication_misconfiguration.py index 546141d801..2d5299689c 100644 --- a/tests/integration/standard/test_authentication_misconfiguration.py +++ b/tests/integration/standard/test_authentication_misconfiguration.py @@ -14,7 +14,7 @@ import unittest -from tests.integration import USE_CASS_EXTERNAL, use_cluster, TestCluster +from tests.integration import USE_CASS_EXTERNAL, use_cluster, IntegrationTestCluster class MisconfiguredAuthenticationTests(unittest.TestCase): @@ -33,7 +33,7 @@ def setUpClass(cls): cls.ccm_cluster = ccm_cluster def test_connect_no_auth_provider(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect() cluster.refresh_nodes() down_hosts = [host for host in cluster.metadata.all_hosts() if not host.is_up] diff --git a/tests/integration/standard/test_client_warnings.py b/tests/integration/standard/test_client_warnings.py index c5ce5dc726..c53896ea4a 100644 --- a/tests/integration/standard/test_client_warnings.py +++ b/tests/integration/standard/test_client_warnings.py @@ -20,7 +20,7 @@ from cassandra.query import BatchStatement -from tests.integration import use_singledc, PROTOCOL_VERSION, local, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, local, IntegrationTestCluster def setup_module(): @@ -34,7 +34,7 @@ def setUpClass(cls): if PROTOCOL_VERSION < 4: return - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls.session.execute("CREATE TABLE IF NOT EXISTS test1rf.client_warning (k int, v0 int, v1 int, PRIMARY KEY (k, v0))") diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 7f4a4ffd19..e73d328069 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -43,7 +43,7 @@ from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \ - DSE_VERSION, TestCluster, PROTOCOL_VERSION + DSE_VERSION, IntegrationTestCluster, PROTOCOL_VERSION from tests.integration.util import assert_quiescent_pool_state import sys @@ -81,7 +81,7 @@ def test_ignored_host_up(self): @test_category connection """ ignored_host_policy = IgnoredHostPolicy(["127.0.0.2", "127.0.0.3"]) - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=ignored_host_policy)} ) cluster.connect() @@ -103,7 +103,7 @@ def test_host_resolution(self): @test_category connection """ - cluster = TestCluster(contact_points=["localhost"], connect_timeout=1) + cluster = IntegrationTestCluster(contact_points=["localhost"], connect_timeout=1) self.assertTrue(DefaultEndPoint('127.0.0.1') in cluster.endpoints_resolved) @local @@ -117,14 +117,14 @@ def test_host_duplication(self): @test_category connection """ - cluster = TestCluster( + cluster = IntegrationTestCluster( contact_points=["localhost", "127.0.0.1", "localhost", "localhost", "localhost"], connect_timeout=1 ) cluster.connect(wait_for_all_pools=True) self.assertEqual(len(cluster.metadata.all_hosts()), 3) cluster.shutdown() - cluster = TestCluster(contact_points=["127.0.0.1", "localhost"], connect_timeout=1) + cluster = IntegrationTestCluster(contact_points=["127.0.0.1", "localhost"], connect_timeout=1) cluster.connect(wait_for_all_pools=True) self.assertEqual(len(cluster.metadata.all_hosts()), 3) cluster.shutdown() @@ -148,7 +148,7 @@ def test_raise_error_on_control_connection_timeout(self): """ get_node(1).pause() - cluster = TestCluster(contact_points=['127.0.0.1'], connect_timeout=1) + cluster = IntegrationTestCluster(contact_points=['127.0.0.1'], connect_timeout=1) with self.assertRaisesRegexp(NoHostAvailable, r"OperationTimedOut\('errors=Timed out creating connection \(1 seconds\)"): @@ -162,7 +162,7 @@ def test_basic(self): Test basic connection and usage """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() result = execute_until_pass(session, """ @@ -218,19 +218,19 @@ def cleanup(): self.addCleanup(cleanup) # Test with empty list - self.cluster_to_shutdown = TestCluster(contact_points=[]) + self.cluster_to_shutdown = IntegrationTestCluster(contact_points=[]) with self.assertRaises(NoHostAvailable): self.cluster_to_shutdown.connect() self.cluster_to_shutdown.shutdown() # Test with only invalid - self.cluster_to_shutdown = TestCluster(contact_points=('1.2.3.4',)) + self.cluster_to_shutdown = IntegrationTestCluster(contact_points=('1.2.3.4',)) with self.assertRaises(NoHostAvailable): self.cluster_to_shutdown.connect() self.cluster_to_shutdown.shutdown() # Test with valid and invalid hosts - self.cluster_to_shutdown = TestCluster(contact_points=("127.0.0.1", "127.0.0.2", "1.2.3.4")) + self.cluster_to_shutdown = IntegrationTestCluster(contact_points=("127.0.0.1", "127.0.0.2", "1.2.3.4")) self.cluster_to_shutdown.connect() self.cluster_to_shutdown.shutdown() @@ -314,7 +314,7 @@ def test_invalid_protocol_negotation(self): upper_bound = get_unsupported_upper_protocol() log.debug('got upper_bound of {}'.format(upper_bound)) if upper_bound is not None: - cluster = TestCluster(protocol_version=upper_bound) + cluster = IntegrationTestCluster(protocol_version=upper_bound) with self.assertRaises(NoHostAvailable): cluster.connect() cluster.shutdown() @@ -322,7 +322,7 @@ def test_invalid_protocol_negotation(self): lower_bound = get_unsupported_lower_protocol() log.debug('got lower_bound of {}'.format(lower_bound)) if lower_bound is not None: - cluster = TestCluster(protocol_version=lower_bound) + cluster = IntegrationTestCluster(protocol_version=lower_bound) with self.assertRaises(NoHostAvailable): cluster.connect() cluster.shutdown() @@ -332,7 +332,7 @@ def test_connect_on_keyspace(self): Ensure clusters that connect on a keyspace, do """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() result = session.execute( """ @@ -350,7 +350,7 @@ def test_connect_on_keyspace(self): cluster.shutdown() def test_set_keyspace_twice(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() session.execute("USE system") session.execute("USE system") @@ -361,7 +361,7 @@ def test_default_connections(self): Ensure errors are not thrown when using non-default policies """ - TestCluster( + IntegrationTestCluster( reconnection_policy=ExponentialReconnectionPolicy(1.0, 600.0), conviction_policy_factory=SimpleConvictionPolicy, protocol_version=PROTOCOL_VERSION @@ -371,7 +371,7 @@ def test_connect_to_already_shutdown_cluster(self): """ Ensure you cannot connect to a cluster that's been shutdown """ - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.shutdown() self.assertRaises(Exception, cluster.connect) @@ -380,7 +380,7 @@ def test_auth_provider_is_callable(self): Ensure that auth_providers are always callable """ self.assertRaises(TypeError, Cluster, auth_provider=1, protocol_version=1) - c = TestCluster(protocol_version=1) + c = IntegrationTestCluster(protocol_version=1) self.assertRaises(TypeError, setattr, c, 'auth_provider', 1) def test_v2_auth_provider(self): @@ -389,7 +389,7 @@ def test_v2_auth_provider(self): """ bad_auth_provider = lambda x: {'username': 'foo', 'password': 'bar'} self.assertRaises(TypeError, Cluster, auth_provider=bad_auth_provider, protocol_version=2) - c = TestCluster(protocol_version=2) + c = IntegrationTestCluster(protocol_version=2) self.assertRaises(TypeError, setattr, c, 'auth_provider', bad_auth_provider) def test_conviction_policy_factory_is_callable(self): @@ -405,8 +405,8 @@ def test_connect_to_bad_hosts(self): when a cluster cannot connect to given hosts """ - cluster = TestCluster(contact_points=['127.1.2.9', '127.1.2.10'], - protocol_version=PROTOCOL_VERSION) + cluster = IntegrationTestCluster(contact_points=['127.1.2.9', '127.1.2.10'], + protocol_version=PROTOCOL_VERSION) self.assertRaises(NoHostAvailable, cluster.connect) def test_cluster_settings(self): @@ -416,7 +416,7 @@ def test_cluster_settings(self): if PROTOCOL_VERSION >= 3: raise unittest.SkipTest("min/max requests and core/max conns aren't used with v3 protocol") - cluster = TestCluster() + cluster = IntegrationTestCluster() min_requests_per_connection = cluster.get_min_requests_per_connection(HostDistance.LOCAL) self.assertEqual(cassandra.cluster.DEFAULT_MIN_REQUESTS, min_requests_per_connection) @@ -439,7 +439,7 @@ def test_cluster_settings(self): self.assertEqual(cluster.get_max_connections_per_host(HostDistance.LOCAL), max_connections_per_host + 1) def test_refresh_schema(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() original_meta = cluster.metadata.keyspaces @@ -451,7 +451,7 @@ def test_refresh_schema(self): cluster.shutdown() def test_refresh_schema_keyspace(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() original_meta = cluster.metadata.keyspaces @@ -467,7 +467,7 @@ def test_refresh_schema_keyspace(self): cluster.shutdown() def test_refresh_schema_table(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() original_meta = cluster.metadata.keyspaces @@ -493,7 +493,7 @@ def test_refresh_schema_type(self): raise unittest.SkipTest('UDTs are not specified in change events for protocol v2') # We may want to refresh types on keyspace change events in that case(?) - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() keyspace_name = 'test1rf' @@ -532,7 +532,7 @@ def patched_wait_for_responses(*args, **kwargs): agreement_timeout = 1 # cluster agreement wait exceeded - c = TestCluster(max_schema_agreement_wait=agreement_timeout) + c = IntegrationTestCluster(max_schema_agreement_wait=agreement_timeout) c.connect() self.assertTrue(c.metadata.keyspaces) @@ -557,7 +557,7 @@ def patched_wait_for_responses(*args, **kwargs): refresh_threshold = 0.5 # cluster agreement bypass - c = TestCluster(max_schema_agreement_wait=0) + c = IntegrationTestCluster(max_schema_agreement_wait=0) start_time = time.time() s = c.connect() end_time = time.time() @@ -588,7 +588,7 @@ def test_trace(self): Ensure trace can be requested for async and non-async queries """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() result = session.execute( "SELECT * FROM system.local", trace=True) @@ -634,7 +634,7 @@ def test_trace_unavailable(self): @test_category query """ - cluster = TestCluster() + cluster = IntegrationTestCluster() self.addCleanup(cluster.shutdown) session = cluster.connect() @@ -676,7 +676,7 @@ def test_one_returns_none(self): @test_category query """ - with TestCluster() as cluster: + with IntegrationTestCluster() as cluster: session = cluster.connect() self.assertIsNone(session.execute("SELECT * from system.local WHERE key='madeup_key'").one()) @@ -685,7 +685,7 @@ def test_string_coverage(self): Ensure str(future) returns without error """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() query = "SELECT * FROM system.local" @@ -742,7 +742,7 @@ def test_can_connect_with_sslauth(self): def _warning_are_issued_when_auth(self, auth_provider): with MockLoggingHandler().set_module_name(connection.__name__) as mock_handler: - with TestCluster(auth_provider=auth_provider) as cluster: + with IntegrationTestCluster(auth_provider=auth_provider) as cluster: session = cluster.connect() self.assertIsNotNone(session.execute("SELECT * from system.local")) @@ -756,8 +756,8 @@ def _warning_are_issued_when_auth(self, auth_provider): def test_idle_heartbeat(self): interval = 2 - cluster = TestCluster(idle_heartbeat_interval=interval, - monitor_reporting_enabled=False) + cluster = IntegrationTestCluster(idle_heartbeat_interval=interval, + monitor_reporting_enabled=False) if PROTOCOL_VERSION < 3: cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) session = cluster.connect(wait_for_all_pools=True) @@ -819,7 +819,7 @@ def test_idle_heartbeat_disabled(self): self.assertTrue(Cluster.idle_heartbeat_interval) # heartbeat disabled with '0' - cluster = TestCluster(idle_heartbeat_interval=0) + cluster = IntegrationTestCluster(idle_heartbeat_interval=0) self.assertEqual(cluster.idle_heartbeat_interval, 0) session = cluster.connect() @@ -835,7 +835,7 @@ def test_idle_heartbeat_disabled(self): def test_pool_management(self): # Ensure that in_flight and request_ids quiesce after cluster operations - cluster = TestCluster(idle_heartbeat_interval=0) # no idle heartbeat here, pool management is tested in test_idle_heartbeat + cluster = IntegrationTestCluster(idle_heartbeat_interval=0) # no idle heartbeat here, pool management is tested in test_idle_heartbeat session = cluster.connect() session2 = cluster.connect() @@ -879,7 +879,7 @@ def test_profile_load_balancing(self): RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP ) ) - with TestCluster(execution_profiles={'node1': node1}, monitor_reporting_enabled=False) as cluster: + with IntegrationTestCluster(execution_profiles={'node1': node1}, monitor_reporting_enabled=False) as cluster: session = cluster.connect(wait_for_all_pools=True) # default is DCA RR for all hosts @@ -920,7 +920,7 @@ def test_profile_load_balancing(self): self.assertTrue(session.execute(query, execution_profile='node1')[0].release_version) def test_setting_lbp_legacy(self): - cluster = TestCluster() + cluster = IntegrationTestCluster() self.addCleanup(cluster.shutdown) cluster.load_balancing_policy = RoundRobinPolicy() self.assertEqual( @@ -948,7 +948,7 @@ def test_profile_lb_swap(self): rr1 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) rr2 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) exec_profiles = {'rr1': rr1, 'rr2': rr2} - with TestCluster(execution_profiles=exec_profiles) as cluster: + with IntegrationTestCluster(execution_profiles=exec_profiles) as cluster: session = cluster.connect(wait_for_all_pools=True) # default is DCA RR for all hosts @@ -975,7 +975,7 @@ def test_ta_lbp(self): """ query = "select release_version from system.local" ta1 = ExecutionProfile() - with TestCluster() as cluster: + with IntegrationTestCluster() as cluster: session = cluster.connect() cluster.add_execution_profile("ta1", ta1) rs = session.execute(query, execution_profile='ta1') @@ -996,7 +996,7 @@ def test_clone_shared_lbp(self): query = "select release_version from system.local" rr1 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) exec_profiles = {'rr1': rr1} - with TestCluster(execution_profiles=exec_profiles) as cluster: + with IntegrationTestCluster(execution_profiles=exec_profiles) as cluster: session = cluster.connect(wait_for_all_pools=True) self.assertGreater(len(cluster.metadata.all_hosts()), 1, "We only have one host connected at this point") @@ -1024,7 +1024,7 @@ def test_missing_exec_prof(self): rr1 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) rr2 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) exec_profiles = {'rr1': rr1, 'rr2': rr2} - with TestCluster(execution_profiles=exec_profiles) as cluster: + with IntegrationTestCluster(execution_profiles=exec_profiles) as cluster: session = cluster.connect() with self.assertRaises(ValueError): session.execute(query, execution_profile='rr3') @@ -1051,7 +1051,7 @@ def test_profile_pool_management(self): RoundRobinPolicy(), lambda host: host.address == "127.0.0.2" ) ) - with TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1, 'node2': node2}) as cluster: + with IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1, 'node2': node2}) as cluster: session = cluster.connect(wait_for_all_pools=True) pools = session.get_pool_state() # there are more hosts, but we connected to the ones in the lbp aggregate @@ -1086,7 +1086,7 @@ def test_add_profile_timeout(self): RoundRobinPolicy(), lambda host: host.address == "127.0.0.1" ) ) - with TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1}) as cluster: + with IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1}) as cluster: session = cluster.connect(wait_for_all_pools=True) pools = session.get_pool_state() self.assertGreater(len(cluster.metadata.all_hosts()), 2) @@ -1112,7 +1112,7 @@ def test_add_profile_timeout(self): @notwindows def test_execute_query_timeout(self): - with TestCluster() as cluster: + with IntegrationTestCluster() as cluster: session = cluster.connect(wait_for_all_pools=True) query = "SELECT * FROM system.local" @@ -1158,7 +1158,7 @@ def test_replicas_are_queried(self): tap_profile = ExecutionProfile( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()) ) - with TestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: tap_profile}) as cluster: + with IntegrationTestCluster(execution_profiles={EXEC_PROFILE_DEFAULT: tap_profile}) as cluster: session = cluster.connect(wait_for_all_pools=True) session.execute(''' CREATE TABLE test1rf.table_with_big_key ( @@ -1183,8 +1183,8 @@ def test_replicas_are_queried(self): log = logging.getLogger(__name__) log.info("The only replica found was: {}".format(only_replica)) available_hosts = [host for host in ["127.0.0.1", "127.0.0.2", "127.0.0.3"] if host != only_replica] - with TestCluster(contact_points=available_hosts, - execution_profiles={EXEC_PROFILE_DEFAULT: hfp_profile}) as cluster: + with IntegrationTestCluster(contact_points=available_hosts, + execution_profiles={EXEC_PROFILE_DEFAULT: hfp_profile}) as cluster: session = cluster.connect(wait_for_all_pools=True) prepared = session.prepare("""SELECT * from test1rf.table_with_big_key @@ -1210,10 +1210,10 @@ def test_compact_option(self): @test_category connection """ - nc_cluster = TestCluster(no_compact=True) + nc_cluster = IntegrationTestCluster(no_compact=True) nc_session = nc_cluster.connect() - cluster = TestCluster(no_compact=False) + cluster = IntegrationTestCluster(no_compact=False) session = cluster.connect() self.addCleanup(cluster.shutdown) @@ -1298,7 +1298,7 @@ def test_address_translator_basic(self): @test_category metadata """ lh_ad = LocalHostAdressTranslator({'127.0.0.1': '127.0.0.1', '127.0.0.2': '127.0.0.1', '127.0.0.3': '127.0.0.1'}) - c = TestCluster(address_translator=lh_ad) + c = IntegrationTestCluster(address_translator=lh_ad) c.connect() self.assertEqual(len(c.metadata.all_hosts()), 1) c.shutdown() @@ -1318,7 +1318,7 @@ def test_address_translator_with_mixed_nodes(self): """ adder_map = {'127.0.0.1': '127.0.0.1', '127.0.0.2': '127.0.0.3', '127.0.0.3': '127.0.0.2'} lh_ad = LocalHostAdressTranslator(adder_map) - c = TestCluster(address_translator=lh_ad) + c = IntegrationTestCluster(address_translator=lh_ad) c.connect() for host in c.metadata.all_hosts(): self.assertEqual(adder_map.get(host.address), host.broadcast_address) @@ -1344,7 +1344,7 @@ def test_no_connect(self): @test_category configuration """ - with TestCluster() as cluster: + with IntegrationTestCluster() as cluster: self.assertFalse(cluster.is_shutdown) self.assertTrue(cluster.is_shutdown) @@ -1358,7 +1358,7 @@ def test_simple_nested(self): @test_category configuration """ - with TestCluster(**self.cluster_kwargs) as cluster: + with IntegrationTestCluster(**self.cluster_kwargs) as cluster: with cluster.connect() as session: self.assertFalse(cluster.is_shutdown) self.assertFalse(session.is_shutdown) @@ -1376,7 +1376,7 @@ def test_cluster_no_session(self): @test_category configuration """ - with TestCluster(**self.cluster_kwargs) as cluster: + with IntegrationTestCluster(**self.cluster_kwargs) as cluster: session = cluster.connect() self.assertFalse(cluster.is_shutdown) self.assertFalse(session.is_shutdown) @@ -1394,7 +1394,7 @@ def test_session_no_cluster(self): @test_category configuration """ - cluster = TestCluster(**self.cluster_kwargs) + cluster = IntegrationTestCluster(**self.cluster_kwargs) unmanaged_session = cluster.connect() with cluster.connect() as session: self.assertFalse(cluster.is_shutdown) @@ -1425,7 +1425,7 @@ def test_down_event_with_active_connection(self): @test_category connection """ - with TestCluster() as cluster: + with IntegrationTestCluster() as cluster: session = cluster.connect(wait_for_all_pools=True) random_host = cluster.metadata.all_hosts()[0] cluster.on_down(random_host, False) @@ -1454,7 +1454,7 @@ class DontPrepareOnIgnoredHostsTest(unittest.TestCase): def test_prepare_on_ignored_hosts(self): - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=self.ignore_node_3_policy)} ) session = cluster.connect() @@ -1501,7 +1501,7 @@ def test_invalid_protocol_version_beta_option(self): @test_category connection """ - cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False) + cluster = IntegrationTestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False) try: with self.assertRaises(NoHostAvailable): cluster.connect() @@ -1539,7 +1539,7 @@ def test_deprecation_warnings_legacy_parameters(self): @test_category logs """ with warnings.catch_warnings(record=True) as w: - TestCluster(load_balancing_policy=RoundRobinPolicy()) + IntegrationTestCluster(load_balancing_policy=RoundRobinPolicy()) self.assertEqual(len(w), 1) self.assertIn("Legacy execution parameters will be removed in 4.0. Consider using execution profiles.", str(w[0].message)) @@ -1556,7 +1556,7 @@ def test_deprecation_warnings_meta_refreshed(self): @test_category logs """ with warnings.catch_warnings(record=True) as w: - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.set_meta_refresh_enabled(True) self.assertEqual(len(w), 1) self.assertIn("Cluster.set_meta_refresh_enabled is deprecated and will be removed in 4.0.", @@ -1574,7 +1574,7 @@ def test_deprecation_warning_default_consistency_level(self): @test_category logs """ with warnings.catch_warnings(record=True) as w: - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() session.default_consistency_level = ConsistencyLevel.ONE self.assertEqual(len(w), 1) diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 8bd65c7f6f..eb12304896 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -22,7 +22,7 @@ from cassandra.policies import HostDistance from cassandra.query import tuple_factory, SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, IntegrationTestCluster from six import next @@ -42,7 +42,7 @@ class ClusterTests(unittest.TestCase): @classmethod def setUpClass(cls): - cls.cluster = TestCluster( + cls.cluster = IntegrationTestCluster( execution_profiles = { EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory) } diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index aaa5a27dfd..ec69e389a2 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -38,7 +38,7 @@ from tests import is_monkey_patched from tests.integration import use_singledc, get_node, CASSANDRA_IP, local, \ - requiresmallclockgranularity, greaterthancass20, TestCluster + requiresmallclockgranularity, greaterthancass20, IntegrationTestCluster try: from cassandra.io.libevreactor import LibevConnection @@ -57,7 +57,7 @@ def setup_module(): class ConnectionTimeoutTest(unittest.TestCase): def setUp(self): - self.cluster = TestCluster(execution_profiles={ + self.cluster = IntegrationTestCluster(execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=HostFilterPolicy( RoundRobinPolicy(), predicate=lambda host: host.address == CASSANDRA_IP @@ -117,7 +117,7 @@ class HeartbeatTest(unittest.TestCase): """ def setUp(self): - self.cluster = TestCluster(idle_heartbeat_interval=1) + self.cluster = IntegrationTestCluster(idle_heartbeat_interval=1) self.session = self.cluster.connect(wait_for_all_pools=True) def tearDown(self): @@ -219,8 +219,8 @@ def get_connection(self, timeout=5): conn = self.klass.factory( endpoint=contact_point, timeout=timeout, - protocol_version=TestCluster.DEFAULT_PROTOCOL_VERSION, - allow_beta_protocol_version=TestCluster.DEFAULT_ALLOW_BETA + protocol_version=IntegrationTestCluster.DEFAULT_PROTOCOL_VERSION, + allow_beta_protocol_version=IntegrationTestCluster.DEFAULT_ALLOW_BETA ) break except (OperationTimedOut, NoHostAvailable, ConnectionShutdown) as e: @@ -416,10 +416,10 @@ class C1(self.klass): class C2(self.klass): pass - clusterC1 = TestCluster(connection_class=C1) + clusterC1 = IntegrationTestCluster(connection_class=C1) clusterC1.connect(wait_for_all_pools=True) - clusterC2 = TestCluster(connection_class=C2) + clusterC2 = IntegrationTestCluster(connection_class=C2) clusterC2.connect(wait_for_all_pools=True) self.addCleanup(clusterC1.shutdown) self.addCleanup(clusterC2.shutdown) diff --git a/tests/integration/standard/test_control_connection.py b/tests/integration/standard/test_control_connection.py index db7cff8506..e48ebba696 100644 --- a/tests/integration/standard/test_control_connection.py +++ b/tests/integration/standard/test_control_connection.py @@ -23,7 +23,7 @@ from cassandra.protocol import ConfigurationException -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40, notdse +from tests.integration import use_singledc, PROTOCOL_VERSION, IntegrationTestCluster, greaterthanorequalcass40, notdse from tests.integration.datatype_utils import update_datatypes @@ -38,7 +38,7 @@ def setUp(self): raise unittest.SkipTest( "Native protocol 3,0+ is required for UDTs using %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() def tearDown(self): try: @@ -112,7 +112,7 @@ def test_control_connection_port_discovery(self): Unit tests already validate that the port can be picked up (or not) from the query. This validates it picks up the correct port from a real server and is able to connect. """ - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() host = self.cluster.get_control_connection_host() self.assertEqual(host, None) diff --git a/tests/integration/standard/test_custom_cluster.py b/tests/integration/standard/test_custom_cluster.py index 84e0737086..71ac454160 100644 --- a/tests/integration/standard/test_custom_cluster.py +++ b/tests/integration/standard/test_custom_cluster.py @@ -13,7 +13,7 @@ # limitations under the License. from cassandra.cluster import NoHostAvailable -from tests.integration import use_singledc, get_cluster, remove_cluster, local, TestCluster +from tests.integration import use_singledc, get_cluster, remove_cluster, local, IntegrationTestCluster from tests.util import wait_until, wait_until_not_raised try: @@ -31,9 +31,9 @@ def setup_module(): # can't use wait_for_binary_proto cause ccm tries on port 9042 ccm_cluster.start(wait_for_binary_proto=False) # wait until all nodes are up - wait_until_not_raised(lambda: TestCluster(contact_points=['127.0.0.1'], port=9046).connect().shutdown(), 1, 20) - wait_until_not_raised(lambda: TestCluster(contact_points=['127.0.0.2'], port=9046).connect().shutdown(), 1, 20) - wait_until_not_raised(lambda: TestCluster(contact_points=['127.0.0.3'], port=9046).connect().shutdown(), 1, 20) + wait_until_not_raised(lambda: IntegrationTestCluster(contact_points=['127.0.0.1'], port=9046).connect().shutdown(), 1, 20) + wait_until_not_raised(lambda: IntegrationTestCluster(contact_points=['127.0.0.2'], port=9046).connect().shutdown(), 1, 20) + wait_until_not_raised(lambda: IntegrationTestCluster(contact_points=['127.0.0.3'], port=9046).connect().shutdown(), 1, 20) def teardown_module(): @@ -50,11 +50,11 @@ def test_connection_honor_cluster_port(self): All hosts should be marked as up and we should be able to execute queries on it. """ - cluster = TestCluster() + cluster = IntegrationTestCluster() with self.assertRaises(NoHostAvailable): cluster.connect() # should fail on port 9042 - cluster = TestCluster(port=9046) + cluster = IntegrationTestCluster(port=9046) session = cluster.connect(wait_for_all_pools=True) wait_until(lambda: len(cluster.metadata.all_hosts()) == 3, 1, 5) diff --git a/tests/integration/standard/test_custom_payload.py b/tests/integration/standard/test_custom_payload.py index 9906a8243e..5e7bbfd0b5 100644 --- a/tests/integration/standard/test_custom_payload.py +++ b/tests/integration/standard/test_custom_payload.py @@ -22,7 +22,7 @@ from cassandra.query import (SimpleStatement, BatchStatement, BatchType) -from tests.integration import use_singledc, PROTOCOL_VERSION, local, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, local, IntegrationTestCluster def setup_module(): @@ -38,7 +38,7 @@ def setUp(self): raise unittest.SkipTest( "Native protocol 4,0+ is required for custom payloads, currently using %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() def tearDown(self): diff --git a/tests/integration/standard/test_custom_protocol_handler.py b/tests/integration/standard/test_custom_protocol_handler.py index bf549511c8..fef2583e62 100644 --- a/tests/integration/standard/test_custom_protocol_handler.py +++ b/tests/integration/standard/test_custom_protocol_handler.py @@ -25,7 +25,7 @@ from tests.integration import use_singledc, drop_keyspace_shutdown_cluster, \ greaterthanorequalcass30, execute_with_long_wait_retry, greaterthanorequaldse51, greaterthanorequalcass3_10, \ - TestCluster, greaterthanorequalcass40, requirecassandra + IntegrationTestCluster, greaterthanorequalcass40, requirecassandra from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES from tests.integration.standard.utils import create_table_with_all_types, get_all_primitive_params from six import binary_type @@ -43,7 +43,7 @@ class CustomProtocolHandlerTest(unittest.TestCase): @classmethod def setUpClass(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls.session.execute("CREATE KEYSPACE custserdes WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}") cls.session.set_keyspace("custserdes") @@ -68,7 +68,7 @@ def test_custom_raw_uuid_row_results(self): """ # Ensure that we get normal uuid back first - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory)} ) session = cluster.connect(keyspace="custserdes") @@ -106,7 +106,7 @@ def test_custom_raw_row_results_all_types(self): @test_category data_types:serialization """ # Connect using a custom protocol handler that tracks the various types the result message is used with. - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory)} ) session = cluster.connect(keyspace="custserdes") @@ -136,7 +136,7 @@ def test_protocol_divergence_v5_fail_by_continuous_paging(self): @test_category connection """ - cluster = TestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True) session = cluster.connect() max_pages = 4 @@ -233,7 +233,7 @@ def _send_query_message(self, session, timeout, **kwargs): return future def _protocol_divergence_fail_by_flag_uses_int(self, version, uses_int_query_flag, int_flag = True, beta=False): - cluster = TestCluster(protocol_version=version, allow_beta_protocol_version=beta) + cluster = IntegrationTestCluster(protocol_version=version, allow_beta_protocol_version=beta) session = cluster.connect() query_one = SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (1, 1)") diff --git a/tests/integration/standard/test_cython_protocol_handlers.py b/tests/integration/standard/test_cython_protocol_handlers.py index 4e45553be2..ffffa53771 100644 --- a/tests/integration/standard/test_cython_protocol_handlers.py +++ b/tests/integration/standard/test_cython_protocol_handlers.py @@ -16,7 +16,7 @@ from cassandra.query import tuple_factory from tests import VERIFY_CYTHON from tests.integration import use_singledc, notprotocolv1, \ - drop_keyspace_shutdown_cluster, BasicSharedKeyspaceUnitTestCase, greaterthancass21, TestCluster + drop_keyspace_shutdown_cluster, BasicSharedKeyspaceUnitTestCase, greaterthancass21, IntegrationTestCluster from tests.integration.datatype_utils import update_datatypes from tests.integration.standard.utils import ( create_table_with_all_types, get_all_primitive_params, get_primitive_datatypes) @@ -34,7 +34,7 @@ class CythonProtocolHandlerTest(unittest.TestCase): @classmethod def setUpClass(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls.session.execute("CREATE KEYSPACE testspace WITH replication = " "{ 'class' : 'SimpleStrategy', 'replication_factor': '1'}") @@ -65,7 +65,7 @@ def test_cython_lazy_results_paged(self): Test Cython-based parser that returns an iterator, over multiple pages """ # arrays = { 'a': arr1, 'b': arr2, ... } - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory)} ) session = cluster.connect(keyspace="testspace") @@ -99,7 +99,7 @@ def test_numpy_results_paged(self): Test Numpy-based parser that returns a NumPy array """ # arrays = { 'a': arr1, 'b': arr2, ... } - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory)} ) session = cluster.connect(keyspace="testspace") @@ -182,7 +182,7 @@ def get_data(protocol_handler): """ Get data from the test table. """ - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=tuple_factory)} ) session = cluster.connect(keyspace="testspace") diff --git a/tests/integration/standard/test_dse.py b/tests/integration/standard/test_dse.py index 1b9b5bef84..76f99b1197 100644 --- a/tests/integration/standard/test_dse.py +++ b/tests/integration/standard/test_dse.py @@ -19,7 +19,7 @@ from tests import notwindows from tests.unit.cython.utils import notcython from tests.integration import (execute_until_pass, - execute_with_long_wait_retry, use_cluster, TestCluster) + execute_with_long_wait_retry, use_cluster, IntegrationTestCluster) try: import unittest2 as unittest @@ -59,7 +59,7 @@ def _test_basic(self, dse_version): ) use_cluster(cluster_name=cluster_name, nodes=[3], dse_options={}) - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() result = execute_until_pass( session, diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index ce6260f06e..26e126e2cc 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -42,7 +42,7 @@ get_supported_protocol_versions, greaterthancass20, greaterthancass21, assert_startswith, greaterthanorequalcass40, greaterthanorequaldse67, lessthancass40, - TestCluster, DSE_VERSION) + IntegrationTestCluster, DSE_VERSION) log = logging.getLogger(__name__) @@ -109,7 +109,7 @@ def test_host_release_version(self): class MetaDataRemovalTest(unittest.TestCase): def setUp(self): - self.cluster = TestCluster(contact_points=['127.0.0.1', '127.0.0.2', '127.0.0.3', '126.0.0.186']) + self.cluster = IntegrationTestCluster(contact_points=['127.0.0.1', '127.0.0.2', '127.0.0.3', '126.0.0.186']) self.cluster.connect() def tearDown(self): @@ -143,11 +143,11 @@ def test_schema_metadata_disable(self): @test_category metadata """ # Validate metadata is missing where appropriate - no_schema = TestCluster(schema_metadata_enabled=False) + no_schema = IntegrationTestCluster(schema_metadata_enabled=False) no_schema_session = no_schema.connect() self.assertEqual(len(no_schema.metadata.keyspaces), 0) self.assertEqual(no_schema.metadata.export_schema_as_string(), '') - no_token = TestCluster(token_metadata_enabled=False) + no_token = IntegrationTestCluster(token_metadata_enabled=False) no_token_session = no_token.connect() self.assertEqual(len(no_token.metadata.token_map.token_to_host_owner), 0) @@ -575,7 +575,7 @@ def test_refresh_schema_metadata(self): @test_category metadata """ - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces) @@ -658,7 +658,7 @@ def test_refresh_keyspace_metadata(self): @test_category metadata """ - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertTrue(cluster2.metadata.keyspaces[self.keyspace_name].durable_writes) @@ -689,7 +689,7 @@ def test_refresh_table_metadata(self): table_name = "test" self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.keyspace_name, table_name)) - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertNotIn("c", cluster2.metadata.keyspaces[self.keyspace_name].tables[table_name].columns) @@ -723,7 +723,7 @@ def test_refresh_metadata_for_mv(self): self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.keyspace_name, self.function_table_name)) - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() try: @@ -747,7 +747,7 @@ def test_refresh_metadata_for_mv(self): self.assertIsNot(original_meta, self.session.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].views['mv1']) self.assertEqual(original_meta.as_cql_query(), current_meta.as_cql_query()) - cluster3 = TestCluster(schema_event_refresh_window=-1) + cluster3 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster3.connect() try: self.assertNotIn("mv2", cluster3.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].views) @@ -782,7 +782,7 @@ def test_refresh_user_type_metadata(self): if PROTOCOL_VERSION < 3: raise unittest.SkipTest("Protocol 3+ is required for UDTs, currently testing against {0}".format(PROTOCOL_VERSION)) - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertEqual(cluster2.metadata.keyspaces[self.keyspace_name].user_types, {}) @@ -810,7 +810,7 @@ def test_refresh_user_type_metadata_proto_2(self): raise unittest.SkipTest("Protocol versions 1 and 2 are not supported in Cassandra version ".format(CASSANDRA_VERSION)) for protocol_version in (1, 2): - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() self.assertEqual(cluster.metadata.keyspaces[self.keyspace_name].user_types, {}) @@ -850,7 +850,7 @@ def test_refresh_user_function_metadata(self): if PROTOCOL_VERSION < 4: raise unittest.SkipTest("Protocol 4+ is required for UDFs, currently testing against {0}".format(PROTOCOL_VERSION)) - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertEqual(cluster2.metadata.keyspaces[self.keyspace_name].functions, {}) @@ -886,7 +886,7 @@ def test_refresh_user_aggregate_metadata(self): if PROTOCOL_VERSION < 4: raise unittest.SkipTest("Protocol 4+ is required for UDAs, currently testing against {0}".format(PROTOCOL_VERSION)) - cluster2 = TestCluster(schema_event_refresh_window=-1) + cluster2 = IntegrationTestCluster(schema_event_refresh_window=-1) cluster2.connect() self.assertEqual(cluster2.metadata.keyspaces[self.keyspace_name].aggregates, {}) @@ -1046,7 +1046,7 @@ def test_export_schema(self): Test export schema functionality """ - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect() self.assertIsInstance(cluster.metadata.export_schema_as_string(), six.string_types) @@ -1057,7 +1057,7 @@ def test_export_keyspace_schema(self): Test export keyspace schema functionality """ - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect() for keyspace in cluster.metadata.keyspaces: @@ -1097,7 +1097,7 @@ def test_export_keyspace_schema_udts(self): if sys.version_info[0:2] != (2, 7): raise unittest.SkipTest('This test compares static strings generated from dict items, which may change orders. Test with 2.7.') - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() session.execute(""" @@ -1165,7 +1165,7 @@ def test_case_sensitivity(self): Test that names that need to be escaped in CREATE statements are """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() ksname = 'AnInterestingKeyspace' @@ -1210,7 +1210,7 @@ def test_already_exists_exceptions(self): Ensure AlreadyExists exception is thrown when hit """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() ksname = 'test3rf' @@ -1236,7 +1236,7 @@ def test_replicas(self): if murmur3 is None: raise unittest.SkipTest('the murmur3 extension is not available') - cluster = TestCluster() + cluster = IntegrationTestCluster() self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), []) cluster.connect('test3rf') @@ -1252,7 +1252,7 @@ def test_token_map(self): Test token mappings """ - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect('test3rf') ring = cluster.metadata.token_map.ring owners = list(cluster.metadata.token_map.token_to_host_owner[token] for token in ring) @@ -1276,7 +1276,7 @@ class TokenMetadataTest(unittest.TestCase): def test_token(self): expected_node_count = len(get_cluster().nodes) - cluster = TestCluster() + cluster = IntegrationTestCluster() cluster.connect() tmap = cluster.metadata.token_map self.assertTrue(issubclass(tmap.token_class, Token)) @@ -1289,7 +1289,7 @@ class KeyspaceAlterMetadata(unittest.TestCase): Test verifies that table metadata is preserved on keyspace alter """ def setUp(self): - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() name = self._testMethodName.lower() crt_ks = ''' @@ -1334,7 +1334,7 @@ def table_name(self): @classmethod def setup_class(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() try: if cls.keyspace_name in cls.cluster.metadata.keyspaces: @@ -1443,7 +1443,7 @@ def function_name(self): @classmethod def setup_class(cls): if PROTOCOL_VERSION >= 4: - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.keyspace_name = cls.__name__.lower() cls.session = cls.cluster.connect() cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) @@ -1726,7 +1726,7 @@ def test_init_cond(self): """ # This is required until the java driver bundled with C* is updated to support v4 - c = TestCluster(protocol_version=3) + c = IntegrationTestCluster(protocol_version=3) s = c.connect(self.keyspace_name) encoder = Encoder() @@ -1910,7 +1910,7 @@ def function_name(self): @classmethod def setup_class(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.keyspace_name = cls.__name__.lower() cls.session = cls.cluster.connect() cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index 676a5340ef..3393ab22db 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -27,7 +27,7 @@ from cassandra.protocol import SyntaxException from cassandra.cluster import NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT -from tests.integration import get_cluster, get_node, use_singledc, execute_until_pass, TestCluster +from tests.integration import get_cluster, get_node, use_singledc, execute_until_pass, IntegrationTestCluster from greplin import scales from tests.integration import BasicSharedKeyspaceUnitTestCaseRF3WM, BasicExistingKeyspaceUnitTestCase, local @@ -42,8 +42,8 @@ class MetricsTests(unittest.TestCase): def setUp(self): contact_point = ['127.0.0.2'] - self.cluster = TestCluster(contact_points=contact_point, metrics_enabled=True, - execution_profiles= + self.cluster = IntegrationTestCluster(contact_points=contact_point, metrics_enabled=True, + execution_profiles= {EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=HostFilterPolicy( @@ -51,7 +51,7 @@ def setUp(self): retry_policy=FallthroughRetryPolicy() ) } - ) + ) self.session = self.cluster.connect("test3rf", wait_for_all_pools=True) def tearDown(self): @@ -203,7 +203,7 @@ def test_metrics_per_cluster(self): @test_category metrics """ - cluster2 = TestCluster( + cluster2 = IntegrationTestCluster( metrics_enabled=True, execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(retry_policy=FallthroughRetryPolicy())} ) @@ -257,13 +257,13 @@ def test_duplicate_metrics_per_cluster(self): @test_category metrics """ - cluster2 = TestCluster( + cluster2 = IntegrationTestCluster( metrics_enabled=True, monitor_reporting_enabled=False, execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(retry_policy=FallthroughRetryPolicy())} ) - cluster3 = TestCluster( + cluster3 = IntegrationTestCluster( metrics_enabled=True, monitor_reporting_enabled=False, execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(retry_policy=FallthroughRetryPolicy())} diff --git a/tests/integration/standard/test_policies.py b/tests/integration/standard/test_policies.py index 24facf42a0..e4b2ee7f13 100644 --- a/tests/integration/standard/test_policies.py +++ b/tests/integration/standard/test_policies.py @@ -23,7 +23,7 @@ from cassandra.pool import Host from cassandra.connection import DefaultEndPoint -from tests.integration import local, use_singledc, TestCluster +from tests.integration import local, use_singledc, IntegrationTestCluster from concurrent.futures import wait as wait_futures @@ -55,9 +55,9 @@ def test_predicate_changes(self): hfp = ExecutionProfile( load_balancing_policy=HostFilterPolicy(RoundRobinPolicy(), predicate=predicate) ) - cluster = TestCluster(contact_points=(contact_point,), execution_profiles={EXEC_PROFILE_DEFAULT: hfp}, - topology_event_refresh_window=0, - status_event_refresh_window=0) + cluster = IntegrationTestCluster(contact_points=(contact_point,), execution_profiles={EXEC_PROFILE_DEFAULT: hfp}, + topology_event_refresh_window=0, + status_event_refresh_window=0) session = cluster.connect(wait_for_all_pools=True) queried_hosts = set() @@ -84,7 +84,7 @@ class WhiteListRoundRobinPolicyTests(unittest.TestCase): def test_only_connects_to_subset(self): only_connect_hosts = {"127.0.0.1", "127.0.0.2"} white_list = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(only_connect_hosts)) - cluster = TestCluster(execution_profiles={"white_list": white_list}) + cluster = IntegrationTestCluster(execution_profiles={"white_list": white_list}) #cluster = Cluster(load_balancing_policy=WhiteListRoundRobinPolicy(only_connect_hosts)) session = cluster.connect(wait_for_all_pools=True) queried_hosts = set() diff --git a/tests/integration/standard/test_prepared_statements.py b/tests/integration/standard/test_prepared_statements.py index 5c79f27346..947c5173fa 100644 --- a/tests/integration/standard/test_prepared_statements.py +++ b/tests/integration/standard/test_prepared_statements.py @@ -13,7 +13,7 @@ # limitations under the License. -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, IntegrationTestCluster try: import unittest2 as unittest @@ -43,7 +43,7 @@ def setUpClass(cls): cls.cass_version = get_server_versions() def setUp(self): - self.cluster = TestCluster(metrics_enabled=True, allow_beta_protocol_version=True) + self.cluster = IntegrationTestCluster(metrics_enabled=True, allow_beta_protocol_version=True) self.session = self.cluster.connect() def tearDown(self): @@ -515,7 +515,7 @@ def test_prepare_id_is_updated_across_session(self): @since 3.12 @jira_ticket PYTHON-808 """ - one_cluster = TestCluster(metrics_enabled=True) + one_cluster = IntegrationTestCluster(metrics_enabled=True) one_session = one_cluster.connect() self.addCleanup(one_cluster.shutdown) @@ -555,7 +555,7 @@ def test_id_is_not_updated_conditional_v4(self): @since 3.13 @jira_ticket PYTHON-847 """ - cluster = TestCluster(protocol_version=ProtocolVersion.V4) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.V4) session = cluster.connect() self.addCleanup(cluster.shutdown) self._test_updated_conditional(session, 9) @@ -569,7 +569,7 @@ def test_id_is_not_updated_conditional_v5(self): @since 3.13 @jira_ticket PYTHON-847 """ - cluster = TestCluster(protocol_version=ProtocolVersion.V5) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.V5) session = cluster.connect() self.addCleanup(cluster.shutdown) self._test_updated_conditional(session, 10) @@ -584,7 +584,7 @@ def test_id_is_not_updated_conditional_dsev1(self): @since 3.13 @jira_ticket PYTHON-847 """ - cluster = TestCluster(protocol_version=ProtocolVersion.DSE_V1) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.DSE_V1) session = cluster.connect() self.addCleanup(cluster.shutdown) self._test_updated_conditional(session, 10) @@ -599,7 +599,7 @@ def test_id_is_not_updated_conditional_dsev2(self): @since 3.13 @jira_ticket PYTHON-847 """ - cluster = TestCluster(protocol_version=ProtocolVersion.DSE_V2) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.DSE_V2) session = cluster.connect() self.addCleanup(cluster.shutdown) self._test_updated_conditional(session, 10) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index ea0e326ff5..1ca41f96ea 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -28,7 +28,7 @@ from cassandra.policies import HostDistance, RoundRobinPolicy, WhiteListRoundRobinPolicy from tests.integration import use_singledc, PROTOCOL_VERSION, BasicSharedKeyspaceUnitTestCase, \ greaterthanprotocolv3, MockLoggingHandler, get_supported_protocol_versions, local, get_cluster, setup_keyspace, \ - USE_CASS_EXTERNAL, greaterthanorequalcass40, DSE_VERSION, TestCluster, requirecassandra + USE_CASS_EXTERNAL, greaterthanorequalcass40, DSE_VERSION, IntegrationTestCluster, requirecassandra from tests import notwindows from tests.integration import greaterthanorequalcass30, get_node @@ -122,7 +122,7 @@ def test_trace_id_to_resultset(self): self.assertListEqual([rs_trace], rs.get_all_query_traces()) def test_trace_ignores_row_factory(self): - with TestCluster( + with IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) as cluster: s = cluster.connect() @@ -367,7 +367,7 @@ def test_host_targeting_query(self): class PreparedStatementTests(unittest.TestCase): def setUp(self): - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() def tearDown(self): @@ -523,7 +523,7 @@ def test_prepare_on_all_hosts(self): @jira_ticket PYTHON-556 @expected_result queries will have to re-prepared on hosts that aren't the control connection """ - clus = TestCluster(prepare_on_all_hosts=False, reprepare_on_up=False) + clus = IntegrationTestCluster(prepare_on_all_hosts=False, reprepare_on_up=False) self.addCleanup(clus.shutdown) session = clus.connect(wait_for_all_pools=True) @@ -543,7 +543,7 @@ def test_prepare_batch_statement(self): and the batch statement will be sent. """ policy = ForcedHostIndexPolicy() - clus = TestCluster( + clus = IntegrationTestCluster( execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=policy), }, @@ -587,7 +587,7 @@ def test_prepare_batch_statement_after_alter(self): @expected_result queries will have to re-prepared on hosts that aren't the control connection and the batch statement will be sent. """ - clus = TestCluster(prepare_on_all_hosts=False, reprepare_on_up=False) + clus = IntegrationTestCluster(prepare_on_all_hosts=False, reprepare_on_up=False) self.addCleanup(clus.shutdown) table = "test3rf.%s" % self._testMethodName.lower() @@ -646,7 +646,7 @@ def test_prepared_statement(self): Highlight the difference between Prepared and Bound statements """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() prepared = session.prepare('INSERT INTO test3rf.test (k, v) VALUES (?, ?)') @@ -670,7 +670,7 @@ def setUp(self): "Protocol 2.0+ is required for BATCH operations, currently testing against %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() if PROTOCOL_VERSION < 3: self.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) self.session = self.cluster.connect(wait_for_all_pools=True) @@ -801,7 +801,7 @@ def setUp(self): "Protocol 2.0+ is required for Serial Consistency, currently testing against %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() if PROTOCOL_VERSION < 3: self.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) self.session = self.cluster.connect() @@ -893,7 +893,7 @@ def setUp(self): % (PROTOCOL_VERSION,)) serial_profile = ExecutionProfile(consistency_level=ConsistencyLevel.SERIAL) - self.cluster = TestCluster(execution_profiles={'serial': serial_profile}) + self.cluster = IntegrationTestCluster(execution_profiles={'serial': serial_profile}) self.session = self.cluster.connect() ddl = ''' @@ -1078,7 +1078,7 @@ def setUp(self): raise unittest.SkipTest( "Protocol 2.0+ is required for BATCH operations, currently testing against %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() query = """ INSERT INTO test3rf.test (k, v) VALUES (?, ?) @@ -1353,7 +1353,7 @@ def test_unicode(self): class BaseKeyspaceTests(): @classmethod def setUpClass(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect(wait_for_all_pools=True) cls.ks_name = cls.__name__.lower() @@ -1421,7 +1421,7 @@ def test_setting_keyspace_and_session(self): @test_category query """ - cluster = TestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True) session = cluster.connect(self.alternative_ks) self.addCleanup(cluster.shutdown) @@ -1438,7 +1438,7 @@ def test_setting_keyspace_and_session_after_created(self): @test_category query """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect() self.addCleanup(cluster.shutdown) @@ -1456,7 +1456,7 @@ def test_setting_keyspace_and_same_session(self): @test_category query """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = cluster.connect(self.ks_name) self.addCleanup(cluster.shutdown) @@ -1467,7 +1467,7 @@ def test_setting_keyspace_and_same_session(self): class SimpleWithKeyspaceTests(QueryKeyspaceTests, unittest.TestCase): @unittest.skip def test_lower_protocol(self): - cluster = TestCluster(protocol_version=ProtocolVersion.V4) + cluster = IntegrationTestCluster(protocol_version=ProtocolVersion.V4) session = cluster.connect(self.ks_name) self.addCleanup(cluster.shutdown) @@ -1521,7 +1521,7 @@ def confirm_results(self): class PreparedWithKeyspaceTests(BaseKeyspaceTests, unittest.TestCase): def setUp(self): - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() def tearDown(self): @@ -1597,7 +1597,7 @@ def test_prepared_not_found(self): @test_category query """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = self.cluster.connect("system") self.addCleanup(cluster.shutdown) @@ -1619,7 +1619,7 @@ def test_prepared_in_query_keyspace(self): @test_category query """ - cluster = TestCluster() + cluster = IntegrationTestCluster() session = self.cluster.connect() self.addCleanup(cluster.shutdown) diff --git a/tests/integration/standard/test_query_paging.py b/tests/integration/standard/test_query_paging.py index dac4ec5ce3..8a3473f05c 100644 --- a/tests/integration/standard/test_query_paging.py +++ b/tests/integration/standard/test_query_paging.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster +from tests.integration import use_singledc, PROTOCOL_VERSION, IntegrationTestCluster import logging log = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def setUp(self): "Protocol 2.0+ is required for Paging state, currently testing against %r" % (PROTOCOL_VERSION,)) - self.cluster = TestCluster( + self.cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(consistency_level=ConsistencyLevel.LOCAL_QUORUM)} ) if PROTOCOL_VERSION < 3: diff --git a/tests/integration/standard/test_routing.py b/tests/integration/standard/test_routing.py index e1dabba49a..0dc9755ea6 100644 --- a/tests/integration/standard/test_routing.py +++ b/tests/integration/standard/test_routing.py @@ -21,7 +21,7 @@ import logging log = logging.getLogger(__name__) -from tests.integration import use_singledc, TestCluster +from tests.integration import use_singledc, IntegrationTestCluster def setup_module(): @@ -36,7 +36,7 @@ def cfname(self): @classmethod def setup_class(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect('test1rf') @classmethod diff --git a/tests/integration/standard/test_row_factories.py b/tests/integration/standard/test_row_factories.py index 93f25d9276..8e09a62618 100644 --- a/tests/integration/standard/test_row_factories.py +++ b/tests/integration/standard/test_row_factories.py @@ -13,7 +13,7 @@ # limitations under the License. from tests.integration import get_server_versions, use_singledc, \ - BasicSharedKeyspaceUnitTestCaseWFunctionTable, BasicSharedKeyspaceUnitTestCase, execute_until_pass, TestCluster + BasicSharedKeyspaceUnitTestCaseWFunctionTable, BasicSharedKeyspaceUnitTestCase, execute_until_pass, IntegrationTestCluster try: import unittest2 as unittest @@ -87,7 +87,7 @@ def setUpClass(cls): cls.select = "SELECT * FROM {0}.{1}".format(cls.ks_name, cls.ks_name) def _results_from_row_factory(self, row_factory): - cluster = TestCluster( + cluster = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=row_factory)} ) with cluster: @@ -176,7 +176,7 @@ class NamedTupleFactoryAndNumericColNamesTests(unittest.TestCase): """ @classmethod def setup_class(cls): - cls.cluster = TestCluster() + cls.cluster = IntegrationTestCluster() cls.session = cls.cluster.connect() cls._cass_version, cls._cql_version = get_server_versions() ddl = ''' @@ -213,7 +213,7 @@ def test_can_select_with_dict_factory(self): """ can SELECT numeric column using dict_factory """ - with TestCluster( + with IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) as cluster: try: diff --git a/tests/integration/standard/test_single_interface.py b/tests/integration/standard/test_single_interface.py index 91451a52a0..c26989ebf7 100644 --- a/tests/integration/standard/test_single_interface.py +++ b/tests/integration/standard/test_single_interface.py @@ -25,7 +25,7 @@ from packaging.version import Version from tests.integration import use_singledc, PROTOCOL_VERSION, \ remove_cluster, greaterthanorequalcass40, notdse, \ - CASSANDRA_VERSION, DSE_VERSION, TestCluster + CASSANDRA_VERSION, DSE_VERSION, IntegrationTestCluster def setup_module(): @@ -42,7 +42,7 @@ def teardown_module(): class SingleInterfaceTest(unittest.TestCase): def setUp(self): - self.cluster = TestCluster() + self.cluster = IntegrationTestCluster() self.session = self.cluster.connect() def tearDown(self): diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index 0592b7d737..b659f406f6 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -34,7 +34,7 @@ from tests.integration import use_singledc, execute_until_pass, notprotocolv1, \ BasicSharedKeyspaceUnitTestCase, greaterthancass21, lessthancass30, greaterthanorequaldse51, \ - DSE_VERSION, greaterthanorequalcass3_10, requiredse, TestCluster + DSE_VERSION, greaterthanorequalcass3_10, requiredse, IntegrationTestCluster from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, COLLECTION_TYPES, PRIMITIVE_DATATYPES_KEYS, \ get_sample, get_all_samples, get_collection_sample @@ -136,7 +136,7 @@ def test_can_insert_primitive_datatypes(self): """ Test insertion of all datatype primitives """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name) # create table @@ -217,7 +217,7 @@ def test_can_insert_collection_datatypes(self): Test insertion of all collection types """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name) # use tuple encoding, to convert native python tuple into raw CQL s.encoder.mapping[tuple] = s.encoder.cql_encode_tuple @@ -449,7 +449,7 @@ def test_can_insert_tuples(self): if self.cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name) # use this encoder in order to insert tuples @@ -501,7 +501,7 @@ def test_can_insert_tuples_with_varying_lengths(self): if self.cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") - c = TestCluster( + c = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) s = c.connect(self.keyspace_name) @@ -540,7 +540,7 @@ def test_can_insert_tuples_all_primitive_datatypes(self): if self.cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name) s.encoder.mapping[tuple] = s.encoder.cql_encode_tuple @@ -568,7 +568,7 @@ def test_can_insert_tuples_all_collection_datatypes(self): if self.cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") - c = TestCluster( + c = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) s = c.connect(self.keyspace_name) @@ -667,7 +667,7 @@ def test_can_insert_nested_tuples(self): if self.cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") - c = TestCluster( + c = IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) s = c.connect(self.keyspace_name) @@ -1279,7 +1279,7 @@ def test_nested_types_with_protocol_version(self): self.read_inserts_at_level(pvr) def read_inserts_at_level(self, proto_ver): - session = TestCluster(protocol_version=proto_ver).connect(self.keyspace_name) + session = IntegrationTestCluster(protocol_version=proto_ver).connect(self.keyspace_name) try: results = session.execute('select * from t')[0] self.assertEqual("[SortedSet([1, 2]), SortedSet([3, 5])]", str(results.v)) @@ -1297,7 +1297,7 @@ def read_inserts_at_level(self, proto_ver): session.cluster.shutdown() def run_inserts_at_version(self, proto_ver): - session = TestCluster(protocol_version=proto_ver).connect(self.keyspace_name) + session = IntegrationTestCluster(protocol_version=proto_ver).connect(self.keyspace_name) try: p = session.prepare('insert into t (k, v) values (?, ?)') session.execute(p, (0, [{1, 2}, {3, 5}])) diff --git a/tests/integration/standard/test_udts.py b/tests/integration/standard/test_udts.py index 6d9676f25e..4f3cc2547c 100644 --- a/tests/integration/standard/test_udts.py +++ b/tests/integration/standard/test_udts.py @@ -27,7 +27,7 @@ from cassandra.util import OrderedMap from tests.integration import use_singledc, execute_until_pass, \ - BasicSegregatedKeyspaceUnitTestCase, greaterthancass20, lessthancass30, greaterthanorequalcass36, TestCluster + BasicSegregatedKeyspaceUnitTestCase, greaterthancass20, lessthancass30, greaterthanorequalcass36, IntegrationTestCluster from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, PRIMITIVE_DATATYPES_KEYS, \ COLLECTION_TYPES, get_sample, get_collection_sample @@ -79,7 +79,7 @@ def test_can_insert_unprepared_registered_udts(self): Test the insertion of unprepared, registered UDTs """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) s.execute("CREATE TYPE user (age int, name text)") @@ -123,7 +123,7 @@ def test_can_register_udt_before_connecting(self): Test the registration of UDTs before session creation """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(wait_for_all_pools=True) s.execute(""" @@ -144,7 +144,7 @@ def test_can_register_udt_before_connecting(self): # now that types are defined, shutdown and re-create Cluster c.shutdown() - c = TestCluster() + c = IntegrationTestCluster() User1 = namedtuple('user', ('age', 'name')) User2 = namedtuple('user', ('state', 'is_cool')) @@ -181,7 +181,7 @@ def test_can_insert_prepared_unregistered_udts(self): Test the insertion of prepared, unregistered UDTs """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) s.execute("CREATE TYPE user (age int, name text)") @@ -225,7 +225,7 @@ def test_can_insert_prepared_registered_udts(self): Test the insertion of prepared, registered UDTs """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) s.execute("CREATE TYPE user (age int, name text)") @@ -275,7 +275,7 @@ def test_can_insert_udts_with_nulls(self): Test the insertion of UDTs with null and empty string fields """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) s.execute("CREATE TYPE user (a text, b int, c uuid, d blob)") @@ -305,7 +305,7 @@ def test_can_insert_udts_with_varying_lengths(self): Test for ensuring extra-lengthy udts are properly inserted """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) max_test_length = 254 @@ -385,7 +385,7 @@ def nested_udt_verification_helper(self, session, max_nesting_depth, udts): self.assertEqual(udt, result["v_{0}".format(i)]) def _cluster_default_dict_factory(self): - return TestCluster( + return IntegrationTestCluster( execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(row_factory=dict_factory)} ) @@ -486,7 +486,7 @@ def test_raise_error_on_nonexisting_udts(self): Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) User = namedtuple('user', ('age', 'name')) @@ -506,7 +506,7 @@ def test_can_insert_udt_all_datatypes(self): Test for inserting various types of PRIMITIVE_DATATYPES into UDT's """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) # create UDT @@ -551,7 +551,7 @@ def test_can_insert_udt_all_collection_datatypes(self): Test for inserting various types of COLLECTION_TYPES into UDT's """ - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) # create UDT @@ -618,7 +618,7 @@ def test_can_insert_nested_collections(self): if self.cass_version < (2, 1, 3): raise unittest.SkipTest("Support for nested collections was introduced in Cassandra 2.1.3") - c = TestCluster() + c = IntegrationTestCluster() s = c.connect(self.keyspace_name, wait_for_all_pools=True) s.encoder.mapping[tuple] = s.encoder.cql_encode_tuple