Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions bigtable/google/cloud/bigtable/row.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015 Google LLC
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,17 @@

import six

import concurrent.futures

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
data_pb2 as data_v2_pb2)
from google.cloud.bigtable._generated import (
bigtable_pb2 as messages_v2_pb2)
from google.api_core import retry
from google.cloud import exceptions


_PACK_I64 = struct.Struct('>q').pack
Expand Down Expand Up @@ -236,6 +240,26 @@ def _delete_cells(self, column_family_id, columns, time_range=None,
mutations_list.extend(to_append)


class RetryCommit:

def __init__(self, table, request_pb):
"""

:param table:
:param request_pb:
"""
self.table = table
self.request_pb = request_pb

def __call__(self):
"""

:return: raise exception for MutateRow
"""
client = self.table._instance._client
client._data_stub.MutateRow(self.request_pb)


class DirectRow(_SetDeleteRow):
"""Google Cloud Bigtable Row for sending "direct" mutations.

Expand Down Expand Up @@ -269,6 +293,7 @@ class DirectRow(_SetDeleteRow):
def __init__(self, row_key, table):
super(DirectRow, self).__init__(row_key, table)
self._pb_mutations = []
self.request_pb = None

def _get_mutations(self, state): # pylint: disable=unused-argument
"""Gets the list of mutations for a given state.
Expand Down Expand Up @@ -412,9 +437,19 @@ def commit(self):
row_key=self._row_key,
mutations=mutations_list,
)
# We expect a `google.protobuf.empty_pb2.Empty`
client = self._table._instance._client
client._data_stub.MutateRow(request_pb)

retry_commit = RetryCommit(self._table, request_pb)
retry_ = retry.Retry(
predicate=retry.if_exception_type(exceptions.GrpcRendezvous),
deadline=30)

try:
retry_(retry_commit)()
except exceptions.RetryError:
raise concurrent.futures.TimeoutError(
'Operation did not complete within the designated '
'timeout.')

self.clear()

def clear(self):
Expand Down
1 change: 1 addition & 0 deletions core/google/cloud/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@
BadGateway = exceptions.BadGateway
ServiceUnavailable = exceptions.ServiceUnavailable
GatewayTimeout = exceptions.GatewayTimeout
RetryError = exceptions.RetryError
from_http_status = exceptions.from_http_status
from_http_response = exceptions.from_http_response