Skip to content

Commit 98fd06e

Browse files
dvora-hchayim
andauthored
Add support for JSON, TIMESERIES, BLOOM & GRAPH commands in cluster (#2032)
Co-authored-by: Chayim <[email protected]>
1 parent c5d19b8 commit 98fd06e

File tree

10 files changed

+98
-16
lines changed

10 files changed

+98
-16
lines changed

redis/cluster.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands):
284284
"READONLY",
285285
"READWRITE",
286286
"TIME",
287+
"GRAPH.CONFIG",
287288
],
288289
DEFAULT_NODE,
289290
),
@@ -810,6 +811,10 @@ def lock(
810811
thread_local=thread_local,
811812
)
812813

814+
def set_response_callback(self, command, callback):
815+
"""Set a custom Response Callback"""
816+
self.cluster_response_callbacks[command] = callback
817+
813818
def _determine_nodes(self, *args, **kwargs):
814819
command = args[0]
815820
nodes_flag = kwargs.pop("nodes_flag", None)
@@ -1181,6 +1186,20 @@ def _process_result(self, command, res, **kwargs):
11811186
else:
11821187
return res
11831188

1189+
def load_external_module(
1190+
self,
1191+
funcname,
1192+
func,
1193+
):
1194+
"""
1195+
This function can be used to add externally defined redis modules,
1196+
and their namespaces to the redis client.
1197+
1198+
``funcname`` - A string containing the name of the function to create
1199+
``func`` - The function, being added to this class.
1200+
"""
1201+
setattr(self, funcname, func)
1202+
11841203

11851204
class ClusterNode:
11861205
def __init__(self, host, port, server_type=None, redis_connection=None):
@@ -2026,7 +2045,13 @@ def _send_cluster_commands(
20262045

20272046
# turn the response back into a simple flat array that corresponds
20282047
# to the sequence of commands issued in the stack in pipeline.execute()
2029-
response = [c.result for c in sorted(stack, key=lambda x: x.position)]
2048+
response = []
2049+
for c in sorted(stack, key=lambda x: x.position):
2050+
if c.args[0] in self.cluster_response_callbacks:
2051+
c.result = self.cluster_response_callbacks[c.args[0]](
2052+
c.result, **c.options
2053+
)
2054+
response.append(c.result)
20302055

20312056
if raise_on_error:
20322057
self.raise_first_error(stack)
@@ -2040,6 +2065,9 @@ def _fail_on_redirect(self, allow_redirections):
20402065
"ASK & MOVED redirection not allowed in this pipeline"
20412066
)
20422067

2068+
def exists(self, *keys):
2069+
return self.execute_command("EXISTS", *keys)
2070+
20432071
def eval(self):
20442072
""" """
20452073
raise RedisClusterException("method eval() is not implemented")

redis/commands/cluster.py

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
ScriptCommands,
1010
)
1111
from .helpers import list_or_args
12+
from .redismodules import RedisModuleCommands
1213

1314

1415
class ClusterMultiKeyCommands:
@@ -212,6 +213,7 @@ class RedisClusterCommands(
212213
PubSubCommands,
213214
ClusterDataAccessCommands,
214215
ScriptCommands,
216+
RedisModuleCommands,
215217
):
216218
"""
217219
A class for all Redis Cluster commands

redis/commands/json/__init__.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,34 @@ def pipeline(self, transaction=True, shard_hint=None):
103103
pipe.jsonget('foo')
104104
pipe.jsonget('notakey')
105105
"""
106-
p = Pipeline(
107-
connection_pool=self.client.connection_pool,
108-
response_callbacks=self.MODULE_CALLBACKS,
109-
transaction=transaction,
110-
shard_hint=shard_hint,
111-
)
106+
if isinstance(self.client, redis.RedisCluster):
107+
p = ClusterPipeline(
108+
nodes_manager=self.client.nodes_manager,
109+
commands_parser=self.client.commands_parser,
110+
startup_nodes=self.client.nodes_manager.startup_nodes,
111+
result_callbacks=self.client.result_callbacks,
112+
cluster_response_callbacks=self.client.cluster_response_callbacks,
113+
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
114+
read_from_replicas=self.client.read_from_replicas,
115+
reinitialize_steps=self.client.reinitialize_steps,
116+
)
117+
118+
else:
119+
p = Pipeline(
120+
connection_pool=self.client.connection_pool,
121+
response_callbacks=self.MODULE_CALLBACKS,
122+
transaction=transaction,
123+
shard_hint=shard_hint,
124+
)
125+
112126
p._encode = self._encode
113127
p._decode = self._decode
114128
return p
115129

116130

131+
class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline):
132+
"""Cluster pipeline for the module."""
133+
134+
117135
class Pipeline(JSONCommands, redis.client.Pipeline):
118136
"""Pipeline for the module."""

redis/commands/parser.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@ def __init__(self, redis_connection):
1717
self.initialize(redis_connection)
1818

1919
def initialize(self, r):
20-
self.commands = r.execute_command("COMMAND")
20+
commands = r.execute_command("COMMAND")
21+
uppercase_commands = []
22+
for cmd in commands:
23+
if any(x.isupper() for x in cmd):
24+
uppercase_commands.append(cmd)
25+
for cmd in uppercase_commands:
26+
commands[cmd.lower()] = commands.pop(cmd)
27+
self.commands = commands
2128

2229
# As soon as this PR is merged into Redis, we should reimplement
2330
# our logic to use COMMAND INFO changes to determine the key positions

redis/commands/timeseries/__init__.py

+24-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import redis.client
1+
import redis
22

33
from ..helpers import parse_to_list
44
from .commands import (
@@ -67,14 +67,31 @@ def pipeline(self, transaction=True, shard_hint=None):
6767
pipeline.execute()
6868
6969
"""
70-
p = Pipeline(
71-
connection_pool=self.client.connection_pool,
72-
response_callbacks=self.MODULE_CALLBACKS,
73-
transaction=transaction,
74-
shard_hint=shard_hint,
75-
)
70+
if isinstance(self.client, redis.RedisCluster):
71+
p = ClusterPipeline(
72+
nodes_manager=self.client.nodes_manager,
73+
commands_parser=self.client.commands_parser,
74+
startup_nodes=self.client.nodes_manager.startup_nodes,
75+
result_callbacks=self.client.result_callbacks,
76+
cluster_response_callbacks=self.client.cluster_response_callbacks,
77+
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
78+
read_from_replicas=self.client.read_from_replicas,
79+
reinitialize_steps=self.client.reinitialize_steps,
80+
)
81+
82+
else:
83+
p = Pipeline(
84+
connection_pool=self.client.connection_pool,
85+
response_callbacks=self.MODULE_CALLBACKS,
86+
transaction=transaction,
87+
shard_hint=shard_hint,
88+
)
7689
return p
7790

7891

92+
class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline):
93+
"""Cluster pipeline for the module."""
94+
95+
7996
class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
8097
"""Pipeline for the module."""

tests/test_bloom.py

+1
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ def test_cms(client):
191191

192192

193193
@pytest.mark.redismod
194+
@pytest.mark.onlynoncluster
194195
def test_cms_merge(client):
195196
assert client.cms().initbydim("A", 1000, 5)
196197
assert client.cms().initbydim("B", 1000, 5)

tests/test_graph.py

+1
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ def test_config(client):
342342

343343

344344
@pytest.mark.redismod
345+
@pytest.mark.onlynoncluster
345346
def test_list_keys(client):
346347
result = client.graph().list_keys()
347348
assert result == []

tests/test_search.py

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
from .conftest import default_redismod_url, skip_ifmodversion_lt
2323

24+
pytestmark = pytest.mark.onlynoncluster
25+
26+
2427
WILL_PLAY_TEXT = os.path.abspath(
2528
os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2")
2629
)

tests/test_timeseries.py

+5
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ def test_rev_range(client):
264264

265265

266266
@pytest.mark.redismod
267+
@pytest.mark.onlynoncluster
267268
def testMultiRange(client):
268269
client.ts().create(1, labels={"Test": "This", "team": "ny"})
269270
client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"})
@@ -293,6 +294,7 @@ def testMultiRange(client):
293294

294295

295296
@pytest.mark.redismod
297+
@pytest.mark.onlynoncluster
296298
@skip_ifmodversion_lt("99.99.99", "timeseries")
297299
def test_multi_range_advanced(client):
298300
client.ts().create(1, labels={"Test": "This", "team": "ny"})
@@ -349,6 +351,7 @@ def test_multi_range_advanced(client):
349351

350352

351353
@pytest.mark.redismod
354+
@pytest.mark.onlynoncluster
352355
@skip_ifmodversion_lt("99.99.99", "timeseries")
353356
def test_multi_reverse_range(client):
354357
client.ts().create(1, labels={"Test": "This", "team": "ny"})
@@ -442,6 +445,7 @@ def test_get(client):
442445

443446

444447
@pytest.mark.redismod
448+
@pytest.mark.onlynoncluster
445449
def test_mget(client):
446450
client.ts().create(1, labels={"Test": "This"})
447451
client.ts().create(2, labels={"Test": "This", "Taste": "That"})
@@ -483,6 +487,7 @@ def testInfoDuplicatePolicy(client):
483487

484488

485489
@pytest.mark.redismod
490+
@pytest.mark.onlynoncluster
486491
def test_query_index(client):
487492
client.ts().create(1, labels={"Test": "This"})
488493
client.ts().create(2, labels={"Test": "This", "Taste": "That"})

tox.ini

+1-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ setenv =
286286
commands =
287287
standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs}
288288
standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}
289-
cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs}
289+
cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs}
290290
cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}
291291

292292
[testenv:redis5]

0 commit comments

Comments
 (0)