From 240882a9610477e15bea341de3ad04aa060baf77 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Fri, 15 Mar 2024 22:07:25 +0000 Subject: [PATCH] fix: series.(to_csv|to_json) leverages bq export --- bigframes/dataframe.py | 12 ++--- bigframes/series.py | 20 ++++--- tests/system/small/test_series.py | 28 +++++++--- .../bigframes_vendored/pandas/core/generic.py | 4 +- .../bigframes_vendored/pandas/core/series.py | 53 ------------------- 5 files changed, 40 insertions(+), 77 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 5dae7a82f9..e8328b6047 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2588,16 +2588,16 @@ def to_json( if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) - if lines is True and orient != "records": - raise ValueError( - "'lines' keyword is only valid when 'orient' is 'records'." - ) - # TODO(ashleyxu) Support lines=False for small tables with arrays and TO_JSON_STRING. # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#to_json_string if lines is False: raise NotImplementedError( - f"Only newline delimited JSON format is supported. {constants.FEEDBACK_LINK}" + f"Only newline-delimited JSON is supported. Add `lines=True` to your function call. {constants.FEEDBACK_LINK}" + ) + + if lines is True and orient != "records": + raise ValueError( + "'lines' keyword is only valid when 'orient' is 'records'." ) result_table = self._run_io_query( diff --git a/bigframes/series.py b/bigframes/series.py index ef2feb4f92..86afdd047c 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1390,9 +1390,10 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: ) return bigframes.dataframe.DataFrame(block) - def to_csv(self, path_or_buf=None, **kwargs) -> typing.Optional[str]: - # TODO(b/280651142): Implement version that leverages bq export native csv support to bypass local pandas step. - return self.to_pandas().to_csv(path_or_buf, **kwargs) + def to_csv( + self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True + ) -> None: + return self.to_frame().to_csv(path_or_buf, sep=sep, header=header, index=index) def to_dict(self, into: type[dict] = dict) -> typing.Mapping: return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore @@ -1402,14 +1403,17 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: def to_json( self, - path_or_buf=None, + path_or_buf: str, orient: typing.Literal[ "split", "records", "index", "columns", "values", "table" ] = "columns", - **kwargs, - ) -> typing.Optional[str]: - # TODO(b/280651142): Implement version that leverages bq export native csv support to bypass local pandas step. - return self.to_pandas().to_json(path_or_buf, **kwargs) + *, + lines: bool = False, + index: bool = True, + ) -> None: + return self.to_frame().to_json( + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + ) def to_latex( self, buf=None, columns=None, header=True, index=True, **kwargs diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index e22037a1ce..584dc21956 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2384,18 +2384,30 @@ def test_to_frame(scalars_dfs): assert_pandas_df_equal(bf_result, pd_result) -def test_to_json(scalars_df_index, scalars_pandas_df_index): - bf_result = scalars_df_index["int64_col"].to_json() - pd_result = scalars_pandas_df_index["int64_col"].to_json() +def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index): + path = gcs_folder + "test_series_to_json*.jsonl" + scalars_df_index["int64_col"].to_json(path, lines=True, orient="records") + gcs_df = pd.read_json(path, lines=True) - assert bf_result == pd_result + pd.testing.assert_series_equal( + gcs_df["int64_col"].astype(pd.Int64Dtype()), + scalars_pandas_df_index["int64_col"], + check_dtype=False, + check_index=False, + ) -def test_to_csv(scalars_df_index, scalars_pandas_df_index): - bf_result = scalars_df_index["int64_col"].to_csv() - pd_result = scalars_pandas_df_index["int64_col"].to_csv() +def test_to_csv(gcs_folder, scalars_df_index, scalars_pandas_df_index): + path = gcs_folder + "test_series_to_csv*.csv" + scalars_df_index["int64_col"].to_csv(path) + gcs_df = pd.read_csv(path) - assert bf_result == pd_result + pd.testing.assert_series_equal( + gcs_df["int64_col"].astype(pd.Int64Dtype()), + scalars_pandas_df_index["int64_col"], + check_dtype=False, + check_index=False, + ) def test_to_latex(scalars_df_index, scalars_pandas_df_index): diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 7f8e1f7b53..9358dca17b 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -183,7 +183,7 @@ def to_json( *, index: bool = True, lines: bool = False, - ) -> str | None: + ) -> None: """Convert the object to a JSON string, written to Cloud Storage. Note NaN's and None will be converted to null and datetime objects @@ -241,7 +241,7 @@ def to_json( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_csv(self, path_or_buf: str, *, index: bool = True) -> str | None: + def to_csv(self, path_or_buf: str, *, index: bool = True) -> None: """Write object to a comma-separated values (csv) file on Cloud Storage. Args: diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index beaf8aedb1..2d32ca9929 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -535,59 +535,6 @@ def to_xarray(self): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_json( - self, - path_or_buf=None, - orient: Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", - **kwarg, - ) -> str | None: - """ - Convert the object to a JSON string. - - Note NaN's and None will be converted to null and datetime objects - will be converted to UNIX timestamps. - - Args: - path_or_buf (str, path object, file-like object, or None, default None): - String, path object (implementing os.PathLike[str]), or file-like - object implementing a write() function. If None, the result is - returned as a string. - orient ({"split", "records", "index", "columns", "values", "table"}, default "columns"): - Indication of expected JSON string format. - 'split' : dict like {{'index' -> [index], 'columns' -> [columns],'data' -> [values]}} - 'records' : list like [{{column -> value}}, ... , {{column -> value}}] - 'index' : dict like {{index -> {{column -> value}}}} - 'columns' : dict like {{column -> {{index -> value}}}} - 'values' : just the values array - 'table' : dict like {{'schema': {{schema}}, 'data': {{data}}}} - Describing the data, where data component is like ``orient='records'``. - - Returns: - None or str: If path_or_buf is None, returns the resulting json format as a - string. Otherwise returns None. - """ - raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - - def to_csv(self, path_or_buf: str, *, index: bool = True) -> str | None: - """ - Write object to a comma-separated values (csv) file. - - Args: - path_or_buf (str, path object, file-like object, or None, default None): - String, path object (implementing os.PathLike[str]), or file-like - object implementing a write() function. If None, the result is - returned as a string. If a non-binary file object is passed, it should - be opened with `newline=''`, disabling universal newlines. If a binary - file object is passed, `mode` might need to contain a `'b'`. - - Returns: - None or str: If path_or_buf is None, returns the resulting csv format - as a string. Otherwise returns None. - """ - raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def agg(self, func): """ Aggregate using one or more operations over the specified axis.