Skip to content

Commit 306ccdb

Browse files
authored
PERF: ExpandingGroupby (#39664)
1 parent e889b94 commit 306ccdb

File tree

4 files changed

+165
-131
lines changed

4 files changed

+165
-131
lines changed

asv_bench/benchmarks/rolling.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,19 @@ def time_rolling_multiindex_creation(self):
255255

256256
class GroupbyEWM:
257257

258+
params = ["var", "std", "cov", "corr"]
259+
param_names = ["method"]
260+
261+
def setup(self, method):
262+
df = pd.DataFrame({"A": range(50), "B": range(50)})
263+
self.gb_ewm = df.groupby("A").ewm(com=1.0)
264+
265+
def time_groupby_method(self, method):
266+
getattr(self.gb_ewm, method)()
267+
268+
269+
class GroupbyEWMEngine:
270+
258271
params = ["cython", "numba"]
259272
param_names = ["engine"]
260273

doc/source/whatsnew/v1.3.0.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ Performance improvements
254254
- Performance improvement in :meth:`core.window.rolling.Rolling.corr` and :meth:`core.window.rolling.Rolling.cov` (:issue:`39388`)
255255
- Performance improvement in :meth:`core.window.rolling.RollingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.cov` (:issue:`39591`)
256256
- Performance improvement in :func:`unique` for object data type (:issue:`37615`)
257+
- Performance improvement in :class:`core.window.rolling.ExpandingGroupby` aggregation methods (:issue:`39664`)
257258

258259
.. ---------------------------------------------------------------------------
259260

pandas/_libs/window/aggregations.pyx

Lines changed: 122 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,48 +1547,59 @@ def ewma(float64_t[:] vals, int64_t[:] start, int64_t[:] end, int minp,
15471547
"""
15481548

15491549
cdef:
1550-
Py_ssize_t i, nobs, N = len(vals)
1551-
ndarray[float64_t] output = np.empty(N, dtype=float)
1550+
Py_ssize_t i, j, s, e, nobs, win_size, N = len(vals), M = len(start)
1551+
float64_t[:] sub_vals
1552+
ndarray[float64_t] sub_output, output = np.empty(N, dtype=float)
15521553
float64_t alpha, old_wt_factor, new_wt, weighted_avg, old_wt, cur
15531554
bint is_observation
15541555

15551556
if N == 0:
15561557
return output
15571558

15581559
alpha = 1. / (1. + com)
1559-
old_wt_factor = 1. - alpha
1560-
new_wt = 1. if adjust else alpha
15611560

1562-
weighted_avg = vals[0]
1563-
is_observation = weighted_avg == weighted_avg
1564-
nobs = int(is_observation)
1565-
output[0] = weighted_avg if nobs >= minp else NaN
1566-
old_wt = 1.
1561+
for j in range(M):
1562+
s = start[j]
1563+
e = end[j]
1564+
sub_vals = vals[s:e]
1565+
win_size = len(sub_vals)
1566+
sub_output = np.empty(win_size, dtype=float)
1567+
1568+
old_wt_factor = 1. - alpha
1569+
new_wt = 1. if adjust else alpha
1570+
1571+
weighted_avg = sub_vals[0]
1572+
is_observation = weighted_avg == weighted_avg
1573+
nobs = int(is_observation)
1574+
sub_output[0] = weighted_avg if nobs >= minp else NaN
1575+
old_wt = 1.
1576+
1577+
with nogil:
1578+
for i in range(1, win_size):
1579+
cur = sub_vals[i]
1580+
is_observation = cur == cur
1581+
nobs += is_observation
1582+
if weighted_avg == weighted_avg:
1583+
1584+
if is_observation or not ignore_na:
1585+
1586+
old_wt *= old_wt_factor
1587+
if is_observation:
1588+
1589+
# avoid numerical errors on constant series
1590+
if weighted_avg != cur:
1591+
weighted_avg = ((old_wt * weighted_avg) +
1592+
(new_wt * cur)) / (old_wt + new_wt)
1593+
if adjust:
1594+
old_wt += new_wt
1595+
else:
1596+
old_wt = 1.
1597+
elif is_observation:
1598+
weighted_avg = cur
15671599

1568-
with nogil:
1569-
for i in range(1, N):
1570-
cur = vals[i]
1571-
is_observation = cur == cur
1572-
nobs += is_observation
1573-
if weighted_avg == weighted_avg:
1574-
1575-
if is_observation or not ignore_na:
1576-
1577-
old_wt *= old_wt_factor
1578-
if is_observation:
1579-
1580-
# avoid numerical errors on constant series
1581-
if weighted_avg != cur:
1582-
weighted_avg = ((old_wt * weighted_avg) +
1583-
(new_wt * cur)) / (old_wt + new_wt)
1584-
if adjust:
1585-
old_wt += new_wt
1586-
else:
1587-
old_wt = 1.
1588-
elif is_observation:
1589-
weighted_avg = cur
1600+
sub_output[i] = weighted_avg if nobs >= minp else NaN
15901601

1591-
output[i] = weighted_avg if nobs >= minp else NaN
1602+
output[s:e] = sub_output
15921603

15931604
return output
15941605

@@ -1620,88 +1631,99 @@ def ewmcov(float64_t[:] input_x, int64_t[:] start, int64_t[:] end, int minp,
16201631
"""
16211632

16221633
cdef:
1623-
Py_ssize_t i, nobs, N = len(input_x), M = len(input_y)
1634+
Py_ssize_t i, j, s, e, win_size, nobs
1635+
Py_ssize_t N = len(input_x), M = len(input_y), L = len(start)
16241636
float64_t alpha, old_wt_factor, new_wt, mean_x, mean_y, cov
16251637
float64_t sum_wt, sum_wt2, old_wt, cur_x, cur_y, old_mean_x, old_mean_y
16261638
float64_t numerator, denominator
1627-
ndarray[float64_t] output
1639+
float64_t[:] sub_x_vals, sub_y_vals
1640+
ndarray[float64_t] sub_out, output = np.empty(N, dtype=float)
16281641
bint is_observation
16291642

16301643
if M != N:
16311644
raise ValueError(f"arrays are of different lengths ({N} and {M})")
16321645

1633-
output = np.empty(N, dtype=float)
16341646
if N == 0:
16351647
return output
16361648

16371649
alpha = 1. / (1. + com)
1638-
old_wt_factor = 1. - alpha
1639-
new_wt = 1. if adjust else alpha
1640-
1641-
mean_x = input_x[0]
1642-
mean_y = input_y[0]
1643-
is_observation = (mean_x == mean_x) and (mean_y == mean_y)
1644-
nobs = int(is_observation)
1645-
if not is_observation:
1646-
mean_x = NaN
1647-
mean_y = NaN
1648-
output[0] = (0. if bias else NaN) if nobs >= minp else NaN
1649-
cov = 0.
1650-
sum_wt = 1.
1651-
sum_wt2 = 1.
1652-
old_wt = 1.
1653-
1654-
with nogil:
16551650

1656-
for i in range(1, N):
1657-
cur_x = input_x[i]
1658-
cur_y = input_y[i]
1659-
is_observation = (cur_x == cur_x) and (cur_y == cur_y)
1660-
nobs += is_observation
1661-
if mean_x == mean_x:
1662-
if is_observation or not ignore_na:
1663-
sum_wt *= old_wt_factor
1664-
sum_wt2 *= (old_wt_factor * old_wt_factor)
1665-
old_wt *= old_wt_factor
1666-
if is_observation:
1667-
old_mean_x = mean_x
1668-
old_mean_y = mean_y
1669-
1670-
# avoid numerical errors on constant series
1671-
if mean_x != cur_x:
1672-
mean_x = ((old_wt * old_mean_x) +
1673-
(new_wt * cur_x)) / (old_wt + new_wt)
1674-
1675-
# avoid numerical errors on constant series
1676-
if mean_y != cur_y:
1677-
mean_y = ((old_wt * old_mean_y) +
1678-
(new_wt * cur_y)) / (old_wt + new_wt)
1679-
cov = ((old_wt * (cov + ((old_mean_x - mean_x) *
1680-
(old_mean_y - mean_y)))) +
1681-
(new_wt * ((cur_x - mean_x) *
1682-
(cur_y - mean_y)))) / (old_wt + new_wt)
1683-
sum_wt += new_wt
1684-
sum_wt2 += (new_wt * new_wt)
1685-
old_wt += new_wt
1686-
if not adjust:
1687-
sum_wt /= old_wt
1688-
sum_wt2 /= (old_wt * old_wt)
1689-
old_wt = 1.
1690-
elif is_observation:
1691-
mean_x = cur_x
1692-
mean_y = cur_y
1693-
1694-
if nobs >= minp:
1695-
if not bias:
1696-
numerator = sum_wt * sum_wt
1697-
denominator = numerator - sum_wt2
1698-
if denominator > 0:
1699-
output[i] = (numerator / denominator) * cov
1651+
for j in range(L):
1652+
s = start[j]
1653+
e = end[j]
1654+
sub_x_vals = input_x[s:e]
1655+
sub_y_vals = input_y[s:e]
1656+
win_size = len(sub_x_vals)
1657+
sub_out = np.empty(win_size, dtype=float)
1658+
1659+
old_wt_factor = 1. - alpha
1660+
new_wt = 1. if adjust else alpha
1661+
1662+
mean_x = sub_x_vals[0]
1663+
mean_y = sub_y_vals[0]
1664+
is_observation = (mean_x == mean_x) and (mean_y == mean_y)
1665+
nobs = int(is_observation)
1666+
if not is_observation:
1667+
mean_x = NaN
1668+
mean_y = NaN
1669+
sub_out[0] = (0. if bias else NaN) if nobs >= minp else NaN
1670+
cov = 0.
1671+
sum_wt = 1.
1672+
sum_wt2 = 1.
1673+
old_wt = 1.
1674+
1675+
with nogil:
1676+
for i in range(1, win_size):
1677+
cur_x = sub_x_vals[i]
1678+
cur_y = sub_y_vals[i]
1679+
is_observation = (cur_x == cur_x) and (cur_y == cur_y)
1680+
nobs += is_observation
1681+
if mean_x == mean_x:
1682+
if is_observation or not ignore_na:
1683+
sum_wt *= old_wt_factor
1684+
sum_wt2 *= (old_wt_factor * old_wt_factor)
1685+
old_wt *= old_wt_factor
1686+
if is_observation:
1687+
old_mean_x = mean_x
1688+
old_mean_y = mean_y
1689+
1690+
# avoid numerical errors on constant series
1691+
if mean_x != cur_x:
1692+
mean_x = ((old_wt * old_mean_x) +
1693+
(new_wt * cur_x)) / (old_wt + new_wt)
1694+
1695+
# avoid numerical errors on constant series
1696+
if mean_y != cur_y:
1697+
mean_y = ((old_wt * old_mean_y) +
1698+
(new_wt * cur_y)) / (old_wt + new_wt)
1699+
cov = ((old_wt * (cov + ((old_mean_x - mean_x) *
1700+
(old_mean_y - mean_y)))) +
1701+
(new_wt * ((cur_x - mean_x) *
1702+
(cur_y - mean_y)))) / (old_wt + new_wt)
1703+
sum_wt += new_wt
1704+
sum_wt2 += (new_wt * new_wt)
1705+
old_wt += new_wt
1706+
if not adjust:
1707+
sum_wt /= old_wt
1708+
sum_wt2 /= (old_wt * old_wt)
1709+
old_wt = 1.
1710+
elif is_observation:
1711+
mean_x = cur_x
1712+
mean_y = cur_y
1713+
1714+
if nobs >= minp:
1715+
if not bias:
1716+
numerator = sum_wt * sum_wt
1717+
denominator = numerator - sum_wt2
1718+
if denominator > 0:
1719+
sub_out[i] = (numerator / denominator) * cov
1720+
else:
1721+
sub_out[i] = NaN
17001722
else:
1701-
output[i] = NaN
1723+
sub_out[i] = cov
17021724
else:
1703-
output[i] = cov
1704-
else:
1705-
output[i] = NaN
1725+
sub_out[i] = NaN
1726+
1727+
output[s:e] = sub_out
17061728

17071729
return output

pandas/core/window/ewm.py

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,6 @@ def get_center_of_mass(
7171
return float(comass)
7272

7373

74-
def dispatch(name: str, *args, **kwargs):
75-
"""
76-
Dispatch to groupby apply.
77-
"""
78-
79-
def outer(self, *args, **kwargs):
80-
def f(x):
81-
x = self._shallow_copy(x, groupby=self._groupby)
82-
return getattr(x, name)(*args, **kwargs)
83-
84-
return self._groupby.apply(f)
85-
86-
outer.__name__ = name
87-
return outer
88-
89-
9074
class ExponentialMovingWindow(BaseWindow):
9175
r"""
9276
Provide exponential weighted (EW) functions.
@@ -457,10 +441,22 @@ def cov(
457441
def cov_func(x, y):
458442
x_array = self._prep_values(x)
459443
y_array = self._prep_values(y)
444+
window_indexer = self._get_window_indexer()
445+
min_periods = (
446+
self.min_periods
447+
if self.min_periods is not None
448+
else window_indexer.window_size
449+
)
450+
start, end = window_indexer.get_window_bounds(
451+
num_values=len(x_array),
452+
min_periods=min_periods,
453+
center=self.center,
454+
closed=self.closed,
455+
)
460456
result = window_aggregations.ewmcov(
461457
x_array,
462-
np.array([0], dtype=np.int64),
463-
np.array([0], dtype=np.int64),
458+
start,
459+
end,
464460
self.min_periods,
465461
y_array,
466462
self.com,
@@ -509,12 +505,24 @@ def corr(
509505
def cov_func(x, y):
510506
x_array = self._prep_values(x)
511507
y_array = self._prep_values(y)
508+
window_indexer = self._get_window_indexer()
509+
min_periods = (
510+
self.min_periods
511+
if self.min_periods is not None
512+
else window_indexer.window_size
513+
)
514+
start, end = window_indexer.get_window_bounds(
515+
num_values=len(x_array),
516+
min_periods=min_periods,
517+
center=self.center,
518+
closed=self.closed,
519+
)
512520

513521
def _cov(X, Y):
514522
return window_aggregations.ewmcov(
515523
X,
516-
np.array([0], dtype=np.int64),
517-
np.array([0], dtype=np.int64),
524+
start,
525+
end,
518526
self.min_periods,
519527
Y,
520528
self.com,
@@ -556,11 +564,6 @@ def _get_window_indexer(self) -> GroupbyIndexer:
556564
)
557565
return window_indexer
558566

559-
var = dispatch("var", bias=False)
560-
std = dispatch("std", bias=False)
561-
cov = dispatch("cov", other=None, pairwise=None, bias=False)
562-
corr = dispatch("corr", other=None, pairwise=None)
563-
564567
def mean(self, engine=None, engine_kwargs=None):
565568
"""
566569
Parameters
@@ -602,11 +605,6 @@ def mean(self, engine=None, engine_kwargs=None):
602605
elif engine in ("cython", None):
603606
if engine_kwargs is not None:
604607
raise ValueError("cython engine does not accept engine_kwargs")
605-
606-
def f(x):
607-
x = self._shallow_copy(x, groupby=self._groupby)
608-
return x.mean()
609-
610-
return self._groupby.apply(f)
608+
return super().mean()
611609
else:
612610
raise ValueError("engine must be either 'numba' or 'cython'")

0 commit comments

Comments
 (0)