Skip to content

Commit 96c2777

Browse files
committed
BUG: Add support to replace partitions in date-partitioned tables (#47)
1 parent 79c9067 commit 96c2777

File tree

3 files changed

+145
-31
lines changed

3 files changed

+145
-31
lines changed

docs/source/changelog.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Changelog
1414
- Use the `google-auth <https://google-auth.readthedocs.io/en/latest/>`__ library for authentication because ``oauth2client`` is deprecated. (:issue:`39`)
1515
- :func:`read_gbq` now has a ``auth_local_webserver`` boolean argument for controlling whether to use web server or console flow when getting user credentials. Replaces `--noauth_local_webserver` command line argument. (:issue:`35`)
1616
- :func:`read_gbq` now displays the BigQuery Job ID and standard price in verbose output. (:issue:`70` and :issue:`71`)
17+
- Add support to replace partitions in `date-partitioned tables <https://cloud.google.com/bigquery/docs/partitioned-tables>`__. Partition must be specified with a partition decorator separator (``$``). (:issue:`47`)
1718

1819
0.1.6 / 2017-05-03
1920
------------------

pandas_gbq/gbq.py

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -769,26 +769,6 @@ def schema_is_subset(self, dataset_id, table_id, schema):
769769

770770
return all(field in fields_remote for field in fields_local)
771771

772-
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
773-
delay = 0
774-
775-
# Changes to table schema may take up to 2 minutes as of May 2015 See
776-
# `Issue 191
777-
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
778-
# Compare previous schema with new schema to determine if there should
779-
# be a 120 second delay
780-
781-
if not self.verify_schema(dataset_id, table_id, table_schema):
782-
self._print('The existing table has a different schema. Please '
783-
'wait 2 minutes. See Google BigQuery issue #191')
784-
delay = 120
785-
786-
table = _Table(self.project_id, dataset_id,
787-
private_key=self.private_key)
788-
table.delete(table_id)
789-
table.create(table_id, table_schema)
790-
sleep(delay)
791-
792772

793773
def _parse_data(schema, rows):
794774
# see:
@@ -1053,19 +1033,33 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
10531033
"already exists. "
10541034
"Change the if_exists parameter to "
10551035
"append or replace data.")
1056-
elif if_exists == 'replace':
1057-
connector.delete_and_recreate_table(
1058-
dataset_id, table_id, table_schema)
1059-
elif if_exists == 'append':
1036+
else:
1037+
delay = 0
10601038
if not connector.schema_is_subset(dataset_id,
10611039
table_id,
10621040
table_schema):
1063-
raise InvalidSchema("Please verify that the structure and "
1064-
"data types in the DataFrame match the "
1065-
"schema of the destination table.")
1066-
else:
1067-
table.create(table_id, table_schema)
1041+
if if_exists == 'append' \
1042+
or table.partition_decorator in table_id:
1043+
raise InvalidSchema("Please verify that the structure "
1044+
"and data types in the DataFrame "
1045+
"match the schema of the destination "
1046+
"table.")
1047+
elif if_exists == 'replace':
1048+
table._print('The existing table has a different schema. '
1049+
'Please wait 2 minutes. See Google BigQuery '
1050+
'issue #191')
1051+
delay = 120
1052+
if if_exists == 'replace':
1053+
table.delete(table_id)
1054+
if table.partition_decorator not in table_id:
1055+
table.create(table_id, table_schema)
1056+
sleep(delay)
10681057

1058+
else:
1059+
is_dpt = table.partition_decorator in table_id
1060+
table.create(table_id.split('$')[0],
1061+
table_schema,
1062+
date_partitioned=is_dpt)
10691063
connector.load_data(dataframe, dataset_id, table_id, chunksize)
10701064

10711065

@@ -1108,6 +1102,8 @@ def _generate_bq_schema(df, default_type='STRING'):
11081102

11091103
class _Table(GbqConnector):
11101104

1105+
partition_decorator = '$'
1106+
11111107
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
11121108
private_key=None):
11131109
try:
@@ -1144,7 +1140,7 @@ def exists(self, table_id):
11441140
else:
11451141
self.process_http_error(ex)
11461142

1147-
def create(self, table_id, schema):
1143+
def create(self, table_id, schema, date_partitioned=False):
11481144
""" Create a table in Google BigQuery given a table and schema
11491145
11501146
Parameters
@@ -1154,6 +1150,8 @@ def create(self, table_id, schema):
11541150
schema : str
11551151
Use the generate_bq_schema to generate your table schema from a
11561152
dataframe.
1153+
date_partitioned: boolean
1154+
Whether table is to be created as a date partitioned table.
11571155
"""
11581156

11591157
if self.exists(table_id):
@@ -1173,6 +1171,9 @@ def create(self, table_id, schema):
11731171
'datasetId': self.dataset_id
11741172
}
11751173
}
1174+
if date_partitioned:
1175+
# The only type supported is DAY
1176+
body.update({'timePartitioning': {'type': 'DAY'}})
11761177

11771178
try:
11781179
self.service.tables().insert(

pandas_gbq/tests/test_gbq.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22

33
import re
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
import pytz
66
from time import sleep
77
import os
@@ -20,6 +20,7 @@
2020

2121

2222
TABLE_ID = 'new_test'
23+
DPT_TABLE_ID = 'dpt_test'
2324

2425

2526
def _skip_if_no_project_id():
@@ -946,6 +947,8 @@ def setup_method(self, method):
946947
private_key=_get_private_key_path())
947948
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
948949
TABLE_ID)
950+
self.destination_dpt = "{0}{1}.{2}".format(self.dataset_prefix, "1",
951+
DPT_TABLE_ID)
949952
self.dataset.create(self.dataset_prefix + "1")
950953

951954
@classmethod
@@ -1080,6 +1083,115 @@ def test_upload_data_if_table_exists_replace(self):
10801083
private_key=_get_private_key_path())
10811084
assert result['num_rows'][0] == 5
10821085

1086+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1087+
# Issue #47; tests that 'replace' is done by the subsequent call
1088+
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
1089+
test_size = 10
1090+
df = make_mixed_dataframe_v2(test_size)
1091+
df_different_schema = tm.makeMixedDataFrame()
1092+
1093+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1094+
self.table.create(DPT_TABLE_ID,
1095+
gbq._generate_bq_schema(df),
1096+
date_partitioned=True)
1097+
1098+
with pytest.raises(gbq.InvalidSchema):
1099+
gbq.to_gbq(df_different_schema, dpt_partition,
1100+
_get_project_id(), if_exists='replace',
1101+
private_key=_get_private_key_path())
1102+
1103+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1104+
private_key=_get_private_key_path(),
1105+
if_exists='replace')
1106+
1107+
# Test partition
1108+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1109+
.format(self.destination_dpt),
1110+
project_id=_get_project_id(),
1111+
private_key=_get_private_key_path())
1112+
assert result0['num_rows'][0] == 10
1113+
1114+
# Test whole table
1115+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1116+
.format(self.destination_dpt),
1117+
project_id=_get_project_id(),
1118+
private_key=_get_private_key_path())
1119+
assert result1['num_rows'][0] == 10
1120+
1121+
self.table.delete(DPT_TABLE_ID)
1122+
1123+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1124+
# Issue #47; tests that 'append' appends to an existing partition
1125+
test_dpt_suffix = (datetime.now() +
1126+
timedelta(days=1)).strftime('%Y%m%d')
1127+
test_size = 10
1128+
df = make_mixed_dataframe_v2(test_size)
1129+
1130+
self.table.create(DPT_TABLE_ID,
1131+
gbq._generate_bq_schema(df),
1132+
date_partitioned=True)
1133+
1134+
dpt_partition = self.destination_dpt + '$' + test_dpt_suffix
1135+
1136+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1137+
.format(self.destination_dpt),
1138+
project_id=_get_project_id(),
1139+
private_key=_get_private_key_path())
1140+
1141+
assert result0['num_rows'][0] == 0
1142+
1143+
gbq.to_gbq(df, dpt_partition,
1144+
_get_project_id(), if_exists='append',
1145+
private_key=_get_private_key_path())
1146+
1147+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1148+
.format(dpt_partition),
1149+
project_id=_get_project_id(),
1150+
private_key=_get_private_key_path())
1151+
1152+
assert result1['num_rows'][0] == 10
1153+
1154+
gbq.to_gbq(df.head(), dpt_partition,
1155+
_get_project_id(), if_exists='append',
1156+
private_key=_get_private_key_path())
1157+
1158+
# Test destination partition
1159+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1160+
.format(dpt_partition),
1161+
project_id=_get_project_id(),
1162+
private_key=_get_private_key_path())
1163+
1164+
assert result1['num_rows'][0] == 15
1165+
1166+
def test_table_creation_error_raised_when_dpt_exists(self):
1167+
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
1168+
test_size = 10
1169+
df = make_mixed_dataframe_v2(test_size)
1170+
table_name = 'foobar'
1171+
self.table.create(table_name,
1172+
gbq._generate_bq_schema(df),
1173+
date_partitioned=True)
1174+
dpt_partition = table_name + '$' + test_dpt_suffix
1175+
with pytest.raises(gbq.TableCreationError):
1176+
gbq.to_gbq(df,
1177+
self.table.dataset_id + '.' + dpt_partition,
1178+
project_id=_get_project_id(),
1179+
private_key=_get_private_key_path())
1180+
self.table.delete(table_name)
1181+
sleep(30)
1182+
1183+
def test_table_created_from_dpt_suffixed_id(self):
1184+
test_dpt_suffix = datetime.now().strftime('%Y%m%d')
1185+
test_size = 10
1186+
df = make_mixed_dataframe_v2(test_size)
1187+
table_name = 'foobar'
1188+
dpt_partition = table_name + '$' + test_dpt_suffix
1189+
gbq.to_gbq(df,
1190+
self.table.dataset_id + '.' + dpt_partition,
1191+
project_id=_get_project_id(),
1192+
private_key=_get_private_key_path())
1193+
self.table.delete(table_name)
1194+
10831195
def test_upload_data_if_table_exists_raises_value_error(self):
10841196
test_id = "4"
10851197
test_size = 10

0 commit comments

Comments
 (0)