Skip to content
This repository was archived by the owner on Jan 24, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ def parse_range(response):
def parse_m_range(response):
res = []
for item in response:
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
parse_range(item[2])]})
return res

def parse_m_get(response):
res = []
for item in response:
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
res.append({ nativestr(item[0]) : [list_to_dict(item[1]),
item[2][0], float(item[2][1])]})
return res

def parseToList(response):
res = []
for item in response:
Expand All @@ -57,7 +57,7 @@ def parseToList(response):

class Client(Redis): #changed from StrictRedis
"""
This class subclasses redis-py's `Redis` and implements
This class subclasses redis-py's `Redis` and implements
RedisTimeSeries's commands (prefixed with "ts").
The client allows to interact with RedisTimeSeries and use all of
it's functionality.
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(self, *args, **kwargs):
# Set the module commands' callbacks
MODULE_CALLBACKS = {
self.CREATE_CMD : bool_ok,
self.ALTER_CMD : bool_ok,
self.ALTER_CMD : bool_ok,
self.CREATERULE_CMD : bool_ok,
self.DELETERULE_CMD : bool_ok,
self.RANGE_CMD : parse_range,
Expand Down Expand Up @@ -121,21 +121,26 @@ def appendLabels(params, labels):
params.append('LABELS')
for k, v in labels.items():
params.extend([k,v])

@staticmethod
def appendCount(params, count):
if count is not None:
params.extend(['COUNT', count])

@staticmethod
def appendAggregation(params, aggregation_type,
bucket_size_msec):
def appendTimestamp(params, timestamp):
if timestamp is not None:
params.extend(['TIMESTAMP', timestamp])

@staticmethod
def appendAggregation(params, aggregation_type,
bucket_size_msec):
params.append('AGGREGATION')
params.extend([aggregation_type, bucket_size_msec])

def create(self, key, retention_msecs=None, uncompressed=False, labels={}):
"""
Creates a new time-series ``key`` with ``retention_msecs`` in
Creates a new time-series ``key`` with ``retention_msecs`` in
milliseconds and ``labels``.
"""
params = [key]
Expand All @@ -147,7 +152,7 @@ def create(self, key, retention_msecs=None, uncompressed=False, labels={}):

def alter(self, key, retention_msecs=None, labels={}):
"""
Update the retention, labels of an existing key. The parameters
Update the retention, labels of an existing key. The parameters
are the same as TS.CREATE.
"""
params = [key]
Expand All @@ -156,10 +161,10 @@ def alter(self, key, retention_msecs=None, labels={}):

return self.execute_command(self.ALTER_CMD, *params)

def add(self, key, timestamp, value, retention_msecs=None,
def add(self, key, timestamp, value, retention_msecs=None,
uncompressed=False, labels={}):
"""
Appends (or creates and appends) a new ``value`` to series
Appends (or creates and appends) a new ``value`` to series
``key`` with ``timestamp``. If ``key`` is created,
``retention_msecs`` and ``labels`` are applied. Return value
is timestamp of insertion.
Expand All @@ -173,7 +178,7 @@ def add(self, key, timestamp, value, retention_msecs=None,

def madd(self, ktv_tuples):
"""
Appends (or creates and appends) a new ``value`` to series
Appends (or creates and appends) a new ``value`` to series
``key`` with ``timestamp``. Expects a list of ``tuples`` as
(``key``,``timestamp``, ``value``). Return value is an
array with timestamps of insertions.
Expand All @@ -185,30 +190,32 @@ def madd(self, ktv_tuples):

return self.execute_command(self.MADD_CMD, *params)

def incrby(self, key, value, time_bucket=None, retention_msecs=None,
uncompressed=False, labels={}):
def incrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}):
"""
Increases latest value in ``key`` by ``value``.
``timeBucket`` resets counter. In milliseconds.
``timestamp` can be set or system time will be used.
If ``key`` is created, ``retention_msecs`` and ``labels`` are
applied.
"""
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendLabels(params, labels)

return self.execute_command(self.INCRBY_CMD, *params)

def decrby(self, key, value, time_bucket=None, retention_msecs=None,
uncompressed=False, labels={}):
def decrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}):
"""
Decreases latest value in ``key`` by ``value``.
``time_bucket`` resets counter. In milliseconds.
``timestamp` can be set or system time will be used.
If ``key`` is created, ``retention_msecs`` and ``labels`` are
applied.
"""
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendLabels(params, labels)
Expand All @@ -218,7 +225,7 @@ def decrby(self, key, value, time_bucket=None, retention_msecs=None,
def createrule(self, source_key, dest_key,
aggregation_type, bucket_size_msec):
"""
Creates a compaction rule from values added to ``source_key``
Creates a compaction rule from values added to ``source_key``
into ``dest_key``. Aggregating for ``bucket_size_msec`` where an
``aggregation_type`` can be ['avg', 'sum', 'min', 'max',
'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s']
Expand Down Expand Up @@ -274,7 +281,7 @@ def get(self, key):

def mget(self, filters, with_labels=False):
"""Get the last samples matching the specific ``filter``."""
params = []
params = []
self.appendWithLabels(params, with_labels)
params.extend(['FILTER'])
params += filters
Expand Down
13 changes: 10 additions & 3 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,26 @@ def testMAdd(self):

rts.create('a')
self.assertEqual([1, 2, 3], rts.madd([('a', 1, 5), ('a', 2, 10), ('a', 3, 15)]))

def testIncrbyDecrby(self):
'''Test TS.INCRBY and TS.DECRBY calls'''

for _ in range(100):
self.assertTrue(rts.incrby(1,1))
self.assertTrue(rts.incrby(1, 1))
sleep(0.001)
self.assertEqual(100, rts.get(1)[1])
for _ in range(100):
self.assertTrue(rts.decrby(1,1))
self.assertTrue(rts.decrby(1, 1))
sleep(0.001)
self.assertEqual(0, rts.get(1)[1])

self.assertTrue(rts.incrby(2, 1.5, timestamp=5))
self.assertEqual((5, 1.5), rts.get(2))
self.assertTrue(rts.incrby(2, 2.25, timestamp=7))
self.assertEqual((7, 3.75), rts.get(2))
self.assertTrue(rts.decrby(2, 1.5, timestamp=15))
self.assertEqual((15, 2.25), rts.get(2))

def testCreateRule(self):
'''Test TS.CREATERULE and TS.DELETERULE calls'''

Expand Down