Skip to content

Commit cd44040

Browse files
committed
feat: add support for BQ storage client to DB API
1 parent 018cea1 commit cd44040

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

google/cloud/bigquery/dbapi/cursor.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,19 @@
2121
except ImportError: # Python 2.7
2222
import collections as collections_abc
2323

24+
import logging
25+
2426
import six
2527

28+
from google.cloud import bigquery
2629
from google.cloud.bigquery import job
2730
from google.cloud.bigquery.dbapi import _helpers
2831
from google.cloud.bigquery.dbapi import exceptions
2932
import google.cloud.exceptions
3033

34+
35+
_LOGGER = logging.getLogger(__name__)
36+
3137
# Per PEP 249: A 7-item sequence containing information describing one result
3238
# column. The first two items (name and type_code) are mandatory, the other
3339
# five are optional and are set to None if no meaningful values can be
@@ -192,11 +198,55 @@ def executemany(self, operation, seq_of_parameters):
192198
for parameters in seq_of_parameters:
193199
self.execute(operation, parameters)
194200

201+
def _bqstorage_fetch(self, bqstorage_client, size):
202+
"""TODO: docstring
203+
"""
204+
# TODO: add assumption that query job exists check has been performed
205+
206+
# Since a BQ storage client instance is passed in, it means that
207+
# bigquery_storage_v1beta1 library is available (no ImportError)
208+
from google.cloud import bigquery_storage_v1beta1
209+
210+
destination = self._query_job.destination
211+
table_reference = bigquery.table.TableReference(
212+
bigquery.dataset.DatasetReference(
213+
destination.project, destination.dataset_id,
214+
),
215+
destination.table_id,
216+
)
217+
218+
# TODO: same checks as in _pandas_helpers._download_table_bqstorage()?
219+
# (rading from partitions and snapshots)
220+
221+
read_session = bqstorage_client.create_read_session(
222+
table_reference.to_bqstorage(),
223+
"projects/{}".format(destination.project),
224+
# only a single stream only, as DB API is not well-suited for multithreading
225+
requested_streams=1,
226+
)
227+
228+
if not read_session.streams:
229+
return iter([]) # empty table, nothing to read
230+
231+
read_position = bigquery_storage_v1beta1.types.StreamPosition(
232+
stream=read_session.streams[0],
233+
)
234+
read_rows_stream = bqstorage_client.read_rows(read_position)
235+
rows_iterable = read_rows_stream.rows(read_session)
236+
return rows_iterable
237+
195238
def _try_fetch(self, size=None):
196239
"""Try to start fetching data, if not yet started.
197240
198241
Mutates self to indicate that iteration has started.
199242
"""
243+
# TODO: fetch results... using bq_storage_client...
244+
# else: fallback to default client
245+
# (mention in docstring higher levels up?)
246+
247+
# TODO: add this known issue to docstring? just as in to_dataframe*()
248+
# (about small result sets)
249+
200250
if self._query_job is None:
201251
raise exceptions.InterfaceError(
202252
"No query results: execute() must be called before fetch."
@@ -212,6 +262,33 @@ def _try_fetch(self, size=None):
212262

213263
if self._query_data is None:
214264
client = self.connection._client
265+
bqstorage_client = self.connection._bqstorage_client
266+
267+
if bqstorage_client:
268+
269+
# TODO: Are we supposed to read somehting here in order to
270+
# detect errors and try a fallback the table.list API?
271+
try:
272+
rows_iterable = self._bqstorage_fetch(bqstorage_client, size)
273+
self._query_data = iter(rows_iterable)
274+
return
275+
except google.api_core.exceptions.Forbidden:
276+
# Don't hide errors such as insufficient permissions to create
277+
# a read session, or the API is not enabled. Both of those are
278+
# clearly problems if the developer has explicitly asked for
279+
# BigQuery Storage API support.
280+
raise
281+
except google.api_core.exceptions.GoogleAPICallError:
282+
# There is an issue with reading from small anonymous
283+
# query results tables. If such an error occurs, we silence
284+
# it in order to try again with the tabledata.list API.
285+
# than throw those errors, try reading the data again, but
286+
# with the tabledata.list API.
287+
_LOGGER.debug(
288+
"Error fetching data with BQ storage client, falling "
289+
"back to tabledata.list API."
290+
)
291+
215292
rows_iter = client.list_rows(
216293
self._query_job.destination,
217294
selected_fields=self._query_job._query_results.schema,

0 commit comments

Comments
 (0)