Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions gcloud/bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"""Google Cloud Bigtable HappyBase table module."""


import struct

import six

from gcloud._helpers import _total_seconds
Expand All @@ -24,6 +26,7 @@
from gcloud.bigtable.table import Table as _LowLevelTable


_UNPACK_I64 = struct.Struct('>q').unpack
_SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule)


Expand Down Expand Up @@ -127,6 +130,102 @@ def regions(self):
raise NotImplementedError('The Cloud Bigtable API does not have a '
'concept of splitting a table into regions.')

def counter_get(self, row, column):
"""Retrieve the current value of a counter column.

This method retrieves the current value of a counter column. If the
counter column does not exist, this function initializes it to ``0``.

.. note::

Application code should **never** store a counter value directly;
use the atomic :meth:`counter_inc` and :meth:`counter_dec` methods
for that.

:type row: str
:param row: Row key for the row we are getting a counter from.

:type column: str
:param column: Column we are ``get``-ing from; of the form ``fam:col``.

:rtype: int
:returns: Counter value (after initializing / incrementing by 0).
"""
# Don't query directly, but increment with value=0 so that the counter
# is correctly initialized if didn't exist yet.
return self.counter_inc(row, column, value=0)

def counter_inc(self, row, column, value=1):
"""Atomically increment a counter column.

This method atomically increments a counter column in ``row``.
If the counter column does not exist, it is automatically initialized
to ``0`` before being incremented.

:type row: str
:param row: Row key for the row we are incrementing a counter in.

:type column: str
:param column: Column we are incrementing a value in; of the
form ``fam:col``.

:type value: int
:param value: Amount to increment the counter by. (If negative,
this is equivalent to decrement.)

:rtype: int
:returns: Counter value after incrementing.
"""
row = self._low_level_table.row(row)
if isinstance(column, six.binary_type):
column = column.decode('utf-8')

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

column_family_id, column_qualifier = column.split(':')
row.increment_cell_value(column_family_id, column_qualifier, value)
# See row.commit_modifications() will return a dictionary:
# {
# u'col-fam-id': {
# b'col-name1': [
# (b'cell-val', datetime.datetime(...)),
# ...
# ],
# ...
# },
# }
modified_cells = row.commit_modifications()
# Get the cells in the modified column,
column_cells = modified_cells[column_family_id][column_qualifier]
# Make sure there is exactly one cell in the column.
if len(column_cells) != 1:
raise ValueError('Expected server to return one modified cell.')
column_cell = column_cells[0]
# Get the bytes value from the column and convert it to an integer.
bytes_value = column_cell[0]
int_value, = _UNPACK_I64(bytes_value)
return int_value

def counter_dec(self, row, column, value=1):
"""Atomically decrement a counter column.

This method atomically decrements a counter column in ``row``.
If the counter column does not exist, it is automatically initialized
to ``0`` before being decremented.

:type row: str
:param row: Row key for the row we are decrementing a counter in.

:type column: str
:param column: Column we are decrementing a value in; of the
form ``fam:col``.

:type value: int
:param value: Amount to decrement the counter by. (If negative,
this is equivalent to increment.)

:rtype: int
:returns: Counter value after decrementing.
"""
return self.counter_inc(row, column, -value)


def _gc_rule_to_dict(gc_rule):
"""Converts garbage collection rule to dictionary if possible.
Expand Down
179 changes: 179 additions & 0 deletions gcloud/bigtable/happybase/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,163 @@ def test_regions(self):
with self.assertRaises(NotImplementedError):
table.regions()

def test_counter_get(self):
klass = self._getTargetClass()
counter_value = 1337

class TableWithInc(klass):

incremented = []
value = counter_value

def counter_inc(self, row, column, value=1):
self.incremented.append((row, column, value))
self.value += value
return self.value

name = 'table-name'
connection = None
table = TableWithInc(name, connection)

row = 'row-key'
column = 'fam:col1'
self.assertEqual(TableWithInc.incremented, [])
result = table.counter_get(row, column)
self.assertEqual(result, counter_value)
self.assertEqual(TableWithInc.incremented, [(row, column, 0)])

def test_counter_dec(self):
klass = self._getTargetClass()
counter_value = 42

class TableWithInc(klass):

incremented = []
value = counter_value

def counter_inc(self, row, column, value=1):
self.incremented.append((row, column, value))
self.value += value
return self.value

name = 'table-name'
connection = None
table = TableWithInc(name, connection)

row = 'row-key'
column = 'fam:col1'
dec_value = 987
self.assertEqual(TableWithInc.incremented, [])
result = table.counter_dec(row, column, value=dec_value)
self.assertEqual(result, counter_value - dec_value)
self.assertEqual(TableWithInc.incremented, [(row, column, -dec_value)])

def _counter_inc_helper(self, row, column, value, commit_result):
import six

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Mock the return values.
table._low_level_table = _MockLowLevelTable()
table._low_level_table.row_values[row] = _MockLowLevelRow(
row, commit_result=commit_result)

result = table.counter_inc(row, column, value=value)

incremented_value = value + _MockLowLevelRow.COUNTER_DEFAULT
self.assertEqual(result, incremented_value)

# Check the row values returned.
row_obj = table._low_level_table.row_values[row]
if isinstance(column, six.binary_type):
column = column.decode('utf-8')
self.assertEqual(row_obj.counts,
{tuple(column.split(':')): incremented_value})

def test_counter_inc(self):
import struct

row = 'row-key'
col_fam = u'fam'
col_qual = u'col1'
column = col_fam + u':' + col_qual
value = 42
packed_value = struct.pack('>q', value)
fake_timestamp = None
commit_result = {
col_fam: {
col_qual: [(packed_value, fake_timestamp)],
}
}
self._counter_inc_helper(row, column, value, commit_result)

def test_counter_inc_column_bytes(self):
import struct

row = 'row-key'
col_fam = b'fam'
col_qual = b'col1'
column = col_fam + b':' + col_qual
value = 42
packed_value = struct.pack('>q', value)
fake_timestamp = None
commit_result = {
col_fam.decode('utf-8'): {
col_qual.decode('utf-8'): [(packed_value, fake_timestamp)],
}
}
self._counter_inc_helper(row, column, value, commit_result)

def test_counter_inc_bad_result(self):
row = 'row-key'
col_fam = 'fam'
col_qual = 'col1'
column = col_fam + ':' + col_qual
value = 42
commit_result = None
with self.assertRaises(TypeError):
self._counter_inc_helper(row, column, value, commit_result)

def test_counter_inc_result_key_error(self):
row = 'row-key'
col_fam = 'fam'
col_qual = 'col1'
column = col_fam + ':' + col_qual
value = 42
commit_result = {}
with self.assertRaises(KeyError):
self._counter_inc_helper(row, column, value, commit_result)

def test_counter_inc_result_nested_key_error(self):
row = 'row-key'
col_fam = 'fam'
col_qual = 'col1'
column = col_fam + ':' + col_qual
value = 42
commit_result = {col_fam: {}}
with self.assertRaises(KeyError):
self._counter_inc_helper(row, column, value, commit_result)

def test_counter_inc_result_non_unique_cell(self):
row = 'row-key'
col_fam = 'fam'
col_qual = 'col1'
column = col_fam + ':' + col_qual
value = 42
fake_timestamp = None
packed_value = None
commit_result = {
col_fam: {
col_qual: [
(packed_value, fake_timestamp),
(packed_value, fake_timestamp),
],
}
}
with self.assertRaises(ValueError):
self._counter_inc_helper(row, column, value, commit_result)


class Test__gc_rule_to_dict(unittest2.TestCase):

Expand Down Expand Up @@ -231,7 +388,29 @@ def __init__(self, *args, **kwargs):
self.kwargs = kwargs
self.list_column_families_calls = 0
self.column_families = {}
self.row_values = {}

def list_column_families(self):
self.list_column_families_calls += 1
return self.column_families

def row(self, row_key):
return self.row_values[row_key]


class _MockLowLevelRow(object):

COUNTER_DEFAULT = 0

def __init__(self, row_key, commit_result=None):
self.row_key = row_key
self.counts = {}
self.commit_result = commit_result

def increment_cell_value(self, column_family_id, column, int_value):
count = self.counts.setdefault((column_family_id, column),
self.COUNTER_DEFAULT)
self.counts[(column_family_id, column)] = count + int_value

def commit_modifications(self):
return self.commit_result