diff --git a/redis/sentinel.py b/redis/sentinel.py index 518fec582f..5f1eb44974 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -20,6 +20,7 @@ class SlaveNotFoundError(ConnectionError): class SentinelManagedConnection(Connection): def __init__(self, **kwargs): self.connection_pool = kwargs.pop('connection_pool') + self.to_be_disconnected = False super(SentinelManagedConnection, self).__init__(**kwargs) def __repr__(self): @@ -56,12 +57,10 @@ def read_response(self): return super(SentinelManagedConnection, self).read_response() except ReadOnlyError: if self.connection_pool.is_master: - # When talking to a master, a ReadOnlyError when likely + # When talking to a master, a ReadOnlyError likely # indicates that the previous master that we're still connected # to has been demoted to a slave and there's a new master. - # calling disconnect will force the connection to re-query - # sentinel during the next connect() attempt. - self.disconnect() + self.to_be_disconnected = True raise ConnectionError('The previous master is now a slave') raise @@ -97,18 +96,16 @@ def reset(self): self.slave_rr_counter = None def get_master_address(self): + """Get the address of the current master""" master_address = self.sentinel_manager.discover_master( self.service_name) if self.is_master: - if self.master_address is None: + if master_address != self.master_address: self.master_address = master_address - elif master_address != self.master_address: - # Master address changed, disconnect all clients in this pool - self.disconnect() return master_address def rotate_slaves(self): - "Round-robin slave balancer" + """Round-robin slave balancer""" slaves = self.sentinel_manager.discover_slaves(self.service_name) if slaves: if self.slave_rr_counter is None: @@ -123,18 +120,42 @@ def rotate_slaves(self): yield self.get_master_address() except MasterNotFoundError: pass - raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) - - def _checkpid(self): - if self.pid != os.getpid(): - self.disconnect() - self.reset() - self.__init__(self.service_name, self.sentinel_manager, - is_master=self.is_master, - check_connection=self.check_connection, - connection_class=self.connection_class, - max_connections=self.max_connections, - **self.connection_kwargs) + raise SlaveNotFoundError('No slave found for %r' % (self.service_name,)) + + def _check_connection(self, connection): + if connection.to_be_disconnected: + connection.disconnect() + self.get_master_address() + return False + if self.is_master: + if self.master_address != (connection.host, connection.port): + connection.disconnect() + return False + return True + + def get_connection(self, command_name, *keys, **options): + """Get a connection from the pool""" + self._checkpid() + while True: + try: + connection = self._available_connections.pop() + except IndexError: + connection = self.make_connection() + else: + if not self._check_connection(connection): + continue + self._in_use_connections.add(connection) + return connection + + def release(self, connection): + """Releases the connection back to the pool""" + self._checkpid() + if connection.pid != self.pid: + return + self._in_use_connections.remove(connection) + if not self._check_connection(connection): + return + self._available_connections.append(connection) class Sentinel(object):