-
Notifications
You must be signed in to change notification settings - Fork 0
Thread safe sentinel #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ class SlaveNotFoundError(ConnectionError): | |
class SentinelManagedConnection(Connection): | ||
def __init__(self, **kwargs): | ||
self.connection_pool = kwargs.pop('connection_pool') | ||
self.to_be_disconnected = False | ||
super(SentinelManagedConnection, self).__init__(**kwargs) | ||
|
||
def __repr__(self): | ||
|
@@ -56,12 +57,10 @@ def read_response(self): | |
return super(SentinelManagedConnection, self).read_response() | ||
except ReadOnlyError: | ||
if self.connection_pool.is_master: | ||
# When talking to a master, a ReadOnlyError when likely | ||
# When talking to a master, a ReadOnlyError likely | ||
# indicates that the previous master that we're still connected | ||
# to has been demoted to a slave and there's a new master. | ||
# calling disconnect will force the connection to re-query | ||
# sentinel during the next connect() attempt. | ||
self.disconnect() | ||
self.to_be_disconnected = True | ||
raise ConnectionError('The previous master is now a slave') | ||
raise | ||
|
||
|
@@ -97,18 +96,16 @@ def reset(self): | |
self.slave_rr_counter = None | ||
|
||
def get_master_address(self): | ||
"""Get the address of the current master""" | ||
master_address = self.sentinel_manager.discover_master( | ||
self.service_name) | ||
if self.is_master: | ||
if self.master_address is None: | ||
if master_address != self.master_address: | ||
self.master_address = master_address | ||
elif master_address != self.master_address: | ||
# Master address changed, disconnect all clients in this pool | ||
self.disconnect() | ||
return master_address | ||
|
||
def rotate_slaves(self): | ||
"Round-robin slave balancer" | ||
"""Round-robin slave balancer""" | ||
slaves = self.sentinel_manager.discover_slaves(self.service_name) | ||
if slaves: | ||
if self.slave_rr_counter is None: | ||
|
@@ -123,18 +120,42 @@ def rotate_slaves(self): | |
yield self.get_master_address() | ||
except MasterNotFoundError: | ||
pass | ||
raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) | ||
|
||
def _checkpid(self): | ||
if self.pid != os.getpid(): | ||
self.disconnect() | ||
self.reset() | ||
self.__init__(self.service_name, self.sentinel_manager, | ||
is_master=self.is_master, | ||
check_connection=self.check_connection, | ||
connection_class=self.connection_class, | ||
max_connections=self.max_connections, | ||
**self.connection_kwargs) | ||
raise SlaveNotFoundError('No slave found for %r' % (self.service_name,)) | ||
|
||
def _check_connection(self, connection): | ||
if connection.to_be_disconnected: | ||
connection.disconnect() | ||
self.get_master_address() | ||
return False | ||
if self.is_master: | ||
if self.master_address != (connection.host, connection.port): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't a similar check be made for the case when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, okay then. |
||
connection.disconnect() | ||
return False | ||
return True | ||
|
||
def get_connection(self, command_name, *keys, **options): | ||
"""Get a connection from the pool""" | ||
self._checkpid() | ||
while True: | ||
try: | ||
connection = self._available_connections.pop() | ||
except IndexError: | ||
connection = self.make_connection() | ||
else: | ||
if not self._check_connection(connection): | ||
continue | ||
self._in_use_connections.add(connection) | ||
return connection | ||
|
||
def release(self, connection): | ||
"""Releases the connection back to the pool""" | ||
self._checkpid() | ||
if connection.pid != self.pid: | ||
return | ||
self._in_use_connections.remove(connection) | ||
if not self._check_connection(connection): | ||
return | ||
self._available_connections.append(connection) | ||
|
||
|
||
class Sentinel(object): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This resolves the master every every time when a connection is returned to the pool, which might be several times (one for each connection in the pool) after a master failover. This could probably be avoided if we kept track of the "generation" (count how many times the failover has happened) and not doing the refresh if the connection is not from the last generation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is, is it a problem? During a failover we do 2*n resolves instead of n.
There is currently no good place to watch for the master change. This
to_be_disconnected
flag is basically just a hint.The correct solution would be to subscribe to the sentinel pub/sub, watch for master changes and do the generation change immediately after that happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't cause any problems.