Skip to content

feat: support dtype parameter in read_csv for bigquery engine #1749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import bigframes._config.bigquery_options as bigquery_options
import bigframes.clients
import bigframes.constants
from bigframes.core import blocks, log_adapter
from bigframes.core import blocks, log_adapter, utils
import bigframes.core.pyformat

# Even though the ibis.backends.bigquery import is unused, it's needed
Expand Down Expand Up @@ -1108,11 +1108,8 @@ def _read_csv_w_bigquery_engine(
native CSV loading capabilities, making it suitable for large datasets
that may not fit into local memory.
"""
if dtype is not None:
raise NotImplementedError(
f"BigQuery engine does not support the `dtype` argument."
f"{constants.FEEDBACK_LINK}"
)
if dtype is not None and not utils.is_dict_like(dtype):
raise ValueError("dtype should be a dict-like object.")

if names is not None:
if len(names) != len(set(names)):
Expand Down Expand Up @@ -1167,10 +1164,16 @@ def _read_csv_w_bigquery_engine(
job_config.skip_leading_rows = header + 1

table_id = self._loader.load_file(filepath_or_buffer, job_config=job_config)
return self._loader.read_gbq_table(
df = self._loader.read_gbq_table(
table_id, index_col=index_col, columns=columns, names=names
)

if dtype is not None:
for column, dtype in dtype.items():
if column in df.columns:
df[column] = df[column].astype(dtype)
return df

def read_pickle(
self,
filepath_or_buffer: FilePath | ReadPickleBuffer,
Expand Down
7 changes: 4 additions & 3 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,10 @@ def read_gbq_table(
renamed_cols: Dict[str, str] = {
col: new_name for col, new_name in zip(array_value.column_ids, names)
}
index_names = [
renamed_cols.get(index_col, index_col) for index_col in index_cols
]
if index_col != bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64:
index_names = [
renamed_cols.get(index_col, index_col) for index_col in index_cols
]
value_columns = [renamed_cols.get(col, col) for col in value_columns]

block = blocks.Block(
Expand Down
39 changes: 39 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,45 @@ def test_read_csv_for_names_and_index_col(
)


def test_read_csv_for_dtype(session, df_and_gcs_csv_for_two_columns):
_, path = df_and_gcs_csv_for_two_columns

dtype = {"bool_col": pd.BooleanDtype(), "int64_col": pd.Float64Dtype()}
bf_df = session.read_csv(path, engine="bigquery", dtype=dtype)

# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
pd_df = session.read_csv(path, dtype=dtype)

assert bf_df.shape == pd_df.shape
assert bf_df.columns.tolist() == pd_df.columns.tolist()

# BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs
# (b/280889935) or guarantee row ordering.
bf_df = bf_df.set_index("rowindex").sort_index()
pd_df = pd_df.set_index("rowindex")
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())


def test_read_csv_for_dtype_w_names(session, df_and_gcs_csv_for_two_columns):
_, path = df_and_gcs_csv_for_two_columns

names = ["a", "b", "c"]
dtype = {"b": pd.BooleanDtype(), "c": pd.Float64Dtype()}
bf_df = session.read_csv(path, engine="bigquery", names=names, dtype=dtype)

# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
pd_df = session.read_csv(path, names=names, dtype=dtype)

assert bf_df.shape == pd_df.shape
assert bf_df.columns.tolist() == pd_df.columns.tolist()

# BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs
# (b/280889935) or guarantee row ordering.
bf_df = bf_df.set_index("a").sort_index()
pd_df = pd_df.set_index("a")
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())


@pytest.mark.parametrize(
("kwargs", "match"),
[
Expand Down
16 changes: 11 additions & 5 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@
@pytest.mark.parametrize(
("kwargs", "match"),
[
pytest.param(
{"engine": "bigquery", "dtype": {}},
"BigQuery engine does not support the `dtype` argument",
id="with_dtype",
),
pytest.param(
{"engine": "bigquery", "usecols": [1, 2]},
"BigQuery engine only supports an iterable of strings for `usecols`.",
Expand Down Expand Up @@ -215,6 +210,17 @@ def test_read_csv_w_bigquery_engine_raises_error_for_invalid_names(
session.read_csv("path/to/csv.csv", engine="bigquery", names=names)


def test_read_csv_w_bigquery_engine_raises_error_for_invalid_dtypes():
session = mocks.create_bigquery_session()

with pytest.raises(ValueError, match="dtype should be a dict-like object."):
session.read_csv(
"path/to/csv.csv",
engine="bigquery",
dtype=["a", "b", "c"], # type: ignore[arg-type]
)


@pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")])
def test_read_gbq_missing_parts(missing_parts_table_id):
session = mocks.create_bigquery_session()
Expand Down