From 730571a3a23d7216d32373619db34cd80ec698d7 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Mon, 9 Nov 2020 09:55:58 -0800 Subject: [PATCH 01/31] Add class to enable groupby ewm --- pandas/core/window/ewm.py | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 9f7040943d9a3..adb58bb64f524 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -14,8 +14,9 @@ from pandas.core.dtypes.common import is_datetime64_ns_dtype import pandas.core.common as common +from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import _doc_template, _shared_docs, zsqrt -from pandas.core.window.rolling import BaseWindow, flex_binary_moment +from pandas.core.window.rolling import _dispatch, BaseWindow, BaseWindowGroupby, flex_binary_moment if TYPE_CHECKING: from pandas import Series @@ -485,3 +486,39 @@ def _cov(x, y): return flex_binary_moment( self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise) ) + +def _dispatch(name: str, *args, **kwargs): + """ + Dispatch to groupby apply. + """ + + def outer(self, *args, **kwargs): + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return getattr(x, name)(*args, **kwargs) + + return self._groupby.apply(f) + + outer.__name__ = name + return outer + +class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): + """ + Provide an ewm groupby implementation. + """ + + var = _dispatch("var", bias=False) + std = _dispatch("std", bias=False) + cov = _dispatch("cov", other=None, pairwise=None, bias=False) + + def mean(self, engine=None, engine_kwargs=None): + if maybe_use_numba(engine): + pass + else: + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return x.mean() + + return self._groupby.apply(f) + + From af1d0cf2967e1973cc1765e08bba663b8611b32d Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 10 Nov 2020 17:38:57 -0800 Subject: [PATCH 02/31] Add stock function for numba groupby ewma --- pandas/core/window/ewm.py | 14 ++++-- pandas/core/window/numba_.py | 97 ++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index adb58bb64f524..92912655af4b4 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -16,7 +16,12 @@ import pandas.core.common as common from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import _doc_template, _shared_docs, zsqrt -from pandas.core.window.rolling import _dispatch, BaseWindow, BaseWindowGroupby, flex_binary_moment +from pandas.core.window.rolling import ( + _dispatch, + BaseWindow, + BaseWindowGroupby, + flex_binary_moment, +) if TYPE_CHECKING: from pandas import Series @@ -487,6 +492,7 @@ def _cov(x, y): self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise) ) + def _dispatch(name: str, *args, **kwargs): """ Dispatch to groupby apply. @@ -502,9 +508,10 @@ def f(x): outer.__name__ = name return outer + class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): """ - Provide an ewm groupby implementation. + Provide an exponential moving window groupby implementation. """ var = _dispatch("var", bias=False) @@ -515,10 +522,9 @@ def mean(self, engine=None, engine_kwargs=None): if maybe_use_numba(engine): pass else: + def f(x): x = self._shallow_copy(x, groupby=self._groupby) return x.mean() return self._groupby.apply(f) - - diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index c4858b6e5a4ab..90f376820c7ce 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -72,3 +72,100 @@ def roll_apply( return result return roll_apply + + +def generate_numba_groupby_ewma_func( + args: Tuple, + kwargs: Dict[str, Any], + func: Callable[..., Scalar], + engine_kwargs: Optional[Dict[str, bool]], +): + """ + Generate a numba jitted groupby ewma function specified by values from engine_kwargs. + + 1. jit the user's function + 2. Return a rolling apply function with the jitted function inline + + Configurations specified in engine_kwargs apply to both the user's + function _AND_ the groupby ewma function. + + Parameters + ---------- + args : tuple + *args to be passed into the function + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each window and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) + + cache_key = (func, "groupby_ewma") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba_func = jit_user_function(func, nopython, nogil, parallel) + numba = import_optional_dependency("numba") + if parallel: + loop_range = numba.prange + else: + loop_range = range + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def groupby_ewma( + values: np.ndarray, + begin: np.ndarray, + end: np.ndarray, + minimum_periods: int, + com: float, + adjust: bool, + ignore_na: bool, + ) -> np.ndarray: + # TODO (MATT): values should be in groupby sorted order + result = np.empty(len(values)) + alpha = 1.0 / (1.0 + com) + for i in loop_range(len(result)): + start = begin[i] + stop = end[i] + window = values[start:stop] + + old_wt_factor = 1.0 - alpha + new_wt = 1.0 if adjust else alpha + + weighted_avg = window[0] + nobs = int(not np.isnan(weighted_avg)) + output[0] = weighted_avg if nobs >= minimum_periods else np.nan + old_wt = 1.0 + + for j in range(1, len(window)): + cur = window[j] + is_observation = not np.isnan(cur) + nobs += is_observation + if not np.isnan(weighted_avg): + + if is_observation or not ignore_na: + + old_wt *= old_wt_factor + if is_observation: + + # avoid numerical errors on constant series + if weighted_avg != cur: + weighted_avg = ( + (old_wt * weighted_avg) + (new_wt * cur) + ) / (old_wt + new_wt) + if adjust: + old_wt += new_wt + else: + old_wt = 1.0 + elif is_observation: + weighted_avg = cur + + output[i] = weighted_avg if nobs >= minimum_periods else np.nan + + return groupby_ewma From 44239059d36642211c0a15c2f9b2c182587cd962 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 13 Nov 2020 18:30:28 -0800 Subject: [PATCH 03/31] Finish jitted function for groupby_ewma --- pandas/core/window/ewm.py | 18 ++---------------- pandas/core/window/numba_.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 92912655af4b4..eb51483737920 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -16,6 +16,7 @@ import pandas.core.common as common from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import _doc_template, _shared_docs, zsqrt +from pandas.core.window.numba_ import generate_numba_groupby_ewma_func from pandas.core.window.rolling import ( _dispatch, BaseWindow, @@ -493,22 +494,6 @@ def _cov(x, y): ) -def _dispatch(name: str, *args, **kwargs): - """ - Dispatch to groupby apply. - """ - - def outer(self, *args, **kwargs): - def f(x): - x = self._shallow_copy(x, groupby=self._groupby) - return getattr(x, name)(*args, **kwargs) - - return self._groupby.apply(f) - - outer.__name__ = name - return outer - - class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): """ Provide an exponential moving window groupby implementation. @@ -520,6 +505,7 @@ class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow) def mean(self, engine=None, engine_kwargs=None): if maybe_use_numba(engine): + generate_numba_groupby_ewma_func pass else: diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 90f376820c7ce..c40c7c48a9bc5 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -75,8 +75,6 @@ def roll_apply( def generate_numba_groupby_ewma_func( - args: Tuple, - kwargs: Dict[str, Any], func: Callable[..., Scalar], engine_kwargs: Optional[Dict[str, bool]], ): @@ -91,10 +89,6 @@ def generate_numba_groupby_ewma_func( Parameters ---------- - args : tuple - *args to be passed into the function - kwargs : dict - **kwargs to be passed into the function func : function function to be applied to each window and will be JITed engine_kwargs : dict @@ -104,13 +98,13 @@ def generate_numba_groupby_ewma_func( ------- Numba function """ - nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + # TODO: what should func be? cache_key = (func, "groupby_ewma") if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] - numba_func = jit_user_function(func, nopython, nogil, parallel) numba = import_optional_dependency("numba") if parallel: loop_range = numba.prange @@ -128,19 +122,20 @@ def groupby_ewma( ignore_na: bool, ) -> np.ndarray: # TODO (MATT): values should be in groupby sorted order - result = np.empty(len(values)) + results = [] alpha = 1.0 / (1.0 + com) - for i in loop_range(len(result)): + for i in loop_range(len(begin)): start = begin[i] stop = end[i] window = values[start:stop] + sub_result = np.np.empty(len(window)) old_wt_factor = 1.0 - alpha new_wt = 1.0 if adjust else alpha weighted_avg = window[0] nobs = int(not np.isnan(weighted_avg)) - output[0] = weighted_avg if nobs >= minimum_periods else np.nan + sub_result[0] = weighted_avg if nobs >= minimum_periods else np.nan old_wt = 1.0 for j in range(1, len(window)): @@ -166,6 +161,11 @@ def groupby_ewma( elif is_observation: weighted_avg = cur - output[i] = weighted_avg if nobs >= minimum_periods else np.nan + sub_result[i] = weighted_avg if nobs >= minimum_periods else np.nan + + results.append(sub_result) + + result = np.concatenate(results) + return result return groupby_ewma From 332b750eb04aa9e65fa5d992f9bd3f145576caec Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 13 Nov 2020 19:01:19 -0800 Subject: [PATCH 04/31] Add EWMA indexer, and adjust generated function --- pandas/core/window/ewm.py | 29 ++++++++++++++++++++++++++--- pandas/core/window/indexers.py | 15 +++++++++++++++ pandas/core/window/numba_.py | 26 ++++++++++---------------- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index eb51483737920..8c1bb4f995d41 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -1,7 +1,7 @@ import datetime from functools import partial from textwrap import dedent -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Optional, Union import numpy as np @@ -16,6 +16,7 @@ import pandas.core.common as common from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import _doc_template, _shared_docs, zsqrt +from pandas.core.window.indexers import ExponentialMovingWindowIndexer, GroupbyIndexer from pandas.core.window.numba_ import generate_numba_groupby_ewma_func from pandas.core.window.rolling import ( _dispatch, @@ -499,14 +500,36 @@ class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow) Provide an exponential moving window groupby implementation. """ + def _get_window_indexer(self) -> GroupbyIndexer: + """ + Return an indexer class that will compute the window start and end bounds + + Returns + ------- + GroupbyIndexer + """ + window_indexer = GroupbyIndexer( + groupby_indicies=self._groupby.indices, + window_indexer=ExponentialMovingWindowIndexer, + ) + return window_indexer + var = _dispatch("var", bias=False) std = _dispatch("std", bias=False) cov = _dispatch("cov", other=None, pairwise=None, bias=False) def mean(self, engine=None, engine_kwargs=None): if maybe_use_numba(engine): - generate_numba_groupby_ewma_func - pass + groupby_ewma_func = generate_numba_groupby_ewma_func( + engine_kwargs, + self.com, + self.adjust, + self.ignore_na, + ) + return self._apply( + groupby_ewma_func, + use_numba_cache=maybe_use_numba(engine), + ) else: def f(x): diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a8229257bb7bb..a3b9695d777d9 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -344,3 +344,18 @@ def get_window_bounds( start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) return start, end + + +class ExponentialMovingWindowIndexer(BaseIndexer): + """Calculate ewm window bounds (the entire window)""" + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64) diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index c40c7c48a9bc5..10122dfa7ea62 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -75,24 +75,21 @@ def roll_apply( def generate_numba_groupby_ewma_func( - func: Callable[..., Scalar], engine_kwargs: Optional[Dict[str, bool]], + com: float, + adjust: bool, + ignore_na: bool, ): """ Generate a numba jitted groupby ewma function specified by values from engine_kwargs. - 1. jit the user's function - 2. Return a rolling apply function with the jitted function inline - - Configurations specified in engine_kwargs apply to both the user's - function _AND_ the groupby ewma function. - Parameters ---------- - func : function - function to be applied to each window and will be JITed engine_kwargs : dict dictionary of arguments to be passed into numba.jit + com : float + adjust : bool + ignore_na : bool Returns ------- @@ -100,8 +97,9 @@ def generate_numba_groupby_ewma_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - # TODO: what should func be? - cache_key = (func, "groupby_ewma") + # We really just want to check that "groupby_ewma" is in the cache, but this + # cache contains UDFs that are passed from the user + cache_key = (lambda x: x, "groupby_ewma") if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] @@ -117,11 +115,7 @@ def groupby_ewma( begin: np.ndarray, end: np.ndarray, minimum_periods: int, - com: float, - adjust: bool, - ignore_na: bool, ) -> np.ndarray: - # TODO (MATT): values should be in groupby sorted order results = [] alpha = 1.0 / (1.0 + com) for i in loop_range(len(begin)): @@ -161,7 +155,7 @@ def groupby_ewma( elif is_observation: weighted_avg = cur - sub_result[i] = weighted_avg if nobs >= minimum_periods else np.nan + sub_result[j] = weighted_avg if nobs >= minimum_periods else np.nan results.append(sub_result) From 1880f2d3221d070c1051ff8d087fe590ef06df16 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 13 Nov 2020 19:07:00 -0800 Subject: [PATCH 05/31] push a dummy function --- pandas/core/window/ewm.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 8c1bb4f995d41..56a81a4806402 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -529,6 +529,9 @@ def mean(self, engine=None, engine_kwargs=None): return self._apply( groupby_ewma_func, use_numba_cache=maybe_use_numba(engine), + # Dummy function; we really just want to check "groupby_ewma" + # in the cache + original_func=lambda x: x, ) else: From 2b181c840198fb78fe0e87badad8ed07bd27dc08 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 13 Nov 2020 19:23:51 -0800 Subject: [PATCH 06/31] make cache in rolling reuseable by groupby ewma --- pandas/core/window/ewm.py | 4 +--- pandas/core/window/numba_.py | 7 ++++--- pandas/core/window/rolling.py | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 56a81a4806402..c45fa326c7067 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -529,9 +529,7 @@ def mean(self, engine=None, engine_kwargs=None): return self._apply( groupby_ewma_func, use_numba_cache=maybe_use_numba(engine), - # Dummy function; we really just want to check "groupby_ewma" - # in the cache - original_func=lambda x: x, + numba_cache_key=(groupby_ewma_func, "groupby_ewma"), ) else: diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 10122dfa7ea62..c79c3cf25d036 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -75,6 +75,7 @@ def roll_apply( def generate_numba_groupby_ewma_func( + func, engine_kwargs: Optional[Dict[str, bool]], com: float, adjust: bool, @@ -85,6 +86,8 @@ def generate_numba_groupby_ewma_func( Parameters ---------- + func : callable + The jitted groupby ewma func engine_kwargs : dict dictionary of arguments to be passed into numba.jit com : float @@ -97,9 +100,7 @@ def generate_numba_groupby_ewma_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - # We really just want to check that "groupby_ewma" is in the cache, but this - # cache contains UDFs that are passed from the user - cache_key = (lambda x: x, "groupby_ewma") + cache_key = (func, "groupby_ewma") if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 5d561c84ab462..61546542bf2ef 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -457,7 +457,7 @@ def calc(x): result = np.asarray(result) if use_numba_cache: - NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func + NUMBA_FUNC_CACHE[kwargs["numba_cache_key"]] = func return result @@ -1306,7 +1306,7 @@ def apply( return self._apply( apply_func, use_numba_cache=maybe_use_numba(engine), - original_func=func, + numba_cache_key=(func, "rolling_apply"), args=args, kwargs=kwargs, ) From 78718daa79e6670d096c23ed586a35e9e5883bb1 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 14:41:18 -0800 Subject: [PATCH 07/31] Add initial tests, fix caching function --- pandas/core/window/ewm.py | 8 ++++++-- pandas/core/window/numba_.py | 5 +---- pandas/tests/window/conftest.py | 12 ++++++++++++ pandas/tests/window/test_numba.py | 29 +++++++++++++++++++++++++++-- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 9587f5af9b51c..357c5a47e20a6 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -529,12 +529,16 @@ def mean(self, engine=None, engine_kwargs=None): return self._apply( groupby_ewma_func, use_numba_cache=maybe_use_numba(engine), - numba_cache_key=(groupby_ewma_func, "groupby_ewma"), + numba_cache_key=(lambda x: x, "groupby_ewma"), ) - else: + elif engine in ("cython", None): + if engine_kwargs is not None: + raise ValueError("cython engine does not accept engine_kwargs") def f(x): x = self._shallow_copy(x, groupby=self._groupby) return x.mean() return self._groupby.apply(f) + else: + raise ValueError("engine must be either 'numba' or 'cython'") diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index c79c3cf25d036..61bedd18b7330 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -75,7 +75,6 @@ def roll_apply( def generate_numba_groupby_ewma_func( - func, engine_kwargs: Optional[Dict[str, bool]], com: float, adjust: bool, @@ -86,8 +85,6 @@ def generate_numba_groupby_ewma_func( Parameters ---------- - func : callable - The jitted groupby ewma func engine_kwargs : dict dictionary of arguments to be passed into numba.jit com : float @@ -100,7 +97,7 @@ def generate_numba_groupby_ewma_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - cache_key = (func, "groupby_ewma") + cache_key = (lambda x: x, "groupby_ewma") if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 1780925202593..69f70db16458b 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -74,6 +74,18 @@ def nopython(request): return request.param +@pytest.fixture(params=[True, False]) +def adjust(request): + """adjust keyword argument for ewm""" + return request.param + + +@pytest.fixture(params=[True, False]) +def ignore(request): + """ignore keyword argument for ewm""" + return request.param + + @pytest.fixture( params=[ pytest.param( diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 35bdb972a7bc0..e75d7e2e5e186 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -3,7 +3,7 @@ import pandas.util._test_decorators as td -from pandas import Series, option_context +from pandas import DataFrame, Series, option_context import pandas._testing as tm from pandas.core.util.numba_ import NUMBA_FUNC_CACHE @@ -11,7 +11,7 @@ @td.skip_if_no("numba", "0.46.0") @pytest.mark.filterwarnings("ignore:\\nThe keyword argument") # Filter warnings when parallel=True and the function can't be parallelized by Numba -class TestApply: +class TestRollingApply: @pytest.mark.parametrize("jit", [True, False]) def test_numba_vs_cython(self, jit, nogil, parallel, nopython, center): def f(x, *args): @@ -77,6 +77,31 @@ def func_2(x): tm.assert_series_equal(result, expected) +@td.skip_if_no("numba", "0.46.0") +class TestGroupbyEWMMean: + def test_invalid_engine(self): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + with pytest.raises(ValueError, match="engine must be either"): + df.groupby("A").ewm(com=1.0).mean(engine="foo") + + def test_invalid_engine_kwargs(self): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + with pytest.raises(ValueError, match="cython engine does not"): + df.groupby("A").ewm(com=1.0).mean( + engine="cython", engine_kwargs={"nopython": True} + ) + + def test_cython_vs_numba(self, nogil, parallel, nopython, ignore, adjust): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + gb_ewm = df.groupby("A").ewm(com=1.0, adjust=adjust, ignore=ignore) + + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs) + expected = gb_ewm.mean(engine="cython") + + tm.assert_frame_equal(result, expected) + + @td.skip_if_no("numba", "0.46.0") def test_use_global_config(): def f(x): From 722f7d38235be4511fff5ebe7c0d5b61216a08ff Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 15:41:59 -0800 Subject: [PATCH 08/31] Add the appropriate method on groupby --- pandas/core/groupby/groupby.py | 10 ++++++++++ pandas/core/window/__init__.py | 5 ++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index ec96a0d502d3f..d69417bec9bc8 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1859,6 +1859,16 @@ def expanding(self, *args, **kwargs): return ExpandingGroupby(self, *args, **kwargs) + @Substitution(name="groupby") + @Appender(_common_see_also) + def ewm(self, *args, **kwargs): + """ + Return an ewm grouper, providing ewm functionality per group. + """ + from pandas.core.window import ExponentialMovingWindowGroupby + + return ExponentialMovingWindowGroupby(self, *args, **kwargs) + def _fill(self, direction, limit=None): """ Shared function for `pad` and `backfill` to call Cython method. diff --git a/pandas/core/window/__init__.py b/pandas/core/window/__init__.py index 304c61ac0e489..ebcc0d3a2da0c 100644 --- a/pandas/core/window/__init__.py +++ b/pandas/core/window/__init__.py @@ -1,3 +1,6 @@ -from pandas.core.window.ewm import ExponentialMovingWindow # noqa:F401 +from pandas.core.window.ewm import ( + ExponentialMovingWindow, + ExponentialMovingWindowGroupby, +) # noqa:F401 from pandas.core.window.expanding import Expanding, ExpandingGroupby # noqa:F401 from pandas.core.window.rolling import Rolling, RollingGroupby, Window # noqa:F401 From 207014385cd31a51130d1e5f17abb3203699ac13 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 16:19:20 -0800 Subject: [PATCH 09/31] Fix conftest and unused params --- pandas/core/window/rolling.py | 2 -- pandas/tests/window/conftest.py | 4 ++-- pandas/tests/window/test_numba.py | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f9a7e85173cbc..4696960b1d5c1 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1307,8 +1307,6 @@ def apply( apply_func, use_numba_cache=maybe_use_numba(engine), numba_cache_key=(func, "rolling_apply"), - args=args, - kwargs=kwargs, ) def _generate_cython_apply_func( diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 69f70db16458b..a803ce716eb05 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -81,8 +81,8 @@ def adjust(request): @pytest.fixture(params=[True, False]) -def ignore(request): - """ignore keyword argument for ewm""" +def ignore_na(request): + """ignore_na keyword argument for ewm""" return request.param diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index e75d7e2e5e186..3dd09bc4b752a 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -91,9 +91,9 @@ def test_invalid_engine_kwargs(self): engine="cython", engine_kwargs={"nopython": True} ) - def test_cython_vs_numba(self, nogil, parallel, nopython, ignore, adjust): + def test_cython_vs_numba(self, nogil, parallel, nopython, ignore_na, adjust): df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) - gb_ewm = df.groupby("A").ewm(com=1.0, adjust=adjust, ignore=ignore) + gb_ewm = df.groupby("A").ewm(com=1.0, adjust=adjust, ignore_na=ignore_na) engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs) From 2af5304c68434289dcee9e542a1dc3e709b92c8d Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 17:23:47 -0800 Subject: [PATCH 10/31] Make ewm reuse BaseWindow._apply --- pandas/_libs/window/aggregations.pyx | 17 ++++++---- pandas/core/window/ewm.py | 49 +++++++++++++++++++++------- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 4de7a5860c465..c726979f1a42e 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1496,8 +1496,8 @@ def roll_weighted_var(float64_t[:] values, float64_t[:] weights, # ---------------------------------------------------------------------- # Exponentially weighted moving average -def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, - int64_t halflife): +def ewma_time(const float64_t[:] vals, int64_t[:] start, int64_t[:] end, + int minp, ndarray[int64_t] times, int64_t halflife): """ Compute exponentially-weighted moving average using halflife and time distances. @@ -1505,6 +1505,8 @@ def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, Parameters ---------- vals : ndarray[float_64] + start: ndarray (int64 type) + end: ndarray (int64 type) minp : int times : ndarray[int64] halflife : int64 @@ -1552,17 +1554,20 @@ def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, return output -def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp): +def ewma(float64_t[:] vals, int64_t[:] start, int64_t[:] end, int minp, + float64_t com, bint adjust, bint ignore_na): """ Compute exponentially-weighted moving average using center-of-mass. Parameters ---------- vals : ndarray (float64 type) + start: ndarray (int64 type) + end: ndarray (int64 type) + minp : int com : float64 adjust : int ignore_na : bool - minp : int Returns ------- @@ -1620,8 +1625,8 @@ def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp # Exponentially weighted moving covariance -def ewmcov(float64_t[:] input_x, float64_t[:] input_y, - float64_t com, bint adjust, bint ignore_na, int minp, bint bias): +def ewmcov(float64_t[:] input_x, int64_t[:] start, int64_t[:] end, int minp, + float64_t[:] input_y, float64_t com, bint adjust, bint ignore_na, bint bias): """ Compute exponentially-weighted moving variance using center-of-mass. diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 357c5a47e20a6..0a6b246247ee4 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -1,7 +1,7 @@ import datetime from functools import partial from textwrap import dedent -from typing import TYPE_CHECKING, Any, Callable, Optional, Union +from typing import TYPE_CHECKING, Optional, Union import numpy as np @@ -14,9 +14,13 @@ from pandas.core.dtypes.common import is_datetime64_ns_dtype import pandas.core.common as common -from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba +from pandas.core.util.numba_ import maybe_use_numba from pandas.core.window.common import _doc_template, _shared_docs, zsqrt -from pandas.core.window.indexers import ExponentialMovingWindowIndexer, GroupbyIndexer +from pandas.core.window.indexers import ( + BaseIndexer, + ExponentialMovingWindowIndexer, + GroupbyIndexer, +) from pandas.core.window.numba_ import generate_numba_groupby_ewma_func from pandas.core.window.rolling import ( _dispatch, @@ -228,6 +232,7 @@ def __init__( axis: int = 0, times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None, ): + super().__init__(obj=obj, min_periods=min_periods, axis=axis) self.com: Optional[float] self.obj = obj self.min_periods = max(int(min_periods), 1) @@ -268,6 +273,12 @@ def __init__( def _constructor(self): return ExponentialMovingWindow + def _get_window_indexer(self) -> BaseIndexer: + """ + Return an indexer class that will compute the window start and end bounds + """ + return ExponentialMovingWindowIndexer() + _agg_see_also_doc = dedent( """ See Also @@ -344,7 +355,6 @@ def mean(self, *args, **kwargs): window_func = self._get_roll_func("ewma_time") window_func = partial( window_func, - minp=self.min_periods, times=self.times, halflife=self.halflife, ) @@ -355,7 +365,6 @@ def mean(self, *args, **kwargs): com=self.com, adjust=self.adjust, ignore_na=self.ignore_na, - minp=self.min_periods, ) return self._apply(window_func) @@ -379,13 +388,19 @@ def var(self, bias: bool = False, *args, **kwargs): Exponential weighted moving variance. """ nv.validate_window_func("var", args, kwargs) + window_func = self._get_roll_func("ewmcov") + window_func = partial( + window_func, + com=self.com, + adjust=self.adjust, + ignore_na=self.ignore_na, + bias=bias, + ) - def f(arg): - return window_aggregations.ewmcov( - arg, arg, self.com, self.adjust, self.ignore_na, self.min_periods, bias - ) + def var_func(values, begin, end, min_periods): + return window_func(values, begin, end, min_periods, values) - return self._apply(f) + return self._apply(var_func) @Substitution(name="ewm", func_name="cov") @Appender(_doc_template) @@ -427,11 +442,13 @@ def _get_cov(X, Y): Y = self._shallow_copy(Y) cov = window_aggregations.ewmcov( X._prep_values(), + np.array([0], dtype=np.int64), + np.array([0], dtype=np.int64), + self.min_periods, Y._prep_values(), self.com, self.adjust, self.ignore_na, - self.min_periods, bias, ) return wrap_result(X, cov) @@ -478,7 +495,15 @@ def _get_corr(X, Y): def _cov(x, y): return window_aggregations.ewmcov( - x, y, self.com, self.adjust, self.ignore_na, self.min_periods, 1 + x, + np.array([0], dtype=np.int64), + np.array([0], dtype=np.int64), + self.min_periods, + y, + self.com, + self.adjust, + self.ignore_na, + 1, ) x_values = X._prep_values() From 868d736561456c5311ed6e91f76713b321fb53b2 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 17:26:13 -0800 Subject: [PATCH 11/31] Remove redefined EWM._apply --- pandas/core/window/ewm.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 0a6b246247ee4..ee418dbf69da5 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -318,27 +318,6 @@ def aggregate(self, func, *args, **kwargs): agg = aggregate - def _apply(self, func): - """ - Rolling statistical measure using supplied function. Designed to be - used with passed-in Cython array-based functions. - - Parameters - ---------- - func : str/callable to apply - - Returns - ------- - y : same type as input argument - """ - - def homogeneous_func(values: np.ndarray): - if values.size == 0: - return values.copy() - return np.apply_along_axis(func, self.axis, values) - - return self._apply_blockwise(homogeneous_func) - @Substitution(name="ewm", func_name="mean") @Appender(_doc_template) def mean(self, *args, **kwargs): From da838b3447ce98e48398ce73f17b6b216c3d9911 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 14 Nov 2020 17:53:16 -0800 Subject: [PATCH 12/31] Cant use super because of np.ndarray copies --- pandas/core/window/ewm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index ee418dbf69da5..9bbe01efb6cfc 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -232,7 +232,6 @@ def __init__( axis: int = 0, times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None, ): - super().__init__(obj=obj, min_periods=min_periods, axis=axis) self.com: Optional[float] self.obj = obj self.min_periods = max(int(min_periods), 1) @@ -240,6 +239,8 @@ def __init__( self.ignore_na = ignore_na self.axis = axis self.on = None + self.center = False + self.closed = None if times is not None: if isinstance(times, str): times = self._selected_obj[times] From 0bf6a8074af9e2e0f9535f5d612e1f6a92b4565f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 12:53:01 -0800 Subject: [PATCH 13/31] Fix bugs in groupby ewma, add kwargs for groupby ewma --- pandas/core/window/ewm.py | 1 + pandas/core/window/numba_.py | 7 +++---- pandas/core/window/rolling.py | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 9bbe01efb6cfc..27947abc78f56 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -231,6 +231,7 @@ def __init__( ignore_na: bool = False, axis: int = 0, times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None, + **kwargs, ): self.com: Optional[float] self.obj = obj diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 61bedd18b7330..368261fdc2df4 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -114,13 +114,13 @@ def groupby_ewma( end: np.ndarray, minimum_periods: int, ) -> np.ndarray: - results = [] + result = np.empty(len(values)) alpha = 1.0 / (1.0 + com) for i in loop_range(len(begin)): start = begin[i] stop = end[i] window = values[start:stop] - sub_result = np.np.empty(len(window)) + sub_result = np.empty(len(window)) old_wt_factor = 1.0 - alpha new_wt = 1.0 if adjust else alpha @@ -155,9 +155,8 @@ def groupby_ewma( sub_result[j] = weighted_avg if nobs >= minimum_periods else np.nan - results.append(sub_result) + result[start:stop] = sub_result - result = np.concatenate(results) return result return groupby_ewma diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 4696960b1d5c1..8cfcc83b7d28f 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -447,6 +447,7 @@ def calc(x): center=self.center, closed=self.closed, ) + breakpoint() return func(x, start, end, min_periods) with np.errstate(all="ignore"): From 19dfd3efcd2017248163e3fc982c0cb1aaabf220 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 13:02:02 -0800 Subject: [PATCH 14/31] Remove breakpoint --- pandas/core/window/rolling.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 8cfcc83b7d28f..4696960b1d5c1 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -447,7 +447,6 @@ def calc(x): center=self.center, closed=self.closed, ) - breakpoint() return func(x, start, end, min_periods) with np.errstate(all="ignore"): From ecbbe7683eb21a5428d9a7b146ab99bd24940f8f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 13:14:21 -0800 Subject: [PATCH 15/31] Add more direct import --- pandas/core/window/ewm.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 27947abc78f56..c0d99da2b3276 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -15,7 +15,7 @@ import pandas.core.common as common from pandas.core.util.numba_ import maybe_use_numba -from pandas.core.window.common import _doc_template, _shared_docs, zsqrt +from pandas.core.window.common import _doc_template, _shared_docs, flex_binary_moment, zsqrt from pandas.core.window.indexers import ( BaseIndexer, ExponentialMovingWindowIndexer, @@ -26,7 +26,6 @@ _dispatch, BaseWindow, BaseWindowGroupby, - flex_binary_moment, ) if TYPE_CHECKING: From 2219df6e69ab1193b10401a5430e3192a7279e55 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 13:25:57 -0800 Subject: [PATCH 16/31] Add docstring --- pandas/core/window/ewm.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index c0d99da2b3276..524e727e6fb9c 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -15,7 +15,12 @@ import pandas.core.common as common from pandas.core.util.numba_ import maybe_use_numba -from pandas.core.window.common import _doc_template, _shared_docs, flex_binary_moment, zsqrt +from pandas.core.window.common import ( + _doc_template, + _shared_docs, + flex_binary_moment, + zsqrt, +) from pandas.core.window.indexers import ( BaseIndexer, ExponentialMovingWindowIndexer, @@ -524,6 +529,32 @@ def _get_window_indexer(self) -> GroupbyIndexer: cov = _dispatch("cov", other=None, pairwise=None, bias=False) def mean(self, engine=None, engine_kwargs=None): + """ + Parameters + ---------- + engine : str, default None + * ``'cython'`` : Runs mean through C-extensions from cython. + * ``'numba'`` : Runs mean through JIT compiled code from numba. + Only available when ``raw`` is set to ``True``. + * ``None`` : Defaults to ``'cython'`` or globally setting ``compute.use_numba`` + + .. versionadded:: 1.2.0 + + engine_kwargs : dict, default None + * For ``'cython'`` engine, there are no accepted ``engine_kwargs`` + * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` + and ``parallel`` dictionary keys. The values must either be ``True`` or + ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is + ``{'nopython': True, 'nogil': False, 'parallel': False}`` and will be + applied to both the ``func`` and the ``apply`` rolling aggregation. + + .. versionadded:: 1.2.0 + + Returns + ------- + Series or DataFrame + Return type is determined by the caller. + """ if maybe_use_numba(engine): groupby_ewma_func = generate_numba_groupby_ewma_func( engine_kwargs, From 0feb852c4ad78e8c106c001e8e047e1b36af7241 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 13:45:20 -0800 Subject: [PATCH 17/31] Add corr and tests --- pandas/core/window/ewm.py | 1 + pandas/tests/window/test_grouper.py | 50 +++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 524e727e6fb9c..b7bf5a3dc1dd1 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -527,6 +527,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: var = _dispatch("var", bias=False) std = _dispatch("std", bias=False) cov = _dispatch("cov", other=None, pairwise=None, bias=False) + corr = _dispatch("corr", other=None, pairwise=None) def mean(self, engine=None, engine_kwargs=None): """ diff --git a/pandas/tests/window/test_grouper.py b/pandas/tests/window/test_grouper.py index 65906df819054..b36083514efdb 100644 --- a/pandas/tests/window/test_grouper.py +++ b/pandas/tests/window/test_grouper.py @@ -631,3 +631,53 @@ def test_groupby_rolling_index_level_and_column_label(self): ), ) tm.assert_frame_equal(result, expected) + + +class TestEWM: + @pytest.mark.parametrize( + "method, expected_data", + [ + ["mean", [0.0, 0.6667, 1.428571, 2.266667]], + ["std", [np.nan, 0.707107, 0.963624, 1.177164]], + ["var", [np.nan, 0.5, 0.963624, 1.177164]], + ], + ) + def test_methods(self, method, expected_data): + # GH 16037 + df = DataFrame({"A": ["a"] * 4, "B": range(4)}) + result = getattr(df.groupby("A").ewm(com=1.0), method)() + expected = DataFrame( + {"B": expected_data}, + index=MultiIndex.from_tuples( + [ + ("a", 0), + ("a", 1), + ("a", 2), + ("a", 3), + ], + names=["A", None], + ), + ) + tm.assert_frame_equal(result, expected) + + @pytest.mark.parametrize( + "method, expected_data", + [["corr", [np.nan, 1.0, 1.0, 1]], ["cov", [np.nan, 0.5, 0.928571, 1.385714]]], + ) + def test_pairwise_methods(self, method, expected_data): + # GH 16037 + df = DataFrame({"A": ["a"] * 4, "B": range(4)}) + result = getattr(df.groupby("A").ewm(com=1.0), method)() + expected = DataFrame( + {"B": expected_data}, + index=MultiIndex.from_tuples( + [ + ("a", 0, "B"), + ("a", 1, "B"), + ("a", 2, "B"), + ("a", 3, "B"), + ], + names=["A", None, None], + ), + ) + tm.assert_frame_equal(result, expected) From 7de126f69b59578f65472568eecaf8686ccd99c9 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:01:04 -0800 Subject: [PATCH 18/31] Format tests and add whatsnew --- doc/source/whatsnew/v1.2.0.rst | 15 +++++++++++++++ pandas/tests/window/test_grouper.py | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 28f7df98cb86b..d0a24016bae21 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -204,6 +204,21 @@ example where the index name is preserved: The same is true for :class:`MultiIndex`, but the logic is applied separately on a level-by-level basis. +.. _whatsnew_120.groupby_ewm: + +Groupby supports EWM operations directly +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:class:`DataFrameGroupBy` now supports exponentially weighted window operations directly (:issue:`16037`). + +.. ipython:: python + + df = pd.DataFrame({'A': ['a', 'b', 'a', 'b'], 'B': range(4)}) + df + df.groupby('A').ewm(com=1.0).mean() + +Additionally ``mean`` supports execution via Numba with the ``engine`` and ``engine_kwargs`` arguments. + .. _whatsnew_120.enhancements.other: Other enhancements diff --git a/pandas/tests/window/test_grouper.py b/pandas/tests/window/test_grouper.py index b36083514efdb..2a2da9968205b 100644 --- a/pandas/tests/window/test_grouper.py +++ b/pandas/tests/window/test_grouper.py @@ -637,9 +637,9 @@ class TestEWM: @pytest.mark.parametrize( "method, expected_data", [ - ["mean", [0.0, 0.6667, 1.428571, 2.266667]], + ["mean", [0.0, 0.6666666666666666, 1.4285714285714286, 2.2666666666666666]], ["std", [np.nan, 0.707107, 0.963624, 1.177164]], - ["var", [np.nan, 0.5, 0.963624, 1.177164]], + ["var", [np.nan, 0.5, 0.9285714285714286, 1.3857142857142857]], ], ) def test_methods(self, method, expected_data): From e3822ba8141a899c735d290c289234413ac21db5 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:03:12 -0800 Subject: [PATCH 19/31] clarify whatsnew --- doc/source/whatsnew/v1.2.0.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index d0a24016bae21..c626fa2ababb3 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -217,7 +217,9 @@ Groupby supports EWM operations directly df df.groupby('A').ewm(com=1.0).mean() -Additionally ``mean`` supports execution via Numba with the ``engine`` and ``engine_kwargs`` arguments. +Additionally ``mean`` supports execution via `Numba `__ with +the ``engine`` and ``engine_kwargs`` arguments. Numba must be installed as an optional dependency +to use this feature. .. _whatsnew_120.enhancements.other: From d03dfdd3ed7684e1514d5ffca4f975661c618216 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:08:19 -0800 Subject: [PATCH 20/31] Change groupby support --- doc/source/user_guide/window.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index 47ef1e9c8c4d7..05f8be091fa25 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -43,7 +43,7 @@ Concept Method Returned Object Rolling window ``rolling`` ``Rolling`` Yes Yes Weighted window ``rolling`` ``Window`` No No Expanding window ``expanding`` ``Expanding`` No Yes -Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No No +Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) ============================= ================= =========================== =========================== ======================== As noted above, some operations support specifying a window based on a time offset: From fdc4d66efe185189760e25796513718298c357a7 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:08:58 -0800 Subject: [PATCH 21/31] Rename grouper -> groupby --- pandas/tests/window/{test_grouper.py => test_groupby.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pandas/tests/window/{test_grouper.py => test_groupby.py} (100%) diff --git a/pandas/tests/window/test_grouper.py b/pandas/tests/window/test_groupby.py similarity index 100% rename from pandas/tests/window/test_grouper.py rename to pandas/tests/window/test_groupby.py From 27955a1b27b4729c78621167c442d01c6bf01296 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:12:11 -0800 Subject: [PATCH 22/31] isort --- pandas/core/window/__init__.py | 4 ++-- pandas/core/window/ewm.py | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pandas/core/window/__init__.py b/pandas/core/window/__init__.py index ebcc0d3a2da0c..b3d0820fee4da 100644 --- a/pandas/core/window/__init__.py +++ b/pandas/core/window/__init__.py @@ -1,6 +1,6 @@ -from pandas.core.window.ewm import ( +from pandas.core.window.ewm import ( # noqa:F401 ExponentialMovingWindow, ExponentialMovingWindowGroupby, -) # noqa:F401 +) from pandas.core.window.expanding import Expanding, ExpandingGroupby # noqa:F401 from pandas.core.window.rolling import Rolling, RollingGroupby, Window # noqa:F401 diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index b7bf5a3dc1dd1..77f2bbd1b4412 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -27,11 +27,7 @@ GroupbyIndexer, ) from pandas.core.window.numba_ import generate_numba_groupby_ewma_func -from pandas.core.window.rolling import ( - _dispatch, - BaseWindow, - BaseWindowGroupby, -) +from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, _dispatch if TYPE_CHECKING: from pandas import Series From 08ca227f60bc5d2a49624937268749aace772c53 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:15:58 -0800 Subject: [PATCH 23/31] Make agg docs more consistent --- pandas/_libs/window/aggregations.pyx | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index c726979f1a42e..04826f912c813 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1505,8 +1505,8 @@ def ewma_time(const float64_t[:] vals, int64_t[:] start, int64_t[:] end, Parameters ---------- vals : ndarray[float_64] - start: ndarray (int64 type) - end: ndarray (int64 type) + start: ndarray[int_64] + end: ndarray[int_64] minp : int times : ndarray[int64] halflife : int64 @@ -1633,11 +1633,13 @@ def ewmcov(float64_t[:] input_x, int64_t[:] start, int64_t[:] end, int minp, Parameters ---------- input_x : ndarray (float64 type) + start: ndarray (int64 type) + end: ndarray (int64 type) + minp : int input_y : ndarray (float64 type) com : float64 adjust : int ignore_na : bool - minp : int bias : int Returns From e862d4f509d257107ecf92f9f2a5f30d24bfd20f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:17:32 -0800 Subject: [PATCH 24/31] lint --- pandas/core/window/ewm.py | 3 ++- pandas/core/window/numba_.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 77f2bbd1b4412..4df308de07159 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -533,7 +533,8 @@ def mean(self, engine=None, engine_kwargs=None): * ``'cython'`` : Runs mean through C-extensions from cython. * ``'numba'`` : Runs mean through JIT compiled code from numba. Only available when ``raw`` is set to ``True``. - * ``None`` : Defaults to ``'cython'`` or globally setting ``compute.use_numba`` + * ``None`` : Defaults to ``'cython'`` or globally setting + ``compute.use_numba`` .. versionadded:: 1.2.0 diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 368261fdc2df4..274586e1745b5 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -81,7 +81,8 @@ def generate_numba_groupby_ewma_func( ignore_na: bool, ): """ - Generate a numba jitted groupby ewma function specified by values from engine_kwargs. + Generate a numba jitted groupby ewma function specified by values + from engine_kwargs. Parameters ---------- From 33b81ba61961c992265bd7724d3e13ab4c980d94 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 14:28:21 -0800 Subject: [PATCH 25/31] Deprivatize helper method --- pandas/core/window/ewm.py | 10 +++++----- pandas/core/window/rolling.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 4df308de07159..bbffed1bed20d 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -27,7 +27,7 @@ GroupbyIndexer, ) from pandas.core.window.numba_ import generate_numba_groupby_ewma_func -from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, _dispatch +from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, dispatch if TYPE_CHECKING: from pandas import Series @@ -520,10 +520,10 @@ def _get_window_indexer(self) -> GroupbyIndexer: ) return window_indexer - var = _dispatch("var", bias=False) - std = _dispatch("std", bias=False) - cov = _dispatch("cov", other=None, pairwise=None, bias=False) - corr = _dispatch("corr", other=None, pairwise=None) + var = dispatch("var", bias=False) + std = dispatch("std", bias=False) + cov = dispatch("cov", other=None, pairwise=None, bias=False) + corr = dispatch("corr", other=None, pairwise=None) def mean(self, engine=None, engine_kwargs=None): """ diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 4696960b1d5c1..fb3bb09fcdbaa 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -715,7 +715,7 @@ def aggregate(self, func, *args, **kwargs): ) -def _dispatch(name: str, *args, **kwargs): +def dispatch(name: str, *args, **kwargs): """ Dispatch to groupby apply. """ @@ -746,8 +746,8 @@ def __init__(self, obj, *args, **kwargs): self._groupby.grouper.mutated = True super().__init__(obj, *args, **kwargs) - corr = _dispatch("corr", other=None, pairwise=None) - cov = _dispatch("cov", other=None, pairwise=None) + corr = dispatch("corr", other=None, pairwise=None) + cov = dispatch("cov", other=None, pairwise=None) def _apply( self, From a7af894346c9096ea81a4bd1484e472321155683 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 15:07:39 -0800 Subject: [PATCH 26/31] Change com for typing, asv bench --- asv_bench/benchmarks/rolling.py | 13 +++++++++++++ pandas/core/window/ewm.py | 3 +-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 226b225b47591..79a33c437ea5c 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -225,4 +225,17 @@ def time_rolling_offset(self, method): getattr(self.groupby_roll_offset, method)() +class GroupbyEWM: + + params = ["cython", "numba"] + param_names = ["engine"] + + def setup(self, engine): + df = pd.DataFrame({"A": range(50), "B": range(50)}) + self.gb_ewm = df.groupby("A").ewm(com=1.0) + + def time_groupby_mean(self, engine): + self.gb_ewm.mean(engine=engine) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index bbffed1bed20d..07d2c365dec46 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -233,7 +233,6 @@ def __init__( times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None, **kwargs, ): - self.com: Optional[float] self.obj = obj self.min_periods = max(int(min_periods), 1) self.adjust = adjust @@ -260,7 +259,7 @@ def __init__( if common.count_not_none(com, span, alpha) > 0: self.com = get_center_of_mass(com, span, None, alpha) else: - self.com = None + self.com = 0.0 else: if halflife is not None and isinstance(halflife, (str, datetime.timedelta)): raise ValueError( From 20f09085c2aadd4b0926d7c657a17329d13276b2 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 17:42:02 -0800 Subject: [PATCH 27/31] Use numba_cache_key instead of 2 variables --- pandas/core/window/ewm.py | 1 - pandas/core/window/rolling.py | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 07d2c365dec46..cb34b66c7feba 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -561,7 +561,6 @@ def mean(self, engine=None, engine_kwargs=None): ) return self._apply( groupby_ewma_func, - use_numba_cache=maybe_use_numba(engine), numba_cache_key=(lambda x: x, "groupby_ewma"), ) elif engine in ("cython", None): diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index fb3bb09fcdbaa..611f265ccb225 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -405,7 +405,7 @@ def _apply( self, func: Callable[..., Any], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ): """ @@ -417,9 +417,8 @@ def _apply( ---------- func : callable function to apply name : str, - use_numba_cache : bool - whether to cache a numba compiled function. Only available for numba - enabled methods (so far only apply) + numba_cache_key : tuple + caching key to be used to store a compiled numba func **kwargs additional arguments for rolling function and window function @@ -456,8 +455,8 @@ def calc(x): result = calc(values) result = np.asarray(result) - if use_numba_cache: - NUMBA_FUNC_CACHE[kwargs["numba_cache_key"]] = func + if numba_cache_key is not None: + NUMBA_FUNC_CACHE[numba_cache_key] = func return result @@ -753,13 +752,13 @@ def _apply( self, func: Callable[..., Any], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ) -> FrameOrSeries: result = super()._apply( func, name, - use_numba_cache, + numba_cache_key, **kwargs, ) # Reconstruct the resulting MultiIndex from tuples @@ -1292,10 +1291,12 @@ def apply( if not is_bool(raw): raise ValueError("raw parameter must be `True` or `False`") + numba_cache_key = None if maybe_use_numba(engine): if raw is False: raise ValueError("raw must be `True` when using the numba engine") apply_func = generate_numba_apply_func(args, kwargs, func, engine_kwargs) + numba_cache_key = (func, "rolling_apply") elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") @@ -1305,8 +1306,7 @@ def apply( return self._apply( apply_func, - use_numba_cache=maybe_use_numba(engine), - numba_cache_key=(func, "rolling_apply"), + numba_cache_key=numba_cache_key, ) def _generate_cython_apply_func( From 0009dfacf0bb23c75687d3550a6966507ad5fd43 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 17:51:39 -0800 Subject: [PATCH 28/31] Mypy --- pandas/core/window/rolling.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 611f265ccb225..e74ae5311125e 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1037,7 +1037,7 @@ def _apply( self, func: Callable[[np.ndarray, int, int], np.ndarray], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ): """ @@ -1049,9 +1049,8 @@ def _apply( ---------- func : callable function to apply name : str, - use_numba_cache : bool - whether to cache a numba compiled function. Only available for numba - enabled methods (so far only apply) + use_numba_cache : tuple + unused **kwargs additional arguments for scipy windows if necessary From 208e2d8b938d552f29393158d3831326afff1283 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 18:02:33 -0800 Subject: [PATCH 29/31] Remove copy paste error --- pandas/core/window/ewm.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index cb34b66c7feba..f8237a436f436 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -542,8 +542,7 @@ def mean(self, engine=None, engine_kwargs=None): * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` and ``parallel`` dictionary keys. The values must either be ``True`` or ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is - ``{'nopython': True, 'nogil': False, 'parallel': False}`` and will be - applied to both the ``func`` and the ``apply`` rolling aggregation. + ``{'nopython': True, 'nogil': False, 'parallel': False}``. .. versionadded:: 1.2.0 From 45e1ba471653bef268720ac56480cf0628827e8e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 15 Nov 2020 19:10:52 -0800 Subject: [PATCH 30/31] Fix groupby tests for new ewm method --- pandas/core/groupby/base.py | 1 + pandas/tests/groupby/test_allowlist.py | 1 + 2 files changed, 2 insertions(+) diff --git a/pandas/core/groupby/base.py b/pandas/core/groupby/base.py index f205226c03a53..7dc0db35bf8fe 100644 --- a/pandas/core/groupby/base.py +++ b/pandas/core/groupby/base.py @@ -192,6 +192,7 @@ def _gotitem(self, key, ndim, subset=None): "describe", "dtypes", "expanding", + "ewm", "filter", "get_group", "groups", diff --git a/pandas/tests/groupby/test_allowlist.py b/pandas/tests/groupby/test_allowlist.py index 4a735fc7bb686..34729c771eac9 100644 --- a/pandas/tests/groupby/test_allowlist.py +++ b/pandas/tests/groupby/test_allowlist.py @@ -329,6 +329,7 @@ def test_tab_completion(mframe): "expanding", "pipe", "sample", + "ewm", } assert results == expected From 3baa13e7cadbdf725d10058589fce6814f5f615d Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 17 Nov 2020 09:57:09 -0800 Subject: [PATCH 31/31] Add tests comparing to groupby.apply --- pandas/tests/window/test_groupby.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pandas/tests/window/test_groupby.py b/pandas/tests/window/test_groupby.py index 2a2da9968205b..c4de112bd6dc0 100644 --- a/pandas/tests/window/test_groupby.py +++ b/pandas/tests/window/test_groupby.py @@ -660,6 +660,10 @@ def test_methods(self, method, expected_data): ) tm.assert_frame_equal(result, expected) + expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)()) + # There may be a bug in the above statement; not returning the correct index + tm.assert_frame_equal(result.reset_index(drop=True), expected) + @pytest.mark.parametrize( "method, expected_data", [["corr", [np.nan, 1.0, 1.0, 1]], ["cov", [np.nan, 0.5, 0.928571, 1.385714]]], @@ -681,3 +685,6 @@ def test_pairwise_methods(self, method, expected_data): ), ) tm.assert_frame_equal(result, expected) + + expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)()) + tm.assert_frame_equal(result, expected)