diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 010eb96f75..00fd5ccc67 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -155,7 +155,13 @@ def __init__( self._transpose_cache: Optional[Block] = transpose_cache @classmethod - def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block: + def from_local( + cls, + data: pd.DataFrame, + session: bigframes.Session, + *, + cache_transpose: bool = True, + ) -> Block: # Assumes caller has already converted datatypes to bigframes ones. pd_data = data column_labels = pd_data.columns @@ -169,12 +175,21 @@ def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block: pd_data = pd_data.reset_index(names=index_ids) as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False) array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session) - return cls( + block = cls( array_value, column_labels=column_labels, index_columns=index_ids, index_labels=index_labels, ) + if cache_transpose: + try: + # this cache will help when aligning on axis=1 + block = block.with_transpose_cache( + cls.from_local(data.T, session, cache_transpose=False) + ) + except Exception: + pass + return block @property def index(self) -> BlockIndexProperties: @@ -724,12 +739,18 @@ def with_column_labels( f"The column labels size `{len(label_list)} ` should equal to the value" + f"columns size: {len(self.value_columns)}." ) - return Block( + block = Block( self._expr, index_columns=self.index_columns, column_labels=label_list, index_labels=self.index.names, ) + singleton_label = len(list(value)) == 1 and list(value)[0] + if singleton_label is not None and self._transpose_cache is not None: + new_cache, label_id = self._transpose_cache.create_constant(singleton_label) + new_cache = new_cache.set_index([label_id]) + block = block.with_transpose_cache(new_cache) + return block def with_transpose_cache(self, transposed: Block): return Block( @@ -1935,6 +1956,153 @@ def merge( expr = joined_expr.promote_offsets(offset_index_id) return Block(expr, index_columns=[offset_index_id], column_labels=labels) + def _align_both_axes( + self, other: Block, how: str + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + # Join rows + aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) + # join columns schema + # indexers will be none for exact match + if self.column_labels.equals(other.column_labels): + columns, lcol_indexer, rcol_indexer = self.column_labels, None, None + else: + columns, lcol_indexer, rcol_indexer = self.column_labels.join( + other.column_labels, how="outer", return_indexers=True + ) + lcol_indexer = ( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)) + ) + rcol_indexer = ( + rcol_indexer if (rcol_indexer is not None) else range(len(columns)) + ) + + left_input_lookup = ( + lambda index: ex.free_var(get_column_left[self.value_columns[index]]) + if index != -1 + else ex.const(None) + ) + righ_input_lookup = ( + lambda index: ex.free_var(get_column_right[other.value_columns[index]]) + if index != -1 + else ex.const(None) + ) + + left_inputs = [left_input_lookup(i) for i in lcol_indexer] + right_inputs = [righ_input_lookup(i) for i in rcol_indexer] + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + + def _align_axis_0( + self, other: Block, how: str + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + assert len(other.value_columns) == 1 + aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) + + series_column_id = other.value_columns[0] + inputs = tuple( + ( + ex.free_var(get_column_left[col]), + ex.free_var(get_column_right[series_column_id]), + ) + for col in self.value_columns + ) + return aligned_block, self.column_labels, inputs + + def _align_series_block_axis_1( + self, other: Block, how: str + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + assert len(other.value_columns) == 1 + if other._transpose_cache is None: + raise ValueError( + "Wrong align method, this approach requires transpose cache" + ) + + # Join rows + aligned_block, (get_column_left, get_column_right) = join_with_single_row( + self, other.transpose() + ) + # join columns schema + # indexers will be none for exact match + if self.column_labels.equals(other.transpose().column_labels): + columns, lcol_indexer, rcol_indexer = self.column_labels, None, None + else: + columns, lcol_indexer, rcol_indexer = self.column_labels.join( + other.transpose().column_labels, how=how, return_indexers=True + ) + lcol_indexer = ( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)) + ) + rcol_indexer = ( + rcol_indexer if (rcol_indexer is not None) else range(len(columns)) + ) + + left_input_lookup = ( + lambda index: ex.free_var(get_column_left[self.value_columns[index]]) + if index != -1 + else ex.const(None) + ) + righ_input_lookup = ( + lambda index: ex.free_var( + get_column_right[other.transpose().value_columns[index]] + ) + if index != -1 + else ex.const(None) + ) + + left_inputs = [left_input_lookup(i) for i in lcol_indexer] + right_inputs = [righ_input_lookup(i) for i in rcol_indexer] + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + + def _align_pd_series_axis_1( + self, other: pd.Series, how: str + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + if self.column_labels.equals(other.index): + columns, lcol_indexer, rcol_indexer = self.column_labels, None, None + else: + if not (self.column_labels.is_unique and other.index.is_unique): + raise ValueError("Cannot align non-unique indices") + columns, lcol_indexer, rcol_indexer = self.column_labels.join( + other.index, how=how, return_indexers=True + ) + lcol_indexer = ( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)) + ) + rcol_indexer = ( + rcol_indexer if (rcol_indexer is not None) else range(len(columns)) + ) + + left_input_lookup = ( + lambda index: ex.free_var(self.value_columns[index]) + if index != -1 + else ex.const(None) + ) + righ_input_lookup = ( + lambda index: ex.const(other.iloc[index]) if index != -1 else ex.const(None) + ) + + left_inputs = [left_input_lookup(i) for i in lcol_indexer] + right_inputs = [righ_input_lookup(i) for i in rcol_indexer] + return self, columns, tuple(zip(left_inputs, right_inputs)) + + def _apply_binop( + self, + op: ops.BinaryOp, + inputs: Sequence[Tuple[ex.Expression, ex.Expression]], + labels: pd.Index, + reverse: bool = False, + ) -> Block: + block = self + binop_result_ids = [] + for left_input, right_input in inputs: + expr = ( + op.as_expr(right_input, left_input) + if reverse + else op.as_expr(left_input, right_input) + ) + block, result_col_id = block.project_expr(expr) + binop_result_ids.append(result_col_id) + + return block.select_columns(binop_result_ids).with_column_labels(labels) + def join( self, other: Block, @@ -2256,15 +2424,6 @@ def column_ids(self) -> Sequence[str]: """Column(s) to use as row labels.""" return self._block._index_columns - def __repr__(self) -> str: - """Converts an Index to a string.""" - # TODO(swast): Add a timeout here? If the query is taking a long time, - # maybe we just print the job metadata that we have so far? - # TODO(swast): Avoid downloading the whole index by using job - # metadata, like we do with DataFrame. - preview = self.to_pandas() - return repr(preview) - def to_pandas(self) -> pd.Index: """Executes deferred operations and downloads the results.""" if len(self.column_ids) == 0: @@ -2359,6 +2518,61 @@ def join_indexless( ) +def join_with_single_row( + left: Block, + single_row_block: Block, +) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + """ + Special join case where other is a single row block. + This property is not validated, caller responsible for not passing multi-row block. + Preserves index of the left block, ignoring label of other. + """ + left_expr = left.expr + # ignore index columns by dropping them + right_expr = single_row_block.expr.select_columns(single_row_block.value_columns) + left_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in right_expr.column_ids # skip index column + ] + + join_def = join_defs.JoinDefinition( + conditions=(), + mappings=(*left_mappings, *right_mappings), + type="cross", + ) + combined_expr = left_expr.join( + right_expr, + join_def=join_def, + ) + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() + # Drop original indices from each side. and used the coalesced combination generated by the join. + index_cols_post_join = [get_column_left[id] for id in left.index_columns] + + block = Block( + combined_expr, + index_columns=index_cols_post_join, + column_labels=left.column_labels.append(single_row_block.column_labels), + index_labels=[left.index.name], + ) + return ( + block, + (get_column_left, get_column_right), + ) + + def join_mono_indexed( left: Block, right: Block, @@ -2546,7 +2760,7 @@ def coalesce_columns( ) -> Tuple[core.ArrayValue, Sequence[str]]: result_ids = [] for left_id, right_id in zip(left_ids, right_ids): - if how == "left" or how == "inner": + if how == "left" or how == "inner" or how == "cross": result_ids.append(left_id) expr = expr.drop_columns([right_id]) elif how == "right": diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 6e3a91cc1c..ee120635d3 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -560,7 +560,7 @@ def size(self) -> series.Series: by_column_ids=self._by_col_ids, dropna=self._dropna, ) - return series.Series(agg_block, name=self._value_name) + return series.Series(agg_block.with_column_labels([self._value_name])) def skew(self, *args, **kwargs) -> series.Series: block = block_ops.skew(self._block, [self._value_column], self._by_col_ids) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index a349ea8f6b..e404e439ab 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -714,13 +714,9 @@ def _apply_binop( DataFrame(other), op, how=how, reverse=reverse ) elif utils.get_axis_number(axis) == 0: - bf_series = bigframes.core.convert.to_bf_series( - other, self.index, self._session - ) - return self._apply_series_binop_axis_0(bf_series, op, how, reverse) + return self._apply_series_binop_axis_0(other, op, how, reverse) elif utils.get_axis_number(axis) == 1: - pd_series = bigframes.core.convert.to_pd_series(other, self.columns) - return self._apply_series_binop_axis_1(pd_series, op, how, reverse) + return self._apply_series_binop_axis_1(other, op, how, reverse) raise NotImplementedError( f"binary operation is not implemented on the second operand of type {type(other).__name__}." f"{constants.FEEDBACK_LINK}" @@ -745,89 +741,49 @@ def _apply_scalar_binop( def _apply_series_binop_axis_0( self, - other: bigframes.series.Series, + other, op: ops.BinaryOp, how: str = "outer", reverse: bool = False, ) -> DataFrame: - block, (get_column_left, get_column_right) = self._block.join( - other._block, how=how + bf_series = bigframes.core.convert.to_bf_series( + other, self.index, self._session ) - - series_column_id = other._value_column - series_col = get_column_right[series_column_id] - for column_id, label in zip( - self._block.value_columns, self._block.column_labels - ): - self_col = get_column_left[column_id] - expr = ( - op.as_expr(series_col, self_col) - if reverse - else op.as_expr(self_col, series_col) - ) - block, _ = block.project_expr(expr, label) - block = block.drop_columns([get_column_left[column_id]]) - - block = block.drop_columns([series_col]) - block = block.with_index_labels(self._block.index.names) - return DataFrame(block) + aligned_block, columns, expr_pairs = self._block._align_axis_0( + bf_series._block, how=how + ) + result = aligned_block._apply_binop( + op, inputs=expr_pairs, labels=columns, reverse=reverse + ) + return DataFrame(result) def _apply_series_binop_axis_1( self, - other: pandas.Series, + other, op: ops.BinaryOp, how: str = "outer", reverse: bool = False, ) -> DataFrame: - # Somewhat different alignment than df-df so separate codepath for now. - if self.columns.equals(other.index): - columns, lcol_indexer, rcol_indexer = self.columns, None, None + """Align dataframe with pandas series by inlining series values as literals.""" + # If we already know the transposed schema (from the transpose cache), we don't need to materialize rows from other + # Instead, can fully defer execution (as a cross-join) + if ( + isinstance(other, bigframes.series.Series) + and other._block._transpose_cache is not None + ): + aligned_block, columns, expr_pairs = self._block._align_series_block_axis_1( + other._block, how=how + ) else: - if not (self.columns.is_unique and other.index.is_unique): - raise ValueError("Cannot align non-unique indices") - columns, lcol_indexer, rcol_indexer = self.columns.join( - other.index, how=how, return_indexers=True + # Fallback path, materialize `other` locally + pd_series = bigframes.core.convert.to_pd_series(other, self.columns) + aligned_block, columns, expr_pairs = self._block._align_pd_series_axis_1( + pd_series, how=how ) - - binop_result_ids = [] - - column_indices = zip( - lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (rcol_indexer is not None) else range(len(columns)), + result = aligned_block._apply_binop( + op, inputs=expr_pairs, labels=columns, reverse=reverse ) - - block = self._block - for left_index, right_index in column_indices: - if left_index >= 0 and right_index >= 0: # -1 indices indicate missing - self_col_id = self._block.value_columns[left_index] - other_scalar = other.iloc[right_index] - expr = ( - op.as_expr(ex.const(other_scalar), self_col_id) - if reverse - else op.as_expr(self_col_id, ex.const(other_scalar)) - ) - elif left_index >= 0: - self_col_id = self._block.value_columns[left_index] - expr = ( - op.as_expr(ex.const(None), self_col_id) - if reverse - else op.as_expr(self_col_id, ex.const(None)) - ) - elif right_index >= 0: - other_scalar = other.iloc[right_index] - expr = ( - op.as_expr(ex.const(other_scalar), ex.const(None)) - if reverse - else op.as_expr(ex.const(None), ex.const(other_scalar)) - ) - else: - # Should not be possible - raise ValueError("No right or left index.") - block, result_col_id = block.project_expr(expr) - binop_result_ids.append(result_col_id) - - block = block.select_columns(binop_result_ids) - return DataFrame(block.with_column_labels(columns)) + return DataFrame(result) def _apply_dataframe_binop( self, @@ -836,57 +792,13 @@ def _apply_dataframe_binop( how: str = "outer", reverse: bool = False, ) -> DataFrame: - # Join rows - block, (get_column_left, get_column_right) = self._block.join( + aligned_block, columns, expr_pairs = self._block._align_both_axes( other._block, how=how ) - # join columns schema - # indexers will be none for exact match - if self.columns.equals(other.columns): - columns, lcol_indexer, rcol_indexer = self.columns, None, None - else: - columns, lcol_indexer, rcol_indexer = self.columns.join( - other.columns, how=how, return_indexers=True - ) - - binop_result_ids = [] - - column_indices = zip( - lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (rcol_indexer is not None) else range(len(columns)), + result = aligned_block._apply_binop( + op, inputs=expr_pairs, labels=columns, reverse=reverse ) - - for left_index, right_index in column_indices: - if left_index >= 0 and right_index >= 0: # -1 indices indicate missing - self_col_id = get_column_left[self._block.value_columns[left_index]] - other_col_id = get_column_right[other._block.value_columns[right_index]] - expr = ( - op.as_expr(other_col_id, self_col_id) - if reverse - else op.as_expr(self_col_id, other_col_id) - ) - elif left_index >= 0: - self_col_id = get_column_left[self._block.value_columns[left_index]] - expr = ( - op.as_expr(ex.const(None), self_col_id) - if reverse - else op.as_expr(self_col_id, ex.const(None)) - ) - elif right_index >= 0: - other_col_id = get_column_right[other._block.value_columns[right_index]] - expr = ( - op.as_expr(other_col_id, ex.const(None)) - if reverse - else op.as_expr(ex.const(None), other_col_id) - ) - else: - # Should not be possible - raise ValueError("No right or left index.") - block, result_col_id = block.project_expr(expr) - binop_result_ids.append(result_col_id) - - block = block.select_columns(binop_result_ids).with_column_labels(columns) - return DataFrame(block) + return DataFrame(result) def eq(self, other: typing.Any, axis: str | int = "columns") -> DataFrame: return self._apply_binop(other, ops.eq_op, axis=axis) diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 49ef7f76ee..f339345971 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -70,27 +70,35 @@ def __init__( ) block: typing.Optional[blocks.Block] = None + if (name is not None) and not isinstance(name, typing.Hashable): + raise ValueError( + f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" + ) if copy is not None and not copy: raise ValueError( f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}" ) if isinstance(data, blocks.Block): + # Constructing from block is for internal use only - shouldn't use parameters, block encompasses all state assert len(data.value_columns) == 1 assert len(data.column_labels) == 1 assert index is None + assert name is None + assert dtype is None block = data # interpret these cases as both index and data - elif ( - isinstance(data, SeriesMethods) - or isinstance(data, pd.Series) - or pd.api.types.is_dict_like(data) - ): - if isinstance(data, pd.Series): - data = read_pandas_func(data) - elif pd.api.types.is_dict_like(data): - data = read_pandas_func(pd.Series(data, dtype=dtype)) # type: ignore - dtype = None + elif isinstance(data, bigframes.pandas.Series) or pd.api.types.is_dict_like( + data + ): # includes pd.Series + if isinstance(data, bigframes.pandas.Series): + data = data.copy() + if name is not None: + data.name = name + if dtype is not None: + data = data.astype(dtype) + else: # local dict-like data + data = read_pandas_func(pd.Series(data, name=name, dtype=dtype)) # type: ignore data_block = data._block if index is not None: # reindex @@ -103,10 +111,8 @@ def __init__( # list-like data that will get default index elif isinstance(data, indexes.Index) or pd.api.types.is_list_like(data): - data = indexes.Index(data, dtype=dtype, session=session) - dtype = ( - None # set to none as it has already been applied, avoid re-cast later - ) + data = indexes.Index(data, dtype=dtype, name=name, session=session) + # set to none as it has already been applied, avoid re-cast later if data.nlevels != 1: raise NotImplementedError("Cannot interpret multi-index as Series.") # Reset index to promote index columns to value columns, set default index @@ -135,20 +141,9 @@ def __init__( dtype=bigframes.dtypes.INT_DTYPE, ) block, _ = bf_index._block.create_constant(data, dtype) - dtype = None block = block.with_column_labels([name]) assert block is not None - if name: - if not isinstance(name, typing.Hashable): - raise ValueError( - f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" - ) - block = block.with_column_labels([name]) - if dtype: - block = block.multi_apply_unary_op( - block.value_columns, ops.AsTypeOp(to_type=dtype) - ) self._block: blocks.Block = block @property diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c8601c101e..2919b2d77f 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -294,6 +294,7 @@ def __init__( # performance logging self._bytes_processed_sum = 0 self._slot_millis_sum = 0 + self._execution_count = 0 @property def bqclient(self): @@ -362,6 +363,10 @@ def _add_slot_millis(self, amount: int): """Increment slot_millis_sum by amount.""" self._slot_millis_sum += amount + def _add_execution(self, amount: int = 1): + """Increment slot_millis_sum by amount.""" + self._execution_count += amount + def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) @@ -442,6 +447,7 @@ def _query_to_destination( configuration: dict = {"query": {"useQueryCache": True}}, do_clustering=True, ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: + self._add_execution(1) # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. dry_run_config = bigquery.QueryJobConfig() @@ -1035,7 +1041,7 @@ def read_pandas( # Try to handle non-dataframe pandas objects as well if isinstance(pandas_dataframe, pandas.Series): bf_df = self._read_pandas(pandas.DataFrame(pandas_dataframe), "read_pandas") - bf_series = typing.cast(series.Series, bf_df[bf_df.columns[0]]) + bf_series = series.Series(bf_df._block) # wrapping into df can set name to 0 so reset to original object name bf_series.name = pandas_dataframe.name return bf_series @@ -1080,9 +1086,8 @@ def _read_pandas_inline( return None try: - inline_df = dataframe.DataFrame( - blocks.Block.from_local(pandas_dataframe, self) - ) + local_block = blocks.Block.from_local(pandas_dataframe, self) + inline_df = dataframe.DataFrame(local_block) except pa.ArrowInvalid as e: raise pa.ArrowInvalid( f"Could not convert with a BigQuery type: `{e}`. " @@ -1969,6 +1974,8 @@ def _execute( dry_run=False, col_id_overrides: Mapping[str, str] = {}, ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + if not dry_run: + self._add_execution(1) sql = self._to_sql( array_value, sorted=sorted, col_id_overrides=col_id_overrides ) # type:ignore diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index e0f4793943..eae25bb027 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2130,14 +2130,12 @@ def test_series_binop_axis_index( (pd.Index([1000, 2000, 3000])), (bf_indexes.Index([1000, 2000, 3000])), (pd.Series((1000, 2000), index=["int64_too", "float64_col"])), - (series.Series((1000, 2000), index=["int64_too", "float64_col"])), ], ids=[ "tuple", "pd_index", "bf_index", "pd_series", - "bf_series", ], ) def test_listlike_binop_axis_1(scalars_dfs, input): @@ -2153,6 +2151,26 @@ def test_listlike_binop_axis_1(scalars_dfs, input): assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) +def test_binop_with_self_aggregate(session, scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + df_columns = ["int64_col", "float64_col", "int64_too"] + + # Ensure that this takes the optimized single-query path by counting executions + execution_count_before = session._execution_count + bf_df = scalars_df[df_columns] + bf_result = (bf_df - bf_df.mean()).to_pandas() + execution_count_after = session._execution_count + + pd_df = scalars_pandas_df[df_columns] + pd_result = pd_df - pd_df.mean() + + executions = execution_count_after - execution_count_before + + assert executions == 1 + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + @pytest.mark.parametrize( ("left_labels", "right_labels"), [