Skip to content

Commit 4f92d68

Browse files
authored
Merge branch 'master' into bloom-2.4
2 parents 619b800 + 9901c79 commit 4f92d68

File tree

16 files changed

+1133
-204
lines changed

16 files changed

+1133
-204
lines changed

CHANGES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fix timezone handling for datetime to unixtime conversions
12
* Fix start_id type for XAUTOCLAIM
23
* Remove verbose logging from cluster.py
34
* Add retry mechanism to async version of Connection
@@ -16,6 +17,8 @@
1617
* Added dynaminc_startup_nodes configuration to RedisCluster
1718
* Fix reusing the old nodes' connections when cluster topology refresh is being done
1819
* Fix RedisCluster to immediately raise AuthenticationError without a retry
20+
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
21+
1922
* 4.1.3 (Feb 8, 2022)
2023
* Fix flushdb and flushall (#1926)
2124
* Add redis5 and redis4 dockers (#1871)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ The above code connects to localhost on port 6379, sets a value in Redis, and re
5252

5353
### Connection Pools
5454

55-
By default, redis-py uses a connection pool to manage connections. Each instance of of a Redis class receives its own connection pool. You can however define your own [redis.ConnectionPool](https://redis.readthedocs.io/en/stable/connections.html#connection-pools)
55+
By default, redis-py uses a connection pool to manage connections. Each instance of a Redis class receives its own connection pool. You can however define your own [redis.ConnectionPool](https://redis.readthedocs.io/en/stable/connections.html#connection-pools)
5656

5757
``` python
5858
>>> pool = redis.ConnectionPool(host='localhost', port=6379, db=0)

redis/cluster.py

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,10 +1755,6 @@ def __len__(self):
17551755
""" """
17561756
return len(self.command_stack)
17571757

1758-
def __nonzero__(self):
1759-
"Pipeline instances should always evaluate to True on Python 2.7"
1760-
return True
1761-
17621758
def __bool__(self):
17631759
"Pipeline instances should always evaluate to True on Python 3+"
17641760
return True
@@ -1901,34 +1897,53 @@ def _send_cluster_commands(
19011897
# we figure out the slot number that command maps to, then from
19021898
# the slot determine the node.
19031899
for c in attempt:
1904-
# refer to our internal node -> slot table that
1905-
# tells us where a given
1906-
# command should route to.
1907-
passed_targets = c.options.pop("target_nodes", None)
1908-
if passed_targets and not self._is_nodes_flag(passed_targets):
1909-
target_nodes = self._parse_target_nodes(passed_targets)
1910-
else:
1911-
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
1912-
if not target_nodes:
1900+
connection_error_retry_counter = 0
1901+
while True:
1902+
# refer to our internal node -> slot table that
1903+
# tells us where a given command should route to.
1904+
# (it might be possible we have a cached node that no longer
1905+
# exists in the cluster, which is why we do this in a loop)
1906+
passed_targets = c.options.pop("target_nodes", None)
1907+
if passed_targets and not self._is_nodes_flag(passed_targets):
1908+
target_nodes = self._parse_target_nodes(passed_targets)
1909+
else:
1910+
target_nodes = self._determine_nodes(
1911+
*c.args, node_flag=passed_targets
1912+
)
1913+
if not target_nodes:
1914+
raise RedisClusterException(
1915+
f"No targets were found to execute {c.args} command on"
1916+
)
1917+
if len(target_nodes) > 1:
19131918
raise RedisClusterException(
1914-
f"No targets were found to execute {c.args} command on"
1919+
f"Too many targets for command {c.args}"
19151920
)
1916-
if len(target_nodes) > 1:
1917-
raise RedisClusterException(f"Too many targets for command {c.args}")
1918-
1919-
node = target_nodes[0]
1920-
# now that we know the name of the node
1921-
# ( it's just a string in the form of host:port )
1922-
# we can build a list of commands for each node.
1923-
node_name = node.name
1924-
if node_name not in nodes:
1925-
redis_node = self.get_redis_connection(node)
1926-
connection = get_connection(redis_node, c.args)
1927-
nodes[node_name] = NodeCommands(
1928-
redis_node.parse_response, redis_node.connection_pool, connection
1929-
)
19301921

1931-
nodes[node_name].append(c)
1922+
node = target_nodes[0]
1923+
1924+
# now that we know the name of the node
1925+
# ( it's just a string in the form of host:port )
1926+
# we can build a list of commands for each node.
1927+
node_name = node.name
1928+
if node_name not in nodes:
1929+
redis_node = self.get_redis_connection(node)
1930+
try:
1931+
connection = get_connection(redis_node, c.args)
1932+
except ConnectionError:
1933+
connection_error_retry_counter += 1
1934+
if connection_error_retry_counter < 5:
1935+
# reinitialize the node -> slot table
1936+
self.nodes_manager.initialize()
1937+
continue
1938+
else:
1939+
raise
1940+
nodes[node_name] = NodeCommands(
1941+
redis_node.parse_response,
1942+
redis_node.connection_pool,
1943+
connection,
1944+
)
1945+
nodes[node_name].append(c)
1946+
break
19321947

19331948
# send the commands in sequence.
19341949
# we write to all the open sockets for each node first,

redis/commands/bf/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ..helpers import parse_to_list
44
from .commands import * # noqa
55
from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo
6+
from .utils import parse_tdigest_quantile
67

78

89
class AbstractBloom(object):
@@ -166,7 +167,7 @@ def __init__(self, client, **kwargs):
166167
# TDIGEST_ADD: spaceHolder,
167168
# TDIGEST_MERGE: spaceHolder,
168169
TDIGEST_CDF: float,
169-
TDIGEST_QUANTILE: float,
170+
TDIGEST_QUANTILE: parse_tdigest_quantile,
170171
TDIGEST_MIN: float,
171172
TDIGEST_MAX: float,
172173
TDIGEST_INFO: TDigestInfo,

redis/commands/bf/commands.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,14 @@ def max(self, key):
394394
""" # noqa
395395
return self.execute_command(TDIGEST_MAX, key)
396396

397-
def quantile(self, key, quantile):
397+
def quantile(self, key, quantile, *quantiles):
398398
"""
399-
Return double value estimate of the cutoff such that a specified fraction of the data
400-
added to this TDigest would be less than or equal to the cutoff.
399+
Returns estimates of one or more cutoffs such that a specified fraction of the
400+
observations added to this t-digest would be less than or equal to each of the
401+
specified cutoffs. (Multiple quantiles can be returned with one call)
401402
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
402403
""" # noqa
403-
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
404+
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)
404405

405406
def cdf(self, key, value):
406407
"""

redis/commands/bf/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
def parse_tdigest_quantile(response):
2+
"""Parse TDIGEST.QUANTILE response."""
3+
return [float(x) for x in response]

redis/commands/core.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import datetime
44
import hashlib
5-
import time
65
import warnings
76
from typing import (
87
TYPE_CHECKING,
@@ -17,6 +16,7 @@
1716
Mapping,
1817
Optional,
1918
Sequence,
19+
Set,
2020
Tuple,
2121
Union,
2222
)
@@ -1673,7 +1673,7 @@ def expireat(
16731673
For more information see https://redis.io/commands/expireat
16741674
"""
16751675
if isinstance(when, datetime.datetime):
1676-
when = int(time.mktime(when.timetuple()))
1676+
when = int(when.timestamp())
16771677

16781678
exp_option = list()
16791679
if nx:
@@ -1768,14 +1768,12 @@ def getex(
17681768
if exat is not None:
17691769
pieces.append("EXAT")
17701770
if isinstance(exat, datetime.datetime):
1771-
s = int(exat.microsecond / 1000000)
1772-
exat = int(time.mktime(exat.timetuple())) + s
1771+
exat = int(exat.timestamp())
17731772
pieces.append(exat)
17741773
if pxat is not None:
17751774
pieces.append("PXAT")
17761775
if isinstance(pxat, datetime.datetime):
1777-
ms = int(pxat.microsecond / 1000)
1778-
pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms
1776+
pxat = int(pxat.timestamp() * 1000)
17791777
pieces.append(pxat)
17801778
if persist:
17811779
pieces.append("PERSIST")
@@ -1994,8 +1992,7 @@ def pexpireat(
19941992
For more information see https://redis.io/commands/pexpireat
19951993
"""
19961994
if isinstance(when, datetime.datetime):
1997-
ms = int(when.microsecond / 1000)
1998-
when = int(time.mktime(when.timetuple())) * 1000 + ms
1995+
when = int(when.timestamp() * 1000)
19991996
exp_option = list()
20001997
if nx:
20011998
exp_option.append("NX")
@@ -2196,14 +2193,12 @@ def set(
21962193
if exat is not None:
21972194
pieces.append("EXAT")
21982195
if isinstance(exat, datetime.datetime):
2199-
s = int(exat.microsecond / 1000000)
2200-
exat = int(time.mktime(exat.timetuple())) + s
2196+
exat = int(exat.timestamp())
22012197
pieces.append(exat)
22022198
if pxat is not None:
22032199
pieces.append("PXAT")
22042200
if isinstance(pxat, datetime.datetime):
2205-
ms = int(pxat.microsecond / 1000)
2206-
pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms
2201+
pxat = int(pxat.timestamp() * 1000)
22072202
pieces.append(pxat)
22082203
if keepttl:
22092204
pieces.append("KEEPTTL")
@@ -3257,7 +3252,7 @@ def sismember(self, name: str, value: str) -> Union[Awaitable[bool], bool]:
32573252
"""
32583253
return self.execute_command("SISMEMBER", name, value)
32593254

3260-
def smembers(self, name: str) -> Union[Awaitable[list], list]:
3255+
def smembers(self, name: str) -> Union[Awaitable[Set], Set]:
32613256
"""
32623257
Return all members of the set ``name``
32633258

0 commit comments

Comments
 (0)