Skip to content

Commit 1ecf65f

Browse files
feat: mutate rows batching (#770)
1 parent ceaf598 commit 1ecf65f

File tree

11 files changed

+2070
-115
lines changed

11 files changed

+2070
-115
lines changed

google/cloud/bigtable/_mutate_rows.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
from google.cloud.bigtable.client import Table
3232
from google.cloud.bigtable.mutations import RowMutationEntry
3333

34+
# mutate_rows requests are limited to this value
35+
MUTATE_ROWS_REQUEST_MUTATION_LIMIT = 100_000
36+
3437

3538
class _MutateRowsIncomplete(RuntimeError):
3639
"""
@@ -68,6 +71,14 @@ def __init__(
6871
- per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds.
6972
If not specified, the request will run until operation_timeout is reached.
7073
"""
74+
# check that mutations are within limits
75+
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
76+
if total_mutations > MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
77+
raise ValueError(
78+
"mutate_rows requests can contain at most "
79+
f"{MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across "
80+
f"all entries. Found {total_mutations}."
81+
)
7182
# create partial function to pass to trigger rpc call
7283
metadata = _make_metadata(table.table_name, table.app_profile_id)
7384
self._gapic_fn = functools.partial(
@@ -119,7 +130,7 @@ async def start(self):
119130
self._handle_entry_error(idx, exc)
120131
finally:
121132
# raise exception detailing incomplete mutations
122-
all_errors = []
133+
all_errors: list[Exception] = []
123134
for idx, exc_list in self.errors.items():
124135
if len(exc_list) == 0:
125136
raise core_exceptions.ClientError(

google/cloud/bigtable/client.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
Any,
2121
Optional,
2222
Set,
23-
Callable,
24-
Coroutine,
2523
TYPE_CHECKING,
2624
)
2725

@@ -60,6 +58,8 @@
6058
from google.cloud.bigtable._mutate_rows import _MutateRowsOperation
6159
from google.cloud.bigtable._helpers import _make_metadata
6260
from google.cloud.bigtable._helpers import _convert_retry_deadline
61+
from google.cloud.bigtable.mutations_batcher import MutationsBatcher
62+
from google.cloud.bigtable.mutations_batcher import _MB_SIZE
6363
from google.cloud.bigtable._helpers import _attempt_timeout_generator
6464

6565
from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule
@@ -69,7 +69,6 @@
6969
from google.cloud.bigtable.row_filters import RowFilterChain
7070

7171
if TYPE_CHECKING:
72-
from google.cloud.bigtable.mutations_batcher import MutationsBatcher
7372
from google.cloud.bigtable import RowKeySamples
7473
from google.cloud.bigtable import ShardedQuery
7574

@@ -753,17 +752,48 @@ async def execute_rpc():
753752
)
754753
return await wrapped_fn()
755754

756-
def mutations_batcher(self, **kwargs) -> MutationsBatcher:
755+
def mutations_batcher(
756+
self,
757+
*,
758+
flush_interval: float | None = 5,
759+
flush_limit_mutation_count: int | None = 1000,
760+
flush_limit_bytes: int = 20 * _MB_SIZE,
761+
flow_control_max_mutation_count: int = 100_000,
762+
flow_control_max_bytes: int = 100 * _MB_SIZE,
763+
batch_operation_timeout: float | None = None,
764+
batch_per_request_timeout: float | None = None,
765+
) -> MutationsBatcher:
757766
"""
758767
Returns a new mutations batcher instance.
759768
760769
Can be used to iteratively add mutations that are flushed as a group,
761770
to avoid excess network calls
762771
772+
Args:
773+
- flush_interval: Automatically flush every flush_interval seconds. If None,
774+
a table default will be used
775+
- flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count
776+
mutations are added across all entries. If None, this limit is ignored.
777+
- flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added.
778+
- flow_control_max_mutation_count: Maximum number of inflight mutations.
779+
- flow_control_max_bytes: Maximum number of inflight bytes.
780+
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None,
781+
table default_operation_timeout will be used
782+
- batch_per_request_timeout: timeout for each individual request, in seconds. If None,
783+
table default_per_request_timeout will be used
763784
Returns:
764785
- a MutationsBatcher context manager that can batch requests
765786
"""
766-
return MutationsBatcher(self, **kwargs)
787+
return MutationsBatcher(
788+
self,
789+
flush_interval=flush_interval,
790+
flush_limit_mutation_count=flush_limit_mutation_count,
791+
flush_limit_bytes=flush_limit_bytes,
792+
flow_control_max_mutation_count=flow_control_max_mutation_count,
793+
flow_control_max_bytes=flow_control_max_bytes,
794+
batch_operation_timeout=batch_operation_timeout,
795+
batch_per_request_timeout=batch_per_request_timeout,
796+
)
767797

768798
async def mutate_row(
769799
self,
@@ -861,10 +891,6 @@ async def bulk_mutate_rows(
861891
*,
862892
operation_timeout: float | None = 60,
863893
per_request_timeout: float | None = None,
864-
on_success: Callable[
865-
[int, RowMutationEntry], None | Coroutine[None, None, None]
866-
]
867-
| None = None,
868894
):
869895
"""
870896
Applies mutations for multiple rows in a single batched request.
@@ -890,9 +916,6 @@ async def bulk_mutate_rows(
890916
in seconds. If it takes longer than this time to complete, the request
891917
will be cancelled with a DeadlineExceeded exception, and a retry will
892918
be attempted if within operation_timeout budget
893-
- on_success: a callback function that will be called when each mutation
894-
entry is confirmed to be applied successfully. Will be passed the
895-
index and the entry itself.
896919
Raises:
897920
- MutationsExceptionGroup if one or more mutations fails
898921
Contains details about any failed entries in .exceptions

google/cloud/bigtable/exceptions.py

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,96 @@ def __str__(self):
8585
class MutationsExceptionGroup(BigtableExceptionGroup):
8686
"""
8787
Represents one or more exceptions that occur during a bulk mutation operation
88+
89+
Exceptions will typically be of type FailedMutationEntryError, but other exceptions may
90+
be included if they are raised during the mutation operation
8891
"""
8992

9093
@staticmethod
91-
def _format_message(excs: list[FailedMutationEntryError], total_entries: int):
92-
entry_str = "entry" if total_entries == 1 else "entries"
93-
plural_str = "" if len(excs) == 1 else "s"
94-
return f"{len(excs)} sub-exception{plural_str} (from {total_entries} {entry_str} attempted)"
94+
def _format_message(
95+
excs: list[Exception], total_entries: int, exc_count: int | None = None
96+
) -> str:
97+
"""
98+
Format a message for the exception group
99+
100+
Args:
101+
- excs: the exceptions in the group
102+
- total_entries: the total number of entries attempted, successful or not
103+
- exc_count: the number of exceptions associated with the request
104+
if None, this will be len(excs)
105+
"""
106+
exc_count = exc_count if exc_count is not None else len(excs)
107+
entry_str = "entry" if exc_count == 1 else "entries"
108+
return f"{exc_count} failed {entry_str} from {total_entries} attempted."
109+
110+
def __init__(
111+
self, excs: list[Exception], total_entries: int, message: str | None = None
112+
):
113+
"""
114+
Args:
115+
- excs: the exceptions in the group
116+
- total_entries: the total number of entries attempted, successful or not
117+
- message: the message for the exception group. If None, a default message
118+
will be generated
119+
"""
120+
message = (
121+
message
122+
if message is not None
123+
else self._format_message(excs, total_entries)
124+
)
125+
super().__init__(message, excs)
126+
self.total_entries_attempted = total_entries
95127

96-
def __init__(self, excs: list[FailedMutationEntryError], total_entries: int):
97-
super().__init__(self._format_message(excs, total_entries), excs)
128+
def __new__(
129+
cls, excs: list[Exception], total_entries: int, message: str | None = None
130+
):
131+
"""
132+
Args:
133+
- excs: the exceptions in the group
134+
- total_entries: the total number of entries attempted, successful or not
135+
- message: the message for the exception group. If None, a default message
136+
"""
137+
message = (
138+
message if message is not None else cls._format_message(excs, total_entries)
139+
)
140+
instance = super().__new__(cls, message, excs)
141+
instance.total_entries_attempted = total_entries
142+
return instance
98143

99-
def __new__(cls, excs: list[FailedMutationEntryError], total_entries: int):
100-
return super().__new__(cls, cls._format_message(excs, total_entries), excs)
144+
@classmethod
145+
def from_truncated_lists(
146+
cls,
147+
first_list: list[Exception],
148+
last_list: list[Exception],
149+
total_excs: int,
150+
entry_count: int,
151+
) -> MutationsExceptionGroup:
152+
"""
153+
Create a MutationsExceptionGroup from two lists of exceptions, representing
154+
a larger set that has been truncated. The MutationsExceptionGroup will
155+
contain the union of the two lists as sub-exceptions, and the error message
156+
describe the number of exceptions that were truncated.
157+
158+
Args:
159+
- first_list: the set of oldest exceptions to add to the ExceptionGroup
160+
- last_list: the set of newest exceptions to add to the ExceptionGroup
161+
- total_excs: the total number of exceptions associated with the request
162+
Should be len(first_list) + len(last_list) + number of dropped exceptions
163+
in the middle
164+
- entry_count: the total number of entries attempted, successful or not
165+
"""
166+
first_count, last_count = len(first_list), len(last_list)
167+
if first_count + last_count >= total_excs:
168+
# no exceptions were dropped
169+
return cls(first_list + last_list, entry_count)
170+
excs = first_list + last_list
171+
truncation_count = total_excs - (first_count + last_count)
172+
base_message = cls._format_message(excs, entry_count, total_excs)
173+
first_message = f"first {first_count}" if first_count else ""
174+
last_message = f"last {last_count}" if last_count else ""
175+
conjunction = " and " if first_message and last_message else ""
176+
message = f"{base_message} ({first_message}{conjunction}{last_message} attached as sub-exceptions; {truncation_count} truncated)"
177+
return cls(excs, entry_count, message)
101178

102179

103180
class FailedMutationEntryError(Exception):
@@ -108,14 +185,17 @@ class FailedMutationEntryError(Exception):
108185

109186
def __init__(
110187
self,
111-
failed_idx: int,
188+
failed_idx: int | None,
112189
failed_mutation_entry: "RowMutationEntry",
113190
cause: Exception,
114191
):
115192
idempotent_msg = (
116193
"idempotent" if failed_mutation_entry.is_idempotent() else "non-idempotent"
117194
)
118-
message = f"Failed {idempotent_msg} mutation entry at index {failed_idx} with cause: {cause!r}"
195+
index_msg = f" at index {failed_idx} " if failed_idx is not None else " "
196+
message = (
197+
f"Failed {idempotent_msg} mutation entry{index_msg}with cause: {cause!r}"
198+
)
119199
super().__init__(message)
120200
self.index = failed_idx
121201
self.entry = failed_mutation_entry

google/cloud/bigtable/mutations.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
import time
1818
from dataclasses import dataclass
1919
from abc import ABC, abstractmethod
20+
from sys import getsizeof
21+
22+
# mutation entries above this should be rejected
23+
from google.cloud.bigtable._mutate_rows import MUTATE_ROWS_REQUEST_MUTATION_LIMIT
24+
2025

2126
from google.cloud.bigtable.read_modify_write_rules import MAX_INCREMENT_VALUE
2227

@@ -41,6 +46,12 @@ def is_idempotent(self) -> bool:
4146
def __str__(self) -> str:
4247
return str(self._to_dict())
4348

49+
def size(self) -> int:
50+
"""
51+
Get the size of the mutation in bytes
52+
"""
53+
return getsizeof(self._to_dict())
54+
4455
@classmethod
4556
def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation:
4657
instance: Mutation | None = None
@@ -195,6 +206,12 @@ def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]):
195206
row_key = row_key.encode("utf-8")
196207
if isinstance(mutations, Mutation):
197208
mutations = [mutations]
209+
if len(mutations) == 0:
210+
raise ValueError("mutations must not be empty")
211+
elif len(mutations) > MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
212+
raise ValueError(
213+
f"entries must have <= {MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations"
214+
)
198215
self.row_key = row_key
199216
self.mutations = tuple(mutations)
200217

@@ -208,6 +225,12 @@ def is_idempotent(self) -> bool:
208225
"""Check if the mutation is idempotent"""
209226
return all(mutation.is_idempotent() for mutation in self.mutations)
210227

228+
def size(self) -> int:
229+
"""
230+
Get the size of the mutation in bytes
231+
"""
232+
return getsizeof(self._to_dict())
233+
211234
@classmethod
212235
def _from_dict(cls, input_dict: dict[str, Any]) -> RowMutationEntry:
213236
return RowMutationEntry(

0 commit comments

Comments
 (0)