diff --git a/bigtable/google/cloud/bigtable/row.py b/bigtable/google/cloud/bigtable/row.py index da9678cdf892..0f113a9722e2 100644 --- a/bigtable/google/cloud/bigtable/row.py +++ b/bigtable/google/cloud/bigtable/row.py @@ -1,4 +1,4 @@ -# Copyright 2015 Google LLC +# Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import six +import concurrent.futures + from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _microseconds_from_datetime from google.cloud._helpers import _to_bytes @@ -26,6 +28,8 @@ data_pb2 as data_v2_pb2) from google.cloud.bigtable._generated import ( bigtable_pb2 as messages_v2_pb2) +from google.api_core import retry +from google.cloud import exceptions _PACK_I64 = struct.Struct('>q').pack @@ -236,6 +240,26 @@ def _delete_cells(self, column_family_id, columns, time_range=None, mutations_list.extend(to_append) +class RetryCommit: + + def __init__(self, table, request_pb): + """ + + :param table: + :param request_pb: + """ + self.table = table + self.request_pb = request_pb + + def __call__(self): + """ + + :return: raise exception for MutateRow + """ + client = self.table._instance._client + client._data_stub.MutateRow(self.request_pb) + + class DirectRow(_SetDeleteRow): """Google Cloud Bigtable Row for sending "direct" mutations. @@ -269,6 +293,7 @@ class DirectRow(_SetDeleteRow): def __init__(self, row_key, table): super(DirectRow, self).__init__(row_key, table) self._pb_mutations = [] + self.request_pb = None def _get_mutations(self, state): # pylint: disable=unused-argument """Gets the list of mutations for a given state. @@ -412,9 +437,19 @@ def commit(self): row_key=self._row_key, mutations=mutations_list, ) - # We expect a `google.protobuf.empty_pb2.Empty` - client = self._table._instance._client - client._data_stub.MutateRow(request_pb) + + retry_commit = RetryCommit(self._table, request_pb) + retry_ = retry.Retry( + predicate=retry.if_exception_type(exceptions.GrpcRendezvous), + deadline=30) + + try: + retry_(retry_commit)() + except exceptions.RetryError: + raise concurrent.futures.TimeoutError( + 'Operation did not complete within the designated ' + 'timeout.') + self.clear() def clear(self): diff --git a/core/google/cloud/exceptions.py b/core/google/cloud/exceptions.py index 36ee6d14fcab..ddfe7ec09cb3 100644 --- a/core/google/cloud/exceptions.py +++ b/core/google/cloud/exceptions.py @@ -55,5 +55,6 @@ BadGateway = exceptions.BadGateway ServiceUnavailable = exceptions.ServiceUnavailable GatewayTimeout = exceptions.GatewayTimeout +RetryError = exceptions.RetryError from_http_status = exceptions.from_http_status from_http_response = exceptions.from_http_response