Skip to content

feat: add WriteDisposition to replace "if_exists='fail'/'replace'/'append'" behavior #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
70927c5
feat: add WriteDispositon to to_gbq
aribray Oct 21, 2022
15de9f3
pass test_load unit tests
aribray Oct 25, 2022
c44f1a4
nox -s blacken
aribray Oct 25, 2022
bc8faf5
test write_disposition parameter
aribray Oct 25, 2022
c3ed7a7
refactor tests for WriteDisposition
aribray Oct 26, 2022
936113e
remove unused import
aribray Oct 26, 2022
8613458
update minimum google-auth version to 2.13.0
aribray Oct 26, 2022
bc09929
update constraints-3.7.txt
aribray Oct 26, 2022
f7394bb
bump google-api-core version to 2.10.2 and google-auth-oauthlib to 0.7.0
aribray Oct 26, 2022
0d45665
remove version constraints from google-api-core
aribray Oct 26, 2022
0e4be09
bump google-cloud-bigquery version to 3.3.5, google-cloud-bigquery-st…
aribray Oct 26, 2022
57c0bf9
bump pandas version, bump db-dbtypes version
aribray Oct 26, 2022
16411c9
fix pandas version
aribray Oct 26, 2022
cb651d2
resolve dependency conflicts
aribray Oct 26, 2022
81736e2
bump dbtypes version
aribray Oct 26, 2022
a006ac0
bump circleci pandas version
aribray Oct 26, 2022
1fa654e
rename conda requirements file
aribray Oct 26, 2022
0407ae5
resetcircleci config pandas version
aribray Oct 26, 2022
3680702
reset pandas version
aribray Oct 26, 2022
846a44d
readjust constraints-3.7 versions
aribray Oct 26, 2022
f44704b
test adding to kokoro
aribray Oct 27, 2022
f3ced37
test removing circleci
aribray Oct 27, 2022
ad45c6d
test removing circleci
aribray Oct 27, 2022
6c771d9
Revert "test removing circleci"
aribray Oct 27, 2022
5f23bd0
add system tests to github workflow
aribray Oct 27, 2022
0a96c98
refactor to_gbq to map 'if_exists' to write_disposition
aribray Oct 27, 2022
8f92fef
fix docstring
aribray Oct 27, 2022
77b2d94
fix docstring
aribray Oct 27, 2022
7b3efc1
fix conda requirements
aribray Oct 27, 2022
3fc748d
remove system tests from github workflow
aribray Oct 27, 2022
0bdef40
adjust circle ci dependencies
aribray Oct 27, 2022
972ce57
drop versions for circleci build
aribray Oct 27, 2022
18b0273
add circleci fixture back to conftest.py
aribray Oct 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 44 additions & 63 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq.exceptions import (
AccessDenied,
GenericGBQException,
)
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp
Expand Down Expand Up @@ -116,20 +113,12 @@ class InvalidSchema(ValueError):
table in BigQuery.
"""

def __init__(
self, message: str, local_schema: Dict[str, Any], remote_schema: Dict[str, Any]
):
super().__init__(message)
self._local_schema = local_schema
self._remote_schema = remote_schema

@property
def local_schema(self) -> Dict[str, Any]:
return self._local_schema
def __init__(self, message: str):
self._message = message

@property
def remote_schema(self) -> Dict[str, Any]:
return self._remote_schema
def message(self) -> str:
return self._message


class NotFoundException(ValueError):
Expand All @@ -155,7 +144,12 @@ class TableCreationError(ValueError):
Raised when the create table method fails
"""

pass
def __init__(self, message: str):
self._message = message

@property
def message(self) -> str:
return self._message


class Context(object):
Expand Down Expand Up @@ -290,9 +284,11 @@ def __init__(
global context
from google.api_core.exceptions import GoogleAPIError
from google.api_core.exceptions import ClientError
from google.api_core.exceptions import BadRequest

from pandas_gbq import auth

self.http_error = (ClientError, GoogleAPIError)
self.http_error = (ClientError, GoogleAPIError, BadRequest)
self.project_id = project_id
self.location = location
self.reauth = reauth
Expand Down Expand Up @@ -379,11 +375,16 @@ def get_client(self):
def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
raise QueryTimeout("Reason: {0}".format(ex))

raise GenericGBQException("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self,
Expand Down Expand Up @@ -577,6 +578,7 @@ def load_data(
self,
dataframe,
destination_table_ref,
write_disposition,
chunksize=None,
schema=None,
progress_bar=True,
Expand All @@ -586,7 +588,6 @@ def load_data(
from pandas_gbq import load

total_rows = len(dataframe)

try:
chunks = load.load_chunks(
self.client,
Expand All @@ -596,6 +597,7 @@ def load_data(
schema=schema,
location=self.location,
api_method=api_method,
write_disposition=write_disposition,
billing_project=billing_project,
)
if progress_bar and tqdm:
Expand All @@ -609,11 +611,6 @@ def load_data(
except self.http_error as ex:
self.process_http_error(ex)

def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema):
table = _Table(project_id, dataset_id, credentials=self.credentials)
table.delete(table_id)
table.create(table_id, table_schema)


def _bqschema_to_nullsafe_dtypes(schema_fields):
"""Specify explicit dtypes based on BigQuery schema.
Expand Down Expand Up @@ -975,11 +972,9 @@ def to_gbq(
):
"""Write a DataFrame to a Google BigQuery table.

The main method a user calls to export pandas DataFrame contents to
Google BigQuery table.
The main method a user calls to export pandas DataFrame contents to Google BigQuery table.

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
Expand Down Expand Up @@ -1114,15 +1109,21 @@ def to_gbq(
stacklevel=2,
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if "." not in destination_table:
raise NotFoundException(
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
"'projectId.datasetId.tableId'"
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if_exists_list = ["fail", "replace", "append"]
dispositions = ["WRITE_EMPTY", "WRITE_TRUNCATE", "WRITE_APPEND"]
dispositions_dict = dict(zip(if_exists_list, dispositions))

write_disposition = dispositions_dict[if_exists]

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -1142,17 +1143,20 @@ def to_gbq(
table_id = destination_table_ref.table_id

default_schema = _generate_bq_schema(dataframe)
# If table_schema isn't provided, we'll create one for you
if not table_schema:
table_schema = default_schema
# It table_schema is provided, we'll update the default_schema to the provided table_schema
else:
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
try:
# Try to get the table
table = bqclient.get_table(destination_table_ref)
except google_exceptions.NotFound:
# If the table doesn't already exist, create it
table_connector = _Table(
project_id_table,
dataset_id,
Expand All @@ -1161,34 +1165,12 @@ def to_gbq(
)
table_connector.create(table_id, table_schema)
else:
# Convert original schema (the schema that already exists) to pandas-gbq API format
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"'append' or 'replace' data."
)
elif if_exists == "replace":
connector.delete_and_recreate_table(
project_id_table, dataset_id, table_id, table_schema
)
else:
if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.",
table_schema,
original_schema,
)

# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(table_schema, original_schema)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand All @@ -1198,6 +1180,7 @@ def to_gbq(
connector.load_data(
dataframe,
destination_table_ref,
write_disposition=write_disposition,
chunksize=chunksize,
schema=table_schema,
progress_bar=progress_bar,
Expand Down Expand Up @@ -1294,7 +1277,6 @@ def exists(self, table_id):

def create(self, table_id, schema):
"""Create a table in Google BigQuery given a table and schema

Parameters
----------
table : str
Expand Down Expand Up @@ -1332,7 +1314,6 @@ def create(self, table_id, schema):

def delete(self, table_id):
"""Delete a table in Google BigQuery

Parameters
----------
table : str
Expand Down
20 changes: 13 additions & 7 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,19 @@ def load_parquet(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
schema: Optional[Dict[str, Any]],
billing_project: Optional[str] = None,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"
job_config.write_disposition = write_disposition
job_config.source_format = "PARQUET"

if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
dataframe = cast_dataframe_for_parquet(dataframe, schema)

try:
client.load_table_from_dataframe(
dataframe,
Expand All @@ -143,13 +142,14 @@ def load_parquet(

def load_csv(
dataframe: pandas.DataFrame,
write_disposition: str,
chunksize: Optional[int],
bq_schema: Optional[List[bigquery.SchemaField]],
load_chunk: Callable,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"

job_config.write_disposition = write_disposition
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

Expand All @@ -167,6 +167,7 @@ def load_csv_from_dataframe(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand All @@ -187,13 +188,14 @@ def load_chunk(chunk, job_config):
project=billing_project,
).result()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_csv_from_file(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand Down Expand Up @@ -223,7 +225,7 @@ def load_chunk(chunk, job_config):
finally:
chunk_buffer.close()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_chunks(
Expand All @@ -234,13 +236,15 @@ def load_chunks(
schema=None,
location=None,
api_method="load_parquet",
write_disposition="WRITE_EMPTY",
billing_project: Optional[str] = None,
):
if api_method == "load_parquet":
load_parquet(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
schema,
billing_project=billing_project,
Expand All @@ -253,6 +257,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand All @@ -263,6 +268,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand Down
4 changes: 2 additions & 2 deletions samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
google-cloud-bigquery-storage==2.15.0
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.16.2
google-cloud-bigquery==3.3.5
pandas-gbq==0.17.8
pandas===1.3.5; python_version == '3.7'
pandas==1.4.4; python_version >= '3.8'
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
release_status = "Development Status :: 4 - Beta"
dependencies = [
"setuptools",
"db-dtypes >=0.3.1,<2.0.0",
"db-dtypes >=1.0.4,<2.0.0",
"numpy >=1.16.6",
"pandas >=0.24.2",
"pandas >=1.1.4",
"pyarrow >=3.0.0, <10.0dev",
"pydata-google-auth",
# Note: google-api-core and google-auth are also included via transitive
Expand All @@ -39,7 +39,7 @@
# https://github.com/googleapis/python-bigquery-pandas/issues/365
# Exclude 2.4.* because it has a bug where waiting for the query can hang
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
"google-cloud-bigquery >=1.27.2,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=1.1.0,<3.0.0dev",
]
extras = {
Expand Down
14 changes: 7 additions & 7 deletions testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
#
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
db-dtypes==0.3.1
google-api-core==1.31.5
google-auth==1.25.0
google-auth-oauthlib==0.0.1
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
db-dtypes==1.0.4
google-api-core==2.10.2
google-auth==2.13.0
google-auth-oauthlib==0.7.0
google-cloud-bigquery==3.3.5
google-cloud-bigquery-storage==2.16.2
numpy==1.16.6
pandas==0.24.2
pandas==1.1.4
pyarrow==3.0.0
pydata-google-auth==0.1.2
tqdm==4.23.0
Expand Down
Loading