Skip to content

Commit b9c0115

Browse files
async_cluster: inlined check_slots_coverage
1 parent 6b9be5a commit b9c0115

File tree

1 file changed

+13
-18
lines changed

1 file changed

+13
-18
lines changed

redis/asyncio/cluster.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,12 @@ async def _determine_nodes(
456456
return [random.choice(list(self.nodes_manager.nodes_cache.values()))]
457457

458458
# get the node that holds the key's slot
459-
slot = await self._determine_slot(*args)
460-
node = self.nodes_manager.get_node_from_slot(
461-
slot, self.read_from_replicas and command in READ_COMMANDS
462-
)
463-
return [node]
459+
return [
460+
self.nodes_manager.get_node_from_slot(
461+
await self._determine_slot(*args),
462+
self.read_from_replicas and command in READ_COMMANDS,
463+
)
464+
]
464465

465466
async def _determine_slot(self, *args) -> int:
466467
command = args[0]
@@ -648,12 +649,11 @@ async def _execute_command(
648649
except BusyLoadingError:
649650
raise
650651
except (ConnectionError, TimeoutError):
651-
connection_error_retry_counter += 1
652-
653652
# Give the node 0.25 seconds to get back up and retry again
654653
# with same node and configuration. After 5 attempts then try
655654
# to reinitialize the cluster and see if the nodes
656655
# configuration has changed or not
656+
connection_error_retry_counter += 1
657657
if connection_error_retry_counter < 5:
658658
await asyncio.sleep(0.25)
659659
else:
@@ -962,14 +962,6 @@ def get_nodes_by_server_type(self, server_type: str) -> List["ClusterNode"]:
962962
if node.server_type == server_type
963963
]
964964

965-
def check_slots_coverage(self, slots_cache: Dict[int, List["ClusterNode"]]) -> bool:
966-
# Validate if all slots are covered or if we should try next
967-
# startup node
968-
for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
969-
if i not in slots_cache:
970-
return False
971-
return True
972-
973965
async def initialize(self) -> None:
974966
self.read_load_balancer.reset()
975967
tmp_nodes_cache = {}
@@ -1078,10 +1070,13 @@ async def initialize(self) -> None:
10781070
f'slots cache: {", ".join(disagreements)}'
10791071
)
10801072

1081-
fully_covered = self.check_slots_coverage(tmp_slots)
1073+
# Validate if all slots are covered or if we should try next startup node
1074+
fully_covered = True
1075+
for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1076+
if i not in tmp_slots:
1077+
fully_covered = False
1078+
break
10821079
if fully_covered:
1083-
# Don't need to continue to the next startup node if all
1084-
# slots are covered
10851080
break
10861081

10871082
if not startup_nodes_reachable:

0 commit comments

Comments
 (0)