Skip to content

Retry on error exception and timeout fixes #1821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 49 additions & 52 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ClusterCrossSlotError,
ClusterDownError,
ClusterError,
ConnectionError,
DataError,
MasterDownError,
MovedError,
Expand Down Expand Up @@ -374,6 +375,12 @@ class RedisCluster(RedisClusterCommands):
),
)

ERRORS_ALLOW_RETRY = (
ConnectionError,
TimeoutError,
ClusterDownError,
)

def __init__(
self,
host=None,
Expand All @@ -385,8 +392,6 @@ def __init__(
reinitialize_steps=10,
read_from_replicas=False,
url=None,
retry_on_timeout=False,
retry=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -417,11 +422,6 @@ def __init__(
:cluster_error_retry_attempts: 'int'
Retry command execution attempts when encountering ClusterDownError
or ConnectionError
:retry_on_timeout: 'bool'
To specify a retry policy, first set `retry_on_timeout` to `True`
then set `retry` to a valid `Retry` object
:retry: 'Retry'
a `Retry` object
:reinitialize_steps: 'int'
Specifies the number of MOVED errors that need to occur before
reinitializing the whole cluster topology. If a MOVED error occurs
Expand Down Expand Up @@ -452,9 +452,6 @@ def __init__(
"Argument 'db' is not possible to use in cluster mode"
)

if retry_on_timeout:
kwargs.update({"retry_on_timeout": retry_on_timeout, "retry": retry})

# Get the startup node/s
from_url = False
if url is not None:
Expand Down Expand Up @@ -850,7 +847,7 @@ def _parse_target_nodes(self, target_nodes):

def execute_command(self, *args, **kwargs):
"""
Wrapper for ClusterDownError and ConnectionError error handling.
Wrapper for ERRORS_ALLOW_RETRY error handling.

It will try the number of times specified by the config option
"self.cluster_error_retry_attempts" which defaults to 3 unless manually
Expand All @@ -865,18 +862,19 @@ def execute_command(self, *args, **kwargs):
dict<Any, ClusterNode>
"""
target_nodes_specified = False
target_nodes = kwargs.pop("target_nodes", None)
if target_nodes is not None and not self._is_nodes_flag(target_nodes):
target_nodes = self._parse_target_nodes(target_nodes)
target_nodes = None
passed_targets = kwargs.pop("target_nodes", None)
if passed_targets is not None and not self._is_nodes_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
# If ClusterDownError/ConnectionError were thrown, the nodes
# and slots cache were reinitialized. We will retry executing the
# command with the updated cluster setup only when the target nodes
# can be determined again with the new cache tables. Therefore,
# when target nodes were passed to this function, we cannot retry
# the command execution since the nodes may not be valid anymore
# after the tables were reinitialized. So in case of passed target
# nodes, retry_attempts will be set to 1.
# If an error that allows retrying was thrown, the nodes and slots
# cache were reinitialized. We will retry executing the command with
# the updated cluster setup only when the target nodes can be
# determined again with the new cache tables. Therefore, when target
# nodes were passed to this function, we cannot retry the command
# execution since the nodes may not be valid anymore after the tables
# were reinitialized. So in case of passed target nodes,
# retry_attempts will be set to 1.
retry_attempts = (
1 if target_nodes_specified else self.cluster_error_retry_attempts
)
Expand All @@ -887,7 +885,7 @@ def execute_command(self, *args, **kwargs):
if not target_nodes_specified:
# Determine the nodes to execute the command on
target_nodes = self._determine_nodes(
*args, **kwargs, nodes_flag=target_nodes
*args, **kwargs, nodes_flag=passed_targets
)
if not target_nodes:
raise RedisClusterException(
Expand All @@ -897,11 +895,14 @@ def execute_command(self, *args, **kwargs):
res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except (ClusterDownError, ConnectionError) as e:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup. All other errors
# should be raised.
exception = e
except BaseException as e:
if type(e) in RedisCluster.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
exception = e
else:
# All other errors should be raised.
raise e

# If it fails the configured number of times then raise exception back
# to caller of this method
Expand Down Expand Up @@ -953,11 +954,11 @@ def _execute_command(self, target_node, *args, **kwargs):
)
return response

except (RedisClusterException, BusyLoadingError):
log.exception("RedisClusterException || BusyLoadingError")
except (RedisClusterException, BusyLoadingError) as e:
log.exception(type(e))
raise
except ConnectionError:
log.exception("ConnectionError")
except (ConnectionError, TimeoutError) as e:
log.exception(type(e))
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
Expand All @@ -976,13 +977,6 @@ def _execute_command(self, target_node, *args, **kwargs):
# and try again with the new setup
self.nodes_manager.initialize()
raise
except TimeoutError:
log.exception("TimeoutError")
if connection is not None:
connection.disconnect()

if ttl < self.RedisClusterRequestTTL / 2:
time.sleep(0.05)
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
Expand Down Expand Up @@ -1014,7 +1008,7 @@ def _execute_command(self, target_node, *args, **kwargs):
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
# and retry executing the command
time.sleep(0.05)
time.sleep(0.25)
self.nodes_manager.initialize()
raise e
except ResponseError as e:
Expand Down Expand Up @@ -1339,7 +1333,7 @@ def initialize(self):
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
cluster_slots = r.execute_command("CLUSTER SLOTS")
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
msg = e.__str__
Expand Down Expand Up @@ -1621,29 +1615,28 @@ def get_redis_connection(self):
return self.node.redis_connection


ERRORS_ALLOW_RETRY = (
ConnectionError,
TimeoutError,
MovedError,
AskError,
TryAgainError,
)


class ClusterPipeline(RedisCluster):
"""
Support for Redis pipeline
in cluster mode
"""

ERRORS_ALLOW_RETRY = (
ConnectionError,
TimeoutError,
MovedError,
AskError,
TryAgainError,
)

def __init__(
self,
nodes_manager,
result_callbacks=None,
cluster_response_callbacks=None,
startup_nodes=None,
read_from_replicas=False,
cluster_error_retry_attempts=3,
cluster_error_retry_attempts=5,
reinitialize_steps=10,
**kwargs,
):
Expand Down Expand Up @@ -1905,7 +1898,11 @@ def _send_cluster_commands(
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted(
(c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)),
(
c
for c in attempt
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
),
key=lambda x: x.position,
)
if attempt and allow_redirections:
Expand Down
39 changes: 9 additions & 30 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from redis.exceptions import (
AskError,
ClusterDownError,
ConnectionError,
DataError,
MovedError,
NoPermissionError,
Expand Down Expand Up @@ -555,46 +556,24 @@ def test_all_nodes_masters(self, r):
for node in r.get_primaries():
assert node in nodes

def test_cluster_down_overreaches_retry_attempts(self):
@pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY)
def test_cluster_down_overreaches_retry_attempts(self, error):
"""
When ClusterDownError is thrown, test that we retry executing the
command as many times as configured in cluster_error_retry_attempts
When error that allows retry is thrown, test that we retry executing
the command as many times as configured in cluster_error_retry_attempts
and then raise the exception
"""
with patch.object(RedisCluster, "_execute_command") as execute_command:

def raise_cluster_down_error(target_node, *args, **kwargs):
def raise_error(target_node, *args, **kwargs):
execute_command.failed_calls += 1
raise ClusterDownError(
"CLUSTERDOWN The cluster is down. Use CLUSTER INFO for "
"more information"
)
raise error("mocked error")

execute_command.side_effect = raise_cluster_down_error
execute_command.side_effect = raise_error

rc = get_mocked_redis_client(host=default_host, port=default_port)

with pytest.raises(ClusterDownError):
rc.get("bar")
assert execute_command.failed_calls == rc.cluster_error_retry_attempts

def test_connection_error_overreaches_retry_attempts(self):
"""
When ConnectionError is thrown, test that we retry executing the
command as many times as configured in cluster_error_retry_attempts
and then raise the exception
"""
with patch.object(RedisCluster, "_execute_command") as execute_command:

def raise_conn_error(target_node, *args, **kwargs):
execute_command.failed_calls += 1
raise ConnectionError()

execute_command.side_effect = raise_conn_error

rc = get_mocked_redis_client(host=default_host, port=default_port)

with pytest.raises(ConnectionError):
with pytest.raises(error):
rc.get("bar")
assert execute_command.failed_calls == rc.cluster_error_retry_attempts

Expand Down