diff --git a/.gitignore b/.gitignore index e507b7e8..6dd9fa31 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ *.egg /.env/ /.vscode/ +/.idea/ +/.mypy_cache/ *.pyc .cache/ *.iml diff --git a/dev_requirements.txt b/dev_requirements.txt index b32e352b..ff12c824 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,6 +1,7 @@ sqlalchemy>=1.1.9 -google-cloud-bigquery>=1.6.0 +google-cloud-bigquery>=1.12.0 future==0.16.0 +packaging>=20.0 pytest==3.2.2 pytz==2017.2 \ No newline at end of file diff --git a/pybigquery/parse_url.py b/pybigquery/parse_url.py index ab30849f..7b0f0d17 100644 --- a/pybigquery/parse_url.py +++ b/pybigquery/parse_url.py @@ -34,6 +34,7 @@ def parse_url(url): dataset_id = url.database or None arraysize = None credentials_path = None + use_bqstorage_api = None # location if 'location' in query: @@ -51,14 +52,22 @@ def parse_url(url): except ValueError: raise ValueError("invalid int in url query arraysize: " + str_arraysize) + # use_bqstorage_api + if 'use_bqstorage_api' in query: + str_use_bqstorage_api = query.pop('use_bqstorage_api') + try: + use_bqstorage_api = parse_boolean(str_use_bqstorage_api) + except ValueError: + raise ValueError("invalid boolean in url query for use_bqstorage_api: " + str_use_bqstorage_api) + # if only these "non-config" values were present, the dict will now be empty if not query: # if a dataset_id exists, we need to return a job_config that isn't None # so it can be updated with a dataset reference from the client if dataset_id: - return project_id, location, dataset_id, arraysize, credentials_path, QueryJobConfig() + return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, QueryJobConfig() else: - return project_id, location, dataset_id, arraysize, credentials_path, None + return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, None job_config = QueryJobConfig() @@ -170,4 +179,4 @@ def parse_url(url): except AttributeError: raise ValueError("invalid write_disposition in url query: " + query['write_disposition']) - return project_id, location, dataset_id, arraysize, credentials_path, job_config + return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config diff --git a/pybigquery/sqlalchemy_bigquery.py b/pybigquery/sqlalchemy_bigquery.py index c0755234..fd73ab79 100644 --- a/pybigquery/sqlalchemy_bigquery.py +++ b/pybigquery/sqlalchemy_bigquery.py @@ -3,12 +3,21 @@ from __future__ import absolute_import from __future__ import unicode_literals +import re +import warnings +from packaging.version import Version, parse as parse_version + from google import auth from google.cloud import bigquery -from google.cloud.bigquery import dbapi, QueryJobConfig + +try: + from google.cloud import bigquery_storage_v1 +except ImportError: + pass + +from google.cloud.bigquery import dbapi from google.cloud.bigquery.schema import SchemaField -from google.cloud.bigquery.table import EncryptionConfiguration -from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery.table import EncryptionConfiguration, TableReference from google.oauth2 import service_account from google.api_core.exceptions import NotFound from sqlalchemy.exc import NoSuchTableError @@ -18,7 +27,6 @@ from sqlalchemy.engine.base import Engine from sqlalchemy.sql.schema import Column from sqlalchemy.sql import elements -import re from .parse_url import parse_url @@ -294,53 +302,67 @@ def _add_default_dataset_to_job_config(job_config, project_id, dataset_id): job_config.default_dataset = '{}.{}'.format(project_id, dataset_id) + @staticmethod + def create_credentials(credentials_path, credentials_info): + """ + Create a service account credentials object if possible. + Using a file is preffered over using info, + return None if both are not available. + """ + credentials = None - def _create_client_from_credentials(self, credentials, default_query_job_config, project_id): - if project_id is None: - project_id = credentials.project_id + if credentials_path: + credentials = service_account.Credentials.from_service_account_file(credentials_path) - scopes = ( + elif credentials_info: + credentials = service_account.Credentials.from_service_account_info(credentials_info) + + if credentials: + scopes = ( 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/drive' ) - credentials = credentials.with_scopes(scopes) - - self._add_default_dataset_to_job_config(default_query_job_config, project_id, self.dataset_id) + credentials = credentials.with_scopes(scopes) - return bigquery.Client( - project=project_id, - credentials=credentials, - location=self.location, - default_query_job_config=default_query_job_config, - ) + return credentials def create_connect_args(self, url): - project_id, location, dataset_id, arraysize, credentials_path, default_query_job_config = parse_url(url) + project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, default_query_job_config = parse_url(url) self.arraysize = self.arraysize or arraysize self.location = location or self.location self.credentials_path = credentials_path or self.credentials_path self.dataset_id = dataset_id - if self.credentials_path: - credentials = service_account.Credentials.from_service_account_file(self.credentials_path) - client = self._create_client_from_credentials(credentials, default_query_job_config, project_id) + credentials = self.create_credentials(self.credentials_path, self.credentials_info) - elif self.credentials_info: - credentials = service_account.Credentials.from_service_account_info(self.credentials_info) - client = self._create_client_from_credentials(credentials, default_query_job_config, project_id) + # Use the credentials project as a default if no project was specefied + if (credentials is not None) and (project_id is None): + project_id = credentials.project_id - else: - self._add_default_dataset_to_job_config(default_query_job_config, project_id, dataset_id) + self._add_default_dataset_to_job_config(default_query_job_config, project_id, dataset_id) - client = bigquery.Client( - project=project_id, - location=self.location, - default_query_job_config=default_query_job_config - ) + client = bigquery.Client( + project=project_id, + credentials=credentials, + location=self.location, + default_query_job_config=default_query_job_config + ) + + clients = [client] + + if use_bqstorage_api: + if parse_version(bigquery.__version__) >= Version("1.26.0"): + try: + storage_client = bigquery_storage_v1.BigQueryReadClient(credentials=credentials) + clients.append(storage_client) + except NameError: + warnings.warn("It is not possible to use the bqstorage api without installing the bqstorage extra requirement") + else: + warnings.warn('It is not possible to use the bqstorage api with google-cloud-bigquery < 1.26.0') - return ([client], {}) + return (clients, {}) def _json_deserializer(self, row): """JSON deserializer for RECORD types. diff --git a/setup.py b/setup.py index e7a3e90f..d4f49c3b 100644 --- a/setup.py +++ b/setup.py @@ -24,10 +24,18 @@ def readme(): "Topic :: Database :: Front-Ends" ], install_requires=[ + 'google-cloud-bigquery>=1.12.0', + 'packaging>=20.0' 'sqlalchemy>=1.1.9', - 'google-cloud-bigquery>=1.6.0', 'future', ], + extras_require = { + "bqstorage": [ + "google-cloud-bigquery-storage >= 1.0.0, <2.0.0dev", + "grpcio >= 1.8.2, < 2.0dev", + "pyarrow>=0.16.0, < 2.0dev", + ] + }, tests_require=[ 'pytz' ], diff --git a/test/test_parse_url.py b/test/test_parse_url.py index 034dbcbc..cc59ba2f 100644 --- a/test/test_parse_url.py +++ b/test/test_parse_url.py @@ -31,6 +31,7 @@ def url_with_everything(): '?credentials_path=/some/path/to.json' '&location=some-location' '&arraysize=1000' + '&use_bqstorage_api=True' '&clustering_fields=a,b,c' '&create_disposition=CREATE_IF_NEEDED' '&destination=different-project.different-dataset.table' @@ -46,12 +47,13 @@ def url_with_everything(): def test_basic(url_with_everything): - project_id, location, dataset_id, arraysize, credentials_path, job_config = parse_url(url_with_everything) + project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = parse_url(url_with_everything) assert project_id == 'some-project' assert location == 'some-location' assert dataset_id == 'some-dataset' assert arraysize == 1000 + assert use_bqstorage_api is True assert credentials_path == '/some/path/to.json' assert isinstance(job_config, QueryJobConfig) @@ -69,7 +71,7 @@ def test_basic(url_with_everything): ('write_disposition', 'WRITE_APPEND'), ]) def test_all_values(url_with_everything, param, value): - job_config = parse_url(url_with_everything)[5] + job_config = parse_url(url_with_everything)[6] config_value = getattr(job_config, param) if callable(value): @@ -85,6 +87,7 @@ def test_all_values(url_with_everything, param, value): @pytest.mark.parametrize("param, value", [ ('arraysize', 'not-int'), + ('use_bqstorage_api', 'not-bool'), ('create_disposition', 'not-attribute'), ('destination', 'not.fully-qualified'), ('dry_run', 'not-bool'), @@ -108,24 +111,26 @@ def test_empty_url(): assert value is None def test_empty_with_non_config(): - url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&credentials_path=/some/path/to.json')) - project_id, location, dataset_id, arraysize, credentials_path, job_config = url + url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&use_bqstorage_api=False&credentials_path=/some/path/to.json')) + project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url assert project_id is None assert location == 'some-location' assert dataset_id is None assert arraysize == 1000 + assert use_bqstorage_api is False assert credentials_path == '/some/path/to.json' assert job_config is None def test_only_dataset(): url = parse_url(make_url('bigquery:///some-dataset')) - project_id, location, dataset_id, arraysize, credentials_path, job_config = url + project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url assert project_id is None assert location is None assert dataset_id == 'some-dataset' assert arraysize is None + assert use_bqstorage_api is None assert credentials_path is None assert isinstance(job_config, QueryJobConfig) # we can't actually test that the dataset is on the job_config,