Skip to content

feat: add DataFrames.corr() method #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def __init__(
):
"""Construct a block object, will create default index if no index columns specified."""
index_columns = list(index_columns)
if index_labels:
if index_labels is not None:
index_labels = list(index_labels)
if len(index_labels) != len(index_columns):
raise ValueError(
"'index_columns' and 'index_labels' must have equal length"
f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length"
)
if len(index_columns) == 0:
new_index_col_id = guid.generate_guid()
Expand Down Expand Up @@ -1089,6 +1089,46 @@ def summarize(
labels = self._get_labels_for_columns(column_ids)
return Block(expr, column_labels=labels, index_columns=[label_col_id])

def corr(self):
"""Returns a block object to compute the self-correlation on this block."""
aggregations = [
(
ex.BinaryAggregation(
agg_ops.CorrOp(), ex.free_var(left_col), ex.free_var(right_col)
),
f"{left_col}-{right_col}",
)
for left_col in self.value_columns
for right_col in self.value_columns
]
expr = self.expr.aggregate(aggregations)

index_col_ids = [
guid.generate_guid() for i in range(self.column_labels.nlevels)
]
input_count = len(self.value_columns)
unpivot_columns = tuple(
(
guid.generate_guid(),
tuple(expr.column_ids[input_count * i : input_count * (i + 1)]),
)
for i in range(input_count)
)
labels = self._get_labels_for_columns(self.value_columns)

expr = expr.unpivot(
row_labels=labels,
index_col_ids=index_col_ids,
unpivot_columns=unpivot_columns,
)

return Block(
expr,
column_labels=self.column_labels,
index_columns=index_col_ids,
index_labels=self.column_labels.names,
)

def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp]:
"""
Gets a standard set of stats to preemptively fetch for a column if
Expand Down Expand Up @@ -1889,7 +1929,7 @@ def to_pandas(self) -> pd.Index:
df = expr.session._rows_to_dataframe(results, dtypes)
df = df.set_index(index_columns)
index = df.index
index.names = list(self._block._index_labels)
index.names = list(self._block._index_labels) # type:ignore
return index

def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
Expand Down
21 changes: 21 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,27 @@ def combine(
def combine_first(self, other: DataFrame):
return self._apply_dataframe_binop(other, ops.fillna_op)

def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame:
if method != "pearson":
raise NotImplementedError(
f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}"
)
if min_periods:
raise NotImplementedError(
f"min_periods not yet supported. {constants.FEEDBACK_LINK}"
)
if len(self.columns) > 30:
raise NotImplementedError(
f"Only work with dataframes containing fewer than 30 columns. Current: {len(self.columns)}. {constants.FEEDBACK_LINK}"
)

if not numeric_only:
frame = self._raise_on_non_numeric("corr")
else:
frame = self._drop_non_numeric()

return DataFrame(frame._block.corr())

def to_pandas(
self,
max_download_size: Optional[int] = None,
Expand Down
40 changes: 40 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,46 @@ def test_combine_first(
pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("columns", "numeric_only"),
[
(["bool_col", "int64_col", "float64_col"], True),
(["bool_col", "int64_col", "float64_col"], False),
(["bool_col", "int64_col", "float64_col", "string_col"], True),
pytest.param(
["bool_col", "int64_col", "float64_col", "string_col"],
False,
marks=pytest.mark.xfail(
raises=NotImplementedError,
),
),
],
)
def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas()
pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_frame_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


def test_corr_w_invalid_parameters(scalars_dfs):
columns = ["int64_too", "int64_col", "float64_col"]
scalars_df, _ = scalars_dfs

with pytest.raises(NotImplementedError):
scalars_df[columns].corr(method="kendall")

with pytest.raises(NotImplementedError):
scalars_df[columns].corr(min_periods=1)


@pytest.mark.parametrize(
("op"),
[
Expand Down
21 changes: 21 additions & 0 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,27 @@ def test_column_multi_index_w_na_stack(scalars_df_index, scalars_pandas_df_index
pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)


def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):
columns = ["int64_too", "float64_col", "int64_col"]
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))

bf = scalars_df_index[columns].copy()
bf.columns = multi_columns

pd_df = scalars_pandas_df_index[columns].copy()
pd_df.columns = multi_columns

bf_result = bf.corr(numeric_only=True).to_pandas()
pd_result = pd_df.corr(numeric_only=True)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pandas.testing.assert_frame_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("index_names",),
[
Expand Down
34 changes: 34 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,40 @@ def combine_first(self, other) -> DataFrame:
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def corr(self, method, min_periods, numeric_only) -> DataFrame:
"""
Compute pairwise correlation of columns, excluding NA/null values.

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame({'A': [1, 2, 3],
... 'B': [400, 500, 600],
... 'C': [0.8, 0.4, 0.9]})
>>> df.corr(numeric_only=True)
A B C
A 1.0 1.0 0.188982
B 1.0 1.0 0.188982
C 0.188982 0.188982 1.0
<BLANKLINE>
[3 rows x 3 columns]

Args:
method (string, default "pearson"):
Correlation method to use - currently only "pearson" is supported.
min_periods (int, default None):
The minimum number of observations needed to return a result. Non-default values
are not yet supported, so a result will be returned for at least two observations.
numeric_only(bool, default False):
Include only float, int, boolean, decimal data.

Returns:
DataFrame: Correlation matrix.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def update(
self, other, join: str = "left", overwrite: bool = True, filter_func=None
) -> DataFrame:
Expand Down
2 changes: 1 addition & 1 deletion third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ def corr(self, other, method="pearson", min_periods=None) -> float:
float: Will return NaN if there are fewer than two numeric pairs, either series has a
variance or covariance of zero, or any input value is infinite.
"""
raise NotImplementedError("abstract method")
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def cov(
self,
Expand Down