diff --git a/gcloud/bigtable/happybase/table.py b/gcloud/bigtable/happybase/table.py index b9e377a448bc..7bcb50efc0c3 100644 --- a/gcloud/bigtable/happybase/table.py +++ b/gcloud/bigtable/happybase/table.py @@ -15,6 +15,8 @@ """Google Cloud Bigtable HappyBase table module.""" +import struct + import six from gcloud._helpers import _total_seconds @@ -24,6 +26,7 @@ from gcloud.bigtable.table import Table as _LowLevelTable +_UNPACK_I64 = struct.Struct('>q').unpack _SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule) @@ -127,6 +130,102 @@ def regions(self): raise NotImplementedError('The Cloud Bigtable API does not have a ' 'concept of splitting a table into regions.') + def counter_get(self, row, column): + """Retrieve the current value of a counter column. + + This method retrieves the current value of a counter column. If the + counter column does not exist, this function initializes it to ``0``. + + .. note:: + + Application code should **never** store a counter value directly; + use the atomic :meth:`counter_inc` and :meth:`counter_dec` methods + for that. + + :type row: str + :param row: Row key for the row we are getting a counter from. + + :type column: str + :param column: Column we are ``get``-ing from; of the form ``fam:col``. + + :rtype: int + :returns: Counter value (after initializing / incrementing by 0). + """ + # Don't query directly, but increment with value=0 so that the counter + # is correctly initialized if didn't exist yet. + return self.counter_inc(row, column, value=0) + + def counter_inc(self, row, column, value=1): + """Atomically increment a counter column. + + This method atomically increments a counter column in ``row``. + If the counter column does not exist, it is automatically initialized + to ``0`` before being incremented. + + :type row: str + :param row: Row key for the row we are incrementing a counter in. + + :type column: str + :param column: Column we are incrementing a value in; of the + form ``fam:col``. + + :type value: int + :param value: Amount to increment the counter by. (If negative, + this is equivalent to decrement.) + + :rtype: int + :returns: Counter value after incrementing. + """ + row = self._low_level_table.row(row) + if isinstance(column, six.binary_type): + column = column.decode('utf-8') + column_family_id, column_qualifier = column.split(':') + row.increment_cell_value(column_family_id, column_qualifier, value) + # See row.commit_modifications() will return a dictionary: + # { + # u'col-fam-id': { + # b'col-name1': [ + # (b'cell-val', datetime.datetime(...)), + # ... + # ], + # ... + # }, + # } + modified_cells = row.commit_modifications() + # Get the cells in the modified column, + column_cells = modified_cells[column_family_id][column_qualifier] + # Make sure there is exactly one cell in the column. + if len(column_cells) != 1: + raise ValueError('Expected server to return one modified cell.') + column_cell = column_cells[0] + # Get the bytes value from the column and convert it to an integer. + bytes_value = column_cell[0] + int_value, = _UNPACK_I64(bytes_value) + return int_value + + def counter_dec(self, row, column, value=1): + """Atomically decrement a counter column. + + This method atomically decrements a counter column in ``row``. + If the counter column does not exist, it is automatically initialized + to ``0`` before being decremented. + + :type row: str + :param row: Row key for the row we are decrementing a counter in. + + :type column: str + :param column: Column we are decrementing a value in; of the + form ``fam:col``. + + :type value: int + :param value: Amount to decrement the counter by. (If negative, + this is equivalent to increment.) + + :rtype: int + :returns: Counter value after decrementing. + """ + return self.counter_inc(row, column, -value) + def _gc_rule_to_dict(gc_rule): """Converts garbage collection rule to dictionary if possible. diff --git a/gcloud/bigtable/happybase/test_table.py b/gcloud/bigtable/happybase/test_table.py index 395b68501b62..fb828fcf6cbb 100644 --- a/gcloud/bigtable/happybase/test_table.py +++ b/gcloud/bigtable/happybase/test_table.py @@ -121,6 +121,163 @@ def test_regions(self): with self.assertRaises(NotImplementedError): table.regions() + def test_counter_get(self): + klass = self._getTargetClass() + counter_value = 1337 + + class TableWithInc(klass): + + incremented = [] + value = counter_value + + def counter_inc(self, row, column, value=1): + self.incremented.append((row, column, value)) + self.value += value + return self.value + + name = 'table-name' + connection = None + table = TableWithInc(name, connection) + + row = 'row-key' + column = 'fam:col1' + self.assertEqual(TableWithInc.incremented, []) + result = table.counter_get(row, column) + self.assertEqual(result, counter_value) + self.assertEqual(TableWithInc.incremented, [(row, column, 0)]) + + def test_counter_dec(self): + klass = self._getTargetClass() + counter_value = 42 + + class TableWithInc(klass): + + incremented = [] + value = counter_value + + def counter_inc(self, row, column, value=1): + self.incremented.append((row, column, value)) + self.value += value + return self.value + + name = 'table-name' + connection = None + table = TableWithInc(name, connection) + + row = 'row-key' + column = 'fam:col1' + dec_value = 987 + self.assertEqual(TableWithInc.incremented, []) + result = table.counter_dec(row, column, value=dec_value) + self.assertEqual(result, counter_value - dec_value) + self.assertEqual(TableWithInc.incremented, [(row, column, -dec_value)]) + + def _counter_inc_helper(self, row, column, value, commit_result): + import six + + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + # Mock the return values. + table._low_level_table = _MockLowLevelTable() + table._low_level_table.row_values[row] = _MockLowLevelRow( + row, commit_result=commit_result) + + result = table.counter_inc(row, column, value=value) + + incremented_value = value + _MockLowLevelRow.COUNTER_DEFAULT + self.assertEqual(result, incremented_value) + + # Check the row values returned. + row_obj = table._low_level_table.row_values[row] + if isinstance(column, six.binary_type): + column = column.decode('utf-8') + self.assertEqual(row_obj.counts, + {tuple(column.split(':')): incremented_value}) + + def test_counter_inc(self): + import struct + + row = 'row-key' + col_fam = u'fam' + col_qual = u'col1' + column = col_fam + u':' + col_qual + value = 42 + packed_value = struct.pack('>q', value) + fake_timestamp = None + commit_result = { + col_fam: { + col_qual: [(packed_value, fake_timestamp)], + } + } + self._counter_inc_helper(row, column, value, commit_result) + + def test_counter_inc_column_bytes(self): + import struct + + row = 'row-key' + col_fam = b'fam' + col_qual = b'col1' + column = col_fam + b':' + col_qual + value = 42 + packed_value = struct.pack('>q', value) + fake_timestamp = None + commit_result = { + col_fam.decode('utf-8'): { + col_qual.decode('utf-8'): [(packed_value, fake_timestamp)], + } + } + self._counter_inc_helper(row, column, value, commit_result) + + def test_counter_inc_bad_result(self): + row = 'row-key' + col_fam = 'fam' + col_qual = 'col1' + column = col_fam + ':' + col_qual + value = 42 + commit_result = None + with self.assertRaises(TypeError): + self._counter_inc_helper(row, column, value, commit_result) + + def test_counter_inc_result_key_error(self): + row = 'row-key' + col_fam = 'fam' + col_qual = 'col1' + column = col_fam + ':' + col_qual + value = 42 + commit_result = {} + with self.assertRaises(KeyError): + self._counter_inc_helper(row, column, value, commit_result) + + def test_counter_inc_result_nested_key_error(self): + row = 'row-key' + col_fam = 'fam' + col_qual = 'col1' + column = col_fam + ':' + col_qual + value = 42 + commit_result = {col_fam: {}} + with self.assertRaises(KeyError): + self._counter_inc_helper(row, column, value, commit_result) + + def test_counter_inc_result_non_unique_cell(self): + row = 'row-key' + col_fam = 'fam' + col_qual = 'col1' + column = col_fam + ':' + col_qual + value = 42 + fake_timestamp = None + packed_value = None + commit_result = { + col_fam: { + col_qual: [ + (packed_value, fake_timestamp), + (packed_value, fake_timestamp), + ], + } + } + with self.assertRaises(ValueError): + self._counter_inc_helper(row, column, value, commit_result) + class Test__gc_rule_to_dict(unittest2.TestCase): @@ -231,7 +388,29 @@ def __init__(self, *args, **kwargs): self.kwargs = kwargs self.list_column_families_calls = 0 self.column_families = {} + self.row_values = {} def list_column_families(self): self.list_column_families_calls += 1 return self.column_families + + def row(self, row_key): + return self.row_values[row_key] + + +class _MockLowLevelRow(object): + + COUNTER_DEFAULT = 0 + + def __init__(self, row_key, commit_result=None): + self.row_key = row_key + self.counts = {} + self.commit_result = commit_result + + def increment_cell_value(self, column_family_id, column, int_value): + count = self.counts.setdefault((column_family_id, column), + self.COUNTER_DEFAULT) + self.counts[(column_family_id, column)] = count + int_value + + def commit_modifications(self): + return self.commit_result