Skip to content

Commit ed4fb75

Browse files
authored
Merge branch 'master' into 4.2.0rc2
2 parents 29147cb + 5bf9034 commit ed4fb75

File tree

16 files changed

+347
-96
lines changed

16 files changed

+347
-96
lines changed

CHANGES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
2+
* Add `items` parameter to `hset` signature
13
* Create codeql-analysis.yml (#1988). Thanks @chayim
24
* Add limited support for Lua scripting with RedisCluster
35
* Implement `.lock()` method on RedisCluster
6+
47
* 4.1.3 (Feb 8, 2022)
58
* Fix flushdb and flushall (#1926)
69
* Add redis5 and redis4 dockers (#1871)

redis/cluster.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class RedisCluster(RedisClusterCommands):
228228
"ACL SETUSER",
229229
"ACL USERS",
230230
"ACL WHOAMI",
231+
"AUTH",
231232
"CLIENT LIST",
232233
"CLIENT SETNAME",
233234
"CLIENT GETNAME",
@@ -283,19 +284,29 @@ class RedisCluster(RedisClusterCommands):
283284
"READONLY",
284285
"READWRITE",
285286
"TIME",
287+
"GRAPH.CONFIG",
286288
],
287289
DEFAULT_NODE,
288290
),
289291
list_keys_to_dict(
290292
[
291293
"FLUSHALL",
292294
"FLUSHDB",
295+
"FUNCTION DELETE",
296+
"FUNCTION FLUSH",
297+
"FUNCTION LIST",
298+
"FUNCTION LOAD",
299+
"FUNCTION RESTORE",
293300
"SCRIPT EXISTS",
294301
"SCRIPT FLUSH",
295302
"SCRIPT LOAD",
296303
],
297304
PRIMARIES,
298305
),
306+
list_keys_to_dict(
307+
["FUNCTION DUMP"],
308+
RANDOM,
309+
),
299310
list_keys_to_dict(
300311
[
301312
"CLUSTER COUNTKEYSINSLOT",
@@ -809,6 +820,10 @@ def lock(
809820
thread_local=thread_local,
810821
)
811822

823+
def set_response_callback(self, command, callback):
824+
"""Set a custom Response Callback"""
825+
self.cluster_response_callbacks[command] = callback
826+
812827
def _determine_nodes(self, *args, **kwargs):
813828
command = args[0]
814829
nodes_flag = kwargs.pop("nodes_flag", None)
@@ -910,6 +925,10 @@ def determine_slot(self, *args):
910925
else:
911926
keys = self._get_command_keys(*args)
912927
if keys is None or len(keys) == 0:
928+
# FCALL can call a function with 0 keys, that means the function
929+
# can be run on any node so we can just return a random slot
930+
if command in ("FCALL", "FCALL_RO"):
931+
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
913932
raise RedisClusterException(
914933
"No way to dispatch this command to Redis Cluster. "
915934
"Missing key.\nYou can execute the command by specifying "
@@ -1180,6 +1199,20 @@ def _process_result(self, command, res, **kwargs):
11801199
else:
11811200
return res
11821201

1202+
def load_external_module(
1203+
self,
1204+
funcname,
1205+
func,
1206+
):
1207+
"""
1208+
This function can be used to add externally defined redis modules,
1209+
and their namespaces to the redis client.
1210+
1211+
``funcname`` - A string containing the name of the function to create
1212+
``func`` - The function, being added to this class.
1213+
"""
1214+
setattr(self, funcname, func)
1215+
11831216

11841217
class ClusterNode:
11851218
def __init__(self, host, port, server_type=None, redis_connection=None):
@@ -2025,7 +2058,13 @@ def _send_cluster_commands(
20252058

20262059
# turn the response back into a simple flat array that corresponds
20272060
# to the sequence of commands issued in the stack in pipeline.execute()
2028-
response = [c.result for c in sorted(stack, key=lambda x: x.position)]
2061+
response = []
2062+
for c in sorted(stack, key=lambda x: x.position):
2063+
if c.args[0] in self.cluster_response_callbacks:
2064+
c.result = self.cluster_response_callbacks[c.args[0]](
2065+
c.result, **c.options
2066+
)
2067+
response.append(c.result)
20292068

20302069
if raise_on_error:
20312070
self.raise_first_error(stack)
@@ -2039,6 +2078,9 @@ def _fail_on_redirect(self, allow_redirections):
20392078
"ASK & MOVED redirection not allowed in this pipeline"
20402079
)
20412080

2081+
def exists(self, *keys):
2082+
return self.execute_command("EXISTS", *keys)
2083+
20422084
def eval(self):
20432085
""" """
20442086
raise RedisClusterException("method eval() is not implemented")

redis/commands/cluster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
from .core import (
55
ACLCommands,
66
DataAccessCommands,
7+
FunctionCommands,
78
ManagementCommands,
89
PubSubCommands,
910
ScriptCommands,
1011
)
1112
from .helpers import list_or_args
13+
from .redismodules import RedisModuleCommands
1214

1315

1416
class ClusterMultiKeyCommands:
@@ -212,6 +214,8 @@ class RedisClusterCommands(
212214
PubSubCommands,
213215
ClusterDataAccessCommands,
214216
ScriptCommands,
217+
FunctionCommands,
218+
RedisModuleCommands,
215219
):
216220
"""
217221
A class for all Redis Cluster commands

redis/commands/core.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -369,14 +369,16 @@ class ManagementCommands(CommandsProtocol):
369369
Redis management commands
370370
"""
371371

372-
def auth(self):
372+
def auth(self, password, username=None, **kwargs):
373373
"""
374-
This function throws a NotImplementedError since it is intentionally
375-
not supported.
374+
Authenticates the user. If you do not pass username, Redis will try to
375+
authenticate for the "default" user. If you do pass username, it will
376+
authenticate for the given user.
377+
For more information check https://redis.io/commands/auth
376378
"""
377-
raise NotImplementedError(
378-
"AUTH is intentionally not implemented in the client."
379-
)
379+
if username:
380+
return self.execute_command("AUTH", username, password, **kwargs)
381+
return self.execute_command
380382

381383
def bgrewriteaof(self, **kwargs):
382384
"""Tell the Redis server to rewrite the AOF file from data in memory.
@@ -741,6 +743,15 @@ def command_info(self, **kwargs) -> None:
741743
def command_count(self, **kwargs) -> ResponseT:
742744
return self.execute_command("COMMAND COUNT", **kwargs)
743745

746+
def command_docs(self, *args):
747+
"""
748+
This function throws a NotImplementedError since it is intentionally
749+
not supported.
750+
"""
751+
raise NotImplementedError(
752+
"COMMAND DOCS is intentionally not implemented in the client."
753+
)
754+
744755
def config_get(self, pattern: PatternT = "*", **kwargs) -> ResponseT:
745756
"""
746757
Return a dictionary of configuration based on the ``pattern``
@@ -1512,6 +1523,15 @@ def expireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT:
15121523
when = int(time.mktime(when.timetuple()))
15131524
return self.execute_command("EXPIREAT", name, when)
15141525

1526+
def expiretime(self, key: str) -> int:
1527+
"""
1528+
Returns the absolute Unix timestamp (since January 1, 1970) in seconds
1529+
at which the given key will expire.
1530+
1531+
For more information check https://redis.io/commands/expiretime
1532+
"""
1533+
return self.execute_command("EXPIRETIME", key)
1534+
15151535
def get(self, name: KeyT) -> ResponseT:
15161536
"""
15171537
Return the value at key ``name``, or None if the key doesn't exist
@@ -1780,6 +1800,15 @@ def pexpireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT:
17801800
when = int(time.mktime(when.timetuple())) * 1000 + ms
17811801
return self.execute_command("PEXPIREAT", name, when)
17821802

1803+
def pexpiretime(self, key: str) -> int:
1804+
"""
1805+
Returns the absolute Unix timestamp (since January 1, 1970) in milliseconds
1806+
at which the given key will expire.
1807+
1808+
For more information check https://redis.io/commands/pexpiretime
1809+
"""
1810+
return self.execute_command("PEXPIRETIME", key)
1811+
17831812
def psetex(
17841813
self,
17851814
name: KeyT,
@@ -4587,18 +4616,21 @@ def hset(
45874616
key: Optional[str] = None,
45884617
value: Optional[str] = None,
45894618
mapping: Optional[dict] = None,
4619+
items: Optional[list] = None,
45904620
) -> Union[Awaitable[int], int]:
45914621
"""
45924622
Set ``key`` to ``value`` within hash ``name``,
45934623
``mapping`` accepts a dict of key/value pairs that will be
45944624
added to hash ``name``.
4625+
``items`` accepts a list of key/value pairs that will be
4626+
added to hash ``name``.
45954627
Returns the number of fields that were added.
45964628
45974629
For more information check https://redis.io/commands/hset
45984630
"""
4599-
if key is None and not mapping:
4631+
if key is None and not mapping and not items:
46004632
raise DataError("'hset' with no key value pairs")
4601-
items = []
4633+
items = items or []
46024634
if key is not None:
46034635
items.extend((key, value))
46044636
if mapping:

redis/commands/json/__init__.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,34 @@ def pipeline(self, transaction=True, shard_hint=None):
103103
pipe.jsonget('foo')
104104
pipe.jsonget('notakey')
105105
"""
106-
p = Pipeline(
107-
connection_pool=self.client.connection_pool,
108-
response_callbacks=self.MODULE_CALLBACKS,
109-
transaction=transaction,
110-
shard_hint=shard_hint,
111-
)
106+
if isinstance(self.client, redis.RedisCluster):
107+
p = ClusterPipeline(
108+
nodes_manager=self.client.nodes_manager,
109+
commands_parser=self.client.commands_parser,
110+
startup_nodes=self.client.nodes_manager.startup_nodes,
111+
result_callbacks=self.client.result_callbacks,
112+
cluster_response_callbacks=self.client.cluster_response_callbacks,
113+
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
114+
read_from_replicas=self.client.read_from_replicas,
115+
reinitialize_steps=self.client.reinitialize_steps,
116+
)
117+
118+
else:
119+
p = Pipeline(
120+
connection_pool=self.client.connection_pool,
121+
response_callbacks=self.MODULE_CALLBACKS,
122+
transaction=transaction,
123+
shard_hint=shard_hint,
124+
)
125+
112126
p._encode = self._encode
113127
p._decode = self._decode
114128
return p
115129

116130

131+
class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline):
132+
"""Cluster pipeline for the module."""
133+
134+
117135
class Pipeline(JSONCommands, redis.client.Pipeline):
118136
"""Pipeline for the module."""

redis/commands/parser.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@ def __init__(self, redis_connection):
1717
self.initialize(redis_connection)
1818

1919
def initialize(self, r):
20-
self.commands = r.execute_command("COMMAND")
20+
commands = r.execute_command("COMMAND")
21+
uppercase_commands = []
22+
for cmd in commands:
23+
if any(x.isupper() for x in cmd):
24+
uppercase_commands.append(cmd)
25+
for cmd in uppercase_commands:
26+
commands[cmd.lower()] = commands.pop(cmd)
27+
self.commands = commands
2128

2229
# As soon as this PR is merged into Redis, we should reimplement
2330
# our logic to use COMMAND INFO changes to determine the key positions

redis/commands/search/__init__.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import redis
2+
13
from .commands import SearchCommands
24

35

@@ -17,7 +19,7 @@ def __init__(self, client, chunk_size=1000):
1719

1820
self.client = client
1921
self.execute_command = client.execute_command
20-
self.pipeline = client.pipeline(transaction=False, shard_hint=None)
22+
self._pipeline = client.pipeline(transaction=False, shard_hint=None)
2123
self.total = 0
2224
self.chunk_size = chunk_size
2325
self.current_chunk = 0
@@ -42,7 +44,7 @@ def add_document(
4244
"""
4345
self.client._add_document(
4446
doc_id,
45-
conn=self.pipeline,
47+
conn=self._pipeline,
4648
nosave=nosave,
4749
score=score,
4850
payload=payload,
@@ -67,7 +69,7 @@ def add_document_hash(
6769
"""
6870
self.client._add_document_hash(
6971
doc_id,
70-
conn=self.pipeline,
72+
conn=self._pipeline,
7173
score=score,
7274
replace=replace,
7375
)
@@ -80,7 +82,7 @@ def commit(self):
8082
"""
8183
Manually commit and flush the batch indexing query
8284
"""
83-
self.pipeline.execute()
85+
self._pipeline.execute()
8486
self.current_chunk = 0
8587

8688
def __init__(self, client, index_name="idx"):
@@ -90,7 +92,25 @@ def __init__(self, client, index_name="idx"):
9092
9193
If conn is not None, we employ an already existing redis connection
9294
"""
95+
self.MODULE_CALLBACKS = {}
9396
self.client = client
9497
self.index_name = index_name
9598
self.execute_command = client.execute_command
96-
self.pipeline = client.pipeline
99+
self._pipeline = client.pipeline
100+
101+
def pipeline(self, transaction=True, shard_hint=None):
102+
"""Creates a pipeline for the SEARCH module, that can be used for executing
103+
SEARCH commands, as well as classic core commands.
104+
"""
105+
p = Pipeline(
106+
connection_pool=self.client.connection_pool,
107+
response_callbacks=self.MODULE_CALLBACKS,
108+
transaction=transaction,
109+
shard_hint=shard_hint,
110+
)
111+
p.index_name = self.index_name
112+
return p
113+
114+
115+
class Pipeline(SearchCommands, redis.client.Pipeline):
116+
"""Pipeline for the module."""

0 commit comments

Comments
 (0)