diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 34df7231cc..993f2caa47 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -102,11 +102,11 @@ def __init__( ): """Construct a block object, will create default index if no index columns specified.""" index_columns = list(index_columns) - if index_labels: + if index_labels is not None: index_labels = list(index_labels) if len(index_labels) != len(index_columns): raise ValueError( - "'index_columns' and 'index_labels' must have equal length" + f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length" ) if len(index_columns) == 0: new_index_col_id = guid.generate_guid() @@ -1089,6 +1089,46 @@ def summarize( labels = self._get_labels_for_columns(column_ids) return Block(expr, column_labels=labels, index_columns=[label_col_id]) + def corr(self): + """Returns a block object to compute the self-correlation on this block.""" + aggregations = [ + ( + ex.BinaryAggregation( + agg_ops.CorrOp(), ex.free_var(left_col), ex.free_var(right_col) + ), + f"{left_col}-{right_col}", + ) + for left_col in self.value_columns + for right_col in self.value_columns + ] + expr = self.expr.aggregate(aggregations) + + index_col_ids = [ + guid.generate_guid() for i in range(self.column_labels.nlevels) + ] + input_count = len(self.value_columns) + unpivot_columns = tuple( + ( + guid.generate_guid(), + tuple(expr.column_ids[input_count * i : input_count * (i + 1)]), + ) + for i in range(input_count) + ) + labels = self._get_labels_for_columns(self.value_columns) + + expr = expr.unpivot( + row_labels=labels, + index_col_ids=index_col_ids, + unpivot_columns=unpivot_columns, + ) + + return Block( + expr, + column_labels=self.column_labels, + index_columns=index_col_ids, + index_labels=self.column_labels.names, + ) + def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp]: """ Gets a standard set of stats to preemptively fetch for a column if @@ -1889,7 +1929,7 @@ def to_pandas(self) -> pd.Index: df = expr.session._rows_to_dataframe(results, dtypes) df = df.set_index(index_columns) index = df.index - index.names = list(self._block._index_labels) + index.names = list(self._block._index_labels) # type:ignore return index def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9db567a497..ccbf68ebb5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1017,6 +1017,27 @@ def combine( def combine_first(self, other: DataFrame): return self._apply_dataframe_binop(other, ops.fillna_op) + def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame: + if method != "pearson": + raise NotImplementedError( + f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}" + ) + if min_periods: + raise NotImplementedError( + f"min_periods not yet supported. {constants.FEEDBACK_LINK}" + ) + if len(self.columns) > 30: + raise NotImplementedError( + f"Only work with dataframes containing fewer than 30 columns. Current: {len(self.columns)}. {constants.FEEDBACK_LINK}" + ) + + if not numeric_only: + frame = self._raise_on_non_numeric("corr") + else: + frame = self._drop_non_numeric() + + return DataFrame(frame._block.corr()) + def to_pandas( self, max_download_size: Optional[int] = None, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 3d31253021..8f75534fc6 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1783,6 +1783,46 @@ def test_combine_first( pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) +@pytest.mark.parametrize( + ("columns", "numeric_only"), + [ + (["bool_col", "int64_col", "float64_col"], True), + (["bool_col", "int64_col", "float64_col"], False), + (["bool_col", "int64_col", "float64_col", "string_col"], True), + pytest.param( + ["bool_col", "int64_col", "float64_col", "string_col"], + False, + marks=pytest.mark.xfail( + raises=NotImplementedError, + ), + ), + ], +) +def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas() + pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_frame_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +def test_corr_w_invalid_parameters(scalars_dfs): + columns = ["int64_too", "int64_col", "float64_col"] + scalars_df, _ = scalars_dfs + + with pytest.raises(NotImplementedError): + scalars_df[columns].corr(method="kendall") + + with pytest.raises(NotImplementedError): + scalars_df[columns].corr(min_periods=1) + + @pytest.mark.parametrize( ("op"), [ diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index c5e8b45b8e..e0b9164315 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -880,6 +880,27 @@ def test_column_multi_index_w_na_stack(scalars_df_index, scalars_pandas_df_index pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) +def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): + columns = ["int64_too", "float64_col", "int64_col"] + multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + + bf = scalars_df_index[columns].copy() + bf.columns = multi_columns + + pd_df = scalars_pandas_df_index[columns].copy() + pd_df.columns = multi_columns + + bf_result = bf.corr(numeric_only=True).to_pandas() + pd_result = pd_df.corr(numeric_only=True) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pandas.testing.assert_frame_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("index_names",), [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 05f4167838..84d2aa7fcb 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2805,6 +2805,40 @@ def combine_first(self, other) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def corr(self, method, min_periods, numeric_only) -> DataFrame: + """ + Compute pairwise correlation of columns, excluding NA/null values. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame({'A': [1, 2, 3], + ... 'B': [400, 500, 600], + ... 'C': [0.8, 0.4, 0.9]}) + >>> df.corr(numeric_only=True) + A B C + A 1.0 1.0 0.188982 + B 1.0 1.0 0.188982 + C 0.188982 0.188982 1.0 + + [3 rows x 3 columns] + + Args: + method (string, default "pearson"): + Correlation method to use - currently only "pearson" is supported. + min_periods (int, default None): + The minimum number of observations needed to return a result. Non-default values + are not yet supported, so a result will be returned for at least two observations. + numeric_only(bool, default False): + Include only float, int, boolean, decimal data. + + Returns: + DataFrame: Correlation matrix. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def update( self, other, join: str = "left", overwrite: bool = True, filter_func=None ) -> DataFrame: diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index b203471606..6c01a6dd0c 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -840,7 +840,7 @@ def corr(self, other, method="pearson", min_periods=None) -> float: float: Will return NaN if there are fewer than two numeric pairs, either series has a variance or covariance of zero, or any input value is infinite. """ - raise NotImplementedError("abstract method") + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) def cov( self,