Skip to content
This repository was archived by the owner on Jan 24, 2023. It is now read-only.

Commit 0552bb6

Browse files
author
Ariel Shtul
authored
add timestamp, remove time bucket from incr/decrby (#47)
1 parent cd1ecef commit 0552bb6

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

redistimeseries/client.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ def parse_range(response):
3838
def parse_m_range(response):
3939
res = []
4040
for item in response:
41-
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
41+
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
4242
parse_range(item[2])]})
4343
return res
4444

4545
def parse_m_get(response):
4646
res = []
4747
for item in response:
48-
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
48+
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
4949
item[2][0], float(item[2][1])]})
5050
return res
51-
51+
5252
def parseToList(response):
5353
res = []
5454
for item in response:
@@ -57,7 +57,7 @@ def parseToList(response):
5757

5858
class Client(Redis): #changed from StrictRedis
5959
"""
60-
This class subclasses redis-py's `Redis` and implements
60+
This class subclasses redis-py's `Redis` and implements
6161
RedisTimeSeries's commands (prefixed with "ts").
6262
The client allows to interact with RedisTimeSeries and use all of
6363
it's functionality.
@@ -87,7 +87,7 @@ def __init__(self, *args, **kwargs):
8787
# Set the module commands' callbacks
8888
MODULE_CALLBACKS = {
8989
self.CREATE_CMD : bool_ok,
90-
self.ALTER_CMD : bool_ok,
90+
self.ALTER_CMD : bool_ok,
9191
self.CREATERULE_CMD : bool_ok,
9292
self.DELETERULE_CMD : bool_ok,
9393
self.RANGE_CMD : parse_range,
@@ -121,21 +121,26 @@ def appendLabels(params, labels):
121121
params.append('LABELS')
122122
for k, v in labels.items():
123123
params.extend([k,v])
124-
124+
125125
@staticmethod
126126
def appendCount(params, count):
127127
if count is not None:
128128
params.extend(['COUNT', count])
129129

130130
@staticmethod
131-
def appendAggregation(params, aggregation_type,
132-
bucket_size_msec):
131+
def appendTimestamp(params, timestamp):
132+
if timestamp is not None:
133+
params.extend(['TIMESTAMP', timestamp])
134+
135+
@staticmethod
136+
def appendAggregation(params, aggregation_type,
137+
bucket_size_msec):
133138
params.append('AGGREGATION')
134139
params.extend([aggregation_type, bucket_size_msec])
135140

136141
def create(self, key, retention_msecs=None, uncompressed=False, labels={}):
137142
"""
138-
Creates a new time-series ``key`` with ``retention_msecs`` in
143+
Creates a new time-series ``key`` with ``retention_msecs`` in
139144
milliseconds and ``labels``.
140145
"""
141146
params = [key]
@@ -147,7 +152,7 @@ def create(self, key, retention_msecs=None, uncompressed=False, labels={}):
147152

148153
def alter(self, key, retention_msecs=None, labels={}):
149154
"""
150-
Update the retention, labels of an existing key. The parameters
155+
Update the retention, labels of an existing key. The parameters
151156
are the same as TS.CREATE.
152157
"""
153158
params = [key]
@@ -156,10 +161,10 @@ def alter(self, key, retention_msecs=None, labels={}):
156161

157162
return self.execute_command(self.ALTER_CMD, *params)
158163

159-
def add(self, key, timestamp, value, retention_msecs=None,
164+
def add(self, key, timestamp, value, retention_msecs=None,
160165
uncompressed=False, labels={}):
161166
"""
162-
Appends (or creates and appends) a new ``value`` to series
167+
Appends (or creates and appends) a new ``value`` to series
163168
``key`` with ``timestamp``. If ``key`` is created,
164169
``retention_msecs`` and ``labels`` are applied. Return value
165170
is timestamp of insertion.
@@ -173,7 +178,7 @@ def add(self, key, timestamp, value, retention_msecs=None,
173178

174179
def madd(self, ktv_tuples):
175180
"""
176-
Appends (or creates and appends) a new ``value`` to series
181+
Appends (or creates and appends) a new ``value`` to series
177182
``key`` with ``timestamp``. Expects a list of ``tuples`` as
178183
(``key``,``timestamp``, ``value``). Return value is an
179184
array with timestamps of insertions.
@@ -185,30 +190,32 @@ def madd(self, ktv_tuples):
185190

186191
return self.execute_command(self.MADD_CMD, *params)
187192

188-
def incrby(self, key, value, time_bucket=None, retention_msecs=None,
189-
uncompressed=False, labels={}):
193+
def incrby(self, key, value, timestamp=None, retention_msecs=None,
194+
uncompressed=False, labels={}):
190195
"""
191196
Increases latest value in ``key`` by ``value``.
192-
``timeBucket`` resets counter. In milliseconds.
197+
``timestamp` can be set or system time will be used.
193198
If ``key`` is created, ``retention_msecs`` and ``labels`` are
194199
applied.
195200
"""
196201
params = [key, value]
202+
self.appendTimestamp(params, timestamp)
197203
self.appendRetention(params, retention_msecs)
198204
self.appendUncompressed(params, uncompressed)
199205
self.appendLabels(params, labels)
200206

201207
return self.execute_command(self.INCRBY_CMD, *params)
202208

203-
def decrby(self, key, value, time_bucket=None, retention_msecs=None,
204-
uncompressed=False, labels={}):
209+
def decrby(self, key, value, timestamp=None, retention_msecs=None,
210+
uncompressed=False, labels={}):
205211
"""
206212
Decreases latest value in ``key`` by ``value``.
207-
``time_bucket`` resets counter. In milliseconds.
213+
``timestamp` can be set or system time will be used.
208214
If ``key`` is created, ``retention_msecs`` and ``labels`` are
209215
applied.
210216
"""
211217
params = [key, value]
218+
self.appendTimestamp(params, timestamp)
212219
self.appendRetention(params, retention_msecs)
213220
self.appendUncompressed(params, uncompressed)
214221
self.appendLabels(params, labels)
@@ -218,7 +225,7 @@ def decrby(self, key, value, time_bucket=None, retention_msecs=None,
218225
def createrule(self, source_key, dest_key,
219226
aggregation_type, bucket_size_msec):
220227
"""
221-
Creates a compaction rule from values added to ``source_key``
228+
Creates a compaction rule from values added to ``source_key``
222229
into ``dest_key``. Aggregating for ``bucket_size_msec`` where an
223230
``aggregation_type`` can be ['avg', 'sum', 'min', 'max',
224231
'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s']
@@ -274,7 +281,7 @@ def get(self, key):
274281

275282
def mget(self, filters, with_labels=False):
276283
"""Get the last samples matching the specific ``filter``."""
277-
params = []
284+
params = []
278285
self.appendWithLabels(params, with_labels)
279286
params.extend(['FILTER'])
280287
params += filters

test_commands.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,26 @@ def testMAdd(self):
5656

5757
rts.create('a')
5858
self.assertEqual([1, 2, 3], rts.madd([('a', 1, 5), ('a', 2, 10), ('a', 3, 15)]))
59-
59+
6060
def testIncrbyDecrby(self):
6161
'''Test TS.INCRBY and TS.DECRBY calls'''
6262

6363
for _ in range(100):
64-
self.assertTrue(rts.incrby(1,1))
64+
self.assertTrue(rts.incrby(1, 1))
6565
sleep(0.001)
6666
self.assertEqual(100, rts.get(1)[1])
6767
for _ in range(100):
68-
self.assertTrue(rts.decrby(1,1))
68+
self.assertTrue(rts.decrby(1, 1))
6969
sleep(0.001)
7070
self.assertEqual(0, rts.get(1)[1])
7171

72+
self.assertTrue(rts.incrby(2, 1.5, timestamp=5))
73+
self.assertEqual((5, 1.5), rts.get(2))
74+
self.assertTrue(rts.incrby(2, 2.25, timestamp=7))
75+
self.assertEqual((7, 3.75), rts.get(2))
76+
self.assertTrue(rts.decrby(2, 1.5, timestamp=15))
77+
self.assertEqual((15, 2.25), rts.get(2))
78+
7279
def testCreateRule(self):
7380
'''Test TS.CREATERULE and TS.DELETERULE calls'''
7481

0 commit comments

Comments
 (0)