@@ -78,6 +78,54 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
7878 pass
7979
8080
81+ class SentinelConnectionPoolProxy :
82+ def __init__ (
83+ self ,
84+ connection_pool ,
85+ is_master ,
86+ check_connection ,
87+ service_name ,
88+ sentinel_manager ,
89+ ):
90+ self .connection_pool_ref = weakref .ref (connection_pool )
91+ self .is_master = is_master
92+ self .check_connection = check_connection
93+ self .service_name = service_name
94+ self .sentinel_manager = sentinel_manager
95+ self .reset ()
96+
97+ def reset (self ):
98+ self .master_address = None
99+ self .slave_rr_counter = None
100+
101+ def get_master_address (self ):
102+ master_address = self .sentinel_manager .discover_master (self .service_name )
103+ if self .is_master and self .master_address != master_address :
104+ self .master_address = master_address
105+ # disconnect any idle connections so that they reconnect
106+ # to the new master the next time that they are used.
107+ connection_pool = self .connection_pool_ref ()
108+ if connection_pool is not None :
109+ connection_pool .disconnect (inuse_connections = False )
110+ return master_address
111+
112+ def rotate_slaves (self ):
113+ slaves = self .sentinel_manager .discover_slaves (self .service_name )
114+ if slaves :
115+ if self .slave_rr_counter is None :
116+ self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
117+ for _ in range (len (slaves )):
118+ self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
119+ slave = slaves [self .slave_rr_counter ]
120+ yield slave
121+ # Fallback to the master connection
122+ try :
123+ yield self .get_master_address ()
124+ except MasterNotFoundError :
125+ pass
126+ raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
127+
128+
81129class SentinelConnectionPool (ConnectionPool ):
82130 """
83131 Sentinel backed connection pool.
@@ -95,8 +143,15 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
95143 )
96144 self .is_master = kwargs .pop ("is_master" , True )
97145 self .check_connection = kwargs .pop ("check_connection" , False )
146+ self .proxy = SentinelConnectionPoolProxy (
147+ connection_pool = self ,
148+ is_master = self .is_master ,
149+ check_connection = self .check_connection ,
150+ service_name = service_name ,
151+ sentinel_manager = sentinel_manager ,
152+ )
98153 super ().__init__ (** kwargs )
99- self .connection_kwargs ["connection_pool" ] = weakref .proxy ( self )
154+ self .connection_kwargs ["connection_pool" ] = self .proxy
100155 self .service_name = service_name
101156 self .sentinel_manager = sentinel_manager
102157
@@ -106,8 +161,11 @@ def __repr__(self):
106161
107162 def reset (self ):
108163 super ().reset ()
109- self .master_address = None
110- self .slave_rr_counter = None
164+ self .proxy .reset ()
165+
166+ @property
167+ def master_address (self ):
168+ return self .proxy .master_address
111169
112170 def owns_connection (self , connection ):
113171 check = not self .is_master or (
@@ -117,31 +175,11 @@ def owns_connection(self, connection):
117175 return check and parent .owns_connection (connection )
118176
119177 def get_master_address (self ):
120- master_address = self .sentinel_manager .discover_master (self .service_name )
121- if self .is_master :
122- if self .master_address != master_address :
123- self .master_address = master_address
124- # disconnect any idle connections so that they reconnect
125- # to the new master the next time that they are used.
126- self .disconnect (inuse_connections = False )
127- return master_address
178+ return self .proxy .get_master_address ()
128179
129180 def rotate_slaves (self ):
130181 "Round-robin slave balancer"
131- slaves = self .sentinel_manager .discover_slaves (self .service_name )
132- if slaves :
133- if self .slave_rr_counter is None :
134- self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
135- for _ in range (len (slaves )):
136- self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
137- slave = slaves [self .slave_rr_counter ]
138- yield slave
139- # Fallback to the master connection
140- try :
141- yield self .get_master_address ()
142- except MasterNotFoundError :
143- pass
144- raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
182+ return self .proxy .rotate_slaves ()
145183
146184
147185class Sentinel (SentinelCommands ):
0 commit comments