-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Make sentinel thread-safe during master failover #909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ class SlaveNotFoundError(ConnectionError): | |
class SentinelManagedConnection(Connection): | ||
def __init__(self, **kwargs): | ||
self.connection_pool = kwargs.pop('connection_pool') | ||
self.to_be_disconnected = False | ||
super(SentinelManagedConnection, self).__init__(**kwargs) | ||
|
||
def __repr__(self): | ||
|
@@ -56,12 +57,10 @@ def read_response(self): | |
return super(SentinelManagedConnection, self).read_response() | ||
except ReadOnlyError: | ||
if self.connection_pool.is_master: | ||
# When talking to a master, a ReadOnlyError when likely | ||
# When talking to a master, a ReadOnlyError likely | ||
# indicates that the previous master that we're still connected | ||
# to has been demoted to a slave and there's a new master. | ||
# calling disconnect will force the connection to re-query | ||
# sentinel during the next connect() attempt. | ||
self.disconnect() | ||
self.to_be_disconnected = True | ||
raise ConnectionError('The previous master is now a slave') | ||
raise | ||
|
||
|
@@ -97,18 +96,16 @@ def reset(self): | |
self.slave_rr_counter = None | ||
|
||
def get_master_address(self): | ||
"""Get the address of the current master""" | ||
master_address = self.sentinel_manager.discover_master( | ||
self.service_name) | ||
if self.is_master: | ||
if self.master_address is None: | ||
if master_address != self.master_address: | ||
self.master_address = master_address | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related pull request - #847 |
||
elif master_address != self.master_address: | ||
# Master address changed, disconnect all clients in this pool | ||
self.disconnect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All old connections will get closed when attempting to fetch them from the pool. I could theoretically close connections from |
||
return master_address | ||
|
||
def rotate_slaves(self): | ||
"Round-robin slave balancer" | ||
"""Round-robin slave balancer""" | ||
slaves = self.sentinel_manager.discover_slaves(self.service_name) | ||
if slaves: | ||
if self.slave_rr_counter is None: | ||
|
@@ -123,18 +120,42 @@ def rotate_slaves(self): | |
yield self.get_master_address() | ||
except MasterNotFoundError: | ||
pass | ||
raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) | ||
|
||
def _checkpid(self): | ||
if self.pid != os.getpid(): | ||
self.disconnect() | ||
self.reset() | ||
self.__init__(self.service_name, self.sentinel_manager, | ||
is_master=self.is_master, | ||
check_connection=self.check_connection, | ||
connection_class=self.connection_class, | ||
max_connections=self.max_connections, | ||
**self.connection_kwargs) | ||
raise SlaveNotFoundError('No slave found for %r' % (self.service_name,)) | ||
|
||
def _check_connection(self, connection): | ||
if connection.to_be_disconnected: | ||
connection.disconnect() | ||
self.get_master_address() | ||
return False | ||
if self.is_master: | ||
if self.master_address != (connection.host, connection.port): | ||
connection.disconnect() | ||
return False | ||
return True | ||
|
||
def get_connection(self, command_name, *keys, **options): | ||
"""Get a connection from the pool""" | ||
self._checkpid() | ||
while True: | ||
try: | ||
connection = self._available_connections.pop() | ||
except IndexError: | ||
connection = self.make_connection() | ||
else: | ||
if not self._check_connection(connection): | ||
continue | ||
self._in_use_connections.add(connection) | ||
return connection | ||
|
||
def release(self, connection): | ||
"""Releases the connection back to the pool""" | ||
self._checkpid() | ||
if connection.pid != self.pid: | ||
return | ||
self._in_use_connections.remove(connection) | ||
if not self._check_connection(connection): | ||
return | ||
self._available_connections.append(connection) | ||
|
||
|
||
class Sentinel(object): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection will get closed when it's returned to the pool.