Skip to content

Commit 208d97c

Browse files
calpeyserlukesneeringer
authored andcommitted
RPC retries (second PR) (#3324)
1 parent cb670ec commit 208d97c

File tree

7 files changed

+520
-81
lines changed

7 files changed

+520
-81
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""Provides function wrappers that implement retrying."""
2+
import random
3+
import time
4+
import six
5+
import sys
6+
7+
from google.cloud._helpers import _to_bytes
8+
from google.cloud.bigtable._generated import (
9+
bigtable_pb2 as data_messages_v2_pb2)
10+
from google.gax import config, errors
11+
from grpc import RpcError
12+
13+
14+
_MILLIS_PER_SECOND = 1000
15+
16+
17+
class ReadRowsIterator(object):
18+
"""Creates an iterator equivalent to a_iter, but that retries on certain
19+
exceptions.
20+
"""
21+
22+
def __init__(self, client, name, start_key, end_key, filter_, limit,
23+
retry_options, **kwargs):
24+
self.client = client
25+
self.retry_options = retry_options
26+
self.name = name
27+
self.start_key = start_key
28+
self.start_key_closed = True
29+
self.end_key = end_key
30+
self.filter_ = filter_
31+
self.limit = limit
32+
self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier
33+
self.max_delay_millis = \
34+
retry_options.backoff_settings.max_retry_delay_millis
35+
self.timeout_mult = \
36+
retry_options.backoff_settings.rpc_timeout_multiplier
37+
self.max_timeout = \
38+
(retry_options.backoff_settings.max_rpc_timeout_millis /
39+
_MILLIS_PER_SECOND)
40+
self.total_timeout = \
41+
(retry_options.backoff_settings.total_timeout_millis /
42+
_MILLIS_PER_SECOND)
43+
self.set_stream()
44+
45+
def set_start_key(self, start_key):
46+
"""
47+
Sets the row key at which this iterator will begin reading.
48+
"""
49+
self.start_key = start_key
50+
self.start_key_closed = False
51+
52+
def set_stream(self):
53+
"""
54+
Resets the read stream by making an RPC on the 'ReadRows' endpoint.
55+
"""
56+
req_pb = _create_row_request(self.name, start_key=self.start_key,
57+
start_key_closed=self.start_key_closed,
58+
end_key=self.end_key,
59+
filter_=self.filter_, limit=self.limit)
60+
self.stream = self.client._data_stub.ReadRows(req_pb)
61+
62+
def next(self, *args, **kwargs):
63+
"""
64+
Read and return the next row from the stream.
65+
Retry on idempotent failure.
66+
"""
67+
delay = self.retry_options.backoff_settings.initial_retry_delay_millis
68+
exc = errors.RetryError('Retry total timeout exceeded before any'
69+
'response was received')
70+
timeout = (self.retry_options.backoff_settings
71+
.initial_rpc_timeout_millis /
72+
_MILLIS_PER_SECOND)
73+
74+
now = time.time()
75+
deadline = now + self.total_timeout
76+
while deadline is None or now < deadline:
77+
try:
78+
return six.next(self.stream)
79+
except StopIteration as stop:
80+
raise stop
81+
except RpcError as error: # pylint: disable=broad-except
82+
code = config.exc_to_code(error)
83+
if code not in self.retry_options.retry_codes:
84+
six.reraise(type(error), error)
85+
86+
# pylint: disable=redefined-variable-type
87+
exc = errors.RetryError(
88+
'Retry total timeout exceeded with exception', error)
89+
90+
# Sleep a random number which will, on average, equal the
91+
# expected delay.
92+
to_sleep = random.uniform(0, delay * 2)
93+
time.sleep(to_sleep / _MILLIS_PER_SECOND)
94+
delay = min(delay * self.delay_mult, self.max_delay_millis)
95+
now = time.time()
96+
timeout = min(
97+
timeout * self.timeout_mult, self.max_timeout,
98+
deadline - now)
99+
self.set_stream()
100+
101+
six.reraise(errors.RetryError, exc, sys.exc_info()[2])
102+
103+
def __next__(self, *args, **kwargs):
104+
return self.next(*args, **kwargs)
105+
106+
107+
def _create_row_request(table_name, row_key=None, start_key=None,
108+
start_key_closed=True, end_key=None, filter_=None,
109+
limit=None):
110+
"""Creates a request to read rows in a table.
111+
112+
:type table_name: str
113+
:param table_name: The name of the table to read from.
114+
115+
:type row_key: bytes
116+
:param row_key: (Optional) The key of a specific row to read from.
117+
118+
:type start_key: bytes
119+
:param start_key: (Optional) The beginning of a range of row keys to
120+
read from. The range will include ``start_key``. If
121+
left empty, will be interpreted as the empty string.
122+
123+
:type end_key: bytes
124+
:param end_key: (Optional) The end of a range of row keys to read from.
125+
The range will not include ``end_key``. If left empty,
126+
will be interpreted as an infinite string.
127+
128+
:type filter_: :class:`.RowFilter`
129+
:param filter_: (Optional) The filter to apply to the contents of the
130+
specified row(s). If unset, reads the entire table.
131+
132+
:type limit: int
133+
:param limit: (Optional) The read will terminate after committing to N
134+
rows' worth of results. The default (zero) is to return
135+
all results.
136+
137+
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
138+
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
139+
:raises: :class:`ValueError <exceptions.ValueError>` if both
140+
``row_key`` and one of ``start_key`` and ``end_key`` are set
141+
"""
142+
request_kwargs = {'table_name': table_name}
143+
if (row_key is not None and
144+
(start_key is not None or end_key is not None)):
145+
raise ValueError('Row key and row range cannot be '
146+
'set simultaneously')
147+
range_kwargs = {}
148+
if start_key is not None or end_key is not None:
149+
if start_key is not None:
150+
if start_key_closed:
151+
range_kwargs['start_key_closed'] = _to_bytes(start_key)
152+
else:
153+
range_kwargs['start_key_open'] = _to_bytes(start_key)
154+
if end_key is not None:
155+
range_kwargs['end_key_open'] = _to_bytes(end_key)
156+
if filter_ is not None:
157+
request_kwargs['filter'] = filter_.to_pb()
158+
if limit is not None:
159+
request_kwargs['rows_limit'] = limit
160+
161+
message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)
162+
163+
if row_key is not None:
164+
message.rows.row_keys.append(_to_bytes(row_key))
165+
166+
if range_kwargs:
167+
message.rows.row_ranges.add(**range_kwargs)
168+
169+
return message

packages/google-cloud-bigtable/google/cloud/bigtable/row_data.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ def consume_next(self):
274274

275275
self._validate_chunk(chunk)
276276

277+
if hasattr(self._response_iterator, 'set_start_key'):
278+
self._response_iterator.set_start_key(chunk.row_key)
279+
277280
if chunk.reset_row:
278281
row = self._row = None
279282
cell = self._cell = self._previous_cell = None

packages/google-cloud-bigtable/google/cloud/bigtable/table.py

Lines changed: 32 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import six
1919

20-
from google.cloud._helpers import _to_bytes
2120
from google.cloud.bigtable._generated import (
2221
bigtable_pb2 as data_messages_v2_pb2)
2322
from google.cloud.bigtable._generated import (
@@ -30,6 +29,26 @@
3029
from google.cloud.bigtable.row import ConditionalRow
3130
from google.cloud.bigtable.row import DirectRow
3231
from google.cloud.bigtable.row_data import PartialRowsData
32+
from google.gax import RetryOptions, BackoffSettings
33+
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
34+
from grpc import StatusCode
35+
36+
BACKOFF_SETTINGS = BackoffSettings(
37+
initial_retry_delay_millis=10,
38+
retry_delay_multiplier=1.3,
39+
max_retry_delay_millis=30000,
40+
initial_rpc_timeout_millis=25 * 60 * 1000,
41+
rpc_timeout_multiplier=1.0,
42+
max_rpc_timeout_millis=25 * 60 * 1000,
43+
total_timeout_millis=30 * 60 * 1000
44+
)
45+
46+
RETRY_CODES = [
47+
StatusCode.DEADLINE_EXCEEDED,
48+
StatusCode.ABORTED,
49+
StatusCode.INTERNAL,
50+
StatusCode.UNAVAILABLE
51+
]
3352

3453

3554
# Maximum number of mutations in bulk (MutateRowsRequest message):
@@ -257,7 +276,7 @@ def read_row(self, row_key, filter_=None):
257276
return rows_data.rows[row_key]
258277

259278
def read_rows(self, start_key=None, end_key=None, limit=None,
260-
filter_=None):
279+
filter_=None, backoff_settings=None):
261280
"""Read rows from this table.
262281
263282
:type start_key: bytes
@@ -284,13 +303,18 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
284303
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
285304
the streamed results.
286305
"""
287-
request_pb = _create_row_request(
288-
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
289-
limit=limit)
290306
client = self._instance._client
291-
response_iterator = client._data_stub.ReadRows(request_pb)
292-
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
293-
return PartialRowsData(response_iterator)
307+
if backoff_settings is None:
308+
backoff_settings = BACKOFF_SETTINGS
309+
RETRY_OPTIONS = RetryOptions(
310+
retry_codes=RETRY_CODES,
311+
backoff_settings=backoff_settings
312+
)
313+
314+
retrying_iterator = ReadRowsIterator(client, self.name, start_key,
315+
end_key, filter_, limit,
316+
RETRY_OPTIONS)
317+
return PartialRowsData(retrying_iterator)
294318

295319
def mutate_rows(self, rows):
296320
"""Mutates multiple rows in bulk.
@@ -359,67 +383,6 @@ def sample_row_keys(self):
359383
return response_iterator
360384

361385

362-
def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
363-
filter_=None, limit=None):
364-
"""Creates a request to read rows in a table.
365-
366-
:type table_name: str
367-
:param table_name: The name of the table to read from.
368-
369-
:type row_key: bytes
370-
:param row_key: (Optional) The key of a specific row to read from.
371-
372-
:type start_key: bytes
373-
:param start_key: (Optional) The beginning of a range of row keys to
374-
read from. The range will include ``start_key``. If
375-
left empty, will be interpreted as the empty string.
376-
377-
:type end_key: bytes
378-
:param end_key: (Optional) The end of a range of row keys to read from.
379-
The range will not include ``end_key``. If left empty,
380-
will be interpreted as an infinite string.
381-
382-
:type filter_: :class:`.RowFilter`
383-
:param filter_: (Optional) The filter to apply to the contents of the
384-
specified row(s). If unset, reads the entire table.
385-
386-
:type limit: int
387-
:param limit: (Optional) The read will terminate after committing to N
388-
rows' worth of results. The default (zero) is to return
389-
all results.
390-
391-
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
392-
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
393-
:raises: :class:`ValueError <exceptions.ValueError>` if both
394-
``row_key`` and one of ``start_key`` and ``end_key`` are set
395-
"""
396-
request_kwargs = {'table_name': table_name}
397-
if (row_key is not None and
398-
(start_key is not None or end_key is not None)):
399-
raise ValueError('Row key and row range cannot be '
400-
'set simultaneously')
401-
range_kwargs = {}
402-
if start_key is not None or end_key is not None:
403-
if start_key is not None:
404-
range_kwargs['start_key_closed'] = _to_bytes(start_key)
405-
if end_key is not None:
406-
range_kwargs['end_key_open'] = _to_bytes(end_key)
407-
if filter_ is not None:
408-
request_kwargs['filter'] = filter_.to_pb()
409-
if limit is not None:
410-
request_kwargs['rows_limit'] = limit
411-
412-
message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)
413-
414-
if row_key is not None:
415-
message.rows.row_keys.append(_to_bytes(row_key))
416-
417-
if range_kwargs:
418-
message.rows.row_ranges.add(**range_kwargs)
419-
420-
return message
421-
422-
423386
def _mutate_rows_request(table_name, rows):
424387
"""Creates a request to mutate rows in a table.
425388
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# This retry script is processed by the retry server and the client under test.
2+
# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC
3+
# to the retry server and expect a valid response.
4+
# "EXPECT" commands indicate the call the server is expecting the client to send.
5+
#
6+
# The retry server has one table named "table" that should be used for testing.
7+
# There are three types of commands supported:
8+
# READ <comma-separated list of row ids to read>
9+
# Expect the corresponding rows to be returned with arbitrary values.
10+
# SCAN <range>... <comma separated list of row ids to expect>
11+
# Ranges are expressed as an interval with either open or closed start and end,
12+
# such as [1,3) for "1,2" or (1, 3] for "2,3".
13+
# WRITE <comma-separated list of row ids to write>
14+
# All writes should succeed eventually. Value payload is ignored.
15+
# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test.
16+
# All other server output should be ignored.
17+
18+
# Echo same scan back after immediate error
19+
CLIENT: SCAN [r1,r3) r1,r2
20+
EXPECT: SCAN [r1,r3)
21+
SERVER: ERROR Unavailable
22+
EXPECT: SCAN [r1,r3)
23+
SERVER: READ_RESPONSE r1,r2
24+
25+
# Retry scans with open interval starting at the least read row key.
26+
# Instead of using open intervals for retry ranges, '\x00' can be
27+
# appended to the last received row key and sent in a closed interval.
28+
CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8
29+
EXPECT: SCAN [r1,r9)
30+
SERVER: READ_RESPONSE r1,r2,r3,r4
31+
SERVER: ERROR Unavailable
32+
EXPECT: SCAN (r4,r9)
33+
SERVER: ERROR Unavailable
34+
EXPECT: SCAN (r4,r9)
35+
SERVER: READ_RESPONSE r5,r6,r7
36+
SERVER: ERROR Unavailable
37+
EXPECT: SCAN (r7,r9)
38+
SERVER: READ_RESPONSE r8

0 commit comments

Comments
 (0)