From 36d9d21b950921a1e10878ab4e63f8513cd1501d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Fri, 22 Sep 2017 15:18:14 +0200 Subject: [PATCH 1/8] Add `disconnect()` method for resetting the connection pool --- README.rst | 5 +++++ flask_redis_sentinel.py | 19 +++++++++++++++++++ setup.py | 2 +- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 17891f8..db26e66 100644 --- a/README.rst +++ b/README.rst @@ -87,6 +87,11 @@ Accessing redis-py's Sentinel instance Change log ---------- +v2.1.0 +~~~~~~ + +* Added `disconnect()` method for resetting the connection pool + v2.0.1 ~~~~~~ diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index d8d9f7e..085c486 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -20,6 +20,7 @@ from flask import current_app from werkzeug.local import Local, LocalProxy from werkzeug.utils import import_string +from six.moves import queue _EXTENSION_KEY = 'redissentinel' @@ -34,6 +35,7 @@ def __init__(self, url, client_class, client_options, sentinel_class, sentinel_o self.sentinel_class = sentinel_class self.sentinel_options = sentinel_options self.local = Local() + self._connections = queue.Queue() self._connect() if self.local.connection[0] is None: # if there is no sentinel, we don't need to use thread-local storage @@ -49,6 +51,8 @@ def _connect(self): sentinel_class=self.sentinel_class, sentinel_options=self.sentinel_options, client_class=self.client_class, client_options=self.client_options) self.local.connection = conn + self._connections.put(conn[0]) + self._connections.put(conn[1]) return conn @property @@ -74,6 +78,7 @@ def master_for(self, service_name, **kwargs): conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs) self.local.master_connections[service_name] = conn + self._connections.put(conn) return conn def slave_for(self, service_name, **kwargs): @@ -91,8 +96,19 @@ def slave_for(self, service_name, **kwargs): conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs) self.local.slave_connections[service_name] = conn + self._connections.put(conn) return conn + def disconnect(self): + while True: + try: + conn = self._connections.get_nowait() + except queue.Empty: + break + else: + if conn is not None: + conn.connection_pool.disconnect() + class RedisSentinel(object): """Flask extension that supports connections to master using Redis Sentinel. @@ -176,5 +192,8 @@ def master_for(self, service_name, **kwargs): def slave_for(self, service_name, **kwargs): return LocalProxy(lambda: self.get_instance().slave_for(service_name, **kwargs)) + def disconnect(self): + return self.get_instance().disconnect() + SentinelExtension = RedisSentinel # for backwards-compatibility diff --git a/setup.py b/setup.py index d1572cc..15b2d3f 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ def read(fname): setup( name='Flask-Redis-Sentinel', py_modules=['flask_redis_sentinel'], - version='2.0.1', + version='2.1.0', install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0', 'six'], description='Redis-Sentinel integration for Flask', long_description=read('README.rst'), From 0168b44c625223aead871a854a7a17f395661f1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Sat, 30 Sep 2017 15:55:48 +0200 Subject: [PATCH 2/8] Thread-safe connection pool https://github.com/andymccurdy/redis-py/pull/909 --- README.rst | 1 + flask_redis_sentinel.py | 253 +++++++++++++++++++++++++++++------ test_flask_redis_sentinel.py | 4 +- 3 files changed, 213 insertions(+), 45 deletions(-) diff --git a/README.rst b/README.rst index db26e66..49c1815 100644 --- a/README.rst +++ b/README.rst @@ -90,6 +90,7 @@ Change log v2.1.0 ~~~~~~ +* Thread-safe implementation of the sentinel connection pool, so only one pool per process is now used. * Added `disconnect()` method for resetting the connection pool v2.0.1 diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index 085c486..904df19 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -14,18 +14,175 @@ import six import inspect +import random +import threading +import logging +import weakref import redis import redis.sentinel import redis_sentinel_url +from redis._compat import nativestr from flask import current_app -from werkzeug.local import Local, LocalProxy +from redis.exceptions import ConnectionError, ReadOnlyError +from werkzeug.local import LocalProxy from werkzeug.utils import import_string -from six.moves import queue + +logger = logging.getLogger(__name__) _EXTENSION_KEY = 'redissentinel' +class SentinelManagedConnection(redis.Connection): + def __init__(self, **kwargs): + self.connection_pool = kwargs.pop('connection_pool') + super(SentinelManagedConnection, self).__init__(**kwargs) + + def __repr__(self): + pool = self.connection_pool + s = '%s' % (type(self).__name__, pool.service_name) + if self.host: + host_info = ',host=%s,port=%s' % (self.host, self.port) + s = s % host_info + return s + + def connect_to(self, address): + self.host, self.port = address + super(SentinelManagedConnection, self).connect() + if self.connection_pool.check_connection: + self.send_command('PING') + if nativestr(self.read_response()) != 'PONG': + raise ConnectionError('PING failed') + + def connect(self): + if self._sock: + return # already connected + if self.connection_pool.is_master: + self.connect_to(self.connection_pool.get_master_address()) + else: + for slave in self.connection_pool.rotate_slaves(): + try: + return self.connect_to(slave) + except ConnectionError: + continue + raise SlaveNotFoundError # Never be here + + def read_response(self): + try: + return super(SentinelManagedConnection, self).read_response() + except ReadOnlyError: + if self.connection_pool.is_master: + # When talking to a master, a ReadOnlyError when likely + # indicates that the previous master that we're still connected + # to has been demoted to a slave and there's a new master. + raise ConnectionError('The previous master is now a slave') + raise + + +class SentinelConnectionPool(redis.ConnectionPool): + """ + Sentinel backed connection pool. + + If ``check_connection`` flag is set to True, SentinelManagedConnection + sends a PING command right after establishing the connection. + """ + + def __init__(self, service_name, sentinel_manager, **kwargs): + kwargs['connection_class'] = kwargs.get( + 'connection_class', SentinelManagedConnection) + self.is_master = kwargs.pop('is_master', True) + self.check_connection = kwargs.pop('check_connection', False) + super(SentinelConnectionPool, self).__init__(**kwargs) + self.connection_kwargs['connection_pool'] = weakref.proxy(self) + self.service_name = service_name + self.sentinel_manager = sentinel_manager + + def __repr__(self): + return "%s Date: Sat, 30 Sep 2017 17:19:04 +0200 Subject: [PATCH 3/8] Faster way to detect master change --- flask_redis_sentinel.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index 904df19..56d6757 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -36,6 +36,7 @@ class SentinelManagedConnection(redis.Connection): def __init__(self, **kwargs): self.connection_pool = kwargs.pop('connection_pool') + self.to_be_disconnected = False super(SentinelManagedConnection, self).__init__(**kwargs) def __repr__(self): @@ -75,6 +76,7 @@ def read_response(self): # When talking to a master, a ReadOnlyError when likely # indicates that the previous master that we're still connected # to has been demoted to a slave and there's a new master. + self.to_be_disconnected = True raise ConnectionError('The previous master is now a slave') raise @@ -137,10 +139,14 @@ def rotate_slaves(self): raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) def _check_connection(self, connection): - if self.is_master and self.master_address != (connection.host, connection.port): - # this is not a connection to the current master, stop using it + if connection.to_be_disconnected: connection.disconnect() + self.get_master_address() return False + if self.is_master: + if self.master_address != (connection.host, connection.port): + connection.disconnect() + return False return True def get_connection(self, command_name, *keys, **options): From b308dbdd04b1f411b30bfee56a67f299b96c662f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Mon, 2 Oct 2017 16:28:03 +0200 Subject: [PATCH 4/8] Misc fixes --- flask_redis_sentinel.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index 56d6757..af68c98 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -73,7 +73,7 @@ def read_response(self): return super(SentinelManagedConnection, self).read_response() except ReadOnlyError: if self.connection_pool.is_master: - # When talking to a master, a ReadOnlyError when likely + # When talking to a master, a ReadOnlyError likely # indicates that the previous master that we're still connected # to has been demoted to a slave and there's a new master. self.to_be_disconnected = True @@ -121,7 +121,7 @@ def get_master_address(self): return master_address def rotate_slaves(self): - "Round-robin slave balancer" + """Round-robin slave balancer""" slaves = self.sentinel_manager.discover_slaves(self.service_name) if slaves: if self.slave_rr_counter is None: @@ -136,7 +136,7 @@ def rotate_slaves(self): yield self.get_master_address() except MasterNotFoundError: pass - raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) + raise SlaveNotFoundError('No slave found for %r' % (self.service_name,)) def _check_connection(self, connection): if connection.to_be_disconnected: @@ -150,7 +150,7 @@ def _check_connection(self, connection): return True def get_connection(self, command_name, *keys, **options): - "Get a connection from the pool" + """Get a connection from the pool""" self._checkpid() while True: try: @@ -164,7 +164,7 @@ def get_connection(self, command_name, *keys, **options): return connection def release(self, connection): - "Releases the connection back to the pool" + """Releases the connection back to the pool""" self._checkpid() if connection.pid != self.pid: return From 72d331ea3e40ee8e75c9336558032a4431507deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Mon, 2 Oct 2017 16:28:54 +0200 Subject: [PATCH 5/8] Add missing imports of sentinel exceptions --- flask_redis_sentinel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index af68c98..9bff5d2 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -24,6 +24,7 @@ from redis._compat import nativestr from flask import current_app from redis.exceptions import ConnectionError, ReadOnlyError +from redis.sentinel import SlaveNotFoundError, MasterNotFoundError from werkzeug.local import LocalProxy from werkzeug.utils import import_string From 04973dfe8f8d604f6dde48bb602ca0b8d23986fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Wed, 4 Oct 2017 14:28:10 +0200 Subject: [PATCH 6/8] Remove the copied code from patched redis-py --- README.rst | 4 +- flask_redis_sentinel.py | 158 +--------------------------------------- 2 files changed, 4 insertions(+), 158 deletions(-) diff --git a/README.rst b/README.rst index 49c1815..c2b5a01 100644 --- a/README.rst +++ b/README.rst @@ -90,7 +90,9 @@ Change log v2.1.0 ~~~~~~ -* Thread-safe implementation of the sentinel connection pool, so only one pool per process is now used. +* Removed the thread-local variable for sentinel connection pool. If you want + to use sentinel with multiple threads, you need to use a patched + version of redis-py. * Added `disconnect()` method for resetting the connection pool v2.0.1 diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index 9bff5d2..a7bedcf 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -34,162 +34,6 @@ _EXTENSION_KEY = 'redissentinel' -class SentinelManagedConnection(redis.Connection): - def __init__(self, **kwargs): - self.connection_pool = kwargs.pop('connection_pool') - self.to_be_disconnected = False - super(SentinelManagedConnection, self).__init__(**kwargs) - - def __repr__(self): - pool = self.connection_pool - s = '%s' % (type(self).__name__, pool.service_name) - if self.host: - host_info = ',host=%s,port=%s' % (self.host, self.port) - s = s % host_info - return s - - def connect_to(self, address): - self.host, self.port = address - super(SentinelManagedConnection, self).connect() - if self.connection_pool.check_connection: - self.send_command('PING') - if nativestr(self.read_response()) != 'PONG': - raise ConnectionError('PING failed') - - def connect(self): - if self._sock: - return # already connected - if self.connection_pool.is_master: - self.connect_to(self.connection_pool.get_master_address()) - else: - for slave in self.connection_pool.rotate_slaves(): - try: - return self.connect_to(slave) - except ConnectionError: - continue - raise SlaveNotFoundError # Never be here - - def read_response(self): - try: - return super(SentinelManagedConnection, self).read_response() - except ReadOnlyError: - if self.connection_pool.is_master: - # When talking to a master, a ReadOnlyError likely - # indicates that the previous master that we're still connected - # to has been demoted to a slave and there's a new master. - self.to_be_disconnected = True - raise ConnectionError('The previous master is now a slave') - raise - - -class SentinelConnectionPool(redis.ConnectionPool): - """ - Sentinel backed connection pool. - - If ``check_connection`` flag is set to True, SentinelManagedConnection - sends a PING command right after establishing the connection. - """ - - def __init__(self, service_name, sentinel_manager, **kwargs): - kwargs['connection_class'] = kwargs.get( - 'connection_class', SentinelManagedConnection) - self.is_master = kwargs.pop('is_master', True) - self.check_connection = kwargs.pop('check_connection', False) - super(SentinelConnectionPool, self).__init__(**kwargs) - self.connection_kwargs['connection_pool'] = weakref.proxy(self) - self.service_name = service_name - self.sentinel_manager = sentinel_manager - - def __repr__(self): - return "%s Date: Wed, 4 Oct 2017 14:28:16 +0200 Subject: [PATCH 7/8] Use flake8 --- tox.ini | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index b0cf6c0..0b5c48f 100644 --- a/tox.ini +++ b/tox.ini @@ -5,4 +5,7 @@ deps = mock nose coverage -commands=nosetests --with-coverage --cover-package=flask_redis_sentinel + flake8 +commands= + flake8 flask_redis_sentinel + nosetests --with-coverage --cover-package=flask_redis_sentinel From 1b81bdfeb58e6fd8b1e30917f6c333e7ea3f5c5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Lalinsk=C3=BD?= Date: Wed, 4 Oct 2017 15:38:41 +0200 Subject: [PATCH 8/8] PEP8 fixes --- flask_redis_sentinel.py | 7 +------ setup.py | 3 ++- test_flask_redis_sentinel.py | 10 +++++++--- tox.ini | 7 ++++++- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index a7bedcf..abd4dce 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -13,18 +13,13 @@ # limitations under the License. import six +import logging import inspect -import random import threading -import logging -import weakref import redis import redis.sentinel import redis_sentinel_url -from redis._compat import nativestr from flask import current_app -from redis.exceptions import ConnectionError, ReadOnlyError -from redis.sentinel import SlaveNotFoundError, MasterNotFoundError from werkzeug.local import LocalProxy from werkzeug.utils import import_string diff --git a/setup.py b/setup.py index 15b2d3f..cab55a7 100644 --- a/setup.py +++ b/setup.py @@ -3,9 +3,11 @@ import os from setuptools import setup + def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() + setup( name='Flask-Redis-Sentinel', py_modules=['flask_redis_sentinel'], @@ -33,4 +35,3 @@ def read(fname): 'Topic :: Software Development :: Libraries :: Python Modules' ] ) - diff --git a/test_flask_redis_sentinel.py b/test_flask_redis_sentinel.py index ef44394..f4b5148 100644 --- a/test_flask_redis_sentinel.py +++ b/test_flask_redis_sentinel.py @@ -324,6 +324,9 @@ def test_mixed_apps(self): sentinel2 = SentinelExtension(app=self.app2, config_prefix='CUSTOM_REDIS', client_class=FakeRedis) conn2 = sentinel2.default_connection + with self.app2.app_context(): + conn2._get_current_object() + self.app3 = Flask('test3') with self.app2.app_context(): @@ -357,8 +360,9 @@ def test_named_master_no_sentinel(self): conn = sentinel.master_for('othermaster', db=6) with self.app.app_context(): self.assertIsNone(sentinel.sentinel._get_current_object()) - with self.assertRaisesRegexp(RuntimeError, 'Cannot get master othermaster using non-sentinel configuration'): - inst = conn._get_current_object() + msg = 'Cannot get master othermaster using non-sentinel configuration' + with self.assertRaisesRegexp(RuntimeError, msg): + conn._get_current_object() def test_named_slave(self): sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel) @@ -382,4 +386,4 @@ def test_named_slave_no_sentinel(self): with self.app.app_context(): self.assertIsNone(sentinel.sentinel._get_current_object()) with self.assertRaisesRegexp(RuntimeError, 'Cannot get slave otherslave using non-sentinel configuration'): - inst = conn._get_current_object() + conn._get_current_object() diff --git a/tox.ini b/tox.ini index 0b5c48f..d9a6fac 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,6 @@ [tox] envlist = py27,py33,py34,py35,py36,pypy + [testenv] deps = mock @@ -7,5 +8,9 @@ deps = coverage flake8 commands= - flake8 flask_redis_sentinel + flake8 nosetests --with-coverage --cover-package=flask_redis_sentinel + +[flake8] +max-line-length = 120 +exclude=env,venv,.tox,.idea