Skip to content

Commit 3e627ce

Browse files
committed
Raise errors from mutations.
1 parent 25c6265 commit 3e627ce

File tree

1 file changed

+63
-7
lines changed

1 file changed

+63
-7
lines changed

google/cloud/bigtable/batcher.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,22 @@
1818
import concurrent.futures
1919
import atexit
2020

21+
from google.cloud.bigtable.error import Status
22+
2123
FLUSH_COUNT = 100
2224
MAX_MUTATIONS = 100000
2325
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB
24-
MAX_BATCH_SIZE = 100 * 1024 * 1024
26+
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB
27+
28+
29+
class MutationsBatchError(Exception):
30+
"""Error in the batch request"""
31+
32+
def __init__(self, status_codes):
33+
self.status_codes = status_codes
34+
self.message = "Errors in batch mutations."
35+
super().__init__(self.message)
36+
2537

2638
class MaxMutationsError(ValueError):
2739
"""The number of mutations for bulk request is too big."""
@@ -53,6 +65,7 @@ def put(self, item, block=True, timeout=None):
5365
"""Insert an item to the queue. Recalculate queue size."""
5466

5567
mutation_count = len(item._get_mutations())
68+
mutation_size = item.get_mutations_size()
5669

5770
if mutation_count > MAX_MUTATIONS:
5871
raise MaxMutationsError(
@@ -61,6 +74,12 @@ def put(self, item, block=True, timeout=None):
6174
)
6275
)
6376

77+
if mutation_size > MAX_MUTATIONS_SIZE:
78+
raise MaxMutationsError(
79+
"The row key {} exceeds the size of mutations {}.".format(
80+
item.row_key, mutation_size
81+
)
82+
)
6483

6584
self._queue.put(item, block=block, timeout=timeout)
6685

@@ -188,6 +207,8 @@ def mutate(self, row):
188207
match the number of rows that were retried
189208
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
190209
mutations count.
210+
* :exc:`.batcherMutationsBatchError` if there's any error in the
211+
mutations.
191212
"""
192213
self._rows.put(row)
193214

@@ -216,6 +237,8 @@ def mutate_rows(self, rows):
216237
match the number of rows that were retried
217238
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
218239
mutations count.
240+
* :exc:`.batcherMutationsBatchError` if there's any error in the
241+
mutations.
219242
"""
220243
for row in rows:
221244
self.mutate(row)
@@ -229,31 +252,64 @@ def flush(self):
229252
:end-before: [END bigtable_api_batcher_flush]
230253
:dedent: 4
231254
255+
raises:
256+
* :exc:`.batcherMutationsBatchError` if there's any error in the
257+
mutations.
232258
"""
233259
rows_to_flush = []
234260
while not self._rows.empty():
235261
rows_to_flush.append(self._rows.get())
236-
self.flush_rows(rows_to_flush)
262+
response = self.flush_rows(rows_to_flush)
263+
return response
237264

238265
def flush_async(self):
239-
"""Sends the current batch to Cloud Bigtable asynchronously."""
266+
"""Sends the current batch to Cloud Bigtable asynchronously.
267+
268+
raises:
269+
* :exc:`.batcherMutationsBatchError` if there's any error in the
270+
mutations.
271+
"""
240272

241273
rows_to_flush = []
242274
while not self._rows.empty():
243275
rows_to_flush.append(self._rows.get())
244-
self._executor.submit(self.flush_rows, rows_to_flush)
276+
future = self._executor.submit(self.flush_rows, rows_to_flush)
277+
# catch the exceptions in the mutation
278+
exc = future.exception()
279+
if exc:
280+
raise exc
281+
else:
282+
result = future.result()
283+
return result
245284

246285
def flush_rows(self, rows_to_flush=None):
247-
"""Mutate the specified rows."""
286+
"""Mutate the specified rows.
287+
288+
raises:
289+
* :exc:`.batcherMutationsBatchError` if there's any error in the
290+
mutations.
291+
"""
292+
response = []
248293
if len(rows_to_flush) > 0:
249-
self.table.mutate_rows(rows_to_flush)
294+
# returns a list of error codes
295+
response = self.table.mutate_rows(rows_to_flush)
296+
if any(isinstance(result, Status) for result in response):
297+
raise MutationsBatchError(status_codes=response)
298+
return response
250299

251300
def __exit__(self, exc_type, exc_value, exc_traceback):
252301
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
253302
self.close()
254303

255304
def close(self):
256-
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
305+
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor.
306+
Any errors will be raised.
307+
308+
raises:
309+
* :exc:`.batcherMutationsBatchError` if there's any error in the
310+
mutations.
311+
312+
"""
257313
self._is_open = False
258314
self.flush()
259315
self._executor.shutdown(wait=True)

0 commit comments

Comments
 (0)