Skip to content

PERF: GroupBy.count #43730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Mapping,
TypeVar,
Union,
cast,
)
import warnings

Expand All @@ -30,7 +31,9 @@
from pandas._typing import (
ArrayLike,
FrameOrSeries,
Manager,
Manager2D,
SingleManager,
)
from pandas.util._decorators import (
Appender,
Expand Down Expand Up @@ -80,7 +83,6 @@
Index,
MultiIndex,
all_indexes_same,
default_index,
)
from pandas.core.series import Series
from pandas.core.util.numba_ import maybe_use_numba
Expand Down Expand Up @@ -159,19 +161,21 @@ def pinner(cls):
class SeriesGroupBy(GroupBy[Series]):
_apply_allowlist = base.series_apply_allowlist

def _wrap_agged_manager(self, mgr: Manager2D) -> Series:
single = mgr.iget(0)
def _wrap_agged_manager(self, mgr: Manager) -> Series:
if mgr.ndim == 1:
mgr = cast(SingleManager, mgr)
single = mgr
else:
mgr = cast(Manager2D, mgr)
single = mgr.iget(0)
ser = self.obj._constructor(single, name=self.obj.name)
# NB: caller is responsible for setting ser.index
return ser

def _get_data_to_aggregate(self) -> Manager2D:
def _get_data_to_aggregate(self) -> SingleManager:
ser = self._obj_with_exclusions
single = ser._mgr
columns = default_index(1)
# Much faster than using ser.to_frame() since we avoid inferring columns
# from scalar
return single.to_2d_mgr(columns)
return single

def _iterate_slices(self) -> Iterable[Series]:
yield self._selected_obj
Expand Down
15 changes: 11 additions & 4 deletions pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,8 @@ def count(self) -> Series | DataFrame:
ids, _, ngroups = self.grouper.group_info
mask = ids != -1

is_series = data.ndim == 1

def hfunc(bvalues: ArrayLike) -> ArrayLike:
# TODO(2DEA): reshape would not be necessary with 2D EAs
if bvalues.ndim == 1:
Expand All @@ -1754,6 +1756,10 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
masked = mask & ~isna(bvalues)

counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1)
if is_series:
assert counted.ndim == 2
assert counted.shape[0] == 1
return counted[0]
return counted

new_mgr = data.grouped_reduce(hfunc)
Expand Down Expand Up @@ -2702,7 +2708,7 @@ def blk_func(values: ArrayLike) -> ArrayLike:
mgr = self._get_data_to_aggregate()

res_mgr = mgr.grouped_reduce(blk_func, ignore_failures=True)
if len(res_mgr.items) != len(mgr.items):
if not is_ser and len(res_mgr.items) != len(mgr.items):
warnings.warn(
"Dropping invalid columns in "
f"{type(self).__name__}.quantile is deprecated. "
Expand Down Expand Up @@ -3134,14 +3140,15 @@ def blk_func(values: ArrayLike) -> ArrayLike:
obj = self._obj_with_exclusions

# Operate block-wise instead of column-by-column
orig_ndim = obj.ndim
is_ser = obj.ndim == 1
mgr = self._get_data_to_aggregate()

if numeric_only:
mgr = mgr.get_numeric_data()

res_mgr = mgr.grouped_reduce(blk_func, ignore_failures=True)
if len(res_mgr.items) != len(mgr.items):

if not is_ser and len(res_mgr.items) != len(mgr.items):
howstr = how.replace("group_", "")
warnings.warn(
"Dropping invalid columns in "
Expand All @@ -3162,7 +3169,7 @@ def blk_func(values: ArrayLike) -> ArrayLike:
# We should never get here
raise TypeError("All columns were dropped in grouped_reduce")

if orig_ndim == 1:
if is_ser:
out = self._wrap_agged_manager(res_mgr)
out.index = self.grouper.result_index
else:
Expand Down
23 changes: 22 additions & 1 deletion pandas/core/internals/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)

from pandas._typing import (
ArrayLike,
DtypeObj,
Shape,
)
Expand All @@ -18,7 +19,10 @@
from pandas.core.dtypes.cast import find_common_type

from pandas.core.base import PandasObject
from pandas.core.indexes.api import Index
from pandas.core.indexes.api import (
Index,
default_index,
)

T = TypeVar("T", bound="DataManager")

Expand Down Expand Up @@ -171,6 +175,23 @@ def setitem_inplace(self, indexer, value) -> None:
"""
self.array[indexer] = value

def grouped_reduce(self, func, ignore_failures: bool = False):
"""
ignore_failures : bool, default False
Not used; for compatibility with ArrayManager/BlockManager.
"""

arr = self.array
res = func(arr)
index = default_index(len(res))

mgr = type(self).from_array(res, index)
return mgr

@classmethod
def from_array(cls, arr: ArrayLike, index: Index):
raise AbstractMethodError(cls)


def interleaved_dtype(dtypes: list[DtypeObj]) -> DtypeObj | None:
"""
Expand Down
9 changes: 4 additions & 5 deletions pandas/tests/resample/test_resample_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,10 @@ def test_agg():
expected = pd.concat([a_mean, a_std, b_mean, b_std], axis=1)
expected.columns = pd.MultiIndex.from_product([["A", "B"], ["mean", "std"]])
for t in cases:
warn = FutureWarning if t in cases[1:3] else None
with tm.assert_produces_warning(
warn, match="Dropping invalid columns", check_stacklevel=False
):
# .var on dt64 column raises and is dropped
with tm.assert_produces_warning(None):
# .var on dt64 column raises and is dropped, but the path in core.apply
# that it goes through will still suppress a TypeError even
# once the deprecations in the groupby code are enforced
result = t.aggregate([np.mean, np.std])
tm.assert_frame_equal(result, expected)

Expand Down