From 8a49056044ac0cae5c1188d52fe6984944baf27e Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 18 Feb 2016 12:23:23 -0800 Subject: [PATCH] Implementing Bigtable PartialRowData.update_from_read_rows(). --- gcloud/bigtable/row_data.py | 38 +++++++++++++++ gcloud/bigtable/test_row_data.py | 83 ++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/gcloud/bigtable/row_data.py b/gcloud/bigtable/row_data.py index 5a635f1cf7af..66044244442d 100644 --- a/gcloud/bigtable/row_data.py +++ b/gcloud/bigtable/row_data.py @@ -208,6 +208,44 @@ def _handle_row_contents(self, chunk): column_cells = column_family_dict.setdefault(column_name, []) column_cells.extend(cells) + def update_from_read_rows(self, read_rows_response_pb): + """Updates the current row from a ``ReadRows`` response. + + :type read_rows_response_pb: + :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` + :param read_rows_response_pb: A response streamed back as part of a + ``ReadRows`` request. + + :raises: :class:`ValueError ` if the current + partial row has already been committed, if the row key on the + response doesn't match the current one or if there is a chunk + encountered with an unexpected ``ONEOF`` protobuf property. + """ + if self._committed: + raise ValueError('The row has been committed') + + if read_rows_response_pb.row_key != self.row_key: + raise ValueError('Response row key (%r) does not match current ' + 'one (%r).' % (read_rows_response_pb.row_key, + self.row_key)) + + last_chunk_index = len(read_rows_response_pb.chunks) - 1 + for index, chunk in enumerate(read_rows_response_pb.chunks): + chunk_property = chunk.WhichOneof('chunk') + if chunk_property == 'row_contents': + self._handle_row_contents(chunk) + elif chunk_property == 'reset_row': + self._handle_reset_row(chunk) + elif chunk_property == 'commit_row': + self._handle_commit_row(chunk, index, last_chunk_index) + else: + # NOTE: This includes chunk_property == None since we always + # want a value to be set + raise ValueError('Unexpected chunk property: %s' % ( + chunk_property,)) + + self._chunks_encountered = True + class PartialRowsData(object): """Convenience wrapper for consuming a ``ReadRows`` streaming response. diff --git a/gcloud/bigtable/test_row_data.py b/gcloud/bigtable/test_row_data.py index 6b3d21739598..62698d19d5d0 100644 --- a/gcloud/bigtable/test_row_data.py +++ b/gcloud/bigtable/test_row_data.py @@ -296,6 +296,89 @@ def test__handle_row_contents(self): } self.assertEqual(partial_row_data.cells, expected_cells) + def test_update_from_read_rows(self): + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + row_key = b'row-key' + partial_row_data = self._makeOne(row_key) + + # Set-up chunk1, some data that will be reset by chunk2. + ignored_family_name = u'ignore-name' + row_contents = data_pb2.Family(name=ignored_family_name) + chunk1 = messages_pb2.ReadRowsResponse.Chunk(row_contents=row_contents) + + # Set-up chunk2, a reset row. + chunk2 = messages_pb2.ReadRowsResponse.Chunk(reset_row=True) + + # Set-up chunk3, a column family with no columns. + family_name = u'name' + row_contents = data_pb2.Family(name=family_name) + chunk3 = messages_pb2.ReadRowsResponse.Chunk(row_contents=row_contents) + + # Set-up chunk4, a commit row. + chunk4 = messages_pb2.ReadRowsResponse.Chunk(commit_row=True) + + # Prepare request and make sure PartialRowData is empty before. + read_rows_response_pb = messages_pb2.ReadRowsResponse( + row_key=row_key, chunks=[chunk1, chunk2, chunk3, chunk4]) + self.assertEqual(partial_row_data.cells, {}) + self.assertFalse(partial_row_data.committed) + self.assertFalse(partial_row_data._chunks_encountered) + + # Parse the response and make sure the cells took place. + partial_row_data.update_from_read_rows(read_rows_response_pb) + self.assertEqual(partial_row_data.cells, {family_name: {}}) + self.assertFalse(ignored_family_name in partial_row_data.cells) + self.assertTrue(partial_row_data.committed) + self.assertTrue(partial_row_data._chunks_encountered) + + def test_update_from_read_rows_while_committed(self): + partial_row_data = self._makeOne(None) + partial_row_data._committed = True + self.assertFalse(partial_row_data._chunks_encountered) + + with self.assertRaises(ValueError): + partial_row_data.update_from_read_rows(None) + + self.assertFalse(partial_row_data._chunks_encountered) + + def test_update_from_read_rows_row_key_disagree(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + row_key1 = b'row-key1' + row_key2 = b'row-key2' + partial_row_data = self._makeOne(row_key1) + self.assertFalse(partial_row_data._chunks_encountered) + + self.assertNotEqual(row_key1, row_key2) + read_rows_response_pb = messages_pb2.ReadRowsResponse(row_key=row_key2) + with self.assertRaises(ValueError): + partial_row_data.update_from_read_rows(read_rows_response_pb) + + self.assertFalse(partial_row_data._chunks_encountered) + + def test_update_from_read_rows_empty_chunk(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + row_key = b'row-key' + partial_row_data = self._makeOne(row_key) + self.assertFalse(partial_row_data._chunks_encountered) + + chunk = messages_pb2.ReadRowsResponse.Chunk() + read_rows_response_pb = messages_pb2.ReadRowsResponse( + row_key=row_key, chunks=[chunk]) + + # This makes it an "empty" chunk. + self.assertEqual(chunk.WhichOneof('chunk'), None) + with self.assertRaises(ValueError): + partial_row_data.update_from_read_rows(read_rows_response_pb) + + self.assertFalse(partial_row_data._chunks_encountered) + class TestPartialRowsData(unittest2.TestCase):