Skip to content

Commit 4954cd7

Browse files
tseaverlandrito
authored andcommitted
Make 'QueryResponse.fetch_data' return an iterator. (googleapis#3484)
Add a system test which exercises it. Update snippets to match the new usage. Closes googleapis#2840.
1 parent e99fc55 commit 4954cd7

File tree

6 files changed

+121
-82
lines changed

6 files changed

+121
-82
lines changed

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,3 +678,44 @@ def __set__(self, instance, value):
678678
raise ValueError(
679679
"query parameters must be derived from AbstractQueryParameter")
680680
instance._query_parameters = tuple(value)
681+
682+
683+
def _item_to_row(iterator, resource):
684+
"""Convert a JSON row to the native object.
685+
686+
.. note::
687+
688+
This assumes that the ``schema`` attribute has been
689+
added to the iterator after being created, which
690+
should be done by the caller.
691+
692+
:type iterator: :class:`~google.cloud.iterator.Iterator`
693+
:param iterator: The iterator that is currently in use.
694+
695+
:type resource: dict
696+
:param resource: An item to be converted to a row.
697+
698+
:rtype: tuple
699+
:returns: The next row in the page.
700+
"""
701+
return _row_from_json(resource, iterator.schema)
702+
703+
704+
# pylint: disable=unused-argument
705+
def _rows_page_start(iterator, page, response):
706+
"""Grab total rows when :class:`~google.cloud.iterator.Page` starts.
707+
708+
:type iterator: :class:`~google.cloud.iterator.Iterator`
709+
:param iterator: The iterator that is currently in use.
710+
711+
:type page: :class:`~google.cloud.iterator.Page`
712+
:param page: The page that was just created.
713+
714+
:type response: dict
715+
:param response: The JSON API response for a page of rows in a table.
716+
"""
717+
total_rows = response.get('totalRows')
718+
if total_rows is not None:
719+
total_rows = int(total_rows)
720+
iterator.total_rows = total_rows
721+
# pylint: enable=unused-argument

bigquery/google/cloud/bigquery/query.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
import six
1818

19+
from google.cloud.iterator import HTTPIterator
1920
from google.cloud.bigquery._helpers import _TypedProperty
2021
from google.cloud.bigquery._helpers import _rows_from_json
2122
from google.cloud.bigquery.dataset import Dataset
2223
from google.cloud.bigquery.job import QueryJob
2324
from google.cloud.bigquery.table import _parse_schema_resource
2425
from google.cloud.bigquery._helpers import QueryParametersProperty
2526
from google.cloud.bigquery._helpers import UDFResourcesProperty
27+
from google.cloud.bigquery._helpers import _item_to_row
28+
from google.cloud.bigquery._helpers import _rows_page_start
2629

2730

2831
class _SyncQueryConfiguration(object):
@@ -426,28 +429,44 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
426429
client = self._require_client(client)
427430
params = {}
428431

429-
if max_results is not None:
430-
params['maxResults'] = max_results
431-
432-
if page_token is not None:
433-
params['pageToken'] = page_token
434-
435432
if start_index is not None:
436433
params['startIndex'] = start_index
437434

438435
if timeout_ms is not None:
439436
params['timeoutMs'] = timeout_ms
440437

441438
path = '/projects/%s/queries/%s' % (self.project, self.name)
442-
response = client._connection.api_request(method='GET',
443-
path=path,
444-
query_params=params)
445-
self._set_properties(response)
439+
iterator = HTTPIterator(client=client, path=path,
440+
item_to_value=_item_to_row,
441+
items_key='rows',
442+
page_token=page_token,
443+
max_results=max_results,
444+
page_start=_rows_page_start_query,
445+
extra_params=params)
446+
iterator.query_result = self
447+
# Over-ride the key used to retrieve the next page token.
448+
iterator._NEXT_TOKEN = 'pageToken'
449+
return iterator
446450

447-
total_rows = response.get('totalRows')
448-
if total_rows is not None:
449-
total_rows = int(total_rows)
450-
page_token = response.get('pageToken')
451-
rows_data = _rows_from_json(response.get('rows', ()), self.schema)
452451

453-
return rows_data, total_rows, page_token
452+
def _rows_page_start_query(iterator, page, response):
453+
"""Update query response when :class:`~google.cloud.iterator.Page` starts.
454+
455+
.. note::
456+
457+
This assumes that the ``query_response`` attribute has been
458+
added to the iterator after being created, which
459+
should be done by the caller.
460+
461+
:type iterator: :class:`~google.cloud.iterator.Iterator`
462+
:param iterator: The iterator that is currently in use.
463+
464+
:type page: :class:`~google.cloud.iterator.Page`
465+
:param page: The page that was just created.
466+
467+
:type response: dict
468+
:param response: The JSON API response for a page of rows in a table.
469+
"""
470+
iterator.query_result._set_properties(response)
471+
iterator.schema = iterator.query_result.schema
472+
_rows_page_start(iterator, page, response)

bigquery/google/cloud/bigquery/table.py

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
3333
from google.cloud.streaming.transfer import Upload
3434
from google.cloud.bigquery.schema import SchemaField
35-
from google.cloud.bigquery._helpers import _row_from_json
35+
from google.cloud.bigquery._helpers import _item_to_row
36+
from google.cloud.bigquery._helpers import _rows_page_start
3637
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
3738

3839

@@ -1076,47 +1077,6 @@ def _build_schema_resource(fields):
10761077
return infos
10771078

10781079

1079-
def _item_to_row(iterator, resource):
1080-
"""Convert a JSON row to the native object.
1081-
1082-
.. note::
1083-
1084-
This assumes that the ``schema`` attribute has been
1085-
added to the iterator after being created, which
1086-
should be done by the caller.
1087-
1088-
:type iterator: :class:`~google.cloud.iterator.Iterator`
1089-
:param iterator: The iterator that is currently in use.
1090-
1091-
:type resource: dict
1092-
:param resource: An item to be converted to a row.
1093-
1094-
:rtype: tuple
1095-
:returns: The next row in the page.
1096-
"""
1097-
return _row_from_json(resource, iterator.schema)
1098-
1099-
1100-
# pylint: disable=unused-argument
1101-
def _rows_page_start(iterator, page, response):
1102-
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.
1103-
1104-
:type iterator: :class:`~google.cloud.iterator.Iterator`
1105-
:param iterator: The iterator that is currently in use.
1106-
1107-
:type page: :class:`~google.cloud.iterator.Page`
1108-
:param page: The page that was just created.
1109-
1110-
:type response: dict
1111-
:param response: The JSON API response for a page of rows in a table.
1112-
"""
1113-
total_rows = response.get('totalRows')
1114-
if total_rows is not None:
1115-
total_rows = int(total_rows)
1116-
iterator.total_rows = total_rows
1117-
# pylint: enable=unused-argument
1118-
1119-
11201080
class _UploadConfig(object):
11211081
"""Faux message FBO apitools' 'configure_request'."""
11221082
accept = ['*/*']

bigquery/tests/system.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,23 @@ def test_dump_table_w_public_data(self):
830830
table.reload()
831831
self._fetch_single_page(table)
832832

833+
def test_large_query_w_public_data(self):
834+
PUBLIC = 'bigquery-public-data'
835+
DATASET_NAME = 'samples'
836+
TABLE_NAME = 'natality'
837+
LIMIT = 1000
838+
SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format(
839+
PUBLIC, DATASET_NAME, TABLE_NAME, LIMIT)
840+
841+
dataset = Config.CLIENT.dataset(DATASET_NAME, project=PUBLIC)
842+
query = Config.CLIENT.run_sync_query(SQL)
843+
query.use_legacy_sql = False
844+
query.run()
845+
846+
iterator = query.fetch_data()
847+
rows = list(iterator)
848+
self.assertEqual(len(rows), LIMIT)
849+
833850
def test_insert_nested_nested(self):
834851
# See #2951
835852
SF = bigquery.SchemaField

bigquery/tests/unit/test_query.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,8 @@ def test_fetch_data_query_not_yet_run(self):
654654
self.assertRaises(ValueError, query.fetch_data)
655655

656656
def test_fetch_data_w_bound_client(self):
657+
import six
658+
657659
PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME)
658660
BEFORE = self._makeResource(complete=False)
659661
AFTER = self._makeResource(complete=True)
@@ -665,7 +667,11 @@ def test_fetch_data_w_bound_client(self):
665667
query._set_properties(BEFORE)
666668
self.assertFalse(query.complete)
667669

668-
rows, total_rows, page_token = query.fetch_data()
670+
iterator = query.fetch_data()
671+
page = six.next(iterator.pages)
672+
rows = list(page)
673+
total_rows = iterator.total_rows
674+
page_token = iterator.next_page_token
669675

670676
self.assertTrue(query.complete)
671677
self.assertEqual(len(rows), 4)
@@ -682,6 +688,8 @@ def test_fetch_data_w_bound_client(self):
682688
self.assertEqual(req['path'], '/%s' % PATH)
683689

684690
def test_fetch_data_w_alternate_client(self):
691+
import six
692+
685693
PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME)
686694
MAX = 10
687695
TOKEN = 'TOKEN'
@@ -698,9 +706,13 @@ def test_fetch_data_w_alternate_client(self):
698706
query._set_properties(BEFORE)
699707
self.assertFalse(query.complete)
700708

701-
rows, total_rows, page_token = query.fetch_data(
709+
iterator = query.fetch_data(
702710
client=client2, max_results=MAX, page_token=TOKEN,
703711
start_index=START, timeout_ms=TIMEOUT)
712+
page = six.next(iterator.pages)
713+
rows = list(page)
714+
total_rows = iterator.total_rows
715+
page_token = iterator.next_page_token
704716

705717
self.assertTrue(query.complete)
706718
self.assertEqual(len(rows), 4)

docs/bigquery/snippets.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,8 @@ def client_run_sync_query_paged(client, _):
520520

521521
all_rows = []
522522

523-
def do_something_with(rows):
524-
all_rows.extend(rows)
523+
def do_something_with(row):
524+
all_rows.append(row)
525525

526526
# [START client_run_sync_query_paged]
527527
query = client.run_sync_query(LIMITED)
@@ -534,18 +534,12 @@ def do_something_with(rows):
534534
assert len(query.rows) == PAGE_SIZE
535535
assert [field.name for field in query.schema] == ['name']
536536

537-
rows = query.rows
538-
token = query.page_token
539-
540-
while True:
541-
do_something_with(rows)
542-
if token is None:
543-
break
544-
rows, total_count, token = query.fetch_data(
545-
page_token=token) # API request
537+
iterator = query.fetch_data() # API request(s) during iteration
538+
for row in iterator:
539+
do_something_with(row)
546540
# [END client_run_sync_query_paged]
547541

548-
assert total_count == LIMIT
542+
assert iterator.total_rows == LIMIT
549543
assert len(all_rows) == LIMIT
550544

551545

@@ -556,8 +550,8 @@ def client_run_sync_query_timeout(client, _):
556550

557551
all_rows = []
558552

559-
def do_something_with(rows):
560-
all_rows.extend(rows)
553+
def do_something_with(row):
554+
all_rows.append(row)
561555

562556
# [START client_run_sync_query_timeout]
563557
query = client.run_sync_query(QUERY)
@@ -578,16 +572,12 @@ def do_something_with(rows):
578572

579573
assert job.state == u'DONE'
580574

581-
rows, total_count, token = query.fetch_data() # API request
582-
while True:
583-
do_something_with(rows)
584-
if token is None:
585-
break
586-
rows, total_count, token = query.fetch_data(
587-
page_token=token) # API request
575+
iterator = query.fetch_data() # API request(s) during iteration
576+
for row in iterator:
577+
do_something_with(row)
588578
# [END client_run_sync_query_timeout]
589579

590-
assert len(all_rows) == total_count
580+
assert len(all_rows) == iterator.total_rows
591581

592582

593583
def _find_examples():

0 commit comments

Comments
 (0)