From 109d47babfb996c16bca2ecee92bff064a32e1cd Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Mon, 19 Feb 2018 11:42:48 +0200 Subject: [PATCH 1/5] BUG: Add support to replace partitions in date-partitioned tables (#47) --- pandas_gbq/gbq.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 382f276b..8db1d2e4 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -699,8 +699,9 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema): table = _Table(self.project_id, dataset_id, private_key=self.private_key) table.delete(table_id) - table.create(table_id, table_schema) - sleep(delay) + if _Table.partition_decorator not in table_id: + table.create(table_id, table_schema) + sleep(delay) def _get_credentials_file(): @@ -1007,6 +1008,8 @@ def _generate_bq_schema(df, default_type='STRING'): class _Table(GbqConnector): + partition_decorator = '$' + def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): self.dataset_id = dataset_id @@ -1036,7 +1039,7 @@ def exists(self, table_id): except self.http_error as ex: self.process_http_error(ex) - def create(self, table_id, schema): + def create(self, table_id, schema, date_partitioned=False): """ Create a table in Google BigQuery given a table and schema Parameters @@ -1046,6 +1049,8 @@ def create(self, table_id, schema): schema : str Use the generate_bq_schema to generate your table schema from a dataframe. + date_partitioned: boolean + Whether table is to be created as a date partitioned table. """ from google.cloud.bigquery import SchemaField from google.cloud.bigquery import Table @@ -1062,6 +1067,9 @@ def create(self, table_id, schema): table_ref = self.client.dataset(self.dataset_id).table(table_id) table = Table(table_ref) + if date_partitioned or '$' in table_id: + table.partitioning_type = 'DAY' + # Manually create the schema objects, adding NULLABLE mode # as a workaround for # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456 From 617a6960f2636dab23466972e0312fa83609ae0a Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Mon, 19 Feb 2018 14:39:23 +0200 Subject: [PATCH 2/5] Add exists check for dpt table --- pandas_gbq/gbq.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 8db1d2e4..afdb9eeb 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -699,7 +699,7 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema): table = _Table(self.project_id, dataset_id, private_key=self.private_key) table.delete(table_id) - if _Table.partition_decorator not in table_id: + if not _Table.is_date_partitioned(table_id): table.create(table_id, table_schema) sleep(delay) @@ -1010,6 +1010,9 @@ class _Table(GbqConnector): partition_decorator = '$' + def is_date_partitioned(self, table_id): + return self.partition_decorator in table_id + def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): self.dataset_id = dataset_id @@ -1031,8 +1034,12 @@ def exists(self, table_id): from google.api_core.exceptions import NotFound table_ref = self.client.dataset(self.dataset_id).table(table_id) + try: - self.client.get_table(table_ref) + table = self.client.get_table(table_ref) + if self.is_date_partitioned(table_id): + return table.num_rows > 0 + return True except NotFound: return False @@ -1067,7 +1074,7 @@ def create(self, table_id, schema, date_partitioned=False): table_ref = self.client.dataset(self.dataset_id).table(table_id) table = Table(table_ref) - if date_partitioned or '$' in table_id: + if date_partitioned or self.is_date_partitioned(table_id): table.partitioning_type = 'DAY' # Manually create the schema objects, adding NULLABLE mode From 8f2b197a769f471c89bf2f9186f13f4b4584ac65 Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Mon, 19 Feb 2018 14:43:10 +0200 Subject: [PATCH 3/5] Fix docstrings --- pandas_gbq/gbq.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index afdb9eeb..d39a7926 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -1023,7 +1023,7 @@ def exists(self, table_id): Parameters ---------- - table : str + table_id : str Name of table to be verified Returns @@ -1051,7 +1051,7 @@ def create(self, table_id, schema, date_partitioned=False): Parameters ---------- - table : str + table_id : str Name of table to be written schema : str Use the generate_bq_schema to generate your table schema from a @@ -1099,7 +1099,7 @@ def delete(self, table_id): Parameters ---------- - table : str + table_id : str Name of table to be deleted """ from google.api_core.exceptions import NotFound @@ -1178,7 +1178,7 @@ def create(self, dataset_id): Parameters ---------- - dataset : str + dataset_id : str Name of dataset to be written """ from google.cloud.bigquery import Dataset @@ -1199,7 +1199,7 @@ def delete(self, dataset_id): Parameters ---------- - dataset : str + dataset_id : str Name of dataset to be deleted """ from google.api_core.exceptions import NotFound @@ -1222,7 +1222,7 @@ def tables(self, dataset_id): Parameters ---------- - dataset : str + dataset_id : str Name of dataset to list tables for Returns From f08707702e61edc9ebe032e785bfef2f09c3ceba Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Mon, 19 Feb 2018 14:43:25 +0200 Subject: [PATCH 4/5] Fix use schema from params --- pandas_gbq/gbq.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index d39a7926..a5805674 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -337,10 +337,6 @@ def get_user_account_credentials(self): This method authenticates using user credentials, either loading saved credentials from a file or by going through the OAuth flow. - Parameters - ---------- - None - Returns ------- GoogleCredentials : credentials @@ -567,7 +563,7 @@ def load_data( try: for remaining_rows in _load.load_chunks( self.client, dataframe, dataset_id, table_id, - chunksize=chunksize): + chunksize=chunksize, schema=schema): self._print("\rLoad is {0}% Complete".format( ((total_rows - remaining_rows) * 100) / total_rows)) except self.http_error as ex: From 4e0d557ecc4162844ff985f5481fa104fb25a012 Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Mon, 19 Feb 2018 16:18:54 +0200 Subject: [PATCH 5/5] Fix dpt check call --- pandas_gbq/gbq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index a5805674..2bc13ed7 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -695,7 +695,7 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema): table = _Table(self.project_id, dataset_id, private_key=self.private_key) table.delete(table_id) - if not _Table.is_date_partitioned(table_id): + if not table.is_date_partitioned(table_id): table.create(table_id, table_schema) sleep(delay)