From 7b1455c516d6e5c6eafd3acaa1064b378d89a2fa Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Thu, 9 Jun 2022 17:41:16 -0700 Subject: [PATCH 1/7] adds a retry that forces the nodes_manager to reinitialize if cluster pipleline connect to the one of the mapped nodes --- redis/cluster.py | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 8e4c654c6b..28c19d363b 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1962,21 +1962,35 @@ def _send_cluster_commands( # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - # refer to our internal node -> slot table that - # tells us where a given - # command should route to. - node = self._determine_nodes(*c.args) - - # now that we know the name of the node - # ( it's just a string in the form of host:port ) - # we can build a list of commands for each node. - node_name = node[0].name - if node_name not in nodes: - redis_node = self.get_redis_connection(node[0]) - connection = get_connection(redis_node, c.args) - nodes[node_name] = NodeCommands( - redis_node.parse_response, redis_node.connection_pool, connection - ) + + connection_error_retry_counter = 0 + while True: + # refer to our internal node -> slot table that + # tells us where a given command should route to. + # (it might be possible we have a cached node that no longer + # exists in the cluster, which is why we do this in a loop) + node = self._determine_nodes(*c.args) + + # now that we know the name of the node + # ( it's just a string in the form of host:port ) + # we can build a list of commands for each node. + node_name = node[0].name + if node_name not in nodes: + redis_node = self.get_redis_connection(node[0]) + try: + connection = get_connection(redis_node, c.args) + except ConnectionError: + connection_error_retry_counter += 1 + if connection_error_retry_counter < 5: + # reinitialize the node -> slot table + self.nodes_manager.initialize() + continue + else: + raise + nodes[node_name] = NodeCommands( + redis_node.parse_response, redis_node.connection_pool, connection + ) + break nodes[node_name].append(c) From e745e01f15abdfb26cf35312b31b513923524bc0 Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Fri, 10 Jun 2022 09:04:06 -0700 Subject: [PATCH 2/7] fix line length error --- redis/cluster.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index 28c19d363b..95098c2ada 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1988,7 +1988,9 @@ def _send_cluster_commands( else: raise nodes[node_name] = NodeCommands( - redis_node.parse_response, redis_node.connection_pool, connection + redis_node.parse_response, + redis_node.connection_pool, + connection ) break From 576e59053a2b0ed90c22b100095df1f4c2e84ba5 Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Fri, 10 Jun 2022 09:12:01 -0700 Subject: [PATCH 3/7] add trailing comma --- redis/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index 95098c2ada..571a43b462 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1990,7 +1990,7 @@ def _send_cluster_commands( nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, - connection + connection, ) break From f1cc910e4beed44c5fb4c42edb9fa92c5271682b Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Fri, 10 Jun 2022 14:38:04 -0700 Subject: [PATCH 4/7] move appending cmd --- redis/cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 571a43b462..4f89f36fd8 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1992,10 +1992,9 @@ def _send_cluster_commands( redis_node.connection_pool, connection, ) + nodes[node_name].append(c) break - nodes[node_name].append(c) - # send the commands in sequence. # we write to all the open sockets for each node first, # before reading anything From 66e7b1686b33e6fa2407d37f3a95548e8bac7759 Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Fri, 10 Jun 2022 14:38:15 -0700 Subject: [PATCH 5/7] updates changes --- CHANGES | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES b/CHANGES index b7e238ebb3..dbe8440359 100644 --- a/CHANGES +++ b/CHANGES @@ -10,6 +10,8 @@ * Fix broken connection writer lock-up for asyncio (#2065) * Fix auth bug when provided with no username (#2086) * Fix missing ClusterPipeline._lock (#2189) + * ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225) + * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) From 1a0b863be2f60fe9a2bf6137a33dc6ec2eb28697 Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Fri, 24 Jun 2022 13:45:56 -0700 Subject: [PATCH 6/7] trigger build From ab4587660c81fc74aa134111259efc4dd339f939 Mon Sep 17 00:00:00 2001 From: Greg Melton Date: Mon, 1 Aug 2022 16:35:32 -0700 Subject: [PATCH 7/7] fix linting errors --- redis/cluster.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 0d43bd3a28..cee578b075 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -888,7 +888,7 @@ def determine_slot(self, *args): if len(args) <= 2: raise RedisClusterException(f"Invalid args in command: {args}") num_actual_keys = args[2] - eval_keys = args[3: 3 + num_actual_keys] + eval_keys = args[3 : 3 + num_actual_keys] # if there are 0 keys, that means the script can be run on any node # so we can just return a random slot if len(eval_keys) == 0: @@ -1908,14 +1908,16 @@ def _send_cluster_commands( target_nodes = self._parse_target_nodes(passed_targets) else: target_nodes = self._determine_nodes( - *c.args, node_flag=passed_targets) + *c.args, node_flag=passed_targets + ) if not target_nodes: raise RedisClusterException( f"No targets were found to execute {c.args} command on" ) if len(target_nodes) > 1: raise RedisClusterException( - f"Too many targets for command {c.args}") + f"Too many targets for command {c.args}" + ) node = target_nodes[0] @@ -1938,7 +1940,7 @@ def _send_cluster_commands( nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, - connection + connection, ) nodes[node_name].append(c) break