From 8963856bb7349ca49b35fa8164809e78e4e15507 Mon Sep 17 00:00:00 2001 From: Mariatta Wijaya Date: Tue, 22 Nov 2022 12:58:07 -0800 Subject: [PATCH 1/3] Asynchronous batching Call the async mutate_rows from grpc aio Create flush and mutate_coroutines --- google/cloud/bigtable/batcher.py | 19 ++- google/cloud/bigtable/client.py | 20 ++- google/cloud/bigtable/table.py | 146 ++++++++++++++++++ .../services/bigtable/async_client.py | 4 +- 4 files changed, 182 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 3c23f4436..879fec36e 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -14,6 +14,8 @@ """User friendly container for Google Cloud Bigtable MutationBatcher.""" +import asyncio + FLUSH_COUNT = 1000 MAX_MUTATIONS = 100000 @@ -65,6 +67,9 @@ def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES): self.max_row_bytes = max_row_bytes def mutate(self, row): + asyncio.run(self.async_mutate(row)) + + async def async_mutate(self, row): """Add a row to the batch. If the current batch meets one of the size limits, the batch is sent synchronously. @@ -95,16 +100,19 @@ def mutate(self, row): ) if (self.total_mutation_count + mutation_count) >= MAX_MUTATIONS: - self.flush() + await self.async_flush() self.rows.append(row) self.total_mutation_count += mutation_count self.total_size += row.get_mutations_size() if self.total_size >= self.max_row_bytes or len(self.rows) >= self.flush_count: - self.flush() + await self.async_flush() def mutate_rows(self, rows): + asyncio.run(self.async_mutate_rows(rows)) + + async def async_mutate_rows(self, rows): """Add multiple rows to the batch. If the current batch meets one of the size limits, the batch is sent synchronously. @@ -127,9 +135,12 @@ def mutate_rows(self, rows): mutations count. """ for row in rows: - self.mutate(row) + await self.async_mutate(row) def flush(self): + asyncio.run(self.async_flush()) + + async def async_flush(self): """Sends the current. batch to Cloud Bigtable. For example: @@ -140,7 +151,7 @@ def flush(self): """ if len(self.rows) != 0: - self.table.mutate_rows(self.rows) + await self.table.async_mutate_rows(self.rows) self.total_mutation_count = 0 self.total_size = 0 self.rows = [] diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index c82a268c6..a0b895443 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -37,7 +37,7 @@ from google.cloud import bigtable_v2 from google.cloud import bigtable_admin_v2 -from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport +from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport, BigtableGrpcAsyncIOTransport from google.cloud.bigtable_admin_v2.services.bigtable_instance_admin.transports import ( BigtableInstanceAdminGrpcTransport, ) @@ -145,6 +145,7 @@ class Client(ClientWithProject): """ _table_data_client = None + _async_table_data_client = None _table_admin_client = None _instance_admin_client = None @@ -341,6 +342,23 @@ def table_data_client(self): self._table_data_client = klass(self) return self._table_data_client + @property + def async_table_data_client(self): + """only mutate_rows is supported here + """ + if self._async_table_data_client is None: + transport = self._create_gapic_client_channel( + bigtable_v2.BigtableAsyncClient, + BigtableGrpcAsyncIOTransport, + ) + klass = _create_gapic_client( + bigtable_v2.BigtableAsyncClient, + client_options=self._client_options, + transport=transport, + ) + self._async_table_data_client = klass(self) + return self._async_table_data_client + @property def table_admin_client(self): """Getter for the gRPC stub used for the Table Admin API. diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index 8605992ba..f4b19979c 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -27,6 +27,7 @@ from google.api_core.gapic_v1.method import DEFAULT from google.api_core.retry import if_exception_type from google.api_core.retry import Retry +from google.api_core.retry_async import AsyncRetry from google.cloud._helpers import _to_bytes # type: ignore from google.cloud.bigtable.backup import Backup from google.cloud.bigtable.column_family import _gc_rule_from_pb @@ -91,6 +92,13 @@ class _BigtableRetryableError(Exception): Used by :meth:`~google.cloud.bigtable.table.Table.mutate_rows`. """ +ASYNC_DEFAULT_RETRY = AsyncRetry( + predicate=if_exception_type(_BigtableRetryableError), + initial=1.0, + maximum=15.0, + multiplier=2.0, + deadline=120.0, # 2 minutes +) class TableMismatchError(ValueError): """Row from another table.""" @@ -677,6 +685,20 @@ def yield_rows(self, **kwargs): ) return self.read_rows(**kwargs) + async def async_mutate_rows(self, rows, retry=ASYNC_DEFAULT_RETRY, timeout=DEFAULT): + """async mutate_rows """ + if timeout is DEFAULT: + timeout = self.mutation_timeout + + retryable_mutate_rows = _AsyncRetryableMutateRowsWorker( + self._instance._client, + self.name, + rows, + app_profile_id=self._app_profile_id, + timeout=timeout, + ) + return await retryable_mutate_rows(retry=retry) + def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT): """Mutates multiple rows in bulk. @@ -1179,6 +1201,130 @@ def _do_mutate_retryable_rows(self): return self.responses_statuses +class _AsyncRetryableMutateRowsWorker(object): + """Retry async mutate rows""" + + def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None): + self.client = client + self.table_name = table_name + self.rows = rows + self.app_profile_id = app_profile_id + self.responses_statuses = [None] * len(self.rows) + self.timeout = timeout + + async def __call__(self, retry=ASYNC_DEFAULT_RETRY): + """Attempt to mutate all rows and retry rows with transient errors. + + Will retry the rows with transient errors until all rows succeed or + ``deadline`` specified in the `retry` is reached. + + :rtype: list + :returns: A list of response statuses (`google.rpc.status_pb2.Status`) + corresponding to success or failure of each row mutation + sent. These will be in the same order as the ``rows``. + """ + mutate_rows = self._do_mutate_retryable_rows + if retry: + mutate_rows = retry(self._do_mutate_retryable_rows) + + try: + await mutate_rows() + except (_BigtableRetryableError, RetryError): + # - _BigtableRetryableError raised when no retry strategy is used + # and a retryable error on a mutation occurred. + # - RetryError raised when retry deadline is reached. + # In both cases, just return current `responses_statuses`. + pass + + return self.responses_statuses + + @staticmethod + def _is_retryable(status): + return status is None or status.code in RETRYABLE_CODES + + async def _do_mutate_retryable_rows(self): + """Mutate all the rows that are eligible for retry. + + A row is eligible for retry if it has not been tried or if it resulted + in a transient error in a previous call. + + :rtype: list + :return: The responses statuses, which is a list of + :class:`~google.rpc.status_pb2.Status`. + :raises: One of the following: + + * :exc:`~.table._BigtableRetryableError` if any + row returned a transient error. + * :exc:`RuntimeError` if the number of responses doesn't + match the number of rows that were retried + """ + retryable_rows = [] + index_into_all_rows = [] + for index, status in enumerate(self.responses_statuses): + if self._is_retryable(status): + retryable_rows.append(self.rows[index]) + index_into_all_rows.append(index) + + if not retryable_rows: + # All mutations are either successful or non-retryable now. + return self.responses_statuses + + entries = _compile_mutation_entries(self.table_name, retryable_rows) + data_client = self.client.async_table_data_client + + kwargs = {} + if self.timeout is not None: + kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout) + + try: + responses = await data_client.mutate_rows( + table_name=self.table_name, + entries=entries, + app_profile_id=self.app_profile_id, + retry=None, + **kwargs + ) + except RETRYABLE_MUTATION_ERRORS as exc: + # If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is + # returned from the initial call, consider + # it to be retryable. Wrap as a Bigtable Retryable Error. + # For InternalServerError, it is only retriable if the message is related to RST Stream messages + if _retriable_internal_server_error(exc) or not isinstance( + exc, InternalServerError + ): + raise _BigtableRetryableError + else: + # re-raise the original exception + raise + + num_responses = 0 + num_retryable_responses = 0 + + data = await responses.read() + + for entry in data.entries: + num_responses += 1 + index = index_into_all_rows[entry.index] + self.responses_statuses[index] = entry.status + if self._is_retryable(entry.status): + num_retryable_responses += 1 + if entry.status.code == 0: + self.rows[index].clear() + + if len(retryable_rows) != num_responses: + raise RuntimeError( + "Unexpected number of responses", + num_responses, + "Expected", + len(retryable_rows), + ) + + if num_retryable_responses: + raise _BigtableRetryableError + + return self.responses_statuses + + class ClusterState(object): """Representation of a Cluster State. diff --git a/google/cloud/bigtable_v2/services/bigtable/async_client.py b/google/cloud/bigtable_v2/services/bigtable/async_client.py index 8ab2f1348..9768a83d9 100644 --- a/google/cloud/bigtable_v2/services/bigtable/async_client.py +++ b/google/cloud/bigtable_v2/services/bigtable/async_client.py @@ -526,7 +526,7 @@ async def mutate_row( # Done; return the response. return response - def mutate_rows( + async def mutate_rows( self, request: Optional[Union[bigtable.MutateRowsRequest, dict]] = None, *, @@ -626,7 +626,7 @@ def mutate_rows( ) # Send the request. - response = rpc( + response = await rpc( request, retry=retry, timeout=timeout, From 05a4bd13cc4d9effa960d06862c76661661eda40 Mon Sep 17 00:00:00 2001 From: Mariatta Wijaya Date: Tue, 29 Nov 2022 16:18:49 -0800 Subject: [PATCH 2/3] Have both sync and async version of batch mutations/flush --- google/cloud/bigtable/batcher.py | 27 ++++++++++++++++++++++++--- google/cloud/bigtable/client.py | 8 +++++--- google/cloud/bigtable/table.py | 3 ++- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 879fec36e..559e18e15 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -67,7 +67,23 @@ def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES): self.max_row_bytes = max_row_bytes def mutate(self, row): - asyncio.run(self.async_mutate(row)) + mutation_count = len(row._get_mutations()) + if mutation_count > MAX_MUTATIONS: + raise MaxMutationsError( + "The row key {} exceeds the number of mutations {}.".format( + row.row_key, mutation_count + ) + ) + + if (self.total_mutation_count + mutation_count) >= MAX_MUTATIONS: + self.flush() + + self.rows.append(row) + self.total_mutation_count += mutation_count + self.total_size += row.get_mutations_size() + + if self.total_size >= self.max_row_bytes or len(self.rows) >= self.flush_count: + self.flush() async def async_mutate(self, row): """Add a row to the batch. If the current batch meets one of the size @@ -110,7 +126,8 @@ async def async_mutate(self, row): await self.async_flush() def mutate_rows(self, rows): - asyncio.run(self.async_mutate_rows(rows)) + for row in rows: + self.mutate(row) async def async_mutate_rows(self, rows): """Add multiple rows to the batch. If the current batch meets one of the size @@ -138,7 +155,11 @@ async def async_mutate_rows(self, rows): await self.async_mutate(row) def flush(self): - asyncio.run(self.async_flush()) + if len(self.rows) != 0: + self.table.mutate_rows(self.rows) + self.total_mutation_count = 0 + self.total_size = 0 + self.rows = [] async def async_flush(self): """Sends the current. batch to Cloud Bigtable. diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index a0b895443..55b0ae684 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -37,7 +37,10 @@ from google.cloud import bigtable_v2 from google.cloud import bigtable_admin_v2 -from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport, BigtableGrpcAsyncIOTransport +from google.cloud.bigtable_v2.services.bigtable.transports import ( + BigtableGrpcTransport, + BigtableGrpcAsyncIOTransport, +) from google.cloud.bigtable_admin_v2.services.bigtable_instance_admin.transports import ( BigtableInstanceAdminGrpcTransport, ) @@ -344,8 +347,7 @@ def table_data_client(self): @property def async_table_data_client(self): - """only mutate_rows is supported here - """ + """only mutate_rows is supported here""" if self._async_table_data_client is None: transport = self._create_gapic_client_channel( bigtable_v2.BigtableAsyncClient, diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index f4b19979c..044ee0eef 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -100,6 +100,7 @@ class _BigtableRetryableError(Exception): deadline=120.0, # 2 minutes ) + class TableMismatchError(ValueError): """Row from another table.""" @@ -686,7 +687,7 @@ def yield_rows(self, **kwargs): return self.read_rows(**kwargs) async def async_mutate_rows(self, rows, retry=ASYNC_DEFAULT_RETRY, timeout=DEFAULT): - """async mutate_rows """ + """async mutate_rows""" if timeout is DEFAULT: timeout = self.mutation_timeout From eb12c0b6b470a9aa17229a21186223b093aeb7b3 Mon Sep 17 00:00:00 2001 From: Mariatta Wijaya Date: Tue, 29 Nov 2022 16:22:07 -0800 Subject: [PATCH 3/3] Remove unused import --- google/cloud/bigtable/batcher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 559e18e15..9a76eba3c 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -14,8 +14,6 @@ """User friendly container for Google Cloud Bigtable MutationBatcher.""" -import asyncio - FLUSH_COUNT = 1000 MAX_MUTATIONS = 100000