Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions gcloud/bigtable/happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import datetime
import warnings

import six

from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable.row import TimestampRange

Expand All @@ -33,6 +35,14 @@
class Batch(object):
"""Batch class for accumulating mutations.

.. note::

When using a batch with ``transaction=False`` as a context manager
(i.e. in a ``with`` statement), mutations will still be sent as
row mutations even if the context manager exits with an error.
This behavior is in place to match the behavior in the HappyBase
HBase / Thrift implementation.

:type table: :class:`Table <gcloud.bigtable.happybase.table.Table>`
:param table: The table where mutations will be applied.

Expand Down Expand Up @@ -105,6 +115,62 @@ def send(self):
self._row_map.clear()
self._mutation_count = 0

def _try_send(self):
"""Send / commit the batch if mutations have exceeded batch size."""
if self._batch_size and self._mutation_count >= self._batch_size:
self.send()

def _get_row(self, row_key):
"""Gets a row that will hold mutations.

If the row is not already cached on the current batch, a new row will
be created.

:type row_key: str
:param row_key: The row key for a row stored in the map.

:rtype: :class:`Row <gcloud.bigtable.row.Row>`
:returns: The newly created or stored row that will hold mutations.
"""
if row_key not in self._row_map:
table = self._table._low_level_table
self._row_map[row_key] = table.row(row_key)

return self._row_map[row_key]

def put(self, row, data, wal=_WAL_SENTINEL):
"""Insert data into a row in the table owned by this batch.

:type row: str
:param row: The row key where the mutation will be "put".

:type data: dict
:param data: Dictionary containing the data to be inserted. The keys
are columns names (of the form ``fam:col``) and the values
are strings (bytes) to be stored in those columns.

:type wal: object
:param wal: Unused parameter (to over-ride the default on the
instance). Provided for compatibility with HappyBase, but
irrelevant for Cloud Bigtable since it does not have a
Write Ahead Log.
"""
if wal is not _WAL_SENTINEL:
_WARN(_WAL_WARNING)

row_object = self._get_row(row)
# Make sure all the keys are valid before beginning
# to add mutations.
column_pairs = _get_column_pairs(six.iterkeys(data),
require_qualifier=True)
for column_family_id, column_qualifier in column_pairs:
value = data[column_family_id + ':' + column_qualifier]
row_object.set_cell(column_family_id, column_qualifier,
value, timestamp=self._timestamp)

self._mutation_count += len(data)
self._try_send()

def __enter__(self):
"""Enter context manager, no set-up required."""
return self
Expand Down Expand Up @@ -133,3 +199,54 @@ def __exit__(self, exc_type, exc_value, traceback):
# NOTE: For non-transactional batches, this will even commit mutations

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

# if an error occurred during the context manager.
self.send()


def _get_column_pairs(columns, require_qualifier=False):
"""Turns a list of column or column families into parsed pairs.

Turns a column family (``fam`` or ``fam:``) into a pair such
as ``['fam', None]`` and turns a column (``fam:col``) into
``['fam', 'col']``.

:type columns: list
:param columns: Iterable containing column names (as
strings). Each column name can be either

* an entire column family: ``fam`` or ``fam:``
* an single column: ``fam:col``

:type require_qualifier: bool
:param require_qualifier: Boolean indicating if the columns should
all have a qualifier or not.

:rtype: list
:returns: List of pairs, where the first element in each pair is the
column family and the second is the column qualifier
(or :data:`None`).
:raises: :class:`ValueError <exceptions.ValueError>` if any of the columns
are not of the expected format.
:class:`ValueError <exceptions.ValueError>` if
``require_qualifier`` is :data:`True` and one of the values is
for an entire column family
"""
column_pairs = []
for column in columns:
if isinstance(column, six.binary_type):
column = column.decode('utf-8')
# Remove trailing colons (i.e. for standalone column family).
if column.endswith(u':'):
column = column[:-1]
num_colons = column.count(u':')
if num_colons == 0:
# column is a column family.
if require_qualifier:
raise ValueError('column does not contain a qualifier',
column)
else:
column_pairs.append([column, None])
elif num_colons == 1:
column_pairs.append(column.split(u':'))
else:
raise ValueError('Column contains the : separator more than once')

return column_pairs
228 changes: 228 additions & 0 deletions gcloud/bigtable/happybase/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,172 @@ def test_send(self):
self.assertEqual(batch._mutation_count, 0)
self.assertEqual(row_map, {})

def test__try_send_no_batch_size(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch = BatchWithSend(table)

self.assertEqual(batch._batch_size, None)
self.assertFalse(batch._send_called)
batch._try_send()
self.assertFalse(batch._send_called)

def test__try_send_too_few_mutations(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch_size = 10
batch = BatchWithSend(table, batch_size=batch_size)

self.assertEqual(batch._batch_size, batch_size)
self.assertFalse(batch._send_called)
mutation_count = 2
batch._mutation_count = mutation_count
self.assertTrue(mutation_count < batch_size)
batch._try_send()
self.assertFalse(batch._send_called)

def test__try_send_actual_send(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch_size = 10
batch = BatchWithSend(table, batch_size=batch_size)

self.assertEqual(batch._batch_size, batch_size)
self.assertFalse(batch._send_called)
mutation_count = 12
batch._mutation_count = mutation_count
self.assertTrue(mutation_count > batch_size)
batch._try_send()
self.assertTrue(batch._send_called)

def test__get_row_exists(self):
table = object()
batch = self._makeOne(table)

row_key = 'row-key'
row_obj = object()
batch._row_map[row_key] = row_obj
result = batch._get_row(row_key)
self.assertEqual(result, row_obj)

def test__get_row_create_new(self):
# Make mock batch and make sure we can create a low-level table.
low_level_table = _MockLowLevelTable()
table = _MockTable(low_level_table)
batch = self._makeOne(table)

# Make sure row map is empty.
self.assertEqual(batch._row_map, {})

# Customize/capture mock table creation.
low_level_table.mock_row = mock_row = object()

# Actually get the row (which creates a row via a low-level table).
row_key = 'row-key'
result = batch._get_row(row_key)
self.assertEqual(result, mock_row)

# Check all the things that were constructed.
self.assertEqual(low_level_table.rows_made, [row_key])
# Check how the batch was updated.
self.assertEqual(batch._row_map, {row_key: mock_row})

def test_put_bad_wal(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import batch as MUT

warned = []

def mock_warn(message):
warned.append(message)
# Raise an exception so we don't
raise RuntimeError('No need to execute the rest.')

table = object()
batch = self._makeOne(table)

row = 'row-key'
data = {}
wal = None

self.assertNotEqual(wal, MUT._WAL_SENTINEL)
with _Monkey(MUT, _WARN=mock_warn):
with self.assertRaises(RuntimeError):
batch.put(row, data, wal=wal)

self.assertEqual(warned, [MUT._WAL_WARNING])

def test_put(self):
import operator

table = object()
batch = self._makeOne(table)
batch._timestamp = timestamp = object()
row_key = 'row-key'
batch._row_map[row_key] = row = _MockRow()

col1_fam = 'cf1'
col1_qual = 'qual1'
value1 = 'value1'
col2_fam = 'cf2'
col2_qual = 'qual2'
value2 = 'value2'
data = {col1_fam + ':' + col1_qual: value1,
col2_fam + ':' + col2_qual: value2}

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(row.set_cell_calls, [])
batch.put(row_key, data)
self.assertEqual(batch._mutation_count, 2)
# Since the calls depend on data.keys(), the order
# is non-deterministic.
first_elt = operator.itemgetter(0)
ordered_calls = sorted(row.set_cell_calls, key=first_elt)

cell1_args = (col1_fam, col1_qual, value1)
cell1_kwargs = {'timestamp': timestamp}
cell2_args = (col2_fam, col2_qual, value2)
cell2_kwargs = {'timestamp': timestamp}
self.assertEqual(ordered_calls, [
(cell1_args, cell1_kwargs),
(cell2_args, cell2_kwargs),
])

def test_put_call_try_send(self):
klass = self._getTargetClass()

class CallTrySend(klass):

try_send_calls = 0

def _try_send(self):
self.try_send_calls += 1

table = object()
batch = CallTrySend(table)

row_key = 'row-key'
batch._row_map[row_key] = _MockRow()

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 0)
# No data so that nothing happens
batch.put(row_key, data={})
self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 1)

def test_context_manager(self):
klass = self._getTargetClass()

Expand Down Expand Up @@ -174,6 +340,45 @@ class BatchWithSend(_SendMixin, klass):
self.assertTrue(batch._send_called)


class Test__get_column_pairs(unittest2.TestCase):

def _callFUT(self, *args, **kwargs):
from gcloud.bigtable.happybase.batch import _get_column_pairs
return _get_column_pairs(*args, **kwargs)

def test_it(self):
columns = [b'cf1', u'cf2:', 'cf3::', 'cf3:name1', 'cf3:name2']
result = self._callFUT(columns)
expected_result = [
['cf1', None],
['cf2', None],
['cf3', ''],
['cf3', 'name1'],
['cf3', 'name2'],
]
self.assertEqual(result, expected_result)

def test_bad_column(self):
columns = ['a:b:c']
with self.assertRaises(ValueError):
self._callFUT(columns)

def test_bad_column_type(self):
columns = [None]
with self.assertRaises(AttributeError):
self._callFUT(columns)

def test_bad_columns_var(self):
columns = None
with self.assertRaises(TypeError):
self._callFUT(columns)

def test_column_family_with_require_qualifier(self):
columns = ['a:']
with self.assertRaises(ValueError):
self._callFUT(columns, require_qualifier=True)


class _MockRowMap(dict):

clear_count = 0
Expand All @@ -187,6 +392,29 @@ class _MockRow(object):

def __init__(self):
self.commits = 0
self.set_cell_calls = []

def commit(self):
self.commits += 1

def set_cell(self, *args, **kwargs):
self.set_cell_calls.append((args, kwargs))


class _MockTable(object):

def __init__(self, low_level_table):
self._low_level_table = low_level_table


class _MockLowLevelTable(object):

def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.rows_made = []
self.mock_row = None

def row(self, row_key):
self.rows_made.append(row_key)
return self.mock_row