diff --git a/ci/requirements-2.7.pip b/ci/requirements-2.7.pip index cf8e6b8b3d3a6..ff1978a8d45ed 100644 --- a/ci/requirements-2.7.pip +++ b/ci/requirements-2.7.pip @@ -1 +1,3 @@ blosc +httplib2 +google-api-python-client == 1.2 diff --git a/ci/requirements-2.7.txt b/ci/requirements-2.7.txt index 951c8798bef15..d6a1e2d362330 100644 --- a/ci/requirements-2.7.txt +++ b/ci/requirements-2.7.txt @@ -20,6 +20,4 @@ patsy pymysql=0.6.3 html5lib=1.0b2 beautiful-soup=4.2.1 -httplib2=0.8 python-gflags=2.0 -google-api-python-client=1.2 diff --git a/ci/requirements-2.7_SLOW.txt b/ci/requirements-2.7_SLOW.txt index 19686ccb56922..1a56434c62f86 100644 --- a/ci/requirements-2.7_SLOW.txt +++ b/ci/requirements-2.7_SLOW.txt @@ -20,6 +20,4 @@ psycopg2 pymysql html5lib beautiful-soup -httplib2 python-gflags -google-api-python-client diff --git a/ci/requirements-3.4.pip b/ci/requirements-3.4.pip index cf8e6b8b3d3a6..47a049aac7632 100644 --- a/ci/requirements-3.4.pip +++ b/ci/requirements-3.4.pip @@ -1 +1,3 @@ blosc +httplib2 +google-api-python-client diff --git a/doc/source/api.rst b/doc/source/api.rst index 915727817bb7b..01634d7c02481 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -110,6 +110,10 @@ Google BigQuery read_gbq to_gbq + generate_bq_schema + create_table + delete_table + table_exists .. currentmodule:: pandas diff --git a/doc/source/io.rst b/doc/source/io.rst index f95fdd502d306..5ad9af310225d 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3951,14 +3951,35 @@ The :mod:`pandas.io.gbq` module provides a wrapper for Google's BigQuery analytics web service to simplify retrieving results from BigQuery tables using SQL-like queries. Result sets are parsed into a pandas DataFrame with a shape and data types derived from the source table. -Additionally, DataFrames can be appended to existing BigQuery tables if -the destination table is the same shape as the DataFrame. +Additionally, DataFrames can be inserted into new BigQuery tables or appended +to existing tables. -For specifics on the service itself, see `here `__ +.. warning:: + + To use this module, you will need a valid BigQuery account. Refer to the + `BigQuery Documentation `__ for details on the service itself. + +The key functions are: -As an example, suppose you want to load all data from an existing BigQuery -table : `test_dataset.test_table` into a DataFrame using the :func:`~pandas.io.read_gbq` -function. +.. currentmodule:: pandas.io.gbq + +.. autosummary:: + :toctree: generated/ + + read_gbq + to_gbq + generate_bq_schema + create_table + delete_table + table_exists + +.. currentmodule:: pandas + +Querying +'''''''' + +Suppose you want to load all data from an existing BigQuery table : `test_dataset.test_table` +into a DataFrame using the :func:`~pandas.io.gbq.read_gbq` function. .. code-block:: python @@ -3966,14 +3987,14 @@ function. # Can be found in the Google web console projectid = "xxxxxxxx" - data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', project_id = projectid) + data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', projectid) You will then be authenticated to the specified BigQuery account via Google's Oauth2 mechanism. In general, this is as simple as following the prompts in a browser window which will be opened for you. Should the browser not be available, or fail to launch, a code will be provided to complete the process manually. Additional information on the authentication mechanism can be found -`here `__ +`here `__. You can define which column from BigQuery to use as an index in the destination DataFrame as well as a preferred column order as follows: @@ -3982,56 +4003,167 @@ destination DataFrame as well as a preferred column order as follows: data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', index_col='index_column_name', - col_order=['col1', 'col2', 'col3'], project_id = projectid) - -Finally, you can append data to a BigQuery table from a pandas DataFrame -using the :func:`~pandas.io.to_gbq` function. This function uses the -Google streaming API which requires that your destination table exists in -BigQuery. Given the BigQuery table already exists, your DataFrame should -match the destination table in column order, structure, and data types. -DataFrame indexes are not supported. By default, rows are streamed to -BigQuery in chunks of 10,000 rows, but you can pass other chuck values -via the ``chunksize`` argument. You can also see the progess of your -post via the ``verbose`` flag which defaults to ``True``. The http -response code of Google BigQuery can be successful (200) even if the -append failed. For this reason, if there is a failure to append to the -table, the complete error response from BigQuery is returned which -can be quite long given it provides a status for each row. You may want -to start with smaller chunks to test that the size and types of your -dataframe match your destination table to make debugging simpler. + col_order=['col1', 'col2', 'col3'], projectid) + +.. note:: + + You can find your project id in the `BigQuery management console `__. + + +.. note:: + + You can toggle the verbose output via the ``verbose`` flag which defaults to ``True``. + +Writing DataFrames +'''''''''''''''''' + +Assume we want to write a DataFrame ``df`` into a BigQuery table using :func:`~pandas.DataFrame.to_gbq`. + +.. ipython:: python + + df = pd.DataFrame({'my_string': list('abc'), + 'my_int64': list(range(1, 4)), + 'my_float64': np.arange(4.0, 7.0), + 'my_bool1': [True, False, True], + 'my_bool2': [False, True, False], + 'my_dates': pd.date_range('now', periods=3)}) + + df + df.dtypes .. code-block:: python - df = pandas.DataFrame({'string_col_name' : ['hello'], - 'integer_col_name' : [1], - 'boolean_col_name' : [True]}) - df.to_gbq('my_dataset.my_table', project_id = projectid) + df.to_gbq('my_dataset.my_table', projectid) + +.. note:: + + If the destination table does not exist, a new table will be created. The + destination dataset id must already exist in order for a new table to be created. + +The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'`` +or ``'append'`` if the destination table already exists. The default value is ``'fail'``. + +For example, assume that ``if_exists`` is set to ``'fail'``. The following snippet will raise +a ``TableCreationError`` if the destination table already exists. + +.. code-block:: python -The BigQuery SQL query language has some oddities, see `here `__ + df.to_gbq('my_dataset.my_table', projectid, if_exists='fail') -While BigQuery uses SQL-like syntax, it has some important differences -from traditional databases both in functionality, API limitations (size and -quantity of queries or uploads), and how Google charges for use of the service. -You should refer to Google documentation often as the service seems to -be changing and evolving. BiqQuery is best for analyzing large sets of -data quickly, but it is not a direct replacement for a transactional database. +.. note:: -You can access the management console to determine project id's by: - + If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will + be written to the table using the defined table schema and column types. The + dataframe must match the destination table in column order, structure, and + data types. + If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a + different schema, a delay of 2 minutes will be forced to ensure that the new schema + has propagated in the Google environment. See + `Google BigQuery issue 191 `__. -As of 0.15.2, the gbq module has a function ``generate_bq_schema`` which -will produce the dictionary representation of the schema. +Writing large DataFrames can result in errors due to size limitations being exceeded. +This can be avoided by setting the ``chunksize`` argument when calling :func:`~pandas.DataFrame.to_gbq`. +For example, the following writes ``df`` to a BigQuery table in batches of 10000 rows at a time: .. code-block:: python - df = pandas.DataFrame({'A': [1.0]}) - gbq.generate_bq_schema(df, default_type='STRING') + df.to_gbq('my_dataset.my_table', projectid, chunksize=10000) -.. warning:: +You can also see the progress of your post via the ``verbose`` flag which defaults to ``True``. +For example: + +.. code-block:: python + + In [8]: df.to_gbq('my_dataset.my_table', projectid, chunksize=10000, verbose=True) + + Streaming Insert is 10% Complete + Streaming Insert is 20% Complete + Streaming Insert is 30% Complete + Streaming Insert is 40% Complete + Streaming Insert is 50% Complete + Streaming Insert is 60% Complete + Streaming Insert is 70% Complete + Streaming Insert is 80% Complete + Streaming Insert is 90% Complete + Streaming Insert is 100% Complete + +.. note:: + + If an error occurs while streaming data to BigQuery, see + `Troubleshooting BigQuery Errors `__. + +.. note:: + + The BigQuery SQL query language has some oddities, see the + `BigQuery Query Reference Documentation `__. + +.. note:: + + While BigQuery uses SQL-like syntax, it has some important differences from traditional + databases both in functionality, API limitations (size and quantity of queries or uploads), + and how Google charges for use of the service. You should refer to `Google BigQuery documentation `__ + often as the service seems to be changing and evolving. BiqQuery is best for analyzing large + sets of data quickly, but it is not a direct replacement for a transactional database. + + +Creating BigQuery Tables +'''''''''''''''''''''''' + +As of 0.17.0, the gbq module has a function :func:`~pandas.io.gbq.create_table` which allows users +to create a table in BigQuery. The only requirement is that the dataset must already exist. +The schema may be generated from a pandas DataFrame using the :func:`~pandas.io.gbq.generate_bq_schema` function below. + +For example: + +.. code-block:: python + + gbq.create_table('my_dataset.my_table', schema, projectid) + +As of 0.15.2, the gbq module has a function :func:`~pandas.io.gbq.generate_bq_schema` which will +produce the dictionary representation schema of the specified pandas DataFrame. + +.. code-block:: python + + In [10]: gbq.generate_bq_schema(df, default_type='STRING') + + Out[10]: {'fields': [{'name': 'my_bool1', 'type': 'BOOLEAN'}, + {'name': 'my_bool2', 'type': 'BOOLEAN'}, + {'name': 'my_dates', 'type': 'TIMESTAMP'}, + {'name': 'my_float64', 'type': 'FLOAT'}, + {'name': 'my_int64', 'type': 'INTEGER'}, + {'name': 'my_string', 'type': 'STRING'}]} + +Deleting BigQuery Tables +'''''''''''''''''''''''' + +As of 0.17.0, the gbq module has a function :func:`~pandas.io.gbq.delete_table` which allows users to delete a table +in Google BigQuery. + +For example: + +.. code-block:: python + + gbq.delete_table('my_dataset.my_table', projectid) + +The following function can be used to check whether a table exists prior to calling ``table_exists``: + +:func:`~pandas.io.gbq.table_exists`. + +The return value will be of type boolean. + +For example: + +.. code-block:: python + + In [12]: gbq.table_exists('my_dataset.my_table', projectid) + Out[12]: True + +.. note:: - To use this module, you will need a valid BigQuery account. See - for details on the - service. + If you delete and re-create a BigQuery table with the same name, but different table schema, + you must wait 2 minutes before streaming data into the table. As a workaround, consider creating + the new table with a different name. Refer to + `Google BigQuery issue 191 `__. .. _io.stata: diff --git a/doc/source/whatsnew/v0.17.0.txt b/doc/source/whatsnew/v0.17.0.txt index 3b3bf8cffe41b..0a2407ed7b16e 100644 --- a/doc/source/whatsnew/v0.17.0.txt +++ b/doc/source/whatsnew/v0.17.0.txt @@ -319,6 +319,15 @@ has been changed to make this keyword unnecessary - the change is shown below. Excel files saved in version 0.16.2 or prior that had index names will still able to be read in, but the ``has_index_names`` argument must specified to ``True``. +.. _whatsnew_0170.gbq: + +Google BigQuery Enhancements +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- Added ability to automatically create a table using the :func:`pandas.io.gbq.to_gbq` function if destination table does not exist. (:issue:`8325`). +- Added ability to replace an existing table and schema when calling the :func:`pandas.io.gbq.to_gbq` function via the ``if_exists`` argument. See the :ref:`docs ` for more details (:issue:`8325`). +- Added the following functions to the gbq module: :func:`pandas.io.gbq.table_exists`, :func:`pandas.io.gbq.create_table`, and :func:`pandas.io.gbq.delete_table`. See the :ref:`docs ` for more details (:issue:`8325`). +- ``InvalidColumnOrder`` and ``InvalidPageToken`` in the gbq module will raise ``ValueError`` instead of ``IOError``. + .. _whatsnew_0170.enhancements.other: Other enhancements @@ -1137,3 +1146,4 @@ Bug Fixes - Bug in ``DatetimeIndex`` cannot infer negative freq (:issue:`11018`) - Remove use of some deprecated numpy comparison operations, mainly in tests. (:issue:`10569`) - Bug in ``Index`` dtype may not applied properly (:issue:`11017`) +- Bug in ``io.gbq`` when testing for minimum google api client version (:issue:`10652`) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 77b8c4cf35aad..e58bd1f2fa0ff 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -811,20 +811,12 @@ def to_dict(self, orient='dict'): else: raise ValueError("orient '%s' not understood" % orient) - def to_gbq(self, destination_table, project_id=None, chunksize=10000, - verbose=True, reauth=False): + def to_gbq(self, destination_table, project_id, chunksize=10000, + verbose=True, reauth=False, if_exists='fail'): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY - If the table exists, the dataframe will be written to the table using - the defined table schema and column types. For simplicity, this method - uses the Google BigQuery streaming API. The to_gbq method chunks data - into a default chunk size of 10,000. Failures return the complete error - response which can be quite long depending on the size of the insert. - There are several important limitations of the Google streaming API - which are `here `__ - Parameters ---------- dataframe : DataFrame @@ -840,13 +832,18 @@ def to_gbq(self, destination_table, project_id=None, chunksize=10000, reauth : boolean (default False) Force Google BigQuery to reauthenticate the user. This is useful if multiple accounts are used. + if_exists : {'fail', 'replace', 'append'}, default 'fail' + 'fail': If table exists, do nothing. + 'replace': If table exists, drop it, recreate it, and insert data. + 'append': If table exists, insert data. Create if does not exist. + .. versionadded:: 0.17.0 """ from pandas.io import gbq return gbq.to_gbq(self, destination_table, project_id=project_id, chunksize=chunksize, verbose=verbose, - reauth=reauth) + reauth=reauth, if_exists=if_exists) @classmethod def from_records(cls, data, index=None, exclude=None, columns=None, diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 1dff195e4b54f..37e7cb944814a 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -7,7 +7,7 @@ import numpy as np -from distutils.version import LooseVersion +from distutils.version import StrictVersion from pandas import compat from pandas.core.api import DataFrame from pandas.tools.merge import concat @@ -26,76 +26,101 @@ def _check_google_client_version(): _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution('google-api-python-client').version - if LooseVersion(_GOOGLE_API_CLIENT_VERSION) < '1.2.0': + if StrictVersion(_GOOGLE_API_CLIENT_VERSION) < StrictVersion('1.2.0'): raise ImportError("pandas requires google-api-python-client >= 1.2.0 for Google " "BigQuery support, current version " + _GOOGLE_API_CLIENT_VERSION) logger = logging.getLogger('pandas.io.gbq') logger.setLevel(logging.ERROR) -class InvalidPageToken(PandasError, IOError): + +class AccessDenied(PandasError, ValueError): """ - Raised when Google BigQuery fails to return, - or returns a duplicate page token. + Raised when invalid credentials are provided, or tokens have expired. """ pass -class InvalidQueryException(PandasError, IOError): + +class GenericGBQException(PandasError, ValueError): """ - Raised when a malformed query is given to read_gbq. + Raised when an unrecognized Google API Error occurs. """ pass -class AccessDeniedException(PandasError, IOError): + +class InvalidColumnOrder(PandasError, ValueError): """ - Raised when invalid credentials are provided, or tokens have expired. + Raised when the provided column order for output + results DataFrame does not match the schema + returned by BigQuery. """ pass -class NotFoundException(PandasError, IOError): + +class InvalidPageToken(PandasError, ValueError): """ - Raised when the project_id/table provided in the query could not be found. + Raised when Google BigQuery fails to return, + or returns a duplicate page token. """ pass -class TermsOfServiceNotAcceptedException(PandasError, IOError): + + +class InvalidSchema(PandasError, ValueError): """ - Raised when the terms of service were not accepted or have been unaccepted. + Raised when the provided DataFrame does + not match the schema of the destination + table in BigQuery. """ pass -class UnknownGBQException(PandasError, IOError): + +class NotFoundException(PandasError, ValueError): """ - Raised when an unrecognized Google API Error occurs. + Raised when the project_id, table or dataset provided in the query could not be found. """ pass -class InvalidColumnOrder(PandasError, IOError): +class StreamingInsertError(PandasError, ValueError): """ - Raised when the provided column order for output - results DataFrame does not match the schema - returned by BigQuery. + Raised when BigQuery reports a streaming insert error. + For more information see `Streaming Data Into BigQuery `__ + """ + + +class TableCreationError(PandasError, ValueError): + """ + Raised when the create table method fails """ pass + class GbqConnector(object): def __init__(self, project_id, reauth=False): + self.test_google_api_imports() + self.project_id = project_id + self.reauth = reauth + self.credentials = self.get_credentials() + self.service = self.get_service(self.credentials) - self.project_id = project_id - self.reauth = reauth - self.credentials = self.get_credentials() - self.service = self.get_service(self.credentials) - - def get_credentials(self): + def test_google_api_imports(self): try: + import httplib2 + from apiclient.discovery import build + from apiclient.errors import HttpError + from oauth2client.client import AccessTokenRefreshError from oauth2client.client import OAuth2WebServerFlow from oauth2client.file import Storage from oauth2client.tools import run_flow, argparser + except ImportError as e: + raise ImportError("Missing module required for Google BigQuery support: {0}".format(str(e))) - except ImportError: - raise ImportError('Could not import Google API Client.') + def get_credentials(self): + from oauth2client.client import OAuth2WebServerFlow + from oauth2client.file import Storage + from oauth2client.tools import run_flow, argparser _check_google_client_version() @@ -113,17 +138,8 @@ def get_credentials(self): return credentials def get_service(self, credentials): - try: - import httplib2 - - except ImportError: - raise ImportError("pandas requires httplib2 for Google BigQuery support") - - try: - from apiclient.discovery import build - - except ImportError: - raise ImportError('Could not import Google API Client.') + import httplib2 + from apiclient.discovery import build _check_google_client_version() @@ -133,13 +149,41 @@ def get_service(self, credentials): return bigquery_service - def run_query(self, query): - try: - from apiclient.errors import HttpError - from oauth2client.client import AccessTokenRefreshError + def process_http_error(self, ex): + # See `BigQuery Troubleshooting Errors `__ + + status = json.loads(ex.content)['error'] + errors = status.get('errors', None) - except ImportError: - raise ImportError('Could not import Google API Client.') + if errors: + for error in errors: + reason = error['reason'] + message = error['message'] + + raise GenericGBQException("Reason: {0}, Message: {1}".format(reason, message)) + + raise GenericGBQException(errors) + + def process_insert_errors(self, insert_errors, verbose): + for insert_error in insert_errors: + row = insert_error['index'] + errors = insert_error.get('errors', None) + for error in errors: + reason = error['reason'] + message = error['message'] + error_message = 'Error at Row: {0}, Reason: {1}, Message: {2}'.format(row, reason, message) + + # Report all error messages if verbose is set + if verbose: + print(error_message) + else: + raise StreamingInsertError(error_message + '\nEnable verbose logging to see all errors') + + raise StreamingInsertError + + def run_query(self, query, verbose=True): + from apiclient.errors import HttpError + from oauth2client.client import AccessTokenRefreshError _check_google_client_version() @@ -148,122 +192,201 @@ def run_query(self, query): 'configuration': { 'query': { 'query': query - #'allowLargeResults', 'createDisposition', 'preserveNulls', destinationTable, useQueryCache + # 'allowLargeResults', 'createDisposition', 'preserveNulls', destinationTable, useQueryCache } } } try: - query_reply = job_collection.insert(projectId=self.project_id, - body=job_data).execute() - status = query_reply['status'] + query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute() except AccessTokenRefreshError: - raise AccessDeniedException("The credentials have been revoked or expired, please re-run" - "the application to re-authorize") + raise AccessDenied("The credentials have been revoked or expired, please re-run the application " + "to re-authorize") except HttpError as ex: - status = json.loads(ex.content)['error'] - - - errors = status.get('errors', None) - - if errors: - reasons = [error['reason'] for error in errors] - if 'accessDenied' in reasons: - raise AccessDeniedException - if 'invalidQuery' in reasons: - raise InvalidQueryException - if 'notFound' in reasons: - raise NotFoundException - if 'termsOfServiceNotAccepted' in reasons: - raise TermsOfServiceNotAcceptedException - else: - raise UnknownGBQException(errors) + self.process_http_error(ex) job_reference = query_reply['jobReference'] - while(not query_reply.get('jobComplete', False)): - print('Job not yet complete...') - query_reply = job_collection.getQueryResults( - projectId=job_reference['projectId'], - jobId=job_reference['jobId']).execute() + while not query_reply.get('jobComplete', False): + if verbose: + print('Waiting for job to complete...') + try: + query_reply = job_collection.getQueryResults(projectId=job_reference['projectId'], + jobId=job_reference['jobId']).execute() + except HttpError as ex: + self.process_http_error(ex) total_rows = int(query_reply['totalRows']) result_pages = list() seen_page_tokens = list() current_row = 0 - #Only read schema on first page + # Only read schema on first page schema = query_reply['schema'] # Loop through each page of data - while('rows' in query_reply and current_row < total_rows): + while 'rows' in query_reply and current_row < total_rows: page = query_reply['rows'] result_pages.append(page) current_row += len(page) page_token = query_reply.get('pageToken', None) if not page_token and current_row < total_rows: - raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row,total_rows)) + raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row, total_rows)) elif page_token in seen_page_tokens: raise InvalidPageToken("A duplicate pageToken was returned") seen_page_tokens.append(page_token) - query_reply = job_collection.getQueryResults( - projectId = job_reference['projectId'], - jobId = job_reference['jobId'], - pageToken = page_token).execute() - if (current_row < total_rows): + try: + query_reply = job_collection.getQueryResults( + projectId=job_reference['projectId'], + jobId=job_reference['jobId'], + pageToken=page_token).execute() + except HttpError as ex: + self.process_http_error(ex) + + if current_row < total_rows: raise InvalidPageToken() return schema, result_pages def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): + from apiclient.errors import HttpError + job_id = uuid.uuid4().hex rows = [] remaining_rows = len(dataframe) if verbose: total_rows = remaining_rows - sys.stdout.write("\n\n") - sys.stdout.flush() + print("\n\n") for index, row in dataframe.reset_index(drop=True).iterrows(): row_dict = dict() - row_dict['json'] = json.loads(row.to_json(force_ascii = False, - date_unit = 's', - date_format = 'iso')) + row_dict['json'] = json.loads(row.to_json(force_ascii=False, + date_unit='s', + date_format='iso')) row_dict['insertId'] = job_id + str(index) rows.append(row_dict) remaining_rows -= 1 if (len(rows) % chunksize == 0) or (remaining_rows == 0): if verbose: - sys.stdout.write("\rStreaming Insert is {0}% Complete".format(((total_rows - remaining_rows) * 100) / total_rows)) - sys.stdout.flush() + print("\rStreaming Insert is {0}% Complete".format(((total_rows - remaining_rows) * 100) / total_rows)) body = {'rows': rows} - response = self.service.tabledata().insertAll( - projectId = self.project_id, - datasetId = dataset_id, - tableId = table_id, - body = body).execute() - if 'insertErrors' in response: - raise UnknownGBQException(response) - - sleep(1) # Maintains the inserts "per second" rate per API + + try: + response = self.service.tabledata().insertAll( + projectId = self.project_id, + datasetId = dataset_id, + tableId = table_id, + body = body).execute() + except HttpError as ex: + self.process_http_error(ex) + + # For streaming inserts, even if you receive a success HTTP response code, you'll need to check the + # insertErrors property of the response to determine if the row insertions were successful, because + # it's possible that BigQuery was only partially successful at inserting the rows. + # See the `Success HTTP Response Codes `__ + # section + + insert_errors = response.get('insertErrors', None) + if insert_errors: + self.process_insert_errors(insert_errors, verbose) + + sleep(1) # Maintains the inserts "per second" rate per API rows = [] if verbose: - sys.stdout.write("\n") - sys.stdout.flush() + print("\n") + + def table_exists(self, dataset_id, table_id): + from apiclient.errors import HttpError + + try: + self.service.tables().get( + projectId=self.project_id, + datasetId=dataset_id, + tableId=table_id).execute() + return True + except HttpError as ex: + if ex.resp.status == 404: + return False + else: + self.process_http_error(ex) + + def verify_schema(self, dataset_id, table_id, schema): + from apiclient.errors import HttpError + + try: + return (self.service.tables().get( + projectId=self.project_id, + datasetId=dataset_id, + tableId=table_id + ).execute()['schema']) == schema + + except HttpError as ex: + self.process_http_error(ex) + + def create_table(self, dataset_id, table_id, schema): + from apiclient.errors import HttpError + + body = { + 'schema': schema, + 'tableReference': { + 'tableId': table_id, + 'projectId': self.project_id, + 'datasetId': dataset_id + } + } + + try: + self.service.tables().insert( + projectId=self.project_id, + datasetId=dataset_id, + body=body + ).execute() + except HttpError as ex: + self.process_http_error(ex) + + def delete_table(self, dataset_id, table_id): + from apiclient.errors import HttpError + + try: + self.service.tables().delete( + datasetId=dataset_id, + projectId=self.project_id, + tableId=table_id + ).execute() + + except HttpError as ex: + self.process_http_error(ex) + + def delete_and_recreate_table(self, dataset_id, table_id, table_schema, verbose): + delay = 0 + + # Changes to table schema may take up to 2 minutes as of May 2015 + # See `Issue 191 `__ + # Compare previous schema with new schema to determine if there should be a 120 second delay + + if not self.verify_schema(dataset_id, table_id, table_schema): + if verbose: + print('The existing table has a different schema. Please wait 2 minutes. See Google BigQuery issue #191') + delay = 120 + + self.delete_table(dataset_id, table_id) + self.create_table(dataset_id, table_id, table_schema) + + sleep(delay) + def _parse_data(schema, rows): # see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing dtype_map = {'INTEGER': np.dtype(float), 'FLOAT': np.dtype(float), - 'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without - # nanosecond indicator + 'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without nanosecond indicator fields = schema['fields'] col_types = [field['type'] for field in fields] @@ -281,6 +404,7 @@ def _parse_data(schema, rows): return DataFrame(page_array, columns=col_names) + def _parse_entry(field_value, field_type): if field_value is None or field_value == 'null': return None @@ -294,7 +418,7 @@ def _parse_entry(field_value, field_type): return field_value -def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False): +def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True): """Load data from Google BigQuery. THIS IS AN EXPERIMENTAL LIBRARY @@ -319,6 +443,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals reauth : boolean (default False) Force Google BigQuery to reauthenticate the user. This is useful if multiple accounts are used. + verbose : boolean (default True) + Verbose output Returns ------- @@ -327,12 +453,11 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals """ - if not project_id: raise TypeError("Missing required parameter: project_id") - connector = GbqConnector(project_id, reauth = reauth) - schema, pages = connector.run_query(query) + connector = GbqConnector(project_id, reauth=reauth) + schema, pages = connector.run_query(query, verbose=verbose) dataframe_list = [] while len(pages) > 0: page = pages.pop() @@ -346,7 +471,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals # Reindex the DataFrame on the provided column if index_col is not None: if index_col in final_df.columns: - final_df.set_index(index_col, inplace = True) + final_df.set_index(index_col, inplace=True) else: raise InvalidColumnOrder( 'Index column "{0}" does not exist in DataFrame.' @@ -368,21 +493,13 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals final_df._data = final_df._data.downcast(dtypes='infer') return final_df -def to_gbq(dataframe, destination_table, project_id=None, chunksize=10000, - verbose=True, reauth=False): + +def to_gbq(dataframe, destination_table, project_id, chunksize=10000, + verbose=True, reauth=False, if_exists='fail'): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY - If the table exists, the dataframe will be written to the table using - the defined table schema and column types. For simplicity, this method - uses the Google BigQuery streaming API. The to_gbq method chunks data - into a default chunk size of 10,000. Failures return the complete error - response which can be quite long depending on the size of the insert. - There are several important limitations of the Google streaming API - which are detailed at: - https://developers.google.com/bigquery/streaming-data-into-bigquery. - Parameters ---------- dataframe : DataFrame @@ -398,22 +515,41 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=10000, reauth : boolean (default False) Force Google BigQuery to reauthenticate the user. This is useful if multiple accounts are used. - + if_exists : {'fail', 'replace', 'append'}, default 'fail' + 'fail': If table exists, do nothing. + 'replace': If table exists, drop it, recreate it, and insert data. + 'append': If table exists, insert data. Create if does not exist. """ - if not project_id: - raise TypeError("Missing required parameter: project_id") + if if_exists not in ('fail', 'replace', 'append'): + raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) - if not '.' in destination_table: + if '.' not in destination_table: raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ") - connector = GbqConnector(project_id, reauth = reauth) - dataset_id, table_id = destination_table.rsplit('.',1) + connector = GbqConnector(project_id, reauth=reauth) + dataset_id, table_id = destination_table.rsplit('.', 1) + + table_schema = generate_bq_schema(dataframe) + + # If table exists, check if_exists parameter + if connector.table_exists(dataset_id, table_id): + if if_exists == 'fail': + raise TableCreationError("Could not create the table because it already exists. " + "Change the if_exists parameter to append or replace data.") + elif if_exists == 'replace': + connector.delete_and_recreate_table(dataset_id, table_id, table_schema, verbose) + elif if_exists == 'append': + if not connector.verify_schema(dataset_id, table_id, table_schema): + raise InvalidSchema("The schema of the destination table does not match") + else: + connector.create_table(dataset_id, table_id, table_schema) connector.load_data(dataframe, dataset_id, table_id, chunksize, verbose) + def generate_bq_schema(df, default_type='STRING'): - """ Given a passed df, generate the associated big query schema. + """ Given a passed df, generate the associated Google BigQuery schema. Parameters ---------- @@ -439,3 +575,76 @@ def generate_bq_schema(df, default_type='STRING'): 'type': type_mapping.get(dtype.kind, default_type)}) return {'fields': fields} + + +def table_exists(table, project_id): + """ Check if a table exists in Google BigQuery given a table and project id + + .. versionadded:: 0.17.0 + + Parameters + ---------- + table : str + Name of table to be verified, in the form 'dataset.tablename' + project_id : str + Google BigQuery Account project ID. + + Returns + ------- + boolean + true if table exists, otherwise false + """ + + if '.' not in table: + raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ") + + connector = GbqConnector(project_id) + dataset_id, table_id = table.rsplit('.', 1) + + return connector.table_exists(dataset_id, table_id) + + +def create_table(table, schema, project_id): + """ Create a table in Google BigQuery given a table, schema and project id + + .. versionadded:: 0.17.0 + + Parameters + ---------- + table : str + Name of table to be written, in the form 'dataset.tablename' + schema : str + Use the generate_bq_schema to generate your table schema from a dataframe. + project_id : str + Google BigQuery Account project ID. + """ + + if table_exists(table, project_id): + raise TableCreationError("The table could not be created because it already exists") + + connector = GbqConnector(project_id) + dataset_id, table_id = table.rsplit('.', 1) + + return connector.create_table(dataset_id, table_id, schema) + + +def delete_table(table, project_id): + """ Delete a table in Google BigQuery given a table and project id + + .. versionadded:: 0.17.0 + + Parameters + ---------- + table : str + Name of table to be written, in the form 'dataset.tablename' + project_id : str + Google BigQuery Account project ID. + """ + + if not table_exists(table, project_id): + raise NotFoundException("Table does not exist") + + connector = GbqConnector(project_id) + dataset_id, table_id = table.rsplit('.', 1) + + return connector.delete_table(dataset_id, table_id) diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index f04eeb03f790e..990050b8ac544 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -1,5 +1,5 @@ import ast -import datetime +from datetime import datetime import json import nose import os @@ -12,7 +12,7 @@ import numpy as np -from distutils.version import LooseVersion +from distutils.version import StrictVersion from pandas import compat from pandas import NaT @@ -31,13 +31,15 @@ _HTTPLIB2_INSTALLED = False _SETUPTOOLS_INSTALLED = False + def missing_bq(): try: - subprocess.call('bq') + subprocess.call(['bq', 'ls']) return False except OSError: return True + def _test_imports(): if not compat.PY3: @@ -63,7 +65,7 @@ def _test_imports(): _GOOGLE_API_CLIENT_INSTALLED=True _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution('google-api-python-client').version - if LooseVersion(_GOOGLE_API_CLIENT_VERSION) >= '1.2.0': + if StrictVersion(_GOOGLE_API_CLIENT_VERSION) >= StrictVersion('1.2.0'): _GOOGLE_API_CLIENT_VALID_VERSION = True except ImportError: @@ -75,7 +77,6 @@ def _test_imports(): _HTTPLIB2_INSTALLED = True except ImportError: _HTTPLIB2_INSTALLED = False - if compat.PY3: raise NotImplementedError("Google's libraries do not support Python 3 yet") @@ -93,12 +94,24 @@ def _test_imports(): if not _HTTPLIB2_INSTALLED: raise ImportError("pandas requires httplib2 for Google BigQuery support") + def test_requirements(): try: _test_imports() except (ImportError, NotImplementedError) as import_exception: raise nose.SkipTest(import_exception) + +def make_mixed_dataframe_v2(test_size): + # create df to test for all BQ datatypes except RECORD + bools = np.random.randint(2, size=(1,test_size)).astype(bool) + flts = np.random.randn(1, test_size) + ints = np.random.randint(1, 10, size=(1,test_size)) + strs = np.random.randint(1, 10, size=(1,test_size)).astype(str) + times = [datetime.now(pytz.timezone('US/Arizona')) for t in xrange(test_size)] + return DataFrame({'bools': bools[0], 'flts': flts[0], 'ints': ints[0], 'strs': strs[0], 'times': times[0]}, index=range(test_size)) + + class TestGBQConnectorIntegration(tm.TestCase): def setUp(self): test_requirements() @@ -128,6 +141,7 @@ def test_should_be_able_to_get_results_from_query(self): schema, pages = self.sut.run_query('SELECT 1') self.assertTrue(pages is not None) + class TestReadGBQUnitTests(tm.TestCase): def setUp(self): test_requirements() @@ -165,90 +179,116 @@ def test_read_gbq_with_no_project_id_given_should_fail(self): gbq.read_gbq('SELECT "1" as NUMBER_1') def test_that_parse_data_works_properly(self): - test_schema = {'fields': [{'mode': 'NULLABLE', - 'name': 'VALID_STRING', - 'type': 'STRING'}]} + test_schema = {'fields': [{'mode': 'NULLABLE', 'name': 'VALID_STRING', 'type': 'STRING'}]} test_page = [{'f': [{'v': 'PI'}]}] test_output = gbq._parse_data(test_schema, test_page) - correct_output = DataFrame({'VALID_STRING' : ['PI']}) + correct_output = DataFrame({'VALID_STRING': ['PI']}) tm.assert_frame_equal(test_output, correct_output) + class TestReadGBQIntegration(tm.TestCase): - def setUp(self): + @classmethod + def setUpClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests + # described below. + test_requirements() if not PROJECT_ID: raise nose.SkipTest("Cannot run integration tests without a project id") + if missing_bq(): + raise nose.SkipTest("Cannot run read_gbq tests without bq command line client") + + subprocess.call(['bq', 'mk', PROJECT_ID + ':pydata_pandas_bq_testing']) + + def setUp(self): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. + pass + + @classmethod + def tearDownClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* executing all tests. + subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing']) + + def tearDown(self): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test is executed. + if gbq.table_exists('pydata_pandas_bq_testing.new_test', PROJECT_ID): + subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing.new_test']) + def test_should_properly_handle_valid_strings(self): query = 'SELECT "PI" as VALID_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'VALID_STRING' : ['PI']})) + tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) def test_should_properly_handle_empty_strings(self): query = 'SELECT "" as EMPTY_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'EMPTY_STRING' : [""]})) + tm.assert_frame_equal(df, DataFrame({'EMPTY_STRING': [""]})) def test_should_properly_handle_null_strings(self): query = 'SELECT STRING(NULL) as NULL_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'NULL_STRING' : [None]})) + tm.assert_frame_equal(df, DataFrame({'NULL_STRING': [None]})) def test_should_properly_handle_valid_integers(self): query = 'SELECT INTEGER(3) as VALID_INTEGER' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'VALID_INTEGER' : [3]})) + tm.assert_frame_equal(df, DataFrame({'VALID_INTEGER': [3]})) def test_should_properly_handle_null_integers(self): query = 'SELECT INTEGER(NULL) as NULL_INTEGER' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'NULL_INTEGER' : [np.nan]})) + tm.assert_frame_equal(df, DataFrame({'NULL_INTEGER': [np.nan]})) def test_should_properly_handle_valid_floats(self): query = 'SELECT PI() as VALID_FLOAT' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'VALID_FLOAT' : [3.141592653589793]})) + tm.assert_frame_equal(df, DataFrame({'VALID_FLOAT': [3.141592653589793]})) def test_should_properly_handle_null_floats(self): query = 'SELECT FLOAT(NULL) as NULL_FLOAT' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'NULL_FLOAT' : [np.nan]})) + tm.assert_frame_equal(df, DataFrame({'NULL_FLOAT': [np.nan]})) def test_should_properly_handle_timestamp_unix_epoch(self): query = 'SELECT TIMESTAMP("1970-01-01 00:00:00") as UNIX_EPOCH' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'UNIX_EPOCH' : [np.datetime64('1970-01-01T00:00:00.000000Z')]})) + tm.assert_frame_equal(df, DataFrame({'UNIX_EPOCH': [np.datetime64('1970-01-01T00:00:00.000000Z')]})) def test_should_properly_handle_arbitrary_timestamp(self): query = 'SELECT TIMESTAMP("2004-09-15 05:00:00") as VALID_TIMESTAMP' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'VALID_TIMESTAMP' : [np.datetime64('2004-09-15T05:00:00.000000Z')]})) + tm.assert_frame_equal(df, DataFrame({'VALID_TIMESTAMP': [np.datetime64('2004-09-15T05:00:00.000000Z')]})) def test_should_properly_handle_null_timestamp(self): query = 'SELECT TIMESTAMP(NULL) as NULL_TIMESTAMP' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'NULL_TIMESTAMP' :[NaT]})) + tm.assert_frame_equal(df, DataFrame({'NULL_TIMESTAMP': [NaT]})) def test_should_properly_handle_true_boolean(self): query = 'SELECT BOOLEAN(TRUE) as TRUE_BOOLEAN' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'TRUE_BOOLEAN' : [True]})) + tm.assert_frame_equal(df, DataFrame({'TRUE_BOOLEAN': [True]})) def test_should_properly_handle_false_boolean(self): query = 'SELECT BOOLEAN(FALSE) as FALSE_BOOLEAN' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'FALSE_BOOLEAN' : [False]})) + tm.assert_frame_equal(df, DataFrame({'FALSE_BOOLEAN': [False]})) def test_should_properly_handle_null_boolean(self): query = 'SELECT BOOLEAN(NULL) as NULL_BOOLEAN' df = gbq.read_gbq(query, project_id=PROJECT_ID) - tm.assert_frame_equal(df, DataFrame({'NULL_BOOLEAN' : [None]})) + tm.assert_frame_equal(df, DataFrame({'NULL_BOOLEAN': [None]})) def test_unicode_string_conversion_and_normalization(self): correct_test_datatype = DataFrame( - {'UNICODE_STRING' : [u("\xe9\xfc")]} + {'UNICODE_STRING': [u("\xe9\xfc")]} ) query = 'SELECT "\xc3\xa9\xc3\xbc" as UNICODE_STRING' @@ -259,35 +299,35 @@ def test_unicode_string_conversion_and_normalization(self): def test_index_column(self): query = "SELECT 'a' as STRING_1, 'b' as STRING_2" result_frame = gbq.read_gbq(query, project_id=PROJECT_ID, index_col="STRING_1") - correct_frame = DataFrame({'STRING_1' : ['a'], 'STRING_2' : ['b']}).set_index("STRING_1") + correct_frame = DataFrame({'STRING_1': ['a'], 'STRING_2': ['b']}).set_index("STRING_1") tm.assert_equal(result_frame.index.name, correct_frame.index.name) def test_column_order(self): query = "SELECT 'a' as STRING_1, 'b' as STRING_2, 'c' as STRING_3" col_order = ['STRING_3', 'STRING_1', 'STRING_2'] result_frame = gbq.read_gbq(query, project_id=PROJECT_ID, col_order=col_order) - correct_frame = DataFrame({'STRING_1' : ['a'], 'STRING_2' : ['b'], 'STRING_3' : ['c']})[col_order] + correct_frame = DataFrame({'STRING_1': ['a'], 'STRING_2': ['b'], 'STRING_3': ['c']})[col_order] tm.assert_frame_equal(result_frame, correct_frame) def test_column_order_plus_index(self): query = "SELECT 'a' as STRING_1, 'b' as STRING_2, 'c' as STRING_3" col_order = ['STRING_3', 'STRING_2'] result_frame = gbq.read_gbq(query, project_id=PROJECT_ID, index_col='STRING_1', col_order=col_order) - correct_frame = DataFrame({'STRING_1' : ['a'], 'STRING_2' : ['b'], 'STRING_3' : ['c']}) + correct_frame = DataFrame({'STRING_1': ['a'], 'STRING_2': ['b'], 'STRING_3': ['c']}) correct_frame.set_index('STRING_1', inplace=True) correct_frame = correct_frame[col_order] tm.assert_frame_equal(result_frame, correct_frame) def test_malformed_query(self): - with tm.assertRaises(gbq.InvalidQueryException): + with tm.assertRaises(gbq.GenericGBQException): gbq.read_gbq("SELCET * FORM [publicdata:samples.shakespeare]", project_id=PROJECT_ID) def test_bad_project_id(self): - with tm.assertRaises(gbq.NotFoundException): + with tm.assertRaises(gbq.GenericGBQException): gbq.read_gbq("SELECT 1", project_id='001') def test_bad_table_name(self): - with tm.assertRaises(gbq.NotFoundException): + with tm.assertRaises(gbq.GenericGBQException): gbq.read_gbq("SELECT * FROM [publicdata:samples.nope]", project_id=PROJECT_ID) def test_download_dataset_larger_than_200k_rows(self): @@ -304,52 +344,128 @@ def test_zero_rows(self): class TestToGBQIntegration(tm.TestCase): - # This class requires bq.py to be installed for setup/teardown. - # It will also need to be preconfigured with a default dataset, - # so, be sure to `bq init` in terminal before running. + # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 + # As a workaround to this issue, each test should use a unique table name. + # Make sure to modify the for loop range in the tearDownClass when a new test is added + # See `Issue 191 `__ + + @classmethod + def setUpClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests + # described below. - def setUp(self): test_requirements() if not PROJECT_ID: raise nose.SkipTest("Cannot run integration tests without a project id") + if missing_bq(): raise nose.SkipTest("Cannot run to_gbq tests without bq command line client") + subprocess.call(['bq', 'mk', PROJECT_ID + ':pydata_pandas_bq_testing']) + + def setUp(self): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. + pass + @classmethod - def setUpClass(cls): - if PROJECT_ID and not missing_bq(): - subprocess.call(['bq','mk','pydata_pandas_bq_testing']) - subprocess.call(['bq','mk','pydata_pandas_bq_testing.new_test','bools:BOOLEAN,flts:FLOAT,ints:INTEGER,strs:STRING,times:TIMESTAMP']) + def tearDownClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* executing all tests. + + for i in range(1, 8): + if gbq.table_exists('pydata_pandas_bq_testing.new_test' + str(i), PROJECT_ID): + subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing.new_test' + str(i)]) + + subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing']) + + def tearDown(self): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test is executed. + pass def test_upload_data(self): + table_name = 'new_test1' + test_size = 1000001 - #create df to test for all BQ datatypes except RECORD - bools = np.random.randint(2, size=(1,test_size)).astype(bool) - flts = np.random.randn(1,test_size) - ints = np.random.randint(1,10, size=(1,test_size)) - strs = np.random.randint(1,10, size=(1,test_size)).astype(str) - times = [datetime.datetime.now(pytz.timezone('US/Arizona')) for t in xrange(test_size)] - df = DataFrame({'bools':bools[0], 'flts':flts[0], 'ints':ints[0], 'strs':strs[0], 'times':times[0]}, index=range(test_size)) - gbq.to_gbq(df,"pydata_pandas_bq_testing.new_test", project_id=PROJECT_ID, chunksize=10000) - sleep(60) # <- Curses Google!!! - - result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing.new_test", project_id=PROJECT_ID) + df = make_mixed_dataframe_v2(test_size) + + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + + sleep(60) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) self.assertEqual(result['NUM_ROWS'][0], test_size) + def test_upload_data_if_table_exists_fail(self): + table_name = 'new_test2' + + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + + gbq.create_table('pydata_pandas_bq_testing.' + table_name, gbq.generate_bq_schema(df), PROJECT_ID) + + # Test the default value of if_exists is 'fail' + with tm.assertRaises(gbq.TableCreationError): + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID) + + # Test the if_exists parameter with value 'fail' + with tm.assertRaises(gbq.TableCreationError): + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='fail') + + def test_upload_data_if_table_exists_append(self): + table_name = 'new_test3' + + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_different_schema = tm.makeMixedDataFrame() + + # Initialize table with sample data + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + + # Test the if_exists parameter with value 'append' + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='append') + + sleep(60) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size * 2) + + # Try inserting with a different schema, confirm failure + with tm.assertRaises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='append') + + def test_upload_data_if_table_exists_replace(self): + table_name = 'new_test4' + + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_different_schema = tm.makeMixedDataFrame() + + # Initialize table with sample data + gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + + # Test the if_exists parameter with the value 'replace'. + gbq.to_gbq(df_different_schema, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='replace') + + sleep(60) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], 5) + def test_google_upload_errors_should_raise_exception(self): - test_timestamp = datetime.datetime.now(pytz.timezone('US/Arizona')) - bad_df = DataFrame( {'bools': [False, False], - 'flts': [0.0,1.0], - 'ints': [0,'1'], - 'strs': ['a', 1], - 'times': [test_timestamp, test_timestamp] - }, index=range(2)) - with tm.assertRaises(gbq.UnknownGBQException): - gbq.to_gbq(bad_df, 'pydata_pandas_bq_testing.new_test', project_id = PROJECT_ID) + table_name = 'new_test5' - def test_generate_bq_schema(self): + test_timestamp = datetime.now(pytz.timezone('US/Arizona')) + bad_df = DataFrame({'bools': [False, False], 'flts': [0.0, 1.0], 'ints': [0, '1'], 'strs': ['a', 1], + 'times': [test_timestamp, test_timestamp]}, index=range(2)) + + with tm.assertRaises(gbq.StreamingInsertError): + gbq.to_gbq(bad_df, 'pydata_pandas_bq_testing.' + table_name, PROJECT_ID, verbose=True) + def test_generate_bq_schema(self): df = tm.makeMixedDataFrame() schema = gbq.generate_bq_schema(df) @@ -360,13 +476,41 @@ def test_generate_bq_schema(self): self.assertEqual(schema, test_schema) - @classmethod - def tearDownClass(cls): - if PROJECT_ID and not missing_bq(): - subprocess.call(['bq','rm','-f','pydata_pandas_bq_testing.new_test']) - subprocess.call(['bq','rm','-f','pydata_pandas_bq_testing']) + def test_create_bq_table(self): + table_name = 'new_test6' + + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} + + gbq.create_table('pydata_pandas_bq_testing.' + table_name, test_schema, PROJECT_ID) + + self.assertTrue(gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), 'Expected table to exist') + + def test_table_does_not_exist(self): + table_name = 'new_test7' + self.assertTrue(not gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), + 'Expected table not to exist') + + def test_delete_bq_table(self): + table_name = 'new_test8' + + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} + + gbq.create_table('pydata_pandas_bq_testing.' + table_name, test_schema, PROJECT_ID) + + gbq.delete_table('pydata_pandas_bq_testing.' + table_name, PROJECT_ID) + + self.assertTrue(not gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), + 'Expected table not to exist') + + def test_upload_data_dataset_not_found(self): + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + + with tm.assertRaises(gbq.GenericGBQException): + gbq.create_table('pydata_pandas_bq_testing2.new_test', gbq.generate_bq_schema(df), PROJECT_ID) if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], exit=False) -