@@ -1282,15 +1282,11 @@ def __init__(
1282
1282
self .encoder = encoder
1283
1283
if self .encoder is None :
1284
1284
self .encoder = self .connection_pool .get_encoder ()
1285
- self .health_check_response_b = self .encoder .encode (
1286
- self .HEALTH_CHECK_MESSAGE )
1285
+ self .health_check_response_b = self .encoder .encode (self .HEALTH_CHECK_MESSAGE )
1287
1286
if self .encoder .decode_responses :
1288
1287
self .health_check_response = ["pong" , self .HEALTH_CHECK_MESSAGE ]
1289
1288
else :
1290
- self .health_check_response = [
1291
- b"pong" ,
1292
- self .health_check_response_b
1293
- ]
1289
+ self .health_check_response = [b"pong" , self .health_check_response_b ]
1294
1290
self .reset ()
1295
1291
1296
1292
def __enter__ (self ):
@@ -1315,6 +1311,7 @@ def reset(self):
1315
1311
self .connection_pool .release (self .connection )
1316
1312
self .connection = None
1317
1313
self .channels = {}
1314
+ self .health_check_response_counter = 0
1318
1315
self .pending_unsubscribe_channels = set ()
1319
1316
self .patterns = {}
1320
1317
self .pending_unsubscribe_patterns = set ()
@@ -1370,12 +1367,19 @@ def clean_health_check_responses(self):
1370
1367
"""
1371
1368
If any health check responses are present, clean them
1372
1369
"""
1370
+ ttl = 10
1373
1371
conn = self .connection
1374
- while self ._execute (conn , conn .can_read , timeout = 0 ):
1375
- response = self ._execute (conn , conn .read_response )
1376
- if not self .is_health_check_response (response ):
1377
- raise PubSubError ('A non health check response was cleaned by '
1378
- 'execute_command: {0}' .format (response ))
1372
+ while self .health_check_response_counter > 0 and ttl > 0 :
1373
+ if self ._execute (conn , conn .can_read , timeout = 1 ):
1374
+ response = self ._execute (conn , conn .read_response )
1375
+ if self .is_health_check_response (response ):
1376
+ self .health_check_response_counter -= 1
1377
+ else :
1378
+ raise PubSubError (
1379
+ "A non health check response was cleaned by "
1380
+ "execute_command: {0}" .format (response )
1381
+ )
1382
+ ttl -= 1
1379
1383
1380
1384
def _disconnect_raise_connect (self , conn , error ):
1381
1385
"""
@@ -1418,6 +1422,7 @@ def parse_response(self, block=True, timeout=0):
1418
1422
1419
1423
if self .is_health_check_response (response ):
1420
1424
# ignore the health check message as user might not expect it
1425
+ self .health_check_response_counter -= 1
1421
1426
return None
1422
1427
return response
1423
1428
@@ -1428,9 +1433,9 @@ def is_health_check_response(self, response):
1428
1433
bulk response, instead of a multi-bulk with "pong" and the response.
1429
1434
"""
1430
1435
return response in [
1431
- self .health_check_response , # If there was a subscription
1432
- self .health_check_response_b # If there wasn't
1433
- ]
1436
+ self .health_check_response , # If there was a subscription
1437
+ self .health_check_response_b , # If there wasn't
1438
+ ]
1434
1439
1435
1440
def check_health (self ):
1436
1441
conn = self .connection
@@ -1442,6 +1447,7 @@ def check_health(self):
1442
1447
1443
1448
if conn .health_check_interval and time .time () > conn .next_health_check :
1444
1449
conn .send_command ("PING" , self .HEALTH_CHECK_MESSAGE , check_health = False )
1450
+ self .health_check_response_counter += 1
1445
1451
1446
1452
def _normalize_keys (self , data ):
1447
1453
"""
0 commit comments