Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions bigtable/google/cloud/bigtable/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Provides function wrappers that implement retrying."""
import random
import time
import six

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)
from google.gax import config, errors

_MILLIS_PER_SECOND = 1000

def _has_timeout_settings(backoff_settings):
return (backoff_settings.rpc_timeout_multiplier is not None and
backoff_settings.max_rpc_timeout_millis is not None and
backoff_settings.total_timeout_millis is not None and
backoff_settings.initial_rpc_timeout_millis is not None)

class ReadRowsIterator():
"""Creates an iterator equivalent to a_iter, but that retries on certain
exceptions.
"""

def __init__(self, client, name, start_key, end_key, filter_, limit,
retry_options, **kwargs):
self.client = client
self.retry_options = retry_options
self.name = name
self.start_key = start_key
self.start_key_closed = True
self.end_key = end_key
self.filter_ = filter_
self.limit = limit

self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier
self.max_delay_millis = retry_options.backoff_settings.max_retry_delay_millis
self.has_timeout_settings = _has_timeout_settings(retry_options.backoff_settings)

if self.has_timeout_settings:
self.timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier
self.max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / _MILLIS_PER_SECOND)
self.total_timeout = (retry_options.backoff_settings.total_timeout_millis / _MILLIS_PER_SECOND)
self.set_stream()

def set_start_key(self, start_key):

This comment was marked as spam.

self.start_key = start_key
self.start_key_closed = False

def set_stream(self):
request_pb = _create_row_request(
self.name, start_key=self.start_key,
start_key_closed=self.start_key_closed, end_key=self.end_key,
filter_= self.filter_, limit=self.limit)
self.stream = self.client._data_stub.ReadRows(request_pb)

def next(self, *args, **kwargs):
delay = self.retry_options.backoff_settings.initial_retry_delay_millis
exc = errors.RetryError('Retry total timeout exceeded before any'
'response was received')
if self.has_timeout_settings:
timeout = (
self.retry_options.backoff_settings.initial_rpc_timeout_millis /
_MILLIS_PER_SECOND)

now = time.time()
deadline = now + self.total_timeout
else:
timeout = None
deadline = None

while deadline is None or now < deadline:
try:
return six.next(self.stream)
except StopIteration as stop:

This comment was marked as spam.

raise stop
except Exception as exception: # pylint: disable=broad-except

This comment was marked as spam.

code = config.exc_to_code(exception)
if code not in self.retry_options.retry_codes:
raise errors.RetryError(

This comment was marked as spam.

'Exception occurred in retry method that was not'
' classified as transient', exception)

# pylint: disable=redefined-variable-type
exc = errors.RetryError(

This comment was marked as spam.

'Retry total timeout exceeded with exception', exception)

# Sleep a random number which will, on average, equal the
# expected delay.
to_sleep = random.uniform(0, delay * 2)
time.sleep(to_sleep / _MILLIS_PER_SECOND)
delay = min(delay * self.delay_mult, self.max_delay_millis)

if self.has_timeout_settings:
now = time.time()
timeout = min(
timeout * self.timeout_mult, self.max_timeout, deadline - now)
self.set_stream()

raise exc

def __next__(self, *args, **kwargs):
return self.next(*args, **kwargs)

def __iter__(self):
return self

def _create_row_request(table_name, row_key=None, start_key=None,
start_key_closed=True, end_key=None, filter_=None,
limit=None):
"""Creates a request to read rows in a table.
:type table_name: str
:param table_name: The name of the table to read from.
:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results.
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
if start_key_closed:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
else:
range_kwargs['start_key_open'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key_open'] = _to_bytes(end_key)
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if limit is not None:
request_kwargs['rows_limit'] = limit

message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)

if row_key is not None:
message.rows.row_keys.append(_to_bytes(row_key))

if range_kwargs:
message.rows.row_ranges.add(**range_kwargs)

return message
3 changes: 3 additions & 0 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ def consume_next(self):

self._validate_chunk(chunk)

if ("ReadRowsIterator" in self._response_iterator.__class__.__name__):
self._response_iterator.set_start_key(chunk.row_key)

This comment was marked as spam.


if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
Expand Down
99 changes: 30 additions & 69 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

"""User friendly container for Google Cloud Bigtable Table."""
from __future__ import absolute_import, division

This comment was marked as spam.


import six

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)
from google.cloud.bigtable._generated import (
Expand All @@ -27,7 +29,29 @@
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData

from google.gax import RetryOptions, BackoffSettings
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
from grpc import StatusCode

BACKOFF_SETTINGS = BackoffSettings(

This comment was marked as spam.

This comment was marked as spam.

initial_retry_delay_millis = 10,
retry_delay_multiplier = 2,
max_retry_delay_millis = 5000,
initial_rpc_timeout_millis = 10,
rpc_timeout_multiplier = 2,
max_rpc_timeout_millis = 1000,
total_timeout_millis = 5000
)

RETRY_OPTIONS = RetryOptions(
retry_codes = [
StatusCode.DEADLINE_EXCEEDED,
StatusCode.ABORTED,
StatusCode.INTERNAL,
StatusCode.UNAVAILABLE
],
backoff_settings = BACKOFF_SETTINGS
)

class Table(object):
"""Representation of a Google Cloud Bigtable Table.
Expand Down Expand Up @@ -268,13 +292,11 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
"""
request_pb = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit)

client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)
retrying_iterator = ReadRowsIterator(client, self.name, start_key,
end_key, filter_, limit, RETRY_OPTIONS)
return PartialRowsData(retrying_iterator)

def sample_row_keys(self):
"""Read a sample of row keys in the table.
Expand Down Expand Up @@ -312,64 +334,3 @@ def sample_row_keys(self):
client = self._instance._client
response_iterator = client._data_stub.SampleRowKeys(request_pb)
return response_iterator


def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
filter_=None, limit=None):
"""Creates a request to read rows in a table.

:type table_name: str
:param table_name: The name of the table to read from.

:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.

:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.

:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.

:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.

:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results.

:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key_open'] = _to_bytes(end_key)
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if limit is not None:
request_kwargs['rows_limit'] = limit

message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)

if row_key is not None:
message.rows.row_keys.append(_to_bytes(row_key))

if range_kwargs:
message.rows.row_ranges.add(**range_kwargs)

return message
Binary file added bigtable/unit_tests/retries
Binary file not shown.
Binary file added bigtable/unit_tests/retry
Binary file not shown.
Loading