|
5 | 5 | import uuid
|
6 | 6 | import time
|
7 | 7 | import sys
|
| 8 | +import os |
| 9 | +import uuid |
8 | 10 |
|
9 | 11 | import numpy as np
|
10 | 12 |
|
11 | 13 | from distutils.version import StrictVersion
|
12 | 14 | from pandas import compat, DataFrame, concat
|
13 | 15 | from pandas.core.common import PandasError
|
14 | 16 | from pandas.compat import lzip, bytes_to_str
|
| 17 | +from google.cloud import bigquery |
15 | 18 |
|
16 | 19 |
|
17 | 20 | def _check_google_client_version():
|
@@ -767,6 +770,132 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
|
767 | 770 |
|
768 | 771 | return final_df
|
769 | 772 |
|
| 773 | +def from_gbq(query, project_id=None, index_col=None, col_order=None, |
| 774 | + private_key=None, dialect='legacy', configuration = None, **kwargs): |
| 775 | + r"""Load data from Google BigQuery using google-cloud-python |
| 776 | +
|
| 777 | + The main method a user calls to execute a Query in Google BigQuery |
| 778 | + and read results into a pandas DataFrame. |
| 779 | +
|
| 780 | + The Google Cloud library is used. |
| 781 | + Documentation is available `here |
| 782 | + <https://googlecloudplatform.github.io/google-cloud-python/stable/>`__ |
| 783 | +
|
| 784 | + Authentication via Google Cloud can be performed a number of ways, see: |
| 785 | + <https://googlecloudplatform.github.io/google-cloud-python/stable/google-cloud-auth.html> |
| 786 | + The easiest is to download a service account JSON keyfile and point to it using |
| 787 | + an environment variable: |
| 788 | + `$ export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"` |
| 789 | +
|
| 790 | + Parameters |
| 791 | + ---------- |
| 792 | + query : str |
| 793 | + SQL-Like Query to return data values |
| 794 | + project_id : str (optional) |
| 795 | + Google BigQuery Account project ID. |
| 796 | + index_col : str (optional) |
| 797 | + Name of result column to use for index in results DataFrame |
| 798 | + col_order : list(str) (optional) |
| 799 | + List of BigQuery column names in the desired order for results |
| 800 | + DataFrame |
| 801 | + private_key : str (optional) |
| 802 | + Path to service account private key in JSON format. If none is provided, |
| 803 | + will default to the GOOGLE_APPLICATION_CREDENTIALS environment variable |
| 804 | + or another form of authentication (see above) |
| 805 | + dialect : {'legacy', 'standard'}, default 'legacy' |
| 806 | + 'legacy' : Use BigQuery's legacy SQL dialect. |
| 807 | + 'standard' : Use BigQuery's standard SQL (beta), which is |
| 808 | + compliant with the SQL 2011 standard. For more information |
| 809 | + see `BigQuery SQL Reference |
| 810 | + <https://cloud.google.com/bigquery/sql-reference/>`__ |
| 811 | + configuration : dict (optional) |
| 812 | + Because of current limitations (https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2765) |
| 813 | + only a certain number of configuration settings are currently implemented. You can set them with |
| 814 | + like: `from_gbq(q,configuration={'allow_large_results':True,'use_legacy_sql':False})` |
| 815 | + Allowable settings: |
| 816 | + -allow_large_results |
| 817 | + -create_disposition |
| 818 | + -default_dataset |
| 819 | + -destination |
| 820 | + -flatten_results |
| 821 | + -priority |
| 822 | + -use_query_cache |
| 823 | + -use_legacy_sql |
| 824 | + -dry_run |
| 825 | + -write_disposition |
| 826 | + -maximum_billing_tier |
| 827 | + -maximum_bytes_billed |
| 828 | +
|
| 829 | + Returns |
| 830 | + ------- |
| 831 | + df: DataFrame |
| 832 | + DataFrame representing results of query |
| 833 | +
|
| 834 | + """ |
| 835 | + |
| 836 | + if private_key: |
| 837 | + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = private_key |
| 838 | + |
| 839 | + def _wait_for_job(job): |
| 840 | + while True: |
| 841 | + job.reload() # Refreshes the state via a GET request. |
| 842 | + if job.state == 'DONE': |
| 843 | + if job.error_result: |
| 844 | + raise RuntimeError(job.errors) |
| 845 | + return |
| 846 | + time.sleep(1) |
| 847 | + |
| 848 | + client = bigquery.Client(project=project_id) |
| 849 | + query_job = client.run_async_query(str(uuid.uuid4()), query) |
| 850 | + |
| 851 | + if dialect != 'legacy': |
| 852 | + query_job.use_legacy_sql = False |
| 853 | + |
| 854 | + if configuration: |
| 855 | + for setting, value in configuration.items(): |
| 856 | + setattr(query_job, setting, value) |
| 857 | + |
| 858 | + query_job.begin() |
| 859 | + _wait_for_job(query_job) |
| 860 | + |
| 861 | + query_results = query_job.results() |
| 862 | + |
| 863 | + rows, total_rows, page_token = query_results.fetch_data() |
| 864 | + columns=[field.name for field in query_results.schema] |
| 865 | + data = rows |
| 866 | + |
| 867 | + final_df = DataFrame(data=data,columns=columns) |
| 868 | + |
| 869 | + # Reindex the DataFrame on the provided column |
| 870 | + if index_col is not None: |
| 871 | + if index_col in final_df.columns: |
| 872 | + final_df.set_index(index_col, inplace=True) |
| 873 | + else: |
| 874 | + raise InvalidIndexColumn( |
| 875 | + 'Index column "{0}" does not exist in DataFrame.' |
| 876 | + .format(index_col) |
| 877 | + ) |
| 878 | + |
| 879 | + # Change the order of columns in the DataFrame based on provided list |
| 880 | + if col_order is not None: |
| 881 | + if sorted(col_order) == sorted(final_df.columns): |
| 882 | + final_df = final_df[col_order] |
| 883 | + else: |
| 884 | + raise InvalidColumnOrder( |
| 885 | + 'Column order does not match this DataFrame.' |
| 886 | + ) |
| 887 | + |
| 888 | + # cast BOOLEAN and INTEGER columns from object to bool/int |
| 889 | + # if they dont have any nulls |
| 890 | + type_map = {'BOOLEAN': bool, 'INTEGER': int} |
| 891 | + for field in query_results.schema: |
| 892 | + if field.field_type in type_map and \ |
| 893 | + final_df[field.name].notnull().all(): |
| 894 | + final_df[field.name] = \ |
| 895 | + final_df[field.name].astype(type_map[field.field_type]) |
| 896 | + |
| 897 | + return final_df |
| 898 | + |
770 | 899 |
|
771 | 900 | def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
|
772 | 901 | verbose=True, reauth=False, if_exists='fail', private_key=None):
|
|
0 commit comments