Skip to content

Commit 599f5a9

Browse files
authored
Re-enable pipeline support for JSON and TimeSeries (#1674)
1 parent fea7b85 commit 599f5a9

File tree

7 files changed

+154
-38
lines changed

7 files changed

+154
-38
lines changed

redis/commands/helpers.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import random
2+
import string
3+
4+
15
def list_or_args(keys, args):
26
# returns a single new list combining keys and args
37
try:
@@ -42,3 +46,31 @@ def parse_to_list(response):
4246
except TypeError:
4347
res.append(None)
4448
return res
49+
50+
51+
def random_string(length=10):
52+
"""
53+
Returns a random N character long string.
54+
"""
55+
return "".join( # nosec
56+
random.choice(string.ascii_lowercase) for x in range(length)
57+
)
58+
59+
60+
def quote_string(v):
61+
"""
62+
RedisGraph strings must be quoted,
63+
quote_string wraps given v with quotes incase
64+
v is a string.
65+
"""
66+
67+
if isinstance(v, bytes):
68+
v = v.decode()
69+
elif not isinstance(v, str):
70+
return v
71+
if len(v) == 0:
72+
return '""'
73+
74+
v = v.replace('"', '\\"')
75+
76+
return '"{}"'.format(v)

redis/commands/json/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
)
77
from ..helpers import nativestr
88
from .commands import JSONCommands
9+
import redis
910

1011

1112
class JSON(JSONCommands):
@@ -91,3 +92,29 @@ def _decode(self, obj):
9192
def _encode(self, obj):
9293
"""Get the encoder."""
9394
return self.__encoder__.encode(obj)
95+
96+
def pipeline(self, transaction=True, shard_hint=None):
97+
"""Creates a pipeline for the JSON module, that can be used for executing
98+
JSON commands, as well as classic core commands.
99+
100+
Usage example:
101+
102+
r = redis.Redis()
103+
pipe = r.json().pipeline()
104+
pipe.jsonset('foo', '.', {'hello!': 'world'})
105+
pipe.jsonget('foo')
106+
pipe.jsonget('notakey')
107+
"""
108+
p = Pipeline(
109+
connection_pool=self.client.connection_pool,
110+
response_callbacks=self.MODULE_CALLBACKS,
111+
transaction=transaction,
112+
shard_hint=shard_hint,
113+
)
114+
p._encode = self._encode
115+
p._decode = self._decode
116+
return p
117+
118+
119+
class Pipeline(JSONCommands, redis.client.Pipeline):
120+
"""Pipeline for the module."""

redis/commands/json/commands.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ def set(self, name, path, obj, nx=False, xx=False, decode_keys=False):
154154
``xx`` if set to True, set ``value`` only if it exists.
155155
``decode_keys`` If set to True, the keys of ``obj`` will be decoded
156156
with utf-8.
157+
158+
For the purpose of using this within a pipeline, this command is also
159+
aliased to jsonset.
157160
"""
158161
if decode_keys:
159162
obj = decode_dict_keys(obj)
@@ -212,3 +215,18 @@ def debug(self, subcommand, key=None, path=Path.rootPath()):
212215
pieces.append(key)
213216
pieces.append(str(path))
214217
return self.execute_command("JSON.DEBUG", *pieces)
218+
219+
@deprecated(version='4.0.0',
220+
reason='redisjson-py supported this, call get directly.')
221+
def jsonget(self, *args, **kwargs):
222+
return self.get(*args, **kwargs)
223+
224+
@deprecated(version='4.0.0',
225+
reason='redisjson-py supported this, call get directly.')
226+
def jsonmget(self, *args, **kwargs):
227+
return self.mget(*args, **kwargs)
228+
229+
@deprecated(version='4.0.0',
230+
reason='redisjson-py supported this, call get directly.')
231+
def jsonset(self, *args, **kwargs):
232+
return self.set(*args, **kwargs)

redis/commands/timeseries/__init__.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from redis.client import bool_ok
1+
import redis.client
22

33
from .utils import (
44
parse_range,
@@ -37,12 +37,12 @@ class TimeSeries(TimeSeriesCommands):
3737
def __init__(self, client=None, version=None, **kwargs):
3838
"""Create a new RedisTimeSeries client."""
3939
# Set the module commands' callbacks
40-
MODULE_CALLBACKS = {
41-
CREATE_CMD: bool_ok,
42-
ALTER_CMD: bool_ok,
43-
CREATERULE_CMD: bool_ok,
40+
self.MODULE_CALLBACKS = {
41+
CREATE_CMD: redis.client.bool_ok,
42+
ALTER_CMD: redis.client.bool_ok,
43+
CREATERULE_CMD: redis.client.bool_ok,
4444
DEL_CMD: int,
45-
DELETERULE_CMD: bool_ok,
45+
DELETERULE_CMD: redis.client.bool_ok,
4646
RANGE_CMD: parse_range,
4747
REVRANGE_CMD: parse_range,
4848
MRANGE_CMD: parse_m_range,
@@ -57,5 +57,30 @@ def __init__(self, client=None, version=None, **kwargs):
5757
self.execute_command = client.execute_command
5858
self.MODULE_VERSION = version
5959

60-
for k in MODULE_CALLBACKS:
61-
self.client.set_response_callback(k, MODULE_CALLBACKS[k])
60+
for key, value in self.MODULE_CALLBACKS.items():
61+
self.client.set_response_callback(key, value)
62+
63+
def pipeline(self, transaction=True, shard_hint=None):
64+
"""Creates a pipeline for the TimeSeries module, that can be used
65+
for executing only TimeSeries commands and core commands.
66+
67+
Usage example:
68+
69+
r = redis.Redis()
70+
pipe = r.ts().pipeline()
71+
for i in range(100):
72+
pipeline.add("with_pipeline", i, 1.1 * i)
73+
pipeline.execute()
74+
75+
"""
76+
p = Pipeline(
77+
connection_pool=self.client.connection_pool,
78+
response_callbacks=self.MODULE_CALLBACKS,
79+
transaction=transaction,
80+
shard_hint=shard_hint,
81+
)
82+
return p
83+
84+
85+
class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
86+
"""Pipeline for the module."""

tests/test_json.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -275,16 +275,28 @@ def test_objlen(client):
275275
assert len(obj) == client.json().objlen("obj")
276276

277277

278-
# @pytest.mark.pipeline
279-
# @pytest.mark.redismod
280-
# def test_pipelineshouldsucceed(client):
281-
# p = client.json().pipeline()
282-
# p.set("foo", Path.rootPath(), "bar")
283-
# p.get("foo")
284-
# p.delete("foo")
285-
# assert [True, "bar", 1] == p.execute()
286-
# assert client.keys() == []
287-
# assert client.get("foo") is None
278+
@pytest.mark.pipeline
279+
@pytest.mark.redismod
280+
def test_json_commands_in_pipeline(client):
281+
p = client.json().pipeline()
282+
p.set("foo", Path.rootPath(), "bar")
283+
p.get("foo")
284+
p.delete("foo")
285+
assert [True, "bar", 1] == p.execute()
286+
assert client.keys() == []
287+
assert client.get("foo") is None
288+
289+
# now with a true, json object
290+
client.flushdb()
291+
p = client.json().pipeline()
292+
d = {"hello": "world", "oh": "snap"}
293+
p.jsonset("foo", Path.rootPath(), d)
294+
p.jsonget("foo")
295+
p.exists("notarealkey")
296+
p.delete("foo")
297+
assert [True, d, 0, 1] == p.execute()
298+
assert client.keys() == []
299+
assert client.get("foo") is None
288300

289301

290302
@pytest.mark.redismod

tests/test_timeseries.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ def testAlter(client):
4949
assert 10 == client.ts().info(1).retention_msecs
5050

5151

52-
# pipe = client.ts().pipeline()
53-
# assert pipe.create(2)
54-
55-
5652
@pytest.mark.redismod
5753
@skip_ifmodversion_lt("1.4.0", "timeseries")
5854
def testAlterDiplicatePolicy(client):
@@ -568,20 +564,19 @@ def testQueryIndex(client):
568564
assert [2] == client.ts().queryindex(["Taste=That"])
569565

570566

571-
#
572-
# @pytest.mark.redismod
573-
# @pytest.mark.pipeline
574-
# def testPipeline(client):
575-
# pipeline = client.ts().pipeline()
576-
# pipeline.create("with_pipeline")
577-
# for i in range(100):
578-
# pipeline.add("with_pipeline", i, 1.1 * i)
579-
# pipeline.execute()
580-
581-
# info = client.ts().info("with_pipeline")
582-
# assert info.lastTimeStamp == 99
583-
# assert info.total_samples == 100
584-
# assert client.ts().get("with_pipeline")[1] == 99 * 1.1
567+
@pytest.mark.redismod
568+
@pytest.mark.pipeline
569+
def test_pipeline(client):
570+
pipeline = client.ts().pipeline()
571+
pipeline.create("with_pipeline")
572+
for i in range(100):
573+
pipeline.add("with_pipeline", i, 1.1 * i)
574+
pipeline.execute()
575+
576+
info = client.ts().info("with_pipeline")
577+
assert info.lastTimeStamp == 99
578+
assert info.total_samples == 100
579+
assert client.ts().get("with_pipeline")[1] == 99 * 1.1
585580

586581

587582
@pytest.mark.redismod

tox.ini

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ basepython = pypy3
121121

122122
[flake8]
123123
exclude =
124-
.venv,
124+
*.egg-info,
125+
*.pyc,
126+
.git,
125127
.tox,
126-
whitelist.py
128+
.venv*,
129+
build,
130+
dist,
131+
docker,
132+
venv*,
133+
whitelist.py

0 commit comments

Comments
 (0)