Skip to content

Commit b8b631e

Browse files
committed
BUG: Add support to replace partitions in date-partitioned tables (#47)
1 parent cec8c86 commit b8b631e

File tree

3 files changed

+99
-10
lines changed

3 files changed

+99
-10
lines changed

docs/source/changelog.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Changelog
66

77
- Resolve issue where the optional ``--noauth_local_webserver`` command line argument would not be propagated during the authentication process. (:issue:`35`)
88
- Drop support for Python 3.4 (:issue:`40`)
9+
- Add support to replace partitions in date-partitioned tables (:issue:`47`)
910

1011
0.1.6 / 2017-05-03
1112
------------------

pandas_gbq/gbq.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -840,17 +840,27 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
840840
"already exists. "
841841
"Change the if_exists parameter to "
842842
"append or replace data.")
843-
elif if_exists == 'replace':
844-
connector.delete_and_recreate_table(
845-
dataset_id, table_id, table_schema)
846-
elif if_exists == 'append':
843+
else:
844+
delay = 0
847845
if not connector.verify_schema(dataset_id, table_id, table_schema):
848-
raise InvalidSchema("Please verify that the structure and "
849-
"data types in the DataFrame match the "
850-
"schema of the destination table.")
846+
if if_exists == 'append' or table.partition_decorator in table_id:
847+
raise InvalidSchema("Please verify that the structure and "
848+
"data types in the DataFrame match the "
849+
"schema of the destination table.")
850+
elif if_exists == 'replace':
851+
table._print('The existing table has a different schema. Please '
852+
'wait 2 minutes. See Google BigQuery issue #191')
853+
delay = 120
854+
if if_exists == 'replace':
855+
table.delete(table_id)
856+
if table.partition_decorator not in table_id:
857+
table.create(table_id, table_schema)
858+
sleep(delay)
859+
851860
else:
861+
if table.partition_decorator in table_id:
862+
raise TableCreationError("Cannot create a partition without the main table.")
852863
table.create(table_id, table_schema)
853-
854864
connector.load_data(dataframe, dataset_id, table_id, chunksize)
855865

856866

@@ -893,6 +903,8 @@ def _generate_bq_schema(df, default_type='STRING'):
893903

894904
class _Table(GbqConnector):
895905

906+
partition_decorator = '$'
907+
896908
def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
897909
private_key=None):
898910
try:

pandas_gbq/tests/test_gbq.py

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222

2323
TABLE_ID = 'new_test'
24+
DPT_TABLE_ID = 'dpt_test'
2425

2526

2627
_IMPORTS = False
@@ -407,7 +408,7 @@ def test_should_return_bigquery_strings_as_python_strings(self):
407408
def test_to_gbq_should_fail_if_invalid_table_name_passed(self):
408409
with tm.assertRaises(gbq.NotFoundException):
409410
gbq.to_gbq(DataFrame(), 'invalid_table_name', project_id="1234")
410-
411+
411412
def test_to_gbq_with_no_project_id_given_should_fail(self):
412413
with tm.assertRaises(TypeError):
413414
gbq.to_gbq(DataFrame(), 'dataset.tablename')
@@ -996,6 +997,8 @@ def setup_method(self, method):
996997
private_key=_get_private_key_path())
997998
self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
998999
TABLE_ID)
1000+
self.destination_date_partitioned_table = "{0}{1}.{2}".format(self.dataset_prefix, "1",
1001+
DPT_TABLE_ID)
9991002
self.dataset.create(self.dataset_prefix + "1")
10001003

10011004
@classmethod
@@ -1094,6 +1097,79 @@ def test_upload_data_if_table_exists_replace(self):
10941097
private_key=_get_private_key_path())
10951098
assert result['num_rows'][0] == 5
10961099

1100+
def test_upload_data_if_table_exists_replace_dpt_partition(self):
1101+
# Issue #47; tests that 'replace' is done by the subsequent call
1102+
test_dpt_suffix = "20170101"
1103+
test_size = 10
1104+
df = make_mixed_dataframe_v2(test_size)
1105+
df_different_schema = tm.makeMixedDataFrame()
1106+
1107+
dpt_partition = self.destination_date_partitioned_table + '$' + test_dpt_suffix
1108+
1109+
gbq.to_gbq(df, dpt_partition, _get_project_id(),
1110+
chunksize=10000, private_key=_get_private_key_path())
1111+
1112+
gbq.to_gbq(df_different_schema, dpt_partition,
1113+
_get_project_id(), if_exists='replace',
1114+
private_key=_get_private_key_path())
1115+
1116+
sleep(30)
1117+
1118+
# Test whole table
1119+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1120+
.format(self.destination_date_partitioned_table),
1121+
project_id=_get_project_id(),
1122+
private_key=_get_private_key_path())
1123+
assert result0['num_rows'][0] == 5
1124+
1125+
# Test destination partition
1126+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1127+
.format(dpt_partition),
1128+
project_id=_get_project_id(),
1129+
private_key=_get_private_key_path())
1130+
assert result1['num_rows'][0] == 5
1131+
1132+
def test_upload_data_if_table_exists_append_dpt_partition(self):
1133+
# Issue #47; tests that 'append' appends to an existing partition
1134+
test_dpt_suffix = "20170101"
1135+
test_size = 10
1136+
df = make_mixed_dataframe_v2(test_size)
1137+
1138+
dpt_partition = self.destination_date_partitioned_table + '$' + test_dpt_suffix
1139+
1140+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1141+
.format(dpt_partition),
1142+
project_id=_get_project_id(),
1143+
private_key=_get_private_key_path())
1144+
assert result0['num_rows'][0] == 5
1145+
1146+
gbq.to_gbq(df, dpt_partition,
1147+
_get_project_id(), if_exists='append',
1148+
private_key=_get_private_key_path())
1149+
1150+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1151+
.format(dpt_partition),
1152+
project_id=_get_project_id(),
1153+
private_key=_get_private_key_path())
1154+
1155+
assert result1['num_rows'][0] == 15
1156+
1157+
sleep(30)
1158+
1159+
# Test whole table
1160+
result0 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1161+
.format(self.destination_date_partitioned_table),
1162+
project_id=_get_project_id(),
1163+
private_key=_get_private_key_path())
1164+
assert result0['num_rows'][0] == 5
1165+
1166+
# Test destination partition
1167+
result1 = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1168+
.format(dpt_partition),
1169+
project_id=_get_project_id(),
1170+
private_key=_get_private_key_path())
1171+
assert result1['num_rows'][0] == 10
1172+
10971173
def test_upload_data_if_table_exists_raises_value_error(self):
10981174
test_id = "4"
10991175
test_size = 10
@@ -1117,7 +1193,7 @@ def test_google_upload_errors_should_raise_exception(self):
11171193
with tm.assertRaises(gbq.StreamingInsertError):
11181194
gbq.to_gbq(bad_df, self.destination_table + test_id,
11191195
_get_project_id(), private_key=_get_private_key_path())
1120-
1196+
11211197
def test_generate_schema(self):
11221198
df = tm.makeMixedDataFrame()
11231199
schema = gbq._generate_bq_schema(df)

0 commit comments

Comments
 (0)