diff --git a/redistimeseries/client.py b/redistimeseries/client.py index c673c92..33c96bd 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -38,17 +38,17 @@ def parse_range(response): def parse_m_range(response): res = [] for item in response: - res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), parse_range(item[2])]}) return res def parse_m_get(response): res = [] for item in response: - res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), item[2][0], float(item[2][1])]}) return res - + def parseToList(response): res = [] for item in response: @@ -57,7 +57,7 @@ def parseToList(response): class Client(Redis): #changed from StrictRedis """ - This class subclasses redis-py's `Redis` and implements + This class subclasses redis-py's `Redis` and implements RedisTimeSeries's commands (prefixed with "ts"). The client allows to interact with RedisTimeSeries and use all of it's functionality. @@ -87,7 +87,7 @@ def __init__(self, *args, **kwargs): # Set the module commands' callbacks MODULE_CALLBACKS = { self.CREATE_CMD : bool_ok, - self.ALTER_CMD : bool_ok, + self.ALTER_CMD : bool_ok, self.CREATERULE_CMD : bool_ok, self.DELETERULE_CMD : bool_ok, self.RANGE_CMD : parse_range, @@ -121,21 +121,26 @@ def appendLabels(params, labels): params.append('LABELS') for k, v in labels.items(): params.extend([k,v]) - + @staticmethod def appendCount(params, count): if count is not None: params.extend(['COUNT', count]) @staticmethod - def appendAggregation(params, aggregation_type, - bucket_size_msec): + def appendTimestamp(params, timestamp): + if timestamp is not None: + params.extend(['TIMESTAMP', timestamp]) + + @staticmethod + def appendAggregation(params, aggregation_type, + bucket_size_msec): params.append('AGGREGATION') params.extend([aggregation_type, bucket_size_msec]) def create(self, key, retention_msecs=None, uncompressed=False, labels={}): """ - Creates a new time-series ``key`` with ``retention_msecs`` in + Creates a new time-series ``key`` with ``retention_msecs`` in milliseconds and ``labels``. """ params = [key] @@ -147,7 +152,7 @@ def create(self, key, retention_msecs=None, uncompressed=False, labels={}): def alter(self, key, retention_msecs=None, labels={}): """ - Update the retention, labels of an existing key. The parameters + Update the retention, labels of an existing key. The parameters are the same as TS.CREATE. """ params = [key] @@ -156,10 +161,10 @@ def alter(self, key, retention_msecs=None, labels={}): return self.execute_command(self.ALTER_CMD, *params) - def add(self, key, timestamp, value, retention_msecs=None, + def add(self, key, timestamp, value, retention_msecs=None, uncompressed=False, labels={}): """ - Appends (or creates and appends) a new ``value`` to series + Appends (or creates and appends) a new ``value`` to series ``key`` with ``timestamp``. If ``key`` is created, ``retention_msecs`` and ``labels`` are applied. Return value is timestamp of insertion. @@ -173,7 +178,7 @@ def add(self, key, timestamp, value, retention_msecs=None, def madd(self, ktv_tuples): """ - Appends (or creates and appends) a new ``value`` to series + Appends (or creates and appends) a new ``value`` to series ``key`` with ``timestamp``. Expects a list of ``tuples`` as (``key``,``timestamp``, ``value``). Return value is an array with timestamps of insertions. @@ -185,30 +190,32 @@ def madd(self, ktv_tuples): return self.execute_command(self.MADD_CMD, *params) - def incrby(self, key, value, time_bucket=None, retention_msecs=None, - uncompressed=False, labels={}): + def incrby(self, key, value, timestamp=None, retention_msecs=None, + uncompressed=False, labels={}): """ Increases latest value in ``key`` by ``value``. - ``timeBucket`` resets counter. In milliseconds. + ``timestamp` can be set or system time will be used. If ``key`` is created, ``retention_msecs`` and ``labels`` are applied. """ params = [key, value] + self.appendTimestamp(params, timestamp) self.appendRetention(params, retention_msecs) self.appendUncompressed(params, uncompressed) self.appendLabels(params, labels) return self.execute_command(self.INCRBY_CMD, *params) - def decrby(self, key, value, time_bucket=None, retention_msecs=None, - uncompressed=False, labels={}): + def decrby(self, key, value, timestamp=None, retention_msecs=None, + uncompressed=False, labels={}): """ Decreases latest value in ``key`` by ``value``. - ``time_bucket`` resets counter. In milliseconds. + ``timestamp` can be set or system time will be used. If ``key`` is created, ``retention_msecs`` and ``labels`` are applied. """ params = [key, value] + self.appendTimestamp(params, timestamp) self.appendRetention(params, retention_msecs) self.appendUncompressed(params, uncompressed) self.appendLabels(params, labels) @@ -218,7 +225,7 @@ def decrby(self, key, value, time_bucket=None, retention_msecs=None, def createrule(self, source_key, dest_key, aggregation_type, bucket_size_msec): """ - Creates a compaction rule from values added to ``source_key`` + Creates a compaction rule from values added to ``source_key`` into ``dest_key``. Aggregating for ``bucket_size_msec`` where an ``aggregation_type`` can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s'] @@ -274,7 +281,7 @@ def get(self, key): def mget(self, filters, with_labels=False): """Get the last samples matching the specific ``filter``.""" - params = [] + params = [] self.appendWithLabels(params, with_labels) params.extend(['FILTER']) params += filters diff --git a/test_commands.py b/test_commands.py index d03ee0b..d046e29 100644 --- a/test_commands.py +++ b/test_commands.py @@ -56,19 +56,26 @@ def testMAdd(self): rts.create('a') self.assertEqual([1, 2, 3], rts.madd([('a', 1, 5), ('a', 2, 10), ('a', 3, 15)])) - + def testIncrbyDecrby(self): '''Test TS.INCRBY and TS.DECRBY calls''' for _ in range(100): - self.assertTrue(rts.incrby(1,1)) + self.assertTrue(rts.incrby(1, 1)) sleep(0.001) self.assertEqual(100, rts.get(1)[1]) for _ in range(100): - self.assertTrue(rts.decrby(1,1)) + self.assertTrue(rts.decrby(1, 1)) sleep(0.001) self.assertEqual(0, rts.get(1)[1]) + self.assertTrue(rts.incrby(2, 1.5, timestamp=5)) + self.assertEqual((5, 1.5), rts.get(2)) + self.assertTrue(rts.incrby(2, 2.25, timestamp=7)) + self.assertEqual((7, 3.75), rts.get(2)) + self.assertTrue(rts.decrby(2, 1.5, timestamp=15)) + self.assertEqual((15, 2.25), rts.get(2)) + def testCreateRule(self): '''Test TS.CREATERULE and TS.DELETERULE calls'''