Skip to content

Commit 12b2171

Browse files
committed
Support both v1 stable and beta1 BQ Storage client
1 parent 7d7dc73 commit 12b2171

File tree

6 files changed

+466
-48
lines changed

6 files changed

+466
-48
lines changed

google/cloud/bigquery/_pandas_helpers.py

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,19 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
577577
def _download_table_bqstorage_stream(
578578
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
579579
):
580-
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
580+
# Passing a BQ Storage client in implies that the BigQuery Storage library
581+
# is available and can be imported.
582+
from google.cloud import bigquery_storage_v1beta1
583+
584+
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
585+
# thus adjust constructing the rowstream if needed.
586+
# The assumption is that the caller provides a BQ Storage `session` that is
587+
# compatible with the version of the BQ Storage client passed in.
588+
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
589+
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
590+
rowstream = bqstorage_client.read_rows(position).rows(session)
591+
else:
592+
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
581593

582594
for page in rowstream.pages:
583595
if download_state.done:
@@ -609,29 +621,52 @@ def _download_table_bqstorage(
609621
page_to_item=None,
610622
):
611623
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
624+
625+
# Passing a BQ Storage client in implies that the BigQuery Storage library
626+
# is available and can be imported.
627+
from google.cloud import bigquery_storage_v1
628+
from google.cloud import bigquery_storage_v1beta1
629+
612630
if "$" in table.table_id:
613631
raise ValueError(
614632
"Reading from a specific partition is not currently supported."
615633
)
616634
if "@" in table.table_id:
617635
raise ValueError("Reading from a specific snapshot is not currently supported.")
618636

619-
requested_session = bigquery_storage_v1.types.ReadSession(
620-
table=table.to_bqstorage(),
621-
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
622-
)
623-
624-
if selected_fields is not None:
625-
for field in selected_fields:
626-
requested_session.read_options.selected_fields.append(field.name)
627-
628637
requested_streams = 1 if preserve_order else 0
629638

630-
session = bqstorage_client.create_read_session(
631-
parent="projects/{}".format(project_id),
632-
read_session=requested_session,
633-
max_stream_count=requested_streams,
634-
)
639+
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
640+
# thus adjust the session creation if needed.
641+
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
642+
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
643+
644+
if selected_fields is not None:
645+
for field in selected_fields:
646+
read_options.selected_fields.append(field.name)
647+
648+
session = bqstorage_client.create_read_session(
649+
table.to_bqstorage(v1beta1=True),
650+
"projects/{}".format(project_id),
651+
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
652+
read_options=read_options,
653+
requested_streams=requested_streams,
654+
)
655+
else:
656+
requested_session = bigquery_storage_v1.types.ReadSession(
657+
table=table.to_bqstorage(),
658+
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
659+
)
660+
if selected_fields is not None:
661+
for field in selected_fields:
662+
requested_session.read_options.selected_fields.append(field.name)
663+
664+
session = bqstorage_client.create_read_session(
665+
parent="projects/{}".format(project_id),
666+
read_session=requested_session,
667+
max_stream_count=requested_streams,
668+
)
669+
635670
_LOGGER.debug(
636671
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
637672
table.project, table.dataset_id, table.table_id, session.name

google/cloud/bigquery/dbapi/cursor.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -268,28 +268,49 @@ def _bqstorage_fetch(self, bqstorage_client):
268268
A sequence of rows, represented as dictionaries.
269269
"""
270270
# Hitting this code path with a BQ Storage client instance implies that
271-
# bigquery_storage_v1 can indeed be imported here without errors.
271+
# bigquery_storage_v1* can indeed be imported here without errors.
272272
from google.cloud import bigquery_storage_v1
273+
from google.cloud import bigquery_storage_v1beta1
273274

274275
table_reference = self._query_job.destination
275276

276-
requested_session = bigquery_storage_v1.types.ReadSession(
277-
table=table_reference.to_bqstorage(),
278-
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
277+
is_v1beta1_client = isinstance(
278+
bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient
279279
)
280280

281-
read_session = bqstorage_client.create_read_session(
282-
parent="projects/{}".format(table_reference.project),
283-
read_session=requested_session,
284-
# a single stream only, as DB API is not well-suited for multithreading
285-
max_stream_count=1,
286-
)
281+
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
282+
# thus adjust the session creation if needed.
283+
if is_v1beta1_client:
284+
read_session = bqstorage_client.create_read_session(
285+
table_reference.to_bqstorage(v1beta1=True),
286+
"projects/{}".format(table_reference.project),
287+
# a single stream only, as DB API is not well-suited for multithreading
288+
requested_streams=1,
289+
)
290+
else:
291+
requested_session = bigquery_storage_v1.types.ReadSession(
292+
table=table_reference.to_bqstorage(),
293+
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
294+
)
295+
read_session = bqstorage_client.create_read_session(
296+
parent="projects/{}".format(table_reference.project),
297+
read_session=requested_session,
298+
# a single stream only, as DB API is not well-suited for multithreading
299+
max_stream_count=1,
300+
)
287301

288302
if not read_session.streams:
289303
return iter([]) # empty table, nothing to read
290304

291-
stream_name = read_session.streams[0].name
292-
read_rows_stream = bqstorage_client.read_rows(stream_name)
305+
if is_v1beta1_client:
306+
read_position = bigquery_storage_v1beta1.types.StreamPosition(
307+
stream=read_session.streams[0],
308+
)
309+
read_rows_stream = bqstorage_client.read_rows(read_position)
310+
else:
311+
stream_name = read_session.streams[0].name
312+
read_rows_stream = bqstorage_client.read_rows(stream_name)
313+
293314
rows_iterable = read_rows_stream.rows(read_session)
294315
return rows_iterable
295316

google/cloud/bigquery/table.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525

2626
import six
2727

28+
try:
29+
# Needed for the to_bqstorage() method.
30+
from google.cloud import bigquery_storage_v1beta1
31+
except ImportError: # pragma: NO COVER
32+
bigquery_storage_v1beta1 = None
33+
2834
try:
2935
import pandas
3036
except ImportError: # pragma: NO COVER
@@ -221,7 +227,7 @@ def to_api_repr(self):
221227
"tableId": self._table_id,
222228
}
223229

224-
def to_bqstorage(self):
230+
def to_bqstorage(self, v1beta1=False):
225231
"""Construct a BigQuery Storage API representation of this table.
226232
227233
Install the ``google-cloud-bigquery-storage`` package to use this
@@ -235,15 +241,37 @@ def to_bqstorage(self):
235241
:class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableModifiers`
236242
to select a specific snapshot to read from.
237243
244+
Args:
245+
v1beta1 (Optiona[bool]):
246+
If :data:`True`, return representation compatible with BigQuery
247+
Storage ``v1beta1`` version. Defaults to :data:`False`.
248+
238249
Returns:
239-
str: A reference to this table in the BigQuery Storage API.
250+
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
251+
A reference to this table in the BigQuery Storage API.
252+
253+
Raises:
254+
ValueError:
255+
If ``v1beta1`` compatibility is requested, but the
256+
:mod:`google.cloud.bigquery_storage_v1beta1` module cannot be imported.
240257
"""
258+
if v1beta1 and bigquery_storage_v1beta1 is None:
259+
raise ValueError(_NO_BQSTORAGE_ERROR)
260+
241261
table_id, _, _ = self._table_id.partition("@")
242262
table_id, _, _ = table_id.partition("$")
243263

244-
table_ref = "projects/{}/datasets/{}/tables/{}".format(
245-
self._project, self._dataset_id, table_id,
246-
)
264+
if v1beta1:
265+
table_ref = bigquery_storage_v1beta1.types.TableReference(
266+
project_id=self._project,
267+
dataset_id=self._dataset_id,
268+
table_id=table_id,
269+
)
270+
else:
271+
table_ref = "projects/{}/datasets/{}/tables/{}".format(
272+
self._project, self._dataset_id, table_id,
273+
)
274+
247275
return table_ref
248276

249277
def _key(self):
@@ -849,13 +877,19 @@ def to_api_repr(self):
849877
"""
850878
return copy.deepcopy(self._properties)
851879

852-
def to_bqstorage(self):
880+
def to_bqstorage(self, v1beta1=False):
853881
"""Construct a BigQuery Storage API representation of this table.
854882
883+
Args:
884+
v1beta1 (Optiona[bool]):
885+
If :data:`True`, return representation compatible with BigQuery
886+
Storage ``v1beta1`` version. Defaults to :data:`False`.
887+
855888
Returns:
856-
str: A reference to this table in the BigQuery Storage API.
889+
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
890+
A reference to this table in the BigQuery Storage API.
857891
"""
858-
return self.reference.to_bqstorage()
892+
return self.reference.to_bqstorage(v1beta1=v1beta1)
859893

860894
def _build_resource(self, filter_fields):
861895
"""Generate a resource for ``update``."""
@@ -1063,13 +1097,19 @@ def from_string(cls, full_table_id):
10631097
{"tableReference": TableReference.from_string(full_table_id).to_api_repr()}
10641098
)
10651099

1066-
def to_bqstorage(self):
1100+
def to_bqstorage(self, v1beta1=False):
10671101
"""Construct a BigQuery Storage API representation of this table.
10681102
1103+
Args:
1104+
v1beta1 (Optiona[bool]):
1105+
If :data:`True`, return representation compatible with BigQuery
1106+
Storage ``v1beta1`` version. Defaults to :data:`False`.
1107+
10691108
Returns:
1070-
str: A reference to this table in the BigQuery Storage API.
1109+
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
1110+
A reference to this table in the BigQuery Storage API.
10711111
"""
1072-
return self.reference.to_bqstorage()
1112+
return self.reference.to_bqstorage(v1beta1=v1beta1)
10731113

10741114

10751115
def _row_from_mapping(mapping, schema):

tests/system.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434

3535
try:
3636
from google.cloud import bigquery_storage_v1
37+
from google.cloud import bigquery_storage_v1beta1
3738
except ImportError: # pragma: NO COVER
3839
bigquery_storage_v1 = None
40+
bigquery_storage_v1beta1 = None
3941

4042
try:
4143
import fastavro # to parse BQ storage client results
@@ -1648,6 +1650,56 @@ def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
16481650
]
16491651
self.assertEqual(fetched_data, expected_data)
16501652

1653+
@unittest.skipIf(
1654+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1655+
)
1656+
@unittest.skipIf(fastavro is None, "Requires `fastavro`")
1657+
def test_dbapi_fetch_w_bqstorage_client_v1beta1_large_result_set(self):
1658+
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
1659+
credentials=Config.CLIENT._credentials
1660+
)
1661+
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()
1662+
1663+
# Pick a large enouhg LIMIT value to assure that the fallback to the
1664+
# default client is not needed due to the result set being too small
1665+
# (a known issue that causes problems when reding such result sets with
1666+
# BQ storage client).
1667+
cursor.execute(
1668+
"""
1669+
SELECT id, `by`, time_ts
1670+
FROM `bigquery-public-data.hacker_news.comments`
1671+
ORDER BY `id` ASC
1672+
LIMIT 100000
1673+
"""
1674+
)
1675+
1676+
result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]
1677+
1678+
field_name = operator.itemgetter(0)
1679+
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]
1680+
1681+
# Since DB API is not thread safe, only a single result stream should be
1682+
# requested by the BQ storage client, meaning that results should arrive
1683+
# in the sorted order.
1684+
expected_data = [
1685+
[
1686+
("by", "sama"),
1687+
("id", 15),
1688+
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
1689+
],
1690+
[
1691+
("by", "pg"),
1692+
("id", 17),
1693+
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
1694+
],
1695+
[
1696+
("by", "pg"),
1697+
("id", 22),
1698+
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
1699+
],
1700+
]
1701+
self.assertEqual(fetched_data, expected_data)
1702+
16511703
@unittest.skipIf(
16521704
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
16531705
)
@@ -2135,6 +2187,60 @@ def test_query_results_to_dataframe_w_bqstorage(self):
21352187
if not row[col] is None:
21362188
self.assertIsInstance(row[col], exp_datatypes[col])
21372189

2190+
@unittest.skipIf(pandas is None, "Requires `pandas`")
2191+
@unittest.skipIf(
2192+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
2193+
)
2194+
def test_query_results_to_dataframe_w_bqstorage_v1beta1(self):
2195+
dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_"))
2196+
dest_ref = dest_dataset.table("query_results")
2197+
2198+
query = """
2199+
SELECT id, author, time_ts, dead
2200+
FROM `bigquery-public-data.hacker_news.comments`
2201+
LIMIT 10
2202+
"""
2203+
2204+
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
2205+
credentials=Config.CLIENT._credentials
2206+
)
2207+
2208+
job_configs = (
2209+
# There is a known issue reading small anonymous query result
2210+
# tables with the BQ Storage API. Writing to a destination
2211+
# table works around this issue.
2212+
bigquery.QueryJobConfig(
2213+
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
2214+
),
2215+
# Check that the client is able to work around the issue with
2216+
# reading small anonymous query result tables by falling back to
2217+
# the tabledata.list API.
2218+
None,
2219+
)
2220+
2221+
for job_config in job_configs:
2222+
df = (
2223+
Config.CLIENT.query(query, job_config=job_config)
2224+
.result()
2225+
.to_dataframe(bqstorage_client)
2226+
)
2227+
2228+
self.assertIsInstance(df, pandas.DataFrame)
2229+
self.assertEqual(len(df), 10) # verify the number of rows
2230+
column_names = ["id", "author", "time_ts", "dead"]
2231+
self.assertEqual(list(df), column_names)
2232+
exp_datatypes = {
2233+
"id": int,
2234+
"author": six.text_type,
2235+
"time_ts": pandas.Timestamp,
2236+
"dead": bool,
2237+
}
2238+
for index, row in df.iterrows():
2239+
for col in column_names:
2240+
# all the schema fields are nullable, so None is acceptable
2241+
if not row[col] is None:
2242+
self.assertIsInstance(row[col], exp_datatypes[col])
2243+
21382244
@unittest.skipIf(pandas is None, "Requires `pandas`")
21392245
def test_insert_rows_from_dataframe(self):
21402246
SF = bigquery.SchemaField

0 commit comments

Comments
 (0)