From 0a58e982ad4fca221d01b58c17ef3d5e371b7017 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 31 Jan 2025 00:32:47 +0000 Subject: [PATCH 1/7] chore: support timestamp subtractions --- bigframes/core/compile/scalar_op_compiler.py | 5 ++ bigframes/operations/__init__.py | 2 + bigframes/operations/datetime_ops.py | 19 +++++++ bigframes/series.py | 16 +++++- .../system/small/operations/test_datetimes.py | 55 +++++++++++++++++++ 5 files changed, 95 insertions(+), 2 deletions(-) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index b42f983619..bdddd68d8c 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -737,6 +737,11 @@ def unix_millis_op_impl(x: ibis_types.TimestampValue): return unix_millis(x) +@scalar_op_compiler.register_binary_op(ops.timestamp_diff_op) +def timestamp_diff_op_impl(x: ibis_types.TimestampValue, y: ibis_types.TimestampValue): + return x.delta(y, "microsecond") + + @scalar_op_compiler.register_unary_op(ops.FloorDtOp, pass_op=True) def floor_dt_op_impl(x: ibis_types.Value, op: ops.FloorDtOp): supported_freqs = ["Y", "Q", "M", "W", "D", "h", "min", "s", "ms", "us", "ns"] diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index d8b0447686..ce1ca407ef 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -49,6 +49,7 @@ date_op, StrftimeOp, time_op, + timestamp_diff_op, ToDatetimeOp, ToTimestampOp, UnixMicros, @@ -246,6 +247,7 @@ # Datetime ops "date_op", "time_op", + "timestamp_diff_op", "ToDatetimeOp", "ToTimestampOp", "StrftimeOp", diff --git a/bigframes/operations/datetime_ops.py b/bigframes/operations/datetime_ops.py index 5086de27d3..aec78d1f7e 100644 --- a/bigframes/operations/datetime_ops.py +++ b/bigframes/operations/datetime_ops.py @@ -107,3 +107,22 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT if input_types[0] is not dtypes.TIMESTAMP_DTYPE: raise TypeError("expected timestamp input") return dtypes.INT_DTYPE + + +@dataclasses.dataclass(frozen=True) +class TimestampDiff(base_ops.BinaryOp): + name: typing.ClassVar[str] = "timestamp_diff" + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + if input_types[0] is not input_types[1]: + raise TypeError( + f"two inputs have different types. left: {input_types[0]}, right: {input_types[1]}" + ) + + if not dtypes.is_datetime_like(input_types[0]): + raise TypeError("expected timestamp input") + + return dtypes.TIMEDETLA_DTYPE + + +timestamp_diff_op = TimestampDiff() diff --git a/bigframes/series.py b/bigframes/series.py index 706c0f4f09..11d5737013 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -33,6 +33,7 @@ import pandas.core.dtypes.common import pyarrow as pa import typing_extensions +import datetime import bigframes.core from bigframes.core import log_adapter @@ -805,10 +806,14 @@ def __rsub__(self, other: float | int | Series) -> Series: __rsub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rsub__) - def sub(self, other: float | int | Series) -> Series: + def sub(self, other: float | int | pandas.Timestamp | datetime.datetime | Series) -> Series: + if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): + return self._apply_binary_op(other, ops.timestamp_diff_op) return self._apply_binary_op(other, ops.sub_op) - def rsub(self, other: float | int | Series) -> Series: + def rsub(self, other: float | int | pandas.Timestamp | datetime.datetime | Series) -> Series: + if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): + return self._apply_binary_op(other, ops.timestamp_diff_op, reverse=True) return self._apply_binary_op(other, ops.sub_op, reverse=True) subtract = sub @@ -2080,3 +2085,10 @@ def str(self) -> strings.StringMethods: def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj) + + +def _has_timestamp_type(input: typing.Any) -> bool: + if isinstance(input, Series): + return bigframes.dtypes.is_datetime_like(input.dtype) + + return isinstance(input, (pandas.Timestamp, datetime.datetime)) diff --git a/tests/system/small/operations/test_datetimes.py b/tests/system/small/operations/test_datetimes.py index c5c649c638..1195ea3ceb 100644 --- a/tests/system/small/operations/test_datetimes.py +++ b/tests/system/small/operations/test_datetimes.py @@ -367,3 +367,58 @@ def test_dt_clip_coerce_str_timestamp(scalars_dfs): pd_result, bf_result, ) + + +@pytest.mark.parametrize("column", ["timestamp_col", "datetime_col"]) +def test_timestamp_diff_two_series(scalars_dfs, column): + bf_df, pd_df = scalars_dfs + bf_series = bf_df[column] + pd_series = pd_df[column] + + actual_result = (bf_series - bf_series).to_pandas() + + expected_result = pd_series - pd_series + assert_series_equal(actual_result, expected_result) + + +def test_timestamp_diff_two_series_with_different_types_raise_error(scalars_dfs): + df, _ = scalars_dfs + + with pytest.raises(TypeError): + (df["timestamp_col"] - df["datetime_col"]).to_pandas() + + +@pytest.mark.parametrize( + ("column", "value"), + [ + ("timestamp_col", pd.Timestamp("2025-01-01 00:00:01", tz="America/New_York")), + ("datetime_col", datetime.datetime(2025, 1, 1, 0, 0, 1)), + ], +) +def test_timestamp_diff_series_sub_literal(scalars_dfs, column, value): + bf_df, pd_df = scalars_dfs + bf_series = bf_df[column] + pd_series = pd_df[column] + + actual_result = (bf_series - value).to_pandas() + + expected_result = pd_series - value + assert_series_equal(actual_result, expected_result) + + +@pytest.mark.parametrize( + ("column", "value"), + [ + ("timestamp_col", pd.Timestamp("2025-01-01 00:00:01", tz="America/New_York")), + ("datetime_col", datetime.datetime(2025, 1, 1, 0, 0, 1)), + ], +) +def test_timestamp_diff_literal_sub_series(scalars_dfs, column, value): + bf_df, pd_df = scalars_dfs + bf_series = bf_df[column] + pd_series = pd_df[column] + + actual_result = (value - bf_series).to_pandas() + + expected_result = value - pd_series + assert_series_equal(actual_result, expected_result) From 2a32701c1b2fa8f770cbf7a2af667affa4982e5b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 31 Jan 2025 00:36:48 +0000 Subject: [PATCH 2/7] Fix format --- bigframes/series.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index 11d5737013..07c8b6ee46 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -33,7 +33,6 @@ import pandas.core.dtypes.common import pyarrow as pa import typing_extensions -import datetime import bigframes.core from bigframes.core import log_adapter @@ -806,12 +805,16 @@ def __rsub__(self, other: float | int | Series) -> Series: __rsub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rsub__) - def sub(self, other: float | int | pandas.Timestamp | datetime.datetime | Series) -> Series: + def sub( + self, other: float | int | pandas.Timestamp | datetime.datetime | Series + ) -> Series: if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): return self._apply_binary_op(other, ops.timestamp_diff_op) return self._apply_binary_op(other, ops.sub_op) - def rsub(self, other: float | int | pandas.Timestamp | datetime.datetime | Series) -> Series: + def rsub( + self, other: float | int | pandas.Timestamp | datetime.datetime | Series + ) -> Series: if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): return self._apply_binary_op(other, ops.timestamp_diff_op, reverse=True) return self._apply_binary_op(other, ops.sub_op, reverse=True) From 1e87e0cd4e54f22133504b32dd2e9f4d250d678a Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 4 Feb 2025 19:46:17 +0000 Subject: [PATCH 3/7] use tree rewrites to dispatch timestamp_diff operator --- bigframes/core/compile/compiler.py | 4 +- bigframes/core/rewrite/__init__.py | 2 + bigframes/core/rewrite/operators.py | 77 +++++++++++++++++++ bigframes/operations/__init__.py | 2 + bigframes/operations/numeric_ops.py | 5 +- bigframes/series.py | 4 - .../system/small/operations/test_datetimes.py | 26 +++++++ 7 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 bigframes/core/rewrite/operators.py diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0d047b366e..9e9cca98f9 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -57,6 +57,7 @@ def compile_sql( # TODO: get rid of output_ids arg assert len(output_ids) == len(list(node.fields)) node = set_output_names(node, output_ids) + node = nodes.bottom_up(node, rewrites.op_dynamic_dispatch) if ordered: node, limit = rewrites.pullup_limit_from_slice(node) node = nodes.bottom_up(node, rewrites.rewrite_slice) @@ -108,7 +109,8 @@ def _preprocess(self, node: nodes.BigFrameNode): @functools.lru_cache(maxsize=5000) def compile_node(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: """Compile node into CompileArrayValue. Caches result.""" - return self._compile_node(node) + # Need to dispatch op before compilation to keep it consistent with the compile_sql() call + return self._compile_node(nodes.bottom_up(node, rewrites.op_dynamic_dispatch)) @functools.singledispatchmethod def _compile_node(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index 9044cb25f9..4f144042dc 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -15,6 +15,7 @@ from bigframes.core.rewrite.identifiers import remap_variables from bigframes.core.rewrite.implicit_align import try_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection +from bigframes.core.rewrite.operators import op_dynamic_dispatch from bigframes.core.rewrite.order import pull_up_order from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice @@ -22,6 +23,7 @@ "legacy_join_as_projection", "try_row_join", "rewrite_slice", + "op_dynamic_dispatch", "pullup_limit_from_slice", "remap_variables", "pull_up_order", diff --git a/bigframes/core/rewrite/operators.py b/bigframes/core/rewrite/operators.py new file mode 100644 index 0000000000..7e03254b55 --- /dev/null +++ b/bigframes/core/rewrite/operators.py @@ -0,0 +1,77 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import functools +import typing + +from bigframes import dtypes +from bigframes import operations as ops +from bigframes.core import expression as ex +from bigframes.core import nodes, schema + + +@dataclasses.dataclass +class _TypedExpr: + expr: ex.Expression + dtype: dtypes.Dtype + + +def op_dynamic_dispatch(root: nodes.BigFrameNode) -> nodes.BigFrameNode: + if isinstance(root, nodes.ProjectionNode): + updated_assignments = tuple( + (_rewrite_expressions(expr, root.schema).expr, column_id) + for expr, column_id in root.assignments + ) + root = nodes.ProjectionNode(root.child, updated_assignments) + + return root + + +@functools.cache +def _rewrite_expressions(expr: ex.Expression, schema: schema.ArraySchema) -> _TypedExpr: + if isinstance(expr, ex.DerefOp): + return _TypedExpr(expr, schema.get_type(expr.id.sql)) + + if isinstance(expr, ex.ScalarConstantExpression): + return _TypedExpr(expr, expr.dtype) + + if isinstance(expr, ex.OpExpression): + updated_inputs = tuple( + map(lambda x: _rewrite_expressions(x, schema), expr.inputs) + ) + return _rewrite_op_expr(expr, updated_inputs) + + raise AssertionError(f"Unexpected expression type: {type(expr)}") + + +def _rewrite_op_expr( + expr: ex.OpExpression, inputs: typing.Tuple[_TypedExpr, ...] +) -> _TypedExpr: + if isinstance(expr.op, ops.SubOp): + return _rewrite_sub_op(inputs[0], inputs[1]) + + input_types = tuple(map(lambda x: x.dtype, inputs)) + return _TypedExpr(expr, expr.op.output_type(*input_types)) + + +def _rewrite_sub_op(left: _TypedExpr, right: _TypedExpr) -> _TypedExpr: + result_op: ops.BinaryOp = ops.sub_op + if dtypes.is_datetime_like(left.dtype) and dtypes.is_datetime_like(right.dtype): + result_op = ops.timestamp_diff_op + + return _TypedExpr( + result_op.as_expr(left.expr, right.expr), + result_op.output_type(left.dtype, right.dtype), + ) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index ce1ca407ef..ad05143757 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -126,6 +126,7 @@ sinh_op, sqrt_op, sub_op, + SubOp, tan_op, tanh_op, unsafe_pow_op, @@ -285,6 +286,7 @@ "sinh_op", "sqrt_op", "sub_op", + "SubOp", "tan_op", "tanh_op", "unsafe_pow_op", diff --git a/bigframes/operations/numeric_ops.py b/bigframes/operations/numeric_ops.py index 939330954d..7bc5276d1f 100644 --- a/bigframes/operations/numeric_ops.py +++ b/bigframes/operations/numeric_ops.py @@ -141,7 +141,10 @@ def output_type(self, *input_types): ): # Numeric subtraction return dtypes.coerce_to_common(left_type, right_type) - # TODO: Add temporal addition once delta types supported + + if dtypes.is_datetime_like(left_type) and dtypes.is_datetime_like(right_type): + return dtypes.TIMEDETLA_DTYPE + raise TypeError(f"Cannot subtract dtypes {left_type} and {right_type}") diff --git a/bigframes/series.py b/bigframes/series.py index 07c8b6ee46..9fa09896be 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -808,15 +808,11 @@ def __rsub__(self, other: float | int | Series) -> Series: def sub( self, other: float | int | pandas.Timestamp | datetime.datetime | Series ) -> Series: - if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): - return self._apply_binary_op(other, ops.timestamp_diff_op) return self._apply_binary_op(other, ops.sub_op) def rsub( self, other: float | int | pandas.Timestamp | datetime.datetime | Series ) -> Series: - if bigframes.dtypes.is_datetime_like(self.dtype) and _has_timestamp_type(other): - return self._apply_binary_op(other, ops.timestamp_diff_op, reverse=True) return self._apply_binary_op(other, ops.sub_op, reverse=True) subtract = sub diff --git a/tests/system/small/operations/test_datetimes.py b/tests/system/small/operations/test_datetimes.py index 1195ea3ceb..936becff76 100644 --- a/tests/system/small/operations/test_datetimes.py +++ b/tests/system/small/operations/test_datetimes.py @@ -14,6 +14,8 @@ import datetime +import numpy +from pandas import testing import pandas as pd import pytest @@ -381,6 +383,30 @@ def test_timestamp_diff_two_series(scalars_dfs, column): assert_series_equal(actual_result, expected_result) +@pytest.mark.parametrize("column", ["timestamp_col", "datetime_col"]) +def test_timestamp_diff_two_series_with_numpy_ops(scalars_dfs, column): + bf_df, pd_df = scalars_dfs + bf_series = bf_df[column] + pd_series = pd_df[column] + + actual_result = numpy.subtract(bf_series, bf_series).to_pandas() + + expected_result = numpy.subtract(pd_series, pd_series) + assert_series_equal(actual_result, expected_result) + + +def test_timestamp_diff_two_dataframes(scalars_dfs): + columns = ["timestamp_col", "datetime_col"] + bf_df, pd_df = scalars_dfs + bf_df = bf_df[columns] + pd_df = pd_df[columns] + + actual_result = (bf_df - bf_df).to_pandas() + + expected_result = pd_df - pd_df + testing.assert_frame_equal(actual_result, expected_result) + + def test_timestamp_diff_two_series_with_different_types_raise_error(scalars_dfs): df, _ = scalars_dfs From 9e3038c2bbf2c6137a7312778f78f07f23337735 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 4 Feb 2025 20:35:22 +0000 Subject: [PATCH 4/7] add TODO for more node updates --- bigframes/core/rewrite/operators.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/core/rewrite/operators.py b/bigframes/core/rewrite/operators.py index 7e03254b55..f6869ef3a5 100644 --- a/bigframes/core/rewrite/operators.py +++ b/bigframes/core/rewrite/operators.py @@ -36,6 +36,7 @@ def op_dynamic_dispatch(root: nodes.BigFrameNode) -> nodes.BigFrameNode: ) root = nodes.ProjectionNode(root.child, updated_assignments) + # TODO(b/394354614): FilterByNode and OrderNode also contain expressions. Need to update them too. return root From 6ca29eee7d89f638ebb01d640a20dae8eecaf75b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Wed, 5 Feb 2025 01:32:05 +0000 Subject: [PATCH 5/7] polish the code and fix typos --- bigframes/core/compile/compiler.py | 5 +++-- bigframes/core/compile/ibis_types.py | 2 +- bigframes/core/rewrite/__init__.py | 4 ++-- bigframes/core/rewrite/operators.py | 6 +++++- bigframes/dtypes.py | 2 +- bigframes/operations/datetime_ops.py | 2 +- bigframes/operations/numeric_ops.py | 2 +- bigframes/operations/timedelta_ops.py | 2 +- bigframes/series.py | 15 ++------------- bigframes/session/loader.py | 4 ++-- 10 files changed, 19 insertions(+), 25 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index d676de79a3..51895a2362 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -58,7 +58,7 @@ def compile_sql( # TODO: get rid of output_ids arg assert len(output_ids) == len(list(node.fields)) node = set_output_names(node, output_ids) - node = nodes.bottom_up(node, rewrites.op_dynamic_dispatch) + node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) if ordered: node, limit = rewrites.pullup_limit_from_slice(node) node = nodes.bottom_up(node, rewrites.rewrite_slice) @@ -101,6 +101,7 @@ def compile_raw( def _preprocess(self, node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrites.rewrite_slice) + node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) node, _ = rewrites.pull_up_order( node, order_root=False, ordered_joins=self.strict ) @@ -111,7 +112,7 @@ def _preprocess(self, node: nodes.BigFrameNode): def compile_node(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: """Compile node into CompileArrayValue. Caches result.""" # Need to dispatch op before compilation to keep it consistent with the compile_sql() call - return self._compile_node(nodes.bottom_up(node, rewrites.op_dynamic_dispatch)) + return self._compile_node(node) @functools.singledispatchmethod def _compile_node(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index 8a55f6775d..78c2259cf0 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -79,7 +79,7 @@ BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = { pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS } -BIGFRAMES_TO_IBIS.update({bigframes.dtypes.TIMEDETLA_DTYPE: ibis_dtypes.int64}) +BIGFRAMES_TO_IBIS.update({bigframes.dtypes.TIMEDELTA_DTYPE: ibis_dtypes.int64}) IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, bigframes.dtypes.Dtype] = { ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS } diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index 4f144042dc..f93186bf36 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -15,7 +15,7 @@ from bigframes.core.rewrite.identifiers import remap_variables from bigframes.core.rewrite.implicit_align import try_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection -from bigframes.core.rewrite.operators import op_dynamic_dispatch +from bigframes.core.rewrite.operators import rewrite_timedelta_ops from bigframes.core.rewrite.order import pull_up_order from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice @@ -23,7 +23,7 @@ "legacy_join_as_projection", "try_row_join", "rewrite_slice", - "op_dynamic_dispatch", + "rewrite_timedelta_ops", "pullup_limit_from_slice", "remap_variables", "pull_up_order", diff --git a/bigframes/core/rewrite/operators.py b/bigframes/core/rewrite/operators.py index f6869ef3a5..3145a9e9ae 100644 --- a/bigframes/core/rewrite/operators.py +++ b/bigframes/core/rewrite/operators.py @@ -28,7 +28,11 @@ class _TypedExpr: dtype: dtypes.Dtype -def op_dynamic_dispatch(root: nodes.BigFrameNode) -> nodes.BigFrameNode: +def rewrite_timedelta_ops(root: nodes.BigFrameNode) -> nodes.BigFrameNode: + """ + Rewrites expressions to properly handle timedelta values, because this type does not exist + in the SQL world. + """ if isinstance(root, nodes.ProjectionNode): updated_assignments = tuple( (_rewrite_expressions(expr, root.schema).expr, column_id) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 8b1ca3b0c8..0ca815c00a 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -56,7 +56,7 @@ TIME_DTYPE = pd.ArrowDtype(pa.time64("us")) DATETIME_DTYPE = pd.ArrowDtype(pa.timestamp("us")) TIMESTAMP_DTYPE = pd.ArrowDtype(pa.timestamp("us", tz="UTC")) -TIMEDETLA_DTYPE = pd.ArrowDtype(pa.duration("us")) +TIMEDELTA_DTYPE = pd.ArrowDtype(pa.duration("us")) NUMERIC_DTYPE = pd.ArrowDtype(pa.decimal128(38, 9)) BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38)) # No arrow equivalent diff --git a/bigframes/operations/datetime_ops.py b/bigframes/operations/datetime_ops.py index aec78d1f7e..3ea4c652f1 100644 --- a/bigframes/operations/datetime_ops.py +++ b/bigframes/operations/datetime_ops.py @@ -122,7 +122,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT if not dtypes.is_datetime_like(input_types[0]): raise TypeError("expected timestamp input") - return dtypes.TIMEDETLA_DTYPE + return dtypes.TIMEDELTA_DTYPE timestamp_diff_op = TimestampDiff() diff --git a/bigframes/operations/numeric_ops.py b/bigframes/operations/numeric_ops.py index 7bc5276d1f..413d8d66e1 100644 --- a/bigframes/operations/numeric_ops.py +++ b/bigframes/operations/numeric_ops.py @@ -143,7 +143,7 @@ def output_type(self, *input_types): return dtypes.coerce_to_common(left_type, right_type) if dtypes.is_datetime_like(left_type) and dtypes.is_datetime_like(right_type): - return dtypes.TIMEDETLA_DTYPE + return dtypes.TIMEDELTA_DTYPE raise TypeError(f"Cannot subtract dtypes {left_type} and {right_type}") diff --git a/bigframes/operations/timedelta_ops.py b/bigframes/operations/timedelta_ops.py index 0bcd6eb08f..e212381557 100644 --- a/bigframes/operations/timedelta_ops.py +++ b/bigframes/operations/timedelta_ops.py @@ -28,4 +28,4 @@ class ToTimedeltaOp(base_ops.UnaryOp): def output_type(self, *input_types): if input_types[0] is not dtypes.INT_DTYPE: raise TypeError("expected integer input") - return dtypes.TIMEDETLA_DTYPE + return dtypes.TIMEDELTA_DTYPE diff --git a/bigframes/series.py b/bigframes/series.py index 9fa09896be..af9fce6e20 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -805,14 +805,10 @@ def __rsub__(self, other: float | int | Series) -> Series: __rsub__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__rsub__) - def sub( - self, other: float | int | pandas.Timestamp | datetime.datetime | Series - ) -> Series: + def sub(self, other) -> Series: return self._apply_binary_op(other, ops.sub_op) - def rsub( - self, other: float | int | pandas.Timestamp | datetime.datetime | Series - ) -> Series: + def rsub(self, other) -> Series: return self._apply_binary_op(other, ops.sub_op, reverse=True) subtract = sub @@ -2084,10 +2080,3 @@ def str(self) -> strings.StringMethods: def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj) - - -def _has_timestamp_type(input: typing.Any) -> bool: - if isinstance(input, Series): - return bigframes.dtypes.is_datetime_like(input.dtype) - - return isinstance(input, (pandas.Timestamp, datetime.datetime)) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index ba693696c3..b7550583e5 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -177,7 +177,7 @@ def read_pandas_load_job( destination_table = self._bqclient.get_table(load_table_destination) col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - col: bigframes.dtypes.TIMEDETLA_DTYPE + col: bigframes.dtypes.TIMEDELTA_DTYPE for col in df_and_labels.timedelta_cols } array_value = core.ArrayValue.from_table( @@ -236,7 +236,7 @@ def read_pandas_streaming( ) col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - col: bigframes.dtypes.TIMEDETLA_DTYPE + col: bigframes.dtypes.TIMEDELTA_DTYPE for col in df_and_labels.timedelta_cols } array_value = ( From 066cef35f1142e50862810ea7d35cd90bd39ed3b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Wed, 5 Feb 2025 02:54:30 +0000 Subject: [PATCH 6/7] fix comment --- bigframes/core/compile/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 51895a2362..09e5e3c4d7 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -101,6 +101,7 @@ def compile_raw( def _preprocess(self, node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrites.rewrite_slice) + # Need to rewrite ops before compilation to keep it consistent with the compile_sql() call node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) node, _ = rewrites.pull_up_order( node, order_root=False, ordered_joins=self.strict @@ -111,7 +112,6 @@ def _preprocess(self, node: nodes.BigFrameNode): @functools.lru_cache(maxsize=5000) def compile_node(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: """Compile node into CompileArrayValue. Caches result.""" - # Need to dispatch op before compilation to keep it consistent with the compile_sql() call return self._compile_node(node) @functools.singledispatchmethod From ab1e1b0bb6d6a4024514503ea7a1a0f69a898125 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Wed, 5 Feb 2025 18:48:14 +0000 Subject: [PATCH 7/7] add rewrites to compile_raw and compile_peek_sql --- bigframes/core/compile/compiler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 09e5e3c4d7..dca204401e 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -82,6 +82,7 @@ def compile_sql( def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str: ids = [id.sql for id in node.ids] node = nodes.bottom_up(node, rewrites.rewrite_slice) + node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) node, _ = rewrites.pull_up_order( node, order_root=False, ordered_joins=self.strict ) @@ -94,6 +95,7 @@ def compile_raw( str, typing.Sequence[google.cloud.bigquery.SchemaField], bf_ordering.RowOrdering ]: node = nodes.bottom_up(node, rewrites.rewrite_slice) + node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) node, ordering = rewrites.pull_up_order(node, ordered_joins=self.strict) ir = self.compile_node(node) sql = ir.to_sql() @@ -101,7 +103,6 @@ def compile_raw( def _preprocess(self, node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrites.rewrite_slice) - # Need to rewrite ops before compilation to keep it consistent with the compile_sql() call node = nodes.top_down(node, rewrites.rewrite_timedelta_ops) node, _ = rewrites.pull_up_order( node, order_root=False, ordered_joins=self.strict