From 2d26f4f4652e60a1008f6528c915cbbf68c19339 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 26 Apr 2017 13:31:37 -0400 Subject: [PATCH 01/12] implement retries for read_rows --- bigtable/google/cloud/bigtable/retry.py | 178 +++++++++++++++++++++ bigtable/google/cloud/bigtable/row_data.py | 3 + bigtable/google/cloud/bigtable/table.py | 97 ++++------- bigtable/tests/retry_test_script.txt | 38 +++++ bigtable/tests/system.py | 70 ++++++++ bigtable/tests/unit/test_table.py | 14 +- 6 files changed, 325 insertions(+), 75 deletions(-) create mode 100644 bigtable/google/cloud/bigtable/retry.py create mode 100644 bigtable/tests/retry_test_script.txt diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py new file mode 100644 index 000000000000..845ac130c83e --- /dev/null +++ b/bigtable/google/cloud/bigtable/retry.py @@ -0,0 +1,178 @@ +"""Provides function wrappers that implement retrying.""" +import random +import time +import six + +from google.cloud._helpers import _to_bytes +from google.cloud.bigtable._generated import ( + bigtable_pb2 as data_messages_v2_pb2) +from google.gax import config, errors +from grpc import RpcError + +_MILLIS_PER_SECOND = 1000 + +def _has_timeout_settings(backoff_settings): + return (backoff_settings.rpc_timeout_multiplier is not None and + backoff_settings.max_rpc_timeout_millis is not None and + backoff_settings.total_timeout_millis is not None and + backoff_settings.initial_rpc_timeout_millis is not None) + +class ReadRowsIterator(): + """Creates an iterator equivalent to a_iter, but that retries on certain + exceptions. + """ + + def __init__(self, client, name, start_key, end_key, filter_, limit, + retry_options, **kwargs): + self.client = client + self.retry_options = retry_options + self.name = name + self.start_key = start_key + self.start_key_closed = True + self.end_key = end_key + self.filter_ = filter_ + self.limit = limit + + self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier + self.max_delay_millis = retry_options.backoff_settings.max_retry_delay_millis + self.has_timeout_settings = _has_timeout_settings(retry_options.backoff_settings) + + if self.has_timeout_settings: + self.timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier + self.max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / _MILLIS_PER_SECOND) + self.total_timeout = (retry_options.backoff_settings.total_timeout_millis / _MILLIS_PER_SECOND) + self.set_stream() + + def set_start_key(self, start_key): + """ + Sets the row key at which this iterator will begin reading. + """ + self.start_key = start_key + self.start_key_closed = False + + def set_stream(self): + """ + Resets the read stream by making an RPC on the 'ReadRows' endpoint. + """ + request_pb = _create_row_request( + self.name, start_key=self.start_key, + start_key_closed=self.start_key_closed, end_key=self.end_key, + filter_= self.filter_, limit=self.limit) + self.stream = self.client._data_stub.ReadRows(request_pb) + + def next(self, *args, **kwargs): + """ + Read and return the next row from the stream. Retry on idempotent failure. + """ + delay = self.retry_options.backoff_settings.initial_retry_delay_millis + exc = errors.RetryError('Retry total timeout exceeded before any' + 'response was received') + if self.has_timeout_settings: + timeout = ( + self.retry_options.backoff_settings.initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + + now = time.time() + deadline = now + self.total_timeout + else: + timeout = None + deadline = None + + while deadline is None or now < deadline: + try: + return six.next(self.stream) + except StopIteration as stop: + raise stop + except RpcError as error: # pylint: disable=broad-except + code = config.exc_to_code(error) + if code not in self.retry_options.retry_codes: + six.reraise(errors.RetryError, + errors.RetryError(str(error))) + + # pylint: disable=redefined-variable-type + exc = errors.RetryError( + 'Retry total timeout exceeded with exception', error) + + # Sleep a random number which will, on average, equal the + # expected delay. + to_sleep = random.uniform(0, delay * 2) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay = min(delay * self.delay_mult, self.max_delay_millis) + + if self.has_timeout_settings: + now = time.time() + timeout = min( + timeout * self.timeout_mult, self.max_timeout, deadline - now) + self.set_stream() + + six.reraise(errors.RetryError, exc) + + def __next__(self, *args, **kwargs): + return self.next(*args, **kwargs) + + def __iter__(self): + return self + +def _create_row_request(table_name, row_key=None, start_key=None, + start_key_closed=True, end_key=None, filter_=None, + limit=None): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. + + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + range_kwargs = {} + if start_key is not None or end_key is not None: + if start_key is not None: + if start_key_closed: + range_kwargs['start_key_closed'] = _to_bytes(start_key) + else: + range_kwargs['start_key_open'] = _to_bytes(start_key) + if end_key is not None: + range_kwargs['end_key_open'] = _to_bytes(end_key) + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if limit is not None: + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) + + return message \ No newline at end of file diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 60fc1f0ef1e8..d3c70d431e29 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,6 +274,9 @@ def consume_next(self): self._validate_chunk(chunk) + if ("ReadRowsIterator" in self._response_iterator.__class__.__name__): + self._response_iterator.set_start_key(chunk.row_key) + if chunk.reset_row: row = self._row = None cell = self._cell = self._previous_cell = None diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 3fbd198d6b65..8177f1e1e1d9 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -14,6 +14,7 @@ """User friendly container for Google Cloud Bigtable Table.""" +from __future__ import absolute_import from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) @@ -27,7 +28,31 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData - +from google.gax import RetryOptions, BackoffSettings +from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request +from grpc import StatusCode + +import six + +BACKOFF_SETTINGS = BackoffSettings( + initial_retry_delay_millis = 10, + retry_delay_multiplier = 1.3, + max_retry_delay_millis = 30000, + initial_rpc_timeout_millis = 25 * 60 * 1000, + rpc_timeout_multiplier = 1.0, + max_rpc_timeout_millis = 25 * 60 * 1000, + total_timeout_millis = 30 * 60 * 1000 +) + +RETRY_OPTIONS = RetryOptions( + retry_codes = [ + StatusCode.DEADLINE_EXCEEDED, + StatusCode.ABORTED, + StatusCode.INTERNAL, + StatusCode.UNAVAILABLE + ], + backoff_settings = BACKOFF_SETTINGS +) class Table(object): """Representation of a Google Cloud Bigtable Table. @@ -268,13 +293,10 @@ def read_rows(self, start_key=None, end_key=None, limit=None, :returns: A :class:`.PartialRowsData` convenience wrapper for consuming the streamed results. """ - request_pb = _create_row_request( - self.name, start_key=start_key, end_key=end_key, filter_=filter_, - limit=limit) client = self._instance._client - response_iterator = client._data_stub.ReadRows(request_pb) - # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` - return PartialRowsData(response_iterator) + retrying_iterator = ReadRowsIterator(client, self.name, start_key, + end_key, filter_, limit, RETRY_OPTIONS) + return PartialRowsData(retrying_iterator) def sample_row_keys(self): """Read a sample of row keys in the table. @@ -312,64 +334,3 @@ def sample_row_keys(self): client = self._instance._client response_iterator = client._data_stub.SampleRowKeys(request_pb) return response_iterator - - -def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - if end_key is not None: - range_kwargs['end_key_open'] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message diff --git a/bigtable/tests/retry_test_script.txt b/bigtable/tests/retry_test_script.txt new file mode 100644 index 000000000000..863662e897ba --- /dev/null +++ b/bigtable/tests/retry_test_script.txt @@ -0,0 +1,38 @@ +# This retry script is processed by the retry server and the client under test. +# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC +# to the retry server and expect a valid response. +# "EXPECT" commands indicate the call the server is expecting the client to send. +# +# The retry server has one table named "table" that should be used for testing. +# There are three types of commands supported: +# READ +# Expect the corresponding rows to be returned with arbitrary values. +# SCAN ... +# Ranges are expressed as an interval with either open or closed start and end, +# such as [1,3) for "1,2" or (1, 3] for "2,3". +# WRITE +# All writes should succeed eventually. Value payload is ignored. +# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test. +# All other server output should be ignored. + +# Echo same scan back after immediate error +CLIENT: SCAN [r1,r3) r1,r2 +EXPECT: SCAN [r1,r3) +SERVER: ERROR Unavailable +EXPECT: SCAN [r1,r3) +SERVER: READ_RESPONSE r1,r2 + +# Retry scans with open interval starting at the least read row key. +# Instead of using open intervals for retry ranges, '\x00' can be +# appended to the last received row key and sent in a closed interval. +CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8 +EXPECT: SCAN [r1,r9) +SERVER: READ_RESPONSE r1,r2,r3,r4 +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: READ_RESPONSE r5,r6,r7 +SERVER: ERROR Unavailable +EXPECT: SCAN (r7,r9) +SERVER: READ_RESPONSE r8 diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index faed85fdb302..3772aca721a1 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -295,6 +295,76 @@ def test_delete_column_family(self): # Make sure we have successfully deleted it. self.assertEqual(temp_table.list_column_families(), {}) + def test_retry(self): + import subprocess, os, stat, platform + from google.cloud.bigtable.client import Client + from google.cloud.bigtable.instance import Instance + from google.cloud.bigtable.table import Table + + # import for urlopen based on version + try: + # python 3 + from urllib.request import urlopen + except ImportError: + # python 2 + from urllib2 import urlopen + + + TEST_SCRIPT = 'tests/retry_test_script.txt' + SERVER_NAME = 'retry_server' + SERVER_ZIP = SERVER_NAME + ".tar.gz" + + def process_scan(table, range, ids): + range_chunks = range.split(",") + range_open = range_chunks[0].lstrip("[") + range_close = range_chunks[1].rstrip(")") + rows = table.read_rows(range_open, range_close) + rows.consume_all() + + # Download server + MOCK_SERVER_URLS = { + 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', + 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', + } + + test_platform = platform.system() + if (test_platform not in MOCK_SERVER_URLS): + self.fail("Retry server not available for platform " + test_platform) + + mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform]).read() + mock_server_file = open(SERVER_ZIP, 'wb') + mock_server_file.write(mock_server_download) + + # Unzip server + subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) + + # Connect to server + server = subprocess.Popen( + ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + ) + + (endpoint, port) = server.stdout.readline().rstrip("\n").split(":") + os.environ["BIGTABLE_EMULATOR_HOST"] = endpoint + ":" + port + client = Client(project="client", admin=True) + instance = Instance("instance", client) + table = instance.table("table") + + # Run test, line by line + script = open(TEST_SCRIPT, 'r') + for line in script.readlines(): + if line.startswith("CLIENT:"): + chunks = line.split(" ") + op = chunks[1] + if (op != "SCAN"): + self.fail("Script contained " + op + " operation. Only \'SCAN\' is supported.") + else: + process_scan(table, chunks[2], chunks[3]) + + # Clean up + server.kill() + os.remove(SERVER_ZIP) + os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 63844f5d48b7..51adcef6ed17 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -352,7 +352,8 @@ def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import table as MUT + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -372,20 +373,18 @@ def mock_create_row_request(table_name, **kwargs): # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) - # Create expected_result. - expected_result = PartialRowsData(response_iterator) - - # Perform the method and check the result. start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - self.assertEqual(result, expected_result) + self.assertIsInstance(result._response_iterator, ReadRowsIterator) + self.assertEqual(result._response_iterator.client, client) self.assertEqual(stub.method_calls, [( 'ReadRows', (request_pb,), @@ -396,6 +395,7 @@ def mock_create_row_request(table_name, **kwargs): 'end_key': end_key, 'filter_': filter_obj, 'limit': limit, + 'start_key_closed': True, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) From d25cafa21a379d9f5aef48b278c587a3e1404577 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Thu, 27 Apr 2017 10:50:10 -0400 Subject: [PATCH 02/12] fix linter errors --- bigtable/google/cloud/bigtable/retry.py | 51 ++++++++++++++-------- bigtable/google/cloud/bigtable/row_data.py | 3 +- bigtable/google/cloud/bigtable/table.py | 27 ++++++------ 3 files changed, 47 insertions(+), 34 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 845ac130c83e..98c876797b08 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -9,21 +9,24 @@ from google.gax import config, errors from grpc import RpcError + _MILLIS_PER_SECOND = 1000 + def _has_timeout_settings(backoff_settings): return (backoff_settings.rpc_timeout_multiplier is not None and backoff_settings.max_rpc_timeout_millis is not None and backoff_settings.total_timeout_millis is not None and backoff_settings.initial_rpc_timeout_millis is not None) + class ReadRowsIterator(): """Creates an iterator equivalent to a_iter, but that retries on certain exceptions. """ def __init__(self, client, name, start_key, end_key, filter_, limit, - retry_options, **kwargs): + retry_options, **kwargs): self.client = client self.retry_options = retry_options self.name = name @@ -34,13 +37,20 @@ def __init__(self, client, name, start_key, end_key, filter_, limit, self.limit = limit self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier - self.max_delay_millis = retry_options.backoff_settings.max_retry_delay_millis - self.has_timeout_settings = _has_timeout_settings(retry_options.backoff_settings) + self.max_delay_millis = \ + retry_options.backoff_settings.max_retry_delay_millis + self.has_timeout_settings = \ + _has_timeout_settings(retry_options.backoff_settings) if self.has_timeout_settings: - self.timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier - self.max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / _MILLIS_PER_SECOND) - self.total_timeout = (retry_options.backoff_settings.total_timeout_millis / _MILLIS_PER_SECOND) + self.timeout_mult = \ + retry_options.backoff_settings.rpc_timeout_multiplier + self.max_timeout = \ + (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + self.total_timeout = \ + (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) self.set_stream() def set_start_key(self, start_key): @@ -54,23 +64,24 @@ def set_stream(self): """ Resets the read stream by making an RPC on the 'ReadRows' endpoint. """ - request_pb = _create_row_request( - self.name, start_key=self.start_key, - start_key_closed=self.start_key_closed, end_key=self.end_key, - filter_= self.filter_, limit=self.limit) - self.stream = self.client._data_stub.ReadRows(request_pb) + req_pb = _create_row_request(self.name, start_key=self.start_key, + start_key_closed=self.start_key_closed, + end_key=self.end_key, + filter_=self.filter_, limit=self.limit) + self.stream = self.client._data_stub.ReadRows(req_pb) def next(self, *args, **kwargs): """ - Read and return the next row from the stream. Retry on idempotent failure. + Read and return the next row from the stream. + Retry on idempotent failure. """ delay = self.retry_options.backoff_settings.initial_retry_delay_millis exc = errors.RetryError('Retry total timeout exceeded before any' 'response was received') if self.has_timeout_settings: - timeout = ( - self.retry_options.backoff_settings.initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) + timeout = (self.retry_options.backoff_settings + .initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) now = time.time() deadline = now + self.total_timeout @@ -86,8 +97,8 @@ def next(self, *args, **kwargs): except RpcError as error: # pylint: disable=broad-except code = config.exc_to_code(error) if code not in self.retry_options.retry_codes: - six.reraise(errors.RetryError, - errors.RetryError(str(error))) + six.reraise(errors.RetryError, + errors.RetryError(str(error))) # pylint: disable=redefined-variable-type exc = errors.RetryError( @@ -102,7 +113,8 @@ def next(self, *args, **kwargs): if self.has_timeout_settings: now = time.time() timeout = min( - timeout * self.timeout_mult, self.max_timeout, deadline - now) + timeout * self.timeout_mult, self.max_timeout, + deadline - now) self.set_stream() six.reraise(errors.RetryError, exc) @@ -113,6 +125,7 @@ def __next__(self, *args, **kwargs): def __iter__(self): return self + def _create_row_request(table_name, row_key=None, start_key=None, start_key_closed=True, end_key=None, filter_=None, limit=None): @@ -175,4 +188,4 @@ def _create_row_request(table_name, row_key=None, start_key=None, if range_kwargs: message.rows.row_ranges.add(**range_kwargs) - return message \ No newline at end of file + return message diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index d3c70d431e29..e1b1acd8c643 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,7 +274,8 @@ def consume_next(self): self._validate_chunk(chunk) - if ("ReadRowsIterator" in self._response_iterator.__class__.__name__): + if ("ReadRowsIterator" in + self._response_iterator.__class__.__name__): self._response_iterator.set_start_key(chunk.row_key) if chunk.reset_row: diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 8177f1e1e1d9..f196adf6bc80 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -15,7 +15,6 @@ """User friendly container for Google Cloud Bigtable Table.""" from __future__ import absolute_import -from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -32,28 +31,27 @@ from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request from grpc import StatusCode -import six - BACKOFF_SETTINGS = BackoffSettings( - initial_retry_delay_millis = 10, - retry_delay_multiplier = 1.3, - max_retry_delay_millis = 30000, - initial_rpc_timeout_millis = 25 * 60 * 1000, - rpc_timeout_multiplier = 1.0, - max_rpc_timeout_millis = 25 * 60 * 1000, - total_timeout_millis = 30 * 60 * 1000 + initial_retry_delay_millis=10, + retry_delay_multiplier=1.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=25 * 60 * 1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=30 * 60 * 1000 ) - + RETRY_OPTIONS = RetryOptions( - retry_codes = [ + retry_codes=[ StatusCode.DEADLINE_EXCEEDED, StatusCode.ABORTED, StatusCode.INTERNAL, StatusCode.UNAVAILABLE ], - backoff_settings = BACKOFF_SETTINGS + backoff_settings=BACKOFF_SETTINGS ) + class Table(object): """Representation of a Google Cloud Bigtable Table. @@ -295,7 +293,8 @@ def read_rows(self, start_key=None, end_key=None, limit=None, """ client = self._instance._client retrying_iterator = ReadRowsIterator(client, self.name, start_key, - end_key, filter_, limit, RETRY_OPTIONS) + end_key, filter_, limit, + RETRY_OPTIONS) return PartialRowsData(retrying_iterator) def sample_row_keys(self): From 5a8fdc0404d213bdc31bc838ee53dde24400066d Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Mon, 8 May 2017 15:02:56 -0400 Subject: [PATCH 03/12] add some tests for read_rows retries --- bigtable/google/cloud/bigtable/retry.py | 55 ++++-------- bigtable/google/cloud/bigtable/table.py | 24 +++--- bigtable/tests/unit/_testing.py | 27 +++++- bigtable/tests/unit/test_table.py | 110 +++++++++++++++++++++++- 4 files changed, 166 insertions(+), 50 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 98c876797b08..d3435e6a56d2 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -13,13 +13,6 @@ _MILLIS_PER_SECOND = 1000 -def _has_timeout_settings(backoff_settings): - return (backoff_settings.rpc_timeout_multiplier is not None and - backoff_settings.max_rpc_timeout_millis is not None and - backoff_settings.total_timeout_millis is not None and - backoff_settings.initial_rpc_timeout_millis is not None) - - class ReadRowsIterator(): """Creates an iterator equivalent to a_iter, but that retries on certain exceptions. @@ -35,23 +28,18 @@ def __init__(self, client, name, start_key, end_key, filter_, limit, self.end_key = end_key self.filter_ = filter_ self.limit = limit - self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier self.max_delay_millis = \ retry_options.backoff_settings.max_retry_delay_millis - self.has_timeout_settings = \ - _has_timeout_settings(retry_options.backoff_settings) - - if self.has_timeout_settings: - self.timeout_mult = \ - retry_options.backoff_settings.rpc_timeout_multiplier - self.max_timeout = \ - (retry_options.backoff_settings.max_rpc_timeout_millis / - _MILLIS_PER_SECOND) - self.total_timeout = \ - (retry_options.backoff_settings.total_timeout_millis / - _MILLIS_PER_SECOND) - self.set_stream() + self.timeout_mult = \ + retry_options.backoff_settings.rpc_timeout_multiplier + self.max_timeout = \ + (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + self.total_timeout = \ + (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) + self.set_stream() def set_start_key(self, start_key): """ @@ -78,17 +66,12 @@ def next(self, *args, **kwargs): delay = self.retry_options.backoff_settings.initial_retry_delay_millis exc = errors.RetryError('Retry total timeout exceeded before any' 'response was received') - if self.has_timeout_settings: - timeout = (self.retry_options.backoff_settings - .initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) - - now = time.time() - deadline = now + self.total_timeout - else: - timeout = None - deadline = None + timeout = (self.retry_options.backoff_settings + .initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + now = time.time() + deadline = now + self.total_timeout while deadline is None or now < deadline: try: return six.next(self.stream) @@ -109,12 +92,10 @@ def next(self, *args, **kwargs): to_sleep = random.uniform(0, delay * 2) time.sleep(to_sleep / _MILLIS_PER_SECOND) delay = min(delay * self.delay_mult, self.max_delay_millis) - - if self.has_timeout_settings: - now = time.time() - timeout = min( - timeout * self.timeout_mult, self.max_timeout, - deadline - now) + now = time.time() + timeout = min( + timeout * self.timeout_mult, self.max_timeout, + deadline - now) self.set_stream() six.reraise(errors.RetryError, exc) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index f196adf6bc80..70bee43ca576 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -41,15 +41,12 @@ total_timeout_millis=30 * 60 * 1000 ) -RETRY_OPTIONS = RetryOptions( - retry_codes=[ - StatusCode.DEADLINE_EXCEEDED, - StatusCode.ABORTED, - StatusCode.INTERNAL, - StatusCode.UNAVAILABLE - ], - backoff_settings=BACKOFF_SETTINGS -) +RETRY_CODES = [ + StatusCode.DEADLINE_EXCEEDED, + StatusCode.ABORTED, + StatusCode.INTERNAL, + StatusCode.UNAVAILABLE +] class Table(object): @@ -264,7 +261,7 @@ def read_row(self, row_key, filter_=None): return rows_data.rows[row_key] def read_rows(self, start_key=None, end_key=None, limit=None, - filter_=None): + filter_=None, backoff_settings=None): """Read rows from this table. :type start_key: bytes @@ -292,6 +289,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None, the streamed results. """ client = self._instance._client + if backoff_settings is None: + backoff_settings = BACKOFF_SETTINGS + RETRY_OPTIONS = RetryOptions( + retry_codes=RETRY_CODES, + backoff_settings=backoff_settings + ) + retrying_iterator = ReadRowsIterator(client, self.name, start_key, end_key, filter_, limit, RETRY_OPTIONS) diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index e67af6a1498c..7587c66c133b 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -14,7 +14,6 @@ """Mocks used to emulate gRPC generated objects.""" - class _FakeStub(object): """Acts as a gPRC stub.""" @@ -27,6 +26,16 @@ def __getattr__(self, name): # since __getattribute__ will handle them. return _MethodMock(name, self) +class _CustomFakeStub(object): + """Acts as a gRPC stub. Generates a result using an injected callable.""" + def __init__(self, result_callable): + self.result_callable = result_callable + self.method_calls = [] + + def __getattr__(self, name): + # We need not worry about attributes set in constructor + # since __getattribute__ will handle them. + return _CustomMethodMock(name, self) class _MethodMock(object): """Mock for API method attached to a gRPC stub. @@ -42,5 +51,19 @@ def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) curr_result, self._stub.results = (self._stub.results[0], - self._stub.results[1:]) + self._stub.results[1:]) return curr_result + +class _CustomMethodMock(object): + """ + Same as _MethodMock, but backed by an injected callable. + """ + + def __init__(self, name, stub): + self._name = name + self._stub = stub + + def __call__(self, *args, **kwargs): + """Sync method meant to mock a gRPC stub request.""" + self._stub.method_calls.append((self._name, args, kwargs)) + return self._stub.result_callable() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 51adcef6ed17..860b005d7601 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -377,7 +377,7 @@ def mock_create_row_request(table_name, **kwargs): end_key = b'end-key' filter_obj = object() limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): + with _Monkey(MUT, _create_row_request=mock_create_row_request): # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, @@ -399,6 +399,114 @@ def mock_create_row_request(table_name, **kwargs): } self.assertEqual(mock_created, [(table.name, created_kwargs)]) + def test_read_rows_one_chunk(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _FakeStub + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.cloud.bigtable.row_data import Cell + from google.cloud.bigtable.row_data import PartialRowsData + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response_iterator + chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + commit_row=True, + ) + response_pb = _ReadRowsResponsePB(chunks=[chunk]) + response_iterator = iter([response_pb]) + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub(response_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + result.consume_next() + + def test_read_rows_retry_timeout(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create a slow response iterator to cause a timeout + class MockTimeoutError(RpcError): + def code(self): + return StatusCode.DEADLINE_EXCEEDED + + def _wait_then_raise(): + time.sleep(0.5) + raise MockTimeoutError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_slow_iterator(): + return (_wait_then_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_slow_iterator) + + # Set to timeout before RPC completes + test_backoff_settings = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=1.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=1000 + ) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit, backoff_settings=test_backoff_settings) + with self.assertRaises(RetryError): + result.consume_next() + + def test_sample_row_keys(self): from tests.unit._testing import _FakeStub From 537e8b684157d88742bc13887712f2ab77ac8931 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 10 May 2017 19:32:52 -0400 Subject: [PATCH 04/12] add move test coverage for read_rows retries --- bigtable/google/cloud/bigtable/retry.py | 3 +- bigtable/tests/unit/test_table.py | 50 ++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index d3435e6a56d2..187069969d77 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -80,8 +80,7 @@ def next(self, *args, **kwargs): except RpcError as error: # pylint: disable=broad-except code = config.exc_to_code(error) if code not in self.retry_options.retry_codes: - six.reraise(errors.RetryError, - errors.RetryError(str(error))) + six.reraise(type(error), error) # pylint: disable=redefined-variable-type exc = errors.RetryError( diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 860b005d7601..c64ed95f500d 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -443,7 +443,7 @@ def mock_create_row_request(table_name, **kwargs): result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - result.consume_next() + result.consume_all() def test_read_rows_retry_timeout(self): from google.cloud._testing import _Monkey @@ -506,6 +506,54 @@ def make_slow_iterator(): with self.assertRaises(RetryError): result.consume_next() + def test_read_rows_non_idempotent_error_throws(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response iterator that raises a non-idempotent exception + class MockNonIdempotentError(RpcError): + def code(self): + return StatusCode.RESOURCE_EXHAUSTED + + def _raise(): + raise MockNonIdempotentError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_raising_iterator(): + return (_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_raising_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + with self.assertRaises(MockNonIdempotentError): + result.consume_next() def test_sample_row_keys(self): from tests.unit._testing import _FakeStub From bddf67343065880dfe1158712a778dc739589a42 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 10 May 2017 19:47:25 -0400 Subject: [PATCH 05/12] correct import for _create_row_request in test_table.py --- bigtable/tests/unit/test_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index c64ed95f500d..5e4b8aaff3ee 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -588,7 +588,7 @@ class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, filter_=None, limit=None): - from google.cloud.bigtable.table import _create_row_request + from google.cloud.bigtable.retry import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, From 9688b3cbebeb58e6faf84b730129d8f82ff167d2 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 10 May 2017 19:58:58 -0400 Subject: [PATCH 06/12] add test coverage for start_key_closed in read_rows --- bigtable/tests/unit/test_table.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 5e4b8aaff3ee..369050776e57 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -587,12 +587,12 @@ def test_sample_row_keys(self): class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None): + start_key_closed=True, filter_=None, limit=None): from google.cloud.bigtable.retry import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - filter_=filter_, limit=limit) + start_key_closed=start_key_closed, filter_=filter_, limit=limit) def test_table_name_only(self): table_name = 'table_name' @@ -615,7 +615,7 @@ def test_row_key(self): expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) - def test_row_range_start_key(self): + def test_row_range_start_key_closed(self): table_name = 'table_name' start_key = b'start_key' result = self._call_fut(table_name, start_key=start_key) @@ -623,6 +623,15 @@ def test_row_range_start_key(self): expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) + def test_row_range_start_key_open(self): + table_name = 'table_name' + start_key = b'start_key' + result = self._call_fut(table_name, start_key=start_key, + start_key_closed=False) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add(start_key_open=start_key) + self.assertEqual(result, expected_result) + def test_row_range_end_key(self): table_name = 'table_name' end_key = b'end_key' From c4bad6ef887cccd09eb112cbde8226c2a610544d Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 10 May 2017 20:08:32 -0400 Subject: [PATCH 07/12] remove redundant ReadRowsIterator#__iter__ --- bigtable/google/cloud/bigtable/retry.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 187069969d77..379cb8393f84 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -102,10 +102,6 @@ def next(self, *args, **kwargs): def __next__(self, *args, **kwargs): return self.next(*args, **kwargs) - def __iter__(self): - return self - - def _create_row_request(table_name, row_key=None, start_key=None, start_key_closed=True, end_key=None, filter_=None, limit=None): From 68ef3af514a9b8787a17118d0e263fea6334390f Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 10 May 2017 20:19:28 -0400 Subject: [PATCH 08/12] add newline in retry.py for linter --- bigtable/google/cloud/bigtable/retry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 379cb8393f84..c208afeee5b9 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -102,6 +102,7 @@ def next(self, *args, **kwargs): def __next__(self, *args, **kwargs): return self.next(*args, **kwargs) + def _create_row_request(table_name, row_key=None, start_key=None, start_key_closed=True, end_key=None, filter_=None, limit=None): From d0853127146bfd5a8c5eced435ea21f30fe7a4ac Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Fri, 9 Jun 2017 17:29:44 -0400 Subject: [PATCH 09/12] check test outcome --- bigtable/tests/system.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 3772aca721a1..785df5ccd9db 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -328,8 +328,8 @@ def process_scan(table, range, ids): } test_platform = platform.system() - if (test_platform not in MOCK_SERVER_URLS): - self.fail("Retry server not available for platform " + test_platform) + if test_platform not in MOCK_SERVER_URLS: + self.skip('Retry server not available for platform {0}.'.format(test_platform)) mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform]).read() mock_server_file = open(SERVER_ZIP, 'wb') @@ -338,10 +338,11 @@ def process_scan(table, range, ids): # Unzip server subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) - # Connect to server + # Connect to server server = subprocess.Popen( ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) (endpoint, port) = server.stdout.readline().rstrip("\n").split(":") @@ -356,13 +357,20 @@ def process_scan(table, range, ids): if line.startswith("CLIENT:"): chunks = line.split(" ") op = chunks[1] - if (op != "SCAN"): - self.fail("Script contained " + op + " operation. Only \'SCAN\' is supported.") - else: - process_scan(table, chunks[2], chunks[3]) + process_scan(table, chunks[2], chunks[3]) - # Clean up + # Check that the test passed server.kill() + server_stdout_lines = [] + while True: + line = server.stdout.readline() + if line != '': + server_stdout_lines.append(line) + else: + break + self.assertEqual(server_stdout_lines[-1], "PASS\n") + + # Clean up os.remove(SERVER_ZIP) os.remove(SERVER_NAME) From 70d920fb325828a8d0217ee932f7c8ad6f49bdc4 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Fri, 9 Jun 2017 17:41:53 -0400 Subject: [PATCH 10/12] address comments --- bigtable/google/cloud/bigtable/retry.py | 5 +++-- bigtable/google/cloud/bigtable/row_data.py | 3 +-- bigtable/tests/system.py | 12 ++++++------ bigtable/tests/unit/test_table.py | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index c208afeee5b9..f20419ce4f8e 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -2,6 +2,7 @@ import random import time import six +import sys from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( @@ -13,7 +14,7 @@ _MILLIS_PER_SECOND = 1000 -class ReadRowsIterator(): +class ReadRowsIterator(object): """Creates an iterator equivalent to a_iter, but that retries on certain exceptions. """ @@ -97,7 +98,7 @@ def next(self, *args, **kwargs): deadline - now) self.set_stream() - six.reraise(errors.RetryError, exc) + six.reraise(errors.RetryError, exc, sys.exc_info()[2]) def __next__(self, *args, **kwargs): return self.next(*args, **kwargs) diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index e1b1acd8c643..73451faaecd4 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,8 +274,7 @@ def consume_next(self): self._validate_chunk(chunk) - if ("ReadRowsIterator" in - self._response_iterator.__class__.__name__): + if hasattr(self._response_iterator, 'set_start_key'): self._response_iterator.set_start_key(chunk.row_key) if chunk.reset_row: diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 785df5ccd9db..cbc9b488edec 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -352,12 +352,12 @@ def process_scan(table, range, ids): table = instance.table("table") # Run test, line by line - script = open(TEST_SCRIPT, 'r') - for line in script.readlines(): - if line.startswith("CLIENT:"): - chunks = line.split(" ") - op = chunks[1] - process_scan(table, chunks[2], chunks[3]) + with open(TEST_SCRIPT, 'r') as script: + for line in script.readlines(): + if line.startswith("CLIENT:"): + chunks = line.split(" ") + op = chunks[1] + process_scan(table, chunks[2], chunks[3]) # Check that the test passed server.kill() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 369050776e57..05c765594e62 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -474,7 +474,7 @@ def code(self): return StatusCode.DEADLINE_EXCEEDED def _wait_then_raise(): - time.sleep(0.5) + time.sleep(0.1) raise MockTimeoutError() # Patch the stub used by the API method. The stub should create a new @@ -486,7 +486,7 @@ def make_slow_iterator(): # Set to timeout before RPC completes test_backoff_settings = BackoffSettings( initial_retry_delay_millis=10, - retry_delay_multiplier=1.3, + retry_delay_multiplier=0.3, max_retry_delay_millis=30000, initial_rpc_timeout_millis=1000, rpc_timeout_multiplier=1.0, From 7cffbe085aa607d802573e33ad65c0a18b220579 Mon Sep 17 00:00:00 2001 From: calpeyser Date: Fri, 14 Jul 2017 09:58:20 -0400 Subject: [PATCH 11/12] remove extra import --- bigtable/google/cloud/bigtable/table.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 771799891a76..e176834ccc18 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,7 +17,6 @@ import six -from __future__ import absolute_import from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -444,4 +443,4 @@ def _check_row_type(row): """ if not isinstance(row, DirectRow): raise TypeError('Bulk processing can not be applied for ' - 'conditional or append mutations.') \ No newline at end of file + 'conditional or append mutations.') From 19079c3243d1193ce8938fda60127e7e958d1801 Mon Sep 17 00:00:00 2001 From: calpeyser Date: Fri, 14 Jul 2017 10:03:29 -0400 Subject: [PATCH 12/12] adding newline in bigtable/table.py to satisfy linter --- bigtable/google/cloud/bigtable/table.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index e176834ccc18..3ed2d20ea975 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -382,6 +382,7 @@ def sample_row_keys(self): response_iterator = client._data_stub.SampleRowKeys(request_pb) return response_iterator + def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table.