Skip to content

deps: support pandas 2.2 #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions bigframes/core/joins/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from __future__ import annotations

import typing
from typing import Literal, Optional

from bigframes.dataframe import DataFrame
from bigframes.series import Series
# Avoid cirular imports.
if typing.TYPE_CHECKING:
import bigframes.dataframe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like disambiguating "DataFrame..." should we just always qualify it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. I have a bug open to do that. Issue 296390934. Fully qualified names will also help with docs rendering.

import bigframes.series


def merge(
left: DataFrame,
right: DataFrame,
left: bigframes.dataframe.DataFrame,
right: bigframes.dataframe.DataFrame,
how: Literal[
"inner",
"left",
Expand All @@ -40,7 +43,7 @@ def merge(
right_on: Optional[str] = None,
sort: bool = False,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> DataFrame:
) -> bigframes.dataframe.DataFrame:
left = _validate_operand(left)
right = _validate_operand(right)

Expand All @@ -55,14 +58,19 @@ def merge(
)


def _validate_operand(obj: DataFrame | Series) -> DataFrame:
if isinstance(obj, DataFrame):
def _validate_operand(
obj: bigframes.dataframe.DataFrame | bigframes.series.Series,
) -> bigframes.dataframe.DataFrame:
import bigframes.dataframe
import bigframes.series

if isinstance(obj, bigframes.dataframe.DataFrame):
return obj
elif isinstance(obj, Series):
elif isinstance(obj, bigframes.series.Series):
if obj.name is None:
raise ValueError("Cannot merge a Series without a name")
raise ValueError("Cannot merge a bigframes.series.Series without a name")
return obj.to_frame()
else:
raise TypeError(
f"Can only merge Series or DataFrame objects, a {type(obj)} was passed"
f"Can only merge bigframes.series.Series or bigframes.dataframe.DataFrame objects, a {type(obj)} was passed"
)
13 changes: 10 additions & 3 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def split_index(


def get_standardized_ids(
col_labels: Iterable[Hashable], idx_labels: Iterable[Hashable] = ()
col_labels: Iterable[Hashable],
idx_labels: Iterable[Hashable] = (),
strict: bool = False,
) -> tuple[list[str], list[str]]:
"""Get stardardized column ids as column_ids_list, index_ids_list.
The standardized_column_id must be valid BQ SQL schema column names, can only be string type and unique.
Expand All @@ -84,11 +86,15 @@ def get_standardized_ids(
Tuple of (standardized_column_ids, standardized_index_ids)
"""
col_ids = [
UNNAMED_COLUMN_ID if col_label is None else label_to_identifier(col_label)
UNNAMED_COLUMN_ID
if col_label is None
else label_to_identifier(col_label, strict=strict)
for col_label in col_labels
]
idx_ids = [
UNNAMED_INDEX_ID if idx_label is None else label_to_identifier(idx_label)
UNNAMED_INDEX_ID
if idx_label is None
else label_to_identifier(idx_label, strict=strict)
for idx_label in idx_labels
]

Expand All @@ -107,6 +113,7 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str:
# Column values will be loaded as null if the column name has spaces.
# https://github.com/googleapis/python-bigquery/issues/1566
identifier = str(label).replace(" ", "_")

if strict:
identifier = re.sub(r"[^a-zA-Z0-9_]", "", identifier)
if not identifier:
Expand Down
2 changes: 1 addition & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@
if typing.TYPE_CHECKING:
import bigframes.session

SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

ERROR_IO_ONLY_GS_PATHS = f"Only Google Cloud Storage (gs://...) paths are supported. {constants.FEEDBACK_LINK}"
ERROR_IO_REQUIRES_WILDCARD = (
Expand Down
30 changes: 25 additions & 5 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import bigframes.core.ordering as order
import bigframes.core.traversal as traversals
import bigframes.core.utils as utils
import bigframes.dataframe as dataframe
import bigframes.dtypes
import bigframes.formatting_helpers as formatting_helpers
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
Expand All @@ -93,6 +92,10 @@
import bigframes.session.clients
import bigframes.version

# Avoid circular imports.
if typing.TYPE_CHECKING:
import bigframes.dataframe as dataframe

_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"

_MAX_CLUSTER_COLUMNS = 4
Expand Down Expand Up @@ -557,6 +560,8 @@ def _read_gbq_query(
api_name: str = "read_gbq_query",
use_cache: Optional[bool] = None,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

configuration = _transform_read_gbq_configuration(configuration)

if "query" not in configuration:
Expand Down Expand Up @@ -754,6 +759,8 @@ def _read_gbq_table(
api_name: str,
use_cache: bool = True,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

if max_results and max_results <= 0:
raise ValueError("`max_results` should be a positive number.")

Expand Down Expand Up @@ -989,6 +996,8 @@ def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame
def _read_pandas(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

if isinstance(pandas_dataframe, dataframe.DataFrame):
raise ValueError(
"read_pandas() expects a pandas.DataFrame, but got a "
Expand All @@ -1003,6 +1012,8 @@ def _read_pandas(
def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> Optional[dataframe.DataFrame]:
import bigframes.dataframe as dataframe

if pandas_dataframe.size > MAX_INLINE_DF_SIZE:
return None

Expand All @@ -1024,11 +1035,20 @@ def _read_pandas_inline(
def _read_pandas_load_job(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

col_index = pandas_dataframe.columns.copy()
col_labels, idx_labels = (
pandas_dataframe.columns.to_list(),
col_index.to_list(),
pandas_dataframe.index.names,
)
new_col_ids, new_idx_ids = utils.get_standardized_ids(col_labels, idx_labels)
new_col_ids, new_idx_ids = utils.get_standardized_ids(
col_labels,
idx_labels,
# Loading parquet files into BigQuery with special column names
# is only supported under an allowlist.
strict=True,
)

# Add order column to pandas DataFrame to preserve order in BigQuery
ordering_col = "rowid"
Expand All @@ -1047,7 +1067,7 @@ def _read_pandas_load_job(

# Specify the datetime dtypes, which is auto-detected as timestamp types.
schema: list[bigquery.SchemaField] = []
for column, dtype in zip(pandas_dataframe.columns, pandas_dataframe.dtypes):
for column, dtype in zip(new_col_ids, pandas_dataframe.dtypes):
if dtype == "timestamp[us][pyarrow]":
schema.append(
bigquery.SchemaField(column, bigquery.enums.SqlTypeNames.DATETIME)
Expand Down Expand Up @@ -1101,7 +1121,7 @@ def _read_pandas_load_job(
block = blocks.Block(
array_value,
index_columns=new_idx_ids,
column_labels=col_labels,
column_labels=col_index,
index_labels=idx_labels,
)
return dataframe.DataFrame(block)
Expand Down
3 changes: 1 addition & 2 deletions bigframes/session/_io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import bigframes
from bigframes.core import log_adapter
import bigframes.formatting_helpers as formatting_helpers
import bigframes.session._io.bigquery as bigframes_io

IO_ORDERING_ID = "bqdf_row_nums"
MAX_LABELS_COUNT = 64
Expand Down Expand Up @@ -226,7 +225,7 @@ def start_query_with_client(
Starts query job and waits for results.
"""
api_methods = log_adapter.get_and_reset_api_methods()
job_config.labels = bigframes_io.create_job_configs_labels(
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels, api_methods=api_methods
)

Expand Down
37 changes: 14 additions & 23 deletions notebooks/visualization/bq_dataframes_covid_line_graphs.ipynb

Large diffs are not rendered by default.

15 changes: 2 additions & 13 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,22 +556,11 @@ def prerelease(session: nox.sessions.Session, tests_path):
"--prefer-binary",
"--pre",
"--upgrade",
# TODO(shobs): Remove excluding version 2.1.4 after
# https://github.com/pandas-dev/pandas/issues/56463 is resolved.
#
# TODO(shobs): Remove excluding version 2.2.0rc0 after
# https://github.com/pandas-dev/pandas/issues/56646 and
# https://github.com/pandas-dev/pandas/issues/56651 are resolved.
#
# TODO(shobs): Remove excluding version 2.2.0 after
# https://github.com/googleapis/python-bigquery-dataframes/issues/341
# https://github.com/googleapis/python-bigquery-dataframes/issues/337
# are resolved
#
# We exclude each version individually so that we can continue to test
# some prerelease packages. See:
# https://github.com/googleapis/python-bigquery-dataframes/pull/268#discussion_r1423205172
"pandas!=2.1.4, !=2.2.0rc0, !=2.2.0, !=2.2.1",
# "pandas!=2.1.4, !=2.2.0rc0, !=2.2.0, !=2.2.1",
"pandas",
)
already_installed.add("pandas")

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"google-cloud-storage >=2.0.0",
"ibis-framework[bigquery] >=8.0.0,<9.0.0dev",
# TODO: Relax upper bound once we have fixed `system_prerelease` tests.
"pandas >=1.5.0,<2.1.4",
"pandas >=1.5.0",
"pydata-google-auth >=1.8.2",
"requests >=2.27.1",
"scikit-learn >=1.2.2",
Expand Down
1 change: 1 addition & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import test_utils.prefixer

import bigframes
import bigframes.dataframe
import tests.system.utils

# Use this to control the number of cloud functions being deleted in a single
Expand Down
22 changes: 14 additions & 8 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def bq_cf_connection() -> str:
def test_remote_function_multiply_with_ibis(
session,
scalars_table_id,
bigquery_client,
ibis_client,
dataset_id,
bq_cf_connection,
Expand All @@ -134,20 +135,22 @@ def test_remote_function_multiply_with_ibis(
def multiply(x, y):
return x * y

project_id, dataset_name, table_name = scalars_table_id.split(".")
_, dataset_name, table_name = scalars_table_id.split(".")
if not ibis_client.dataset:
ibis_client.dataset = dataset_name

col_name = "int64_col"
table = ibis_client.tables[table_name]
table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10)
pandas_df_orig = table.execute()
sql = table.compile()
pandas_df_orig = bigquery_client.query(sql).to_dataframe()

col = table[col_name]
col_2x = multiply(col, 2).name("int64_col_2x")
col_square = multiply(col, col).name("int64_col_square")
table = table.mutate([col_2x, col_square])
pandas_df_new = table.execute()
sql = table.compile()
pandas_df_new = bigquery_client.query(sql).to_dataframe()

pandas.testing.assert_series_equal(
pandas_df_orig[col_name] * 2,
Expand All @@ -163,14 +166,15 @@ def multiply(x, y):
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, multiply
bigquery_client, session.cloudfunctionsclient, multiply
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_stringify_with_ibis(
session,
scalars_table_id,
bigquery_client,
ibis_client,
dataset_id,
bq_cf_connection,
Expand All @@ -187,19 +191,21 @@ def test_remote_function_stringify_with_ibis(
def stringify(x):
return f"I got {x}"

project_id, dataset_name, table_name = scalars_table_id.split(".")
_, dataset_name, table_name = scalars_table_id.split(".")
if not ibis_client.dataset:
ibis_client.dataset = dataset_name

col_name = "int64_col"
table = ibis_client.tables[table_name]
table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10)
pandas_df_orig = table.execute()
sql = table.compile()
pandas_df_orig = bigquery_client.query(sql).to_dataframe()

col = table[col_name]
col_2x = stringify(col).name("int64_str_col")
table = table.mutate([col_2x])
pandas_df_new = table.execute()
sql = table.compile()
pandas_df_new = bigquery_client.query(sql).to_dataframe()

pandas.testing.assert_series_equal(
pandas_df_orig[col_name].apply(lambda x: f"I got {x}"),
Expand All @@ -209,7 +215,7 @@ def stringify(x):
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, stringify
bigquery_client, session.cloudfunctionsclient, stringify
)


Expand Down
4 changes: 3 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2500,6 +2500,8 @@ def test_df_pivot(scalars_dfs, values, index, columns):
pd_result = scalars_pandas_df.pivot(values=values, index=index, columns=columns)

# Pandas produces NaN, where bq dataframes produces pd.NA
bf_result = bf_result.fillna(float("nan"))
pd_result = pd_result.fillna(float("nan"))
pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)


Expand Down Expand Up @@ -4026,7 +4028,7 @@ def test_to_pandas_downsampling_option_override(session):

total_memory_bytes = df.memory_usage(deep=True).sum()
total_memory_mb = total_memory_bytes / (1024 * 1024)
assert total_memory_mb == pytest.approx(download_size, rel=0.3)
assert total_memory_mb == pytest.approx(download_size, rel=0.5)


def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_created):
Expand Down
4 changes: 3 additions & 1 deletion tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ def test_dataframe_groupby_multi_sum(
(lambda x: x.cumsum(numeric_only=True)),
(lambda x: x.cummax(numeric_only=True)),
(lambda x: x.cummin(numeric_only=True)),
(lambda x: x.cumprod()),
# pandas 2.2 uses floating point for cumulative product even for
# integer inputs.
(lambda x: x.cumprod().astype("Float64")),
(lambda x: x.shift(periods=2)),
],
ids=[
Expand Down
3 changes: 2 additions & 1 deletion tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def test_read_pandas_multi_index_axes():
[[1, 2], [3, 4]], index=index, columns=columns, dtype=pandas.Int64Dtype()
)
bf_df = bpd.DataFrame(pandas_df)
bf_df_computed = bf_df.to_pandas()

pandas.testing.assert_frame_equal(bf_df.to_pandas(), pandas_df)
pandas.testing.assert_frame_equal(bf_df_computed, pandas_df)


# Row Multi-index tests
Expand Down
Loading