diff --git a/doc/source/io.rst b/doc/source/io.rst index 9dfe241062952..fad20e0a18659 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4649,6 +4649,22 @@ destination DataFrame as well as a preferred column order as follows: index_col='index_column_name', col_order=['col1', 'col2', 'col3'], projectid) + +Starting with 0.20.0, you can specify the query config as parameter to use additional options of your job. +For more information about query configuration parameters see +`here `__. + +.. code-block:: python + + configuration = { + 'query': { + "useQueryCache": False + } + } + data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', + configuration=configuration, projectid) + + .. note:: You can find your project id in the `Google developers console `__. diff --git a/doc/source/whatsnew/v0.20.0.txt b/doc/source/whatsnew/v0.20.0.txt index 0873e4b34b0b1..118ac4da01031 100644 --- a/doc/source/whatsnew/v0.20.0.txt +++ b/doc/source/whatsnew/v0.20.0.txt @@ -100,6 +100,8 @@ Other enhancements - ``pd.read_excel`` now preserves sheet order when using ``sheetname=None`` (:issue:`9930`) - Multiple offset aliases with decimal points are now supported (e.g. '0.5min' is parsed as '30s') (:issue:`8419`) +- ``pd.read_gbq`` method now allows query configuration preferences (:issue:`14742`) + - New ``UnsortedIndexError`` (subclass of ``KeyError``) raised when indexing/slicing into an unsorted MultiIndex (:issue:`11897`). This allows differentiation between errors due to lack of sorting or an incorrect key. See :ref:`here ` diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 8038cc500f6cd..966f53e9d75ef 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -375,7 +375,7 @@ def process_insert_errors(self, insert_errors): raise StreamingInsertError - def run_query(self, query): + def run_query(self, query, **kwargs): try: from googleapiclient.errors import HttpError except: @@ -385,16 +385,33 @@ def run_query(self, query): _check_google_client_version() job_collection = self.service.jobs() - job_data = { - 'configuration': { - 'query': { - 'query': query, - 'useLegacySql': self.dialect == 'legacy' - # 'allowLargeResults', 'createDisposition', - # 'preserveNulls', destinationTable, useQueryCache - } + + job_config = { + 'query': { + 'query': query, + 'useLegacySql': self.dialect == 'legacy' + # 'allowLargeResults', 'createDisposition', + # 'preserveNulls', destinationTable, useQueryCache } } + config = kwargs.get('configuration') + if config is not None: + if len(config) != 1: + raise ValueError("Only one job type must be specified, but " + "given {}".format(','.join(config.keys()))) + if 'query' in config: + if 'query' in config['query'] and query is not None: + raise ValueError("Query statement can't be specified " + "inside config while it is specified " + "as parameter") + + job_config['query'].update(config['query']) + else: + raise ValueError("Only 'query' job type is supported") + + job_data = { + 'configuration': job_config + } self._start_timer() try: @@ -622,8 +639,9 @@ def _parse_entry(field_value, field_type): def read_gbq(query, project_id=None, index_col=None, col_order=None, - reauth=False, verbose=True, private_key=None, dialect='legacy'): - """Load data from Google BigQuery. + reauth=False, verbose=True, private_key=None, dialect='legacy', + **kwargs): + r"""Load data from Google BigQuery. THIS IS AN EXPERIMENTAL LIBRARY @@ -682,6 +700,17 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, .. versionadded:: 0.19.0 + **kwargs : Arbitrary keyword arguments + configuration (dict): query config parameters for job processing. + For example: + + configuration = {'query': {'useQueryCache': False}} + + For more information see `BigQuery SQL Reference + ` + + .. versionadded:: 0.20.0 + Returns ------- df: DataFrame @@ -698,7 +727,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, private_key=private_key, dialect=dialect) - schema, pages = connector.run_query(query) + schema, pages = connector.run_query(query, **kwargs) dataframe_list = [] while len(pages) > 0: page = pages.pop() diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 28820fd71af27..ae829f8e0a878 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -711,6 +711,91 @@ def test_invalid_option_for_sql_dialect(self): gbq.read_gbq(sql_statement, project_id=_get_project_id(), dialect='standard', private_key=_get_private_key_path()) + def test_query_with_parameters(self): + sql_statement = "SELECT @param1 + @param2 as VALID_RESULT" + config = { + 'query': { + "useLegacySql": False, + "parameterMode": "named", + "queryParameters": [ + { + "name": "param1", + "parameterType": { + "type": "INTEGER" + }, + "parameterValue": { + "value": 1 + } + }, + { + "name": "param2", + "parameterType": { + "type": "INTEGER" + }, + "parameterValue": { + "value": 2 + } + } + ] + } + } + # Test that a query that relies on parameters fails + # when parameters are not supplied via configuration + with tm.assertRaises(ValueError): + gbq.read_gbq(sql_statement, project_id=_get_project_id(), + private_key=_get_private_key_path()) + + # Test that the query is successful because we have supplied + # the correct query parameters via the 'config' option + df = gbq.read_gbq(sql_statement, project_id=_get_project_id(), + private_key=_get_private_key_path(), + configuration=config) + tm.assert_frame_equal(df, DataFrame({'VALID_RESULT': [3]})) + + def test_query_inside_configuration(self): + query_no_use = 'SELECT "PI_WRONG" as VALID_STRING' + query = 'SELECT "PI" as VALID_STRING' + config = { + 'query': { + "query": query, + "useQueryCache": False, + } + } + # Test that it can't pass query both + # inside config and as parameter + with tm.assertRaises(ValueError): + gbq.read_gbq(query_no_use, project_id=_get_project_id(), + private_key=_get_private_key_path(), + configuration=config) + + df = gbq.read_gbq(None, project_id=_get_project_id(), + private_key=_get_private_key_path(), + configuration=config) + tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) + + def test_configuration_without_query(self): + sql_statement = 'SELECT 1' + config = { + 'copy': { + "sourceTable": { + "projectId": _get_project_id(), + "datasetId": "publicdata:samples", + "tableId": "wikipedia" + }, + "destinationTable": { + "projectId": _get_project_id(), + "datasetId": "publicdata:samples", + "tableId": "wikipedia_copied" + }, + } + } + # Test that only 'query' configurations are supported + # nor 'copy','load','extract' + with tm.assertRaises(ValueError): + gbq.read_gbq(sql_statement, project_id=_get_project_id(), + private_key=_get_private_key_path(), + configuration=config) + class TestToGBQIntegration(tm.TestCase): # Changes to BigQuery table schema may take up to 2 minutes as of May 2015