Skip to content

fix: renable to_csv and to_json related tests #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyarrow as pa
import pytest

from tests.system.utils import assert_pandas_df_equal, convert_pandas_dtypes
from tests.system import utils

try:
import pandas_gbq # type: ignore
Expand Down Expand Up @@ -115,7 +115,6 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index):
pd.testing.assert_series_equal(actual, expected)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
@pytest.mark.parametrize(
("index"),
[True, False],
Expand Down Expand Up @@ -150,12 +149,12 @@ def test_to_csv_index(
# read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string
dtype.pop("bytes_col")
gcs_df = pd.read_csv(
path,
utils.get_first_file_from_wildcard(path),
dtype=dtype,
date_format={"timestamp_col": "YYYY-MM-DD HH:MM:SS Z"},
index_col=index_col,
)
convert_pandas_dtypes(gcs_df, bytes_col=True)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df.index.name = scalars_df.index.name

scalars_pandas_df = scalars_pandas_df.copy()
Expand All @@ -164,7 +163,6 @@ def test_to_csv_index(
pd.testing.assert_frame_equal(gcs_df, scalars_pandas_df)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_csv_tabs(
scalars_dfs: Tuple[bigframes.dataframe.DataFrame, pd.DataFrame],
gcs_folder: str,
Expand All @@ -189,13 +187,13 @@ def test_to_csv_tabs(
# read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string
dtype.pop("bytes_col")
gcs_df = pd.read_csv(
path,
utils.get_first_file_from_wildcard(path),
sep="\t",
dtype=dtype,
date_format={"timestamp_col": "YYYY-MM-DD HH:MM:SS Z"},
index_col=index_col,
)
convert_pandas_dtypes(gcs_df, bytes_col=True)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df.index.name = scalars_df.index.name

scalars_pandas_df = scalars_pandas_df.copy()
Expand Down Expand Up @@ -229,7 +227,7 @@ def test_to_gbq_index(scalars_dfs, dataset_id, index):
else:
df_out = df_out.sort_values("rowindex_2").reset_index(drop=True)

convert_pandas_dtypes(df_out, bytes_col=False)
utils.convert_pandas_dtypes(df_out, bytes_col=False)
# pd.read_gbq interpets bytes_col as object, reconvert to pyarrow binary
df_out["bytes_col"] = df_out["bytes_col"].astype(pd.ArrowDtype(pa.binary()))
expected = scalars_pandas_df.copy()
Expand Down Expand Up @@ -415,7 +413,6 @@ def test_to_json_index_invalid_lines(
scalars_df.to_json(path, index=index)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
@pytest.mark.parametrize(
("index"),
[True, False],
Expand All @@ -435,8 +432,12 @@ def test_to_json_index_records_orient(
""" Test the `to_json` API with `orient` is `records` and `lines` is True"""
scalars_df.to_json(path, index=index, orient="records", lines=True)

gcs_df = pd.read_json(path, lines=True, convert_dates=["datetime_col"])
convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df = pd.read_json(
utils.get_first_file_from_wildcard(path),
lines=True,
convert_dates=["datetime_col"],
)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
if index and scalars_df.index.name is not None:
gcs_df = gcs_df.set_index(scalars_df.index.name)

Expand Down Expand Up @@ -474,8 +475,8 @@ def test_to_parquet_index(scalars_dfs, gcs_folder, index):
# table.
scalars_df.to_parquet(path, index=index)

gcs_df = pd.read_parquet(path.replace("*", "000000000000"))
convert_pandas_dtypes(gcs_df, bytes_col=False)
gcs_df = pd.read_parquet(utils.get_first_file_from_wildcard(path))
utils.convert_pandas_dtypes(gcs_df, bytes_col=False)
if index and scalars_df.index.name is not None:
gcs_df = gcs_df.set_index(scalars_df.index.name)

Expand Down Expand Up @@ -507,7 +508,7 @@ def test_to_sql_query_unnamed_index_included(
pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
roundtrip.index.names = [None]
assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)


def test_to_sql_query_named_index_included(
Expand All @@ -524,7 +525,7 @@ def test_to_sql_query_named_index_included(

pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)


def test_to_sql_query_unnamed_index_excluded(
Expand All @@ -539,7 +540,7 @@ def test_to_sql_query_unnamed_index_excluded(

pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
roundtrip = session.read_gbq(sql)
assert_pandas_df_equal(
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
)

Expand All @@ -558,6 +559,6 @@ def test_to_sql_query_named_index_excluded(
"rowindex_2", drop=True
).reset_index(drop=True)
roundtrip = session.read_gbq(sql)
assert_pandas_df_equal(
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
)
3 changes: 2 additions & 1 deletion tests/system/small/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bigframes
import bigframes.ml.linear_model
from tests.system import utils


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -160,7 +161,7 @@ def test_read_csv_gcs(
# Create a csv in gcs
write_path = gcs_folder + "test_read_csv_gcs_bigquery_engine*.csv"
read_path = (
write_path.replace("*", "000000000000") if engine is None else write_path
utils.get_first_file_from_wildcard(write_path) if engine is None else write_path
)
scalars_df_index.to_csv(write_path)

Expand Down
7 changes: 3 additions & 4 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from tests.system.utils import (
assert_pandas_df_equal,
assert_series_equal,
get_first_file_from_wildcard,
skip_legacy_pandas,
)

Expand Down Expand Up @@ -2390,11 +2391,10 @@ def test_to_frame(scalars_dfs):
assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index):
path = gcs_folder + "test_series_to_json*.jsonl"
scalars_df_index["int64_col"].to_json(path, lines=True, orient="records")
gcs_df = pd.read_json(path, lines=True)
gcs_df = pd.read_json(get_first_file_from_wildcard(path), lines=True)

pd.testing.assert_series_equal(
gcs_df["int64_col"].astype(pd.Int64Dtype()),
Expand All @@ -2404,11 +2404,10 @@ def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index):
)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_csv(gcs_folder, scalars_df_index, scalars_pandas_df_index):
path = gcs_folder + "test_series_to_csv*.csv"
scalars_df_index["int64_col"].to_csv(path)
gcs_df = pd.read_csv(path)
gcs_df = pd.read_csv(get_first_file_from_wildcard(path))

pd.testing.assert_series_equal(
gcs_df["int64_col"].astype(pd.Int64Dtype()),
Expand Down
26 changes: 12 additions & 14 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import bigframes.dataframe
import bigframes.dtypes
import bigframes.ml.linear_model
from tests.system.utils import skip_legacy_pandas

FIRST_FILE = "000000000000"
from tests.system import utils


def test_read_gbq_tokyo(
Expand Down Expand Up @@ -435,14 +433,14 @@ def test_read_pandas_tokyo(
pd.testing.assert_frame_equal(result, expected)


@skip_legacy_pandas
@utils.skip_legacy_pandas
def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
if scalars_df.index.name is not None:
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index*.csv"
else:
path = gcs_folder + "test_read_csv_gcs_default_engine_wo_index*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_csv(path, index=False)
dtype = scalars_df.dtypes.to_dict()
dtype.pop("geography_col")
Expand Down Expand Up @@ -492,7 +490,7 @@ def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder):
pytest.param("\t", id="custom_sep"),
],
)
@skip_legacy_pandas
@utils.skip_legacy_pandas
def test_read_csv_local_default_engine(session, scalars_dfs, sep):
scalars_df, scalars_pandas_df = scalars_dfs
with tempfile.TemporaryDirectory() as dir:
Expand Down Expand Up @@ -641,15 +639,15 @@ def test_read_csv_default_engine_throws_not_implemented_error(
gcs_folder
+ "test_read_csv_gcs_default_engine_throws_not_implemented_error*.csv"
)
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_index.to_csv(path)
with pytest.raises(NotImplementedError, match=match):
session.read_csv(read_path, **kwargs)


def test_read_csv_gcs_default_engine_w_header(session, scalars_df_index, gcs_folder):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_header*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_index.to_csv(path)

# Skips header=N rows, normally considers the N+1th row as the header, but overridden by
Expand Down Expand Up @@ -716,7 +714,7 @@ def test_read_csv_gcs_default_engine_w_index_col_name(
session, scalars_df_default_index, gcs_folder
):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index_col_name*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_default_index.to_csv(path)

df = session.read_csv(read_path, index_col="rowindex")
Expand All @@ -731,7 +729,7 @@ def test_read_csv_gcs_default_engine_w_index_col_index(
session, scalars_df_default_index, gcs_folder
):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index_col_index*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_default_index.to_csv(path)

index_col = scalars_df_default_index.columns.to_list().index("rowindex")
Expand Down Expand Up @@ -790,7 +788,7 @@ def test_read_csv_local_default_engine_w_index_col_index(
def test_read_csv_gcs_w_usecols(session, scalars_df_index, gcs_folder, engine):
path = gcs_folder + "test_read_csv_gcs_w_usecols"
path = path + "_default_engine*.csv" if engine is None else path + "_bq_engine*.csv"
read_path = path.replace("*", FIRST_FILE) if engine is None else path
read_path = utils.get_first_file_from_wildcard(path) if engine is None else path
scalars_df_index.to_csv(path)

# df should only have 1 column which is bool_col.
Expand Down Expand Up @@ -902,7 +900,7 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, e

# Only bigquery engine for reads supports wildcards in path name.
if engine != "bigquery":
path = path.replace("*", "000000000000")
path = utils.get_first_file_from_wildcard(path)

df_out = (
session.read_parquet(path, engine=engine)
Expand Down Expand Up @@ -1012,7 +1010,7 @@ def test_read_parquet_gcs_compression_not_supported(
def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_bq_engine_w_index*.json"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_json(path, index=False, lines=True, orient="records")
df = session.read_json(read_path, lines=True, orient="records", engine="bigquery")

Expand All @@ -1036,7 +1034,7 @@ def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
def test_read_json_gcs_default_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_default_engine_w_index*.json"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_json(
path,
index=False,
Expand Down
4 changes: 4 additions & 0 deletions tests/system/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def delete_cloud_function(
request = functions_v2.DeleteFunctionRequest(name=full_name)
operation = functions_client.delete_function(request=request)
return operation


def get_first_file_from_wildcard(path):
return path.replace("*", "000000000000")