1717
1818import six
1919
20+ from google .cloud ._helpers import _to_bytes
2021from google .cloud .bigtable ._generated import (
2122 bigtable_pb2 as data_messages_v2_pb2 )
2223from google .cloud .bigtable ._generated import (
2930from google .cloud .bigtable .row import ConditionalRow
3031from google .cloud .bigtable .row import DirectRow
3132from google .cloud .bigtable .row_data import PartialRowsData
32- from google .gax import RetryOptions , BackoffSettings
33- from google .cloud .bigtable .retry import ReadRowsIterator , _create_row_request
34- from grpc import StatusCode
35-
36- BACKOFF_SETTINGS = BackoffSettings (
37- initial_retry_delay_millis = 10 ,
38- retry_delay_multiplier = 1.3 ,
39- max_retry_delay_millis = 30000 ,
40- initial_rpc_timeout_millis = 25 * 60 * 1000 ,
41- rpc_timeout_multiplier = 1.0 ,
42- max_rpc_timeout_millis = 25 * 60 * 1000 ,
43- total_timeout_millis = 30 * 60 * 1000
44- )
45-
46- RETRY_CODES = [
47- StatusCode .DEADLINE_EXCEEDED ,
48- StatusCode .ABORTED ,
49- StatusCode .INTERNAL ,
50- StatusCode .UNAVAILABLE
51- ]
5233
5334
5435# Maximum number of mutations in bulk (MutateRowsRequest message):
@@ -276,7 +257,7 @@ def read_row(self, row_key, filter_=None):
276257 return rows_data .rows [row_key ]
277258
278259 def read_rows (self , start_key = None , end_key = None , limit = None ,
279- filter_ = None , backoff_settings = None ):
260+ filter_ = None ):
280261 """Read rows from this table.
281262
282263 :type start_key: bytes
@@ -303,18 +284,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
303284 :returns: A :class:`.PartialRowsData` convenience wrapper for consuming
304285 the streamed results.
305286 """
287+ request_pb = _create_row_request (
288+ self .name , start_key = start_key , end_key = end_key , filter_ = filter_ ,
289+ limit = limit )
306290 client = self ._instance ._client
307- if backoff_settings is None :
308- backoff_settings = BACKOFF_SETTINGS
309- RETRY_OPTIONS = RetryOptions (
310- retry_codes = RETRY_CODES ,
311- backoff_settings = backoff_settings
312- )
313-
314- retrying_iterator = ReadRowsIterator (client , self .name , start_key ,
315- end_key , filter_ , limit ,
316- RETRY_OPTIONS )
317- return PartialRowsData (retrying_iterator )
291+ response_iterator = client ._data_stub .ReadRows (request_pb )
292+ # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
293+ return PartialRowsData (response_iterator )
318294
319295 def mutate_rows (self , rows ):
320296 """Mutates multiple rows in bulk.
@@ -383,6 +359,67 @@ def sample_row_keys(self):
383359 return response_iterator
384360
385361
362+ def _create_row_request (table_name , row_key = None , start_key = None , end_key = None ,
363+ filter_ = None , limit = None ):
364+ """Creates a request to read rows in a table.
365+
366+ :type table_name: str
367+ :param table_name: The name of the table to read from.
368+
369+ :type row_key: bytes
370+ :param row_key: (Optional) The key of a specific row to read from.
371+
372+ :type start_key: bytes
373+ :param start_key: (Optional) The beginning of a range of row keys to
374+ read from. The range will include ``start_key``. If
375+ left empty, will be interpreted as the empty string.
376+
377+ :type end_key: bytes
378+ :param end_key: (Optional) The end of a range of row keys to read from.
379+ The range will not include ``end_key``. If left empty,
380+ will be interpreted as an infinite string.
381+
382+ :type filter_: :class:`.RowFilter`
383+ :param filter_: (Optional) The filter to apply to the contents of the
384+ specified row(s). If unset, reads the entire table.
385+
386+ :type limit: int
387+ :param limit: (Optional) The read will terminate after committing to N
388+ rows' worth of results. The default (zero) is to return
389+ all results.
390+
391+ :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
392+ :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
393+ :raises: :class:`ValueError <exceptions.ValueError>` if both
394+ ``row_key`` and one of ``start_key`` and ``end_key`` are set
395+ """
396+ request_kwargs = {'table_name' : table_name }
397+ if (row_key is not None and
398+ (start_key is not None or end_key is not None )):
399+ raise ValueError ('Row key and row range cannot be '
400+ 'set simultaneously' )
401+ range_kwargs = {}
402+ if start_key is not None or end_key is not None :
403+ if start_key is not None :
404+ range_kwargs ['start_key_closed' ] = _to_bytes (start_key )
405+ if end_key is not None :
406+ range_kwargs ['end_key_open' ] = _to_bytes (end_key )
407+ if filter_ is not None :
408+ request_kwargs ['filter' ] = filter_ .to_pb ()
409+ if limit is not None :
410+ request_kwargs ['rows_limit' ] = limit
411+
412+ message = data_messages_v2_pb2 .ReadRowsRequest (** request_kwargs )
413+
414+ if row_key is not None :
415+ message .rows .row_keys .append (_to_bytes (row_key ))
416+
417+ if range_kwargs :
418+ message .rows .row_ranges .add (** range_kwargs )
419+
420+ return message
421+
422+
386423def _mutate_rows_request (table_name , rows ):
387424 """Creates a request to mutate rows in a table.
388425
0 commit comments