Skip to content

Commit bb49935

Browse files
committed
Implementing HappyBase Table atomic counter helpers.
1 parent 115263b commit bb49935

File tree

2 files changed

+278
-0
lines changed

2 files changed

+278
-0
lines changed

gcloud/bigtable/happybase/table.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
"""Google Cloud Bigtable HappyBase table module."""
1616

1717

18+
import struct
19+
1820
import six
1921

2022
from gcloud._helpers import _total_seconds
@@ -24,6 +26,7 @@
2426
from gcloud.bigtable.table import Table as _LowLevelTable
2527

2628

29+
_UNPACK_I64 = struct.Struct('>q').unpack
2730
_SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule)
2831

2932

@@ -127,6 +130,102 @@ def regions(self):
127130
raise NotImplementedError('The Cloud Bigtable API does not have a '
128131
'concept of splitting a table into regions.')
129132

133+
def counter_get(self, row, column):
134+
"""Retrieve the current value of a counter column.
135+
136+
This method retrieves the current value of a counter column. If the
137+
counter column does not exist, this function initializes it to ``0``.
138+
139+
.. note::
140+
141+
Application code should **never** store a counter value directly;
142+
use the atomic :meth:`counter_inc` and :meth:`counter_dec` methods
143+
for that.
144+
145+
:type row: str
146+
:param row: Row key for the row we are getting a counter from.
147+
148+
:type column: str
149+
:param column: Column we are ``get``-ing from; of the form ``fam:col``.
150+
151+
:rtype: int
152+
:returns: Counter value (after initializing / incrementing by 0).
153+
"""
154+
# Don't query directly, but increment with value=0 so that the counter
155+
# is correctly initialized if didn't exist yet.
156+
return self.counter_inc(row, column, value=0)
157+
158+
def counter_inc(self, row, column, value=1):
159+
"""Atomically increment a counter column.
160+
161+
This method atomically increments a counter column in ``row``.
162+
If the counter column does not exist, it is automatically initialized
163+
to ``0`` before being incremented.
164+
165+
:type row: str
166+
:param row: Row key for the row we are incrementing a counter in.
167+
168+
:type column: str
169+
:param column: Column we are incrementing a value in; of the
170+
form ``fam:col``.
171+
172+
:type value: int
173+
:param value: Amount to increment the counter by. (If negative,
174+
this is equivalent to decrement.)
175+
176+
:rtype: int
177+
:returns: Counter value after incrementing.
178+
"""
179+
row = self._low_level_table.row(row)
180+
if isinstance(column, six.binary_type):
181+
column = column.decode('utf-8')
182+
column_family_id, column_qualifier = column.split(':')
183+
row.increment_cell_value(column_family_id, column_qualifier, value)
184+
# See row.commit_modifications() will return a dictionary:
185+
# {
186+
# u'col-fam-id': {
187+
# b'col-name1': [
188+
# (b'cell-val', datetime.datetime(...)),
189+
# ...
190+
# ],
191+
# ...
192+
# },
193+
# }
194+
modified_cells = row.commit_modifications()
195+
# Get the cells in the modified column,
196+
column_cells = modified_cells[column_family_id][column_qualifier]
197+
# Make sure there is exactly one cell in the column.
198+
if len(column_cells) != 1:
199+
raise ValueError('Expected server to return one modified cell.')
200+
column_cell = column_cells[0]
201+
# Get the bytes value from the column and convert it to an integer.
202+
bytes_value = column_cell[0]
203+
int_value, = _UNPACK_I64(bytes_value)
204+
return int_value
205+
206+
def counter_dec(self, row, column, value=1):
207+
"""Atomically decrement a counter column.
208+
209+
This method atomically decrements a counter column in ``row``.
210+
If the counter column does not exist, it is automatically initialized
211+
to ``0`` before being decremented.
212+
213+
:type row: str
214+
:param row: Row key for the row we are decrementing a counter in.
215+
216+
:type column: str
217+
:param column: Column we are decrementing a value in; of the
218+
form ``fam:col``.
219+
220+
:type value: int
221+
:param value: Amount to decrement the counter by. (If negative,
222+
this is equivalent to increment.)
223+
224+
:rtype: int
225+
:returns: Counter value after decrementing.
226+
"""
227+
return self.counter_inc(row, column, -value)
228+
130229

131230
def _gc_rule_to_dict(gc_rule):
132231
"""Converts garbage collection rule to dictionary if possible.

gcloud/bigtable/happybase/test_table.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,163 @@ def test_regions(self):
121121
with self.assertRaises(NotImplementedError):
122122
table.regions()
123123

124+
def test_counter_get(self):
125+
klass = self._getTargetClass()
126+
counter_value = 1337
127+
128+
class TableWithInc(klass):
129+
130+
incremented = []
131+
value = counter_value
132+
133+
def counter_inc(self, row, column, value=1):
134+
self.incremented.append((row, column, value))
135+
self.value += value
136+
return self.value
137+
138+
name = 'table-name'
139+
connection = None
140+
table = TableWithInc(name, connection)
141+
142+
row = 'row-key'
143+
column = 'fam:col1'
144+
self.assertEqual(TableWithInc.incremented, [])
145+
result = table.counter_get(row, column)
146+
self.assertEqual(result, counter_value)
147+
self.assertEqual(TableWithInc.incremented, [(row, column, 0)])
148+
149+
def test_counter_dec(self):
150+
klass = self._getTargetClass()
151+
counter_value = 42
152+
153+
class TableWithInc(klass):
154+
155+
incremented = []
156+
value = counter_value
157+
158+
def counter_inc(self, row, column, value=1):
159+
self.incremented.append((row, column, value))
160+
self.value += value
161+
return self.value
162+
163+
name = 'table-name'
164+
connection = None
165+
table = TableWithInc(name, connection)
166+
167+
row = 'row-key'
168+
column = 'fam:col1'
169+
dec_value = 987
170+
self.assertEqual(TableWithInc.incremented, [])
171+
result = table.counter_dec(row, column, value=dec_value)
172+
self.assertEqual(result, counter_value - dec_value)
173+
self.assertEqual(TableWithInc.incremented, [(row, column, -dec_value)])
174+
175+
def _counter_inc_helper(self, row, column, value, commit_result):
176+
import six
177+
178+
name = 'table-name'
179+
connection = None
180+
table = self._makeOne(name, connection)
181+
# Mock the return values.
182+
table._low_level_table = _MockLowLevelTable()
183+
table._low_level_table.row_values[row] = _MockLowLevelRow(
184+
row, commit_result=commit_result)
185+
186+
result = table.counter_inc(row, column, value=value)
187+
188+
incremented_value = value + _MockLowLevelRow.COUNTER_DEFAULT
189+
self.assertEqual(result, incremented_value)
190+
191+
# Check the row values returned.
192+
row_obj = table._low_level_table.row_values[row]
193+
if isinstance(column, six.binary_type):
194+
column = column.decode('utf-8')
195+
self.assertEqual(row_obj.counts,
196+
{tuple(column.split(':')): incremented_value})
197+
198+
def test_counter_inc(self):
199+
import struct
200+
201+
row = 'row-key'
202+
col_fam = u'fam'
203+
col_qual = u'col1'
204+
column = col_fam + u':' + col_qual
205+
value = 42
206+
packed_value = struct.pack('>q', value)
207+
fake_timestamp = None
208+
commit_result = {
209+
col_fam: {
210+
col_qual: [(packed_value, fake_timestamp)],
211+
}
212+
}
213+
self._counter_inc_helper(row, column, value, commit_result)
214+
215+
def test_counter_inc_column_bytes(self):
216+
import struct
217+
218+
row = 'row-key'
219+
col_fam = b'fam'
220+
col_qual = b'col1'
221+
column = col_fam + b':' + col_qual
222+
value = 42
223+
packed_value = struct.pack('>q', value)
224+
fake_timestamp = None
225+
commit_result = {
226+
col_fam.decode('utf-8'): {
227+
col_qual.decode('utf-8'): [(packed_value, fake_timestamp)],
228+
}
229+
}
230+
self._counter_inc_helper(row, column, value, commit_result)
231+
232+
def test_counter_inc_bad_result(self):
233+
row = 'row-key'
234+
col_fam = 'fam'
235+
col_qual = 'col1'
236+
column = col_fam + ':' + col_qual
237+
value = 42
238+
commit_result = None
239+
with self.assertRaises(TypeError):
240+
self._counter_inc_helper(row, column, value, commit_result)
241+
242+
def test_counter_inc_result_key_error(self):
243+
row = 'row-key'
244+
col_fam = 'fam'
245+
col_qual = 'col1'
246+
column = col_fam + ':' + col_qual
247+
value = 42
248+
commit_result = {}
249+
with self.assertRaises(KeyError):
250+
self._counter_inc_helper(row, column, value, commit_result)
251+
252+
def test_counter_inc_result_nested_key_error(self):
253+
row = 'row-key'
254+
col_fam = 'fam'
255+
col_qual = 'col1'
256+
column = col_fam + ':' + col_qual
257+
value = 42
258+
commit_result = {col_fam: {}}
259+
with self.assertRaises(KeyError):
260+
self._counter_inc_helper(row, column, value, commit_result)
261+
262+
def test_counter_inc_result_non_unique_cell(self):
263+
row = 'row-key'
264+
col_fam = 'fam'
265+
col_qual = 'col1'
266+
column = col_fam + ':' + col_qual
267+
value = 42
268+
fake_timestamp = None
269+
packed_value = None
270+
commit_result = {
271+
col_fam: {
272+
col_qual: [
273+
(packed_value, fake_timestamp),
274+
(packed_value, fake_timestamp),
275+
],
276+
}
277+
}
278+
with self.assertRaises(ValueError):
279+
self._counter_inc_helper(row, column, value, commit_result)
280+
124281

125282
class Test__gc_rule_to_dict(unittest2.TestCase):
126283

@@ -231,7 +388,29 @@ def __init__(self, *args, **kwargs):
231388
self.kwargs = kwargs
232389
self.list_column_families_calls = 0
233390
self.column_families = {}
391+
self.row_values = {}
234392

235393
def list_column_families(self):
236394
self.list_column_families_calls += 1
237395
return self.column_families
396+
397+
def row(self, row_key):
398+
return self.row_values[row_key]
399+
400+
401+
class _MockLowLevelRow(object):
402+
403+
COUNTER_DEFAULT = 0
404+
405+
def __init__(self, row_key, commit_result=None):
406+
self.row_key = row_key
407+
self.counts = {}
408+
self.commit_result = commit_result
409+
410+
def increment_cell_value(self, column_family_id, column, int_value):
411+
count = self.counts.setdefault((column_family_id, column),
412+
self.COUNTER_DEFAULT)
413+
self.counts[(column_family_id, column)] = count + int_value
414+
415+
def commit_modifications(self):
416+
return self.commit_result

0 commit comments

Comments
 (0)