1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import argparse
16+
1517
1618def main (dataset_id = "query_results" ):
19+ print ("Creating clients." )
20+ bqclient , bqstorageclient , project_id = create_clients ()
21+ print ("\n \n Reading a table:" )
22+ table_to_dataframe (bqclient , bqstorageclient )
23+ print ("\n \n Reading query results:" )
24+ query_to_dataframe (bqclient , bqstorageclient , dataset_id )
25+ print ("\n \n Reading a table, using the BQ Storage API directly:" )
26+ session_to_dataframe (bqstorageclient , project_id )
27+
28+
29+ def create_clients ():
1730 # [START bigquerystorage_pandas_create_client]
1831 import google .auth
1932 from google .cloud import bigquery
@@ -31,20 +44,47 @@ def main(dataset_id="query_results"):
3144 bqstorageclient = bigquery_storage_v1beta1 .BigQueryStorageClient (
3245 credentials = credentials )
3346 # [END bigquerystorage_pandas_create_client]
47+ return bqclient , bqstorageclient , project_id
48+
49+
50+ def table_to_dataframe (bqclient , bqstorageclient ):
51+ from google .cloud import bigquery
3452
3553 # [START bigquerystorage_pandas_read_table]
3654 # Download a table.
37- table = bqclient . get_table (
55+ table = bigquery . TableReference . from_string (
3856 "bigquery-public-data.utility_us.country_code_iso"
3957 )
40- rows = bqclient .list_rows (table )
58+ rows = bqclient .list_rows (
59+ table ,
60+ selected_fields = [
61+ bigquery .SchemaField ("country_name" , "STRING" ),
62+ bigquery .SchemaField ("fips_code" , "STRING" ),
63+ ]
64+ )
4165 dataframe = rows .to_dataframe (bqstorageclient )
4266 print (dataframe .head ())
4367 # [END bigquerystorage_pandas_read_table]
4468
45- dataset = bqclient .dataset (dataset_id )
69+
70+ def query_to_dataframe (bqclient , bqstorageclient , dataset_id ):
71+ from google .cloud import bigquery
4672
4773 # [START bigquerystorage_pandas_read_query_results]
74+ import uuid
75+
76+ # Due to a known issue in the BigQuery Storage API (TODO: link to
77+ # public issue), small query result sets cannot be downloaded. To
78+ # workaround this issue, write results to a destination table.
79+
80+ # TODO: Set dataset_id to a dataset that will store temporary query
81+ # results. Set the default table expiration time to ensure data is
82+ # deleted after the results have been downloaded.
83+ # dataset_id = "temporary_dataset_for_query_results"
84+ dataset = bqclient .dataset (dataset_id )
85+ table_id = "queryresults_" + uuid .uuid4 ().hex
86+ table = dataset .table (table_id )
87+
4888 # Download query results.
4989 query_string = """
5090SELECT
@@ -57,10 +97,7 @@ def main(dataset_id="query_results"):
5797ORDER BY view_count DESC
5898"""
5999 query_config = bigquery .QueryJobConfig (
60- # Due to a known issue in the BigQuery Storage API (TODO: link to
61- # public issue), small result sets cannot be downloaded. To workaround
62- # this issue, write your results to a destination table.
63- destination = dataset .table ('query_results_table' ),
100+ destination = table ,
64101 write_disposition = "WRITE_TRUNCATE"
65102 )
66103
@@ -72,14 +109,28 @@ def main(dataset_id="query_results"):
72109 print (dataframe .head ())
73110 # [END bigquerystorage_pandas_read_query_results]
74111
112+
113+ def session_to_dataframe (bqstorageclient , project_id ):
114+ from google .cloud import bigquery_storage_v1beta1
115+
75116 # [START bigquerystorage_pandas_read_session]
76117 table = bigquery_storage_v1beta1 .types .TableReference ()
77118 table .project_id = "bigquery-public-data"
78- table .dataset_id = "utility_us"
79- table .table_id = "country_code_iso"
119+ table .dataset_id = "new_york_trees"
120+ table .table_id = "tree_species"
121+
122+ # Specify read options to select columns to read. If no read options are
123+ # specified, the whole table is read.
124+ read_options = bigquery_storage_v1beta1 .types .TableReadOptions ()
125+ read_options .selected_fields .append ("species_common_name" )
126+ read_options .selected_fields .append ("fall_color" )
80127
81128 parent = "projects/{}" .format (project_id )
82- session = bqstorageclient .create_read_session (table , parent )
129+ session = bqstorageclient .create_read_session (
130+ table ,
131+ parent ,
132+ read_options = read_options
133+ )
83134
84135 # Don't try to read from an empty table.
85136 if len (session .streams ) == 0 :
@@ -99,4 +150,7 @@ def main(dataset_id="query_results"):
99150
100151
101152if __name__ == "__main__" :
102- main ()
153+ parser = argparse .ArgumentParser ()
154+ parser .add_argument ('dataset_id' )
155+ args = parser .parse_args ()
156+ main (args .dataset_id )
0 commit comments