diff --git a/gcloud/bigtable/happybase/pool.py b/gcloud/bigtable/happybase/pool.py new file mode 100644 index 000000000000..396046af3b48 --- /dev/null +++ b/gcloud/bigtable/happybase/pool.py @@ -0,0 +1,73 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google Cloud Bigtable HappyBase pool module.""" + + +import threading + +import six + +from gcloud.bigtable.happybase.connection import Connection +from gcloud.bigtable.happybase.connection import _get_cluster + + +_MIN_POOL_SIZE = 1 +"""Minimum allowable size of a connection pool.""" + + +class ConnectionPool(object): + """Thread-safe connection pool. + + .. note:: + + All keyword arguments are passed unmodified to the + :class:`.Connection` constructor **except** for ``autoconnect``. + This is because the ``open`` / ``closed`` status of a connection + is managed by the pool. In addition, if ``cluster`` is not passed, + the default / inferred cluster is determined by the pool and then + passed to each :class:`.Connection` that is created. + + :type size: int + :param size: The maximum number of concurrently open connections. + + :type kwargs: dict + :param kwargs: Keyword arguments passed to :class:`.Connection` + constructor. + + :raises: :class:`TypeError ` if ``size`` + is non an integer. + :class:`ValueError ` if ``size`` + is not positive. + """ + def __init__(self, size, **kwargs): + if not isinstance(size, six.integer_types): + raise TypeError('Pool size arg must be an integer') + + if size < _MIN_POOL_SIZE: + raise ValueError('Pool size must be positive') + + self._lock = threading.Lock() + self._queue = six.moves.queue.LifoQueue(maxsize=size) + self._thread_connections = threading.local() + + connection_kwargs = kwargs + connection_kwargs['autoconnect'] = False + if 'cluster' not in connection_kwargs: + connection_kwargs['cluster'] = _get_cluster( + timeout=kwargs.get('timeout')) + + for _ in six.moves.range(size): + connection = Connection(**connection_kwargs) + self._queue.put(connection) diff --git a/gcloud/bigtable/happybase/test_pool.py b/gcloud/bigtable/happybase/test_pool.py new file mode 100644 index 000000000000..17dcee0acb5e --- /dev/null +++ b/gcloud/bigtable/happybase/test_pool.py @@ -0,0 +1,163 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest2 + + +class TestConnectionPool(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.bigtable.happybase.pool import ConnectionPool + return ConnectionPool + + def _makeOne(self, *args, **kwargs): + return self._getTargetClass()(*args, **kwargs) + + def test_constructor_defaults(self): + import six + import threading + from gcloud.bigtable.happybase.connection import Connection + + size = 11 + cluster_copy = _Cluster() + all_copies = [cluster_copy] * size + cluster = _Cluster(copies=all_copies) # Avoid implicit environ check. + pool = self._makeOne(size, cluster=cluster) + + self.assertTrue(isinstance(pool._lock, type(threading.Lock()))) + self.assertTrue(isinstance(pool._thread_connections, threading.local)) + self.assertEqual(pool._thread_connections.__dict__, {}) + + queue = pool._queue + self.assertTrue(isinstance(queue, six.moves.queue.LifoQueue)) + self.assertTrue(queue.full()) + self.assertEqual(queue.maxsize, size) + for connection in queue.queue: + self.assertTrue(isinstance(connection, Connection)) + self.assertTrue(connection._cluster is cluster_copy) + + def test_constructor_passes_kwargs(self): + table_prefix = 'foo' + table_prefix_separator = '<>' + cluster = _Cluster() # Avoid implicit environ check. + + size = 1 + pool = self._makeOne(size, table_prefix=table_prefix, + table_prefix_separator=table_prefix_separator, + cluster=cluster) + + for connection in pool._queue.queue: + self.assertEqual(connection.table_prefix, table_prefix) + self.assertEqual(connection.table_prefix_separator, + table_prefix_separator) + + def test_constructor_ignores_autoconnect(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase.connection import Connection + from gcloud.bigtable.happybase import pool as MUT + + class ConnectionWithOpen(Connection): + + _open_called = False + + def open(self): + self._open_called = True + + # First make sure the custom Connection class does as expected. + cluster_copy1 = _Cluster() + cluster_copy2 = _Cluster() + cluster_copy3 = _Cluster() + cluster = _Cluster( + copies=[cluster_copy1, cluster_copy2, cluster_copy3]) + connection = ConnectionWithOpen(autoconnect=False, cluster=cluster) + self.assertFalse(connection._open_called) + self.assertTrue(connection._cluster is cluster_copy1) + connection = ConnectionWithOpen(autoconnect=True, cluster=cluster) + self.assertTrue(connection._open_called) + self.assertTrue(connection._cluster is cluster_copy2) + + # Then make sure autoconnect=True is ignored in a pool. + size = 1 + with _Monkey(MUT, Connection=ConnectionWithOpen): + pool = self._makeOne(size, autoconnect=True, cluster=cluster) + + for connection in pool._queue.queue: + self.assertTrue(isinstance(connection, ConnectionWithOpen)) + self.assertTrue(connection._cluster is cluster_copy3) + self.assertFalse(connection._open_called) + + def test_constructor_infers_cluster(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase.connection import Connection + from gcloud.bigtable.happybase import pool as MUT + + size = 1 + cluster_copy = _Cluster() + all_copies = [cluster_copy] * size + cluster = _Cluster(copies=all_copies) + get_cluster_calls = [] + + def mock_get_cluster(timeout=None): + get_cluster_calls.append(timeout) + return cluster + + with _Monkey(MUT, _get_cluster=mock_get_cluster): + pool = self._makeOne(size) + + for connection in pool._queue.queue: + self.assertTrue(isinstance(connection, Connection)) + # We know that the Connection() constructor will + # call cluster.copy(). + self.assertTrue(connection._cluster is cluster_copy) + + self.assertEqual(get_cluster_calls, [None]) + + def test_constructor_non_integer_size(self): + size = None + with self.assertRaises(TypeError): + self._makeOne(size) + + def test_constructor_non_positive_size(self): + size = -10 + with self.assertRaises(ValueError): + self._makeOne(size) + size = 0 + with self.assertRaises(ValueError): + self._makeOne(size) + + +class _Client(object): + + def __init__(self): + self.stop_calls = 0 + + def stop(self): + self.stop_calls += 1 + + +class _Cluster(object): + + def __init__(self, copies=()): + self.copies = list(copies) + # Included to support Connection.__del__ + self._client = _Client() + + def copy(self): + if self.copies: + result = self.copies[0] + self.copies[:] = self.copies[1:] + return result + else: + return self diff --git a/scripts/run_pylint.py b/scripts/run_pylint.py index f78da26e59e9..bfd8cf020f01 100644 --- a/scripts/run_pylint.py +++ b/scripts/run_pylint.py @@ -43,6 +43,9 @@ PRODUCTION_RC = os.path.join(SCRIPTS_DIR, 'pylintrc_default') TEST_RC = os.path.join(SCRIPTS_DIR, 'pylintrc_reduced') TEST_DISABLED_MESSAGES = [ + 'abstract-method', + 'arguments-differ', + 'assignment-from-no-return', 'attribute-defined-outside-init', 'exec-used', 'import-error', @@ -55,8 +58,6 @@ 'too-many-locals', 'too-many-public-methods', 'unbalanced-tuple-unpacking', - 'arguments-differ', - 'assignment-from-no-return', ] TEST_RC_ADDITIONS = { 'MESSAGES CONTROL': { diff --git a/scripts/verify_included_modules.py b/scripts/verify_included_modules.py index 8c328f7f3188..2eacc7eb9954 100644 --- a/scripts/verify_included_modules.py +++ b/scripts/verify_included_modules.py @@ -34,6 +34,7 @@ 'gcloud.bigtable.cluster', 'gcloud.bigtable.column_family', 'gcloud.bigtable.happybase.connection', + 'gcloud.bigtable.happybase.pool', 'gcloud.bigtable.happybase.table', 'gcloud.bigtable.row', 'gcloud.bigtable.row_data',