Skip to content

Commit 3823660

Browse files
tswastlandrito
authored andcommitted
Allow fetching more than the first page when max_results is set. (googleapis#3845)
* BigQuery: reproduce error fetching multiple results with DB-API. Add a system test to call `fetchall()` when multiple rows are expected. * BigQuery: system test to reproduce error of only fetching first page. This error applies to all BigQuery iterators, not just DB-API. * BigQuery: allow arraysize to be set after execute() It was allowed before, but it didn't result in the correct behavior. * max_results in BigQuery API had a different meaning from HTTPIterator. In BigQuery it means the page size, but the HTTPIterator it meant "don't fetch any more pages once you have these many rows." * Fix lint errors
1 parent 1b55c2b commit 3823660

File tree

6 files changed

+54
-28
lines changed

6 files changed

+54
-28
lines changed

bigquery/google/cloud/bigquery/dbapi/cursor.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ def __init__(self, connection):
5252
# a single row at a time.
5353
self.arraysize = 1
5454
self._query_data = None
55-
self._page_token = None
56-
self._has_fetched_all_rows = True
55+
self._query_results = None
5756

5857
def close(self):
5958
"""No-op."""
@@ -133,9 +132,8 @@ def execute(self, operation, parameters=None, job_id=None):
133132
:param job_id: (Optional) The job_id to use. If not set, a job ID
134133
is generated at random.
135134
"""
135+
self._query_data = None
136136
self._query_results = None
137-
self._page_token = None
138-
self._has_fetched_all_rows = False
139137
client = self.connection._client
140138
if job_id is None:
141139
job_id = str(uuid.uuid4())
@@ -161,8 +159,7 @@ def execute(self, operation, parameters=None, job_id=None):
161159
raise exceptions.DatabaseError(query_job.errors)
162160

163161
query_results = query_job.query_results()
164-
self._query_data = iter(
165-
query_results.fetch_data(max_results=self.arraysize))
162+
self._query_results = query_results
166163
self._set_rowcount(query_results)
167164
self._set_description(query_results.schema)
168165

@@ -178,6 +175,22 @@ def executemany(self, operation, seq_of_parameters):
178175
for parameters in seq_of_parameters:
179176
self.execute(operation, parameters)
180177

178+
def _try_fetch(self, size=None):
179+
"""Try to start fetching data, if not yet started.
180+
181+
Mutates self to indicate that iteration has started.
182+
"""
183+
if self._query_results is None:
184+
raise exceptions.InterfaceError(
185+
'No query results: execute() must be called before fetch.')
186+
187+
if size is None:
188+
size = self.arraysize
189+
190+
if self._query_data is None:
191+
self._query_data = iter(
192+
self._query_results.fetch_data(max_results=size))
193+
181194
def fetchone(self):
182195
"""Fetch a single row from the results of the last ``execute*()`` call.
183196
@@ -188,10 +201,7 @@ def fetchone(self):
188201
:raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError`
189202
if called before ``execute()``.
190203
"""
191-
if self._query_data is None:
192-
raise exceptions.InterfaceError(
193-
'No query results: execute() must be called before fetch.')
194-
204+
self._try_fetch()
195205
try:
196206
return six.next(self._query_data)
197207
except StopIteration:
@@ -215,17 +225,17 @@ def fetchmany(self, size=None):
215225
:raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError`
216226
if called before ``execute()``.
217227
"""
218-
if self._query_data is None:
219-
raise exceptions.InterfaceError(
220-
'No query results: execute() must be called before fetch.')
221228
if size is None:
222229
size = self.arraysize
223230

231+
self._try_fetch(size=size)
224232
rows = []
233+
225234
for row in self._query_data:
226235
rows.append(row)
227236
if len(rows) >= size:
228237
break
238+
229239
return rows
230240

231241
def fetchall(self):
@@ -236,9 +246,7 @@ def fetchall(self):
236246
:raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError`
237247
if called before ``execute()``.
238248
"""
239-
if self._query_data is None:
240-
raise exceptions.InterfaceError(
241-
'No query results: execute() must be called before fetch.')
249+
self._try_fetch()
242250
return [row for row in self._query_data]
243251

244252
def setinputsizes(self, sizes):

bigquery/google/cloud/bigquery/query.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,9 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
440440
if timeout_ms is not None:
441441
params['timeoutMs'] = timeout_ms
442442

443+
if max_results is not None:
444+
params['maxResults'] = max_results
445+
443446
path = '/projects/%s/queries/%s' % (self.project, self.name)
444447
iterator = page_iterator.HTTPIterator(
445448
client=client,
@@ -448,12 +451,10 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
448451
item_to_value=_item_to_row,
449452
items_key='rows',
450453
page_token=page_token,
451-
max_results=max_results,
452454
page_start=_rows_page_start_query,
455+
next_token='pageToken',
453456
extra_params=params)
454457
iterator.query_result = self
455-
# Over-ride the key used to retrieve the next page token.
456-
iterator._NEXT_TOKEN = 'pageToken'
457458
return iterator
458459

459460

bigquery/google/cloud/bigquery/table.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,11 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
722722
if len(self._schema) == 0:
723723
raise ValueError(_TABLE_HAS_NO_SCHEMA)
724724

725+
params = {}
726+
727+
if max_results is not None:
728+
params['maxResults'] = max_results
729+
725730
client = self._require_client(client)
726731
path = '%s/data' % (self.path,)
727732
iterator = page_iterator.HTTPIterator(
@@ -731,11 +736,10 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
731736
item_to_value=_item_to_row,
732737
items_key='rows',
733738
page_token=page_token,
734-
max_results=max_results,
735-
page_start=_rows_page_start)
739+
page_start=_rows_page_start,
740+
next_token='pageToken',
741+
extra_params=params)
736742
iterator.schema = self._schema
737-
# Over-ride the key used to retrieve the next page token.
738-
iterator._NEXT_TOKEN = 'pageToken'
739743
return iterator
740744

741745
def row_from_mapping(self, mapping):

bigquery/tests/system.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,16 @@ def test_dbapi_w_standard_sql_types(self):
747747
row = Config.CURSOR.fetchone()
748748
self.assertIsNone(row)
749749

750+
def test_dbapi_fetchall(self):
751+
query = 'SELECT * FROM UNNEST([(1, 2), (3, 4), (5, 6)])'
752+
753+
for arraysize in range(1, 5):
754+
Config.CURSOR.execute(query)
755+
self.assertEqual(Config.CURSOR.rowcount, 3, "expected 3 rows")
756+
Config.CURSOR.arraysize = arraysize
757+
rows = Config.CURSOR.fetchall()
758+
self.assertEqual(rows, [(1, 2), (3, 4), (5, 6)])
759+
750760
def _load_table_for_dml(self, rows, dataset_name, table_name):
751761
from google.cloud._testing import _NamedTemporaryFile
752762

@@ -1084,7 +1094,7 @@ def test_large_query_w_public_data(self):
10841094
query.use_legacy_sql = False
10851095
query.run()
10861096

1087-
iterator = query.fetch_data()
1097+
iterator = query.fetch_data(max_results=100)
10881098
rows = list(iterator)
10891099
self.assertEqual(len(rows), LIMIT)
10901100

bigquery/tests/unit/test_dbapi_cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ def test_fetchmany_w_arraysize(self):
141141
(7, 8, 9),
142142
]))
143143
cursor = connection.cursor()
144-
cursor.arraysize = 2
145144
cursor.execute('SELECT a, b, c;')
145+
cursor.arraysize = 2
146146
rows = cursor.fetchmany()
147147
self.assertEqual(len(rows), 2)
148148
self.assertEqual(rows[0], (1, 2, 3))

core/google/api/core/page_iterator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ class HTTPIterator(Iterator):
275275
signature takes the :class:`Iterator` that started the page,
276276
the :class:`Page` that was started and the dictionary containing
277277
the page response.
278+
next_token (str): The name of the field used in the response for page
279+
tokens.
278280
279281
.. autoattribute:: pages
280282
"""
@@ -283,13 +285,13 @@ class HTTPIterator(Iterator):
283285
_PAGE_TOKEN = 'pageToken'
284286
_MAX_RESULTS = 'maxResults'
285287
_NEXT_TOKEN = 'nextPageToken'
286-
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS])
288+
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN])
287289
_HTTP_METHOD = 'GET'
288290

289291
def __init__(self, client, api_request, path, item_to_value,
290292
items_key=_DEFAULT_ITEMS_KEY,
291293
page_token=None, max_results=None, extra_params=None,
292-
page_start=_do_nothing_page_start):
294+
page_start=_do_nothing_page_start, next_token=_NEXT_TOKEN):
293295
super(HTTPIterator, self).__init__(
294296
client, item_to_value, page_token=page_token,
295297
max_results=max_results)
@@ -298,6 +300,7 @@ def __init__(self, client, api_request, path, item_to_value,
298300
self._items_key = items_key
299301
self.extra_params = extra_params
300302
self._page_start = page_start
303+
self._next_token = next_token
301304
# Verify inputs / provide defaults.
302305
if self.extra_params is None:
303306
self.extra_params = {}
@@ -327,7 +330,7 @@ def _next_page(self):
327330
items = response.get(self._items_key, ())
328331
page = Page(self, items, self._item_to_value)
329332
self._page_start(self, page, response)
330-
self.next_page_token = response.get(self._NEXT_TOKEN)
333+
self.next_page_token = response.get(self._next_token)
331334
return page
332335
else:
333336
return None

0 commit comments

Comments
 (0)