From 5e0b2cc9183c627f0b8820e176753fe32dc4d4bb Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 12 Sep 2021 14:05:50 -0700 Subject: [PATCH 01/21] Add mean kernel --- pandas/core/numba_kernels.py | 114 +++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 pandas/core/numba_kernels.py diff --git a/pandas/core/numba_kernels.py b/pandas/core/numba_kernels.py new file mode 100644 index 0000000000000..b560c86f67ecd --- /dev/null +++ b/pandas/core/numba_kernels.py @@ -0,0 +1,114 @@ +""" +Numba aggregation kernels that can be shared by +* Dataframe / Series +* groupby +* rolling / expanding + +Mirrors pandas/_libs/window/aggregation.pyx +""" + +import numba +import numpy as np + + +@numba.jit(nopython=True, nogil=True, parallel=True) +def is_monotonic_increasing(bounds): + n = len(bounds) + if n == 1: + return bounds[0] == bounds[0] + elif n < 2: + return True + prev = bounds[0] + for i in range(1, n): + cur = bounds[i] + if cur < prev: + return False + return True + + +@numba.jit(nopython=True, nogil=True, parallel=True) +def add_mean(val, nobs, sum_x, neg_ct, compensation): + if not np.isnan(val): + nobs += 1 + y = val - compensation + t = sum_x + y + compensation = t - sum_x - y + sum_x = t + if val < 0: + neg_ct += 1 + return nobs, sum_x, neg_ct, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=True) +def remove_mean(val, nobs, sum_x, neg_ct, compensation): + if not np.isnan(val): + nobs -= 1 + y = -val - compensation + t = sum_x + y + compensation = t - sum_x - y + sum_x = t + if val < 0: + neg_ct -= 1 + return nobs, sum_x, neg_ct, compensation + + +@numba.jit(nopython=True, nogil=True, parallel=True) +def sliding_mean( + values: np.array, + start: np.array, + end: np.array, + min_periods: int, +): + N = len(start) + nobs = 0.0 + sum_x = 0.0 + neg_ct = 0 + compensation_add = 0.0 + compensation_remove = 0.0 + + is_monotonic_increasing_bounds = is_monotonic_increasing( + start + ) and is_monotonic_increasing(end) + + output = np.empty(N, dtype=np.float64) + + for i in range(N): + s = start[i] + e = end[i] + if i == 0 or not is_monotonic_increasing_bounds: + for j in range(s, e): + val = values[j] + nobs, sum_x, neg_ct, compensation_add = add_mean( + val, nobs, sum_x, neg_ct, compensation_add + ) + else: + for j in range(start[i - 1], s): + val = values[j] + nobs, sum_x, neg_ct, compensation_remove = remove_mean( + val, nobs, sum_x, neg_ct, compensation_remove + ) + + for j in range(end[i - 1], e): + val = values[j] + nobs, sum_x, neg_ct, compensation_add = add_mean( + val, nobs, sum_x, neg_ct, compensation_add + ) + + if nobs >= min_periods and nobs > 0: + result = sum_x / nobs + if neg_ct == 0 and result < 0: + result = 0 + elif neg_ct == nobs and result > 0: + result = 0 + else: + result = np.nan + + output[i] = result + + if not is_monotonic_increasing_bounds: + nobs = 0.0 + sum_x = 0.0 + neg_ct = 0 + compensation_remove = 0.0 + + return output From 4f2d2983fa63253485aa7c1b384e39a5ac41faf5 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 12 Sep 2021 20:48:03 -0700 Subject: [PATCH 02/21] Add a shared executer function --- pandas/core/numba_/__init__.py | 0 pandas/core/numba_/executor.py | 65 +++++++++++++++++++ .../{numba_kernels.py => numba_/kernels.py} | 2 +- 3 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 pandas/core/numba_/__init__.py create mode 100644 pandas/core/numba_/executor.py rename pandas/core/{numba_kernels.py => numba_/kernels.py} (98%) diff --git a/pandas/core/numba_/__init__.py b/pandas/core/numba_/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/pandas/core/numba_/executor.py b/pandas/core/numba_/executor.py new file mode 100644 index 0000000000000..908a42e777231 --- /dev/null +++ b/pandas/core/numba_/executor.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from typing import ( + Any, + Callable, +) + +import numpy as np + +from pandas._typing import Scalar +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, + get_jit_arguments, +) + + +def generate_shared_aggregator( + kwargs: dict[str, Any], + func: Callable[..., Scalar], + engine_kwargs: dict[str, bool] | None, + cache_key_str: str, +): + """ + Generate a numba function that loops over the columns 2D object and applies + a 1D numba kernel over each column. + + Parameters + ---------- + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each window and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + cache_key_str: str + string to access the compiled function of the form + _ e.g. rolling_mean, groupby_mean + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) + + cache_key = (func, cache_key_str) + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def column_looper( + values, + start, + end, + min_periods, + ): + result = np.empty((len(start), values.shape[1])) + for i in numba.prange(values.shape[1]): + result[:, i] = func(values[:, i], start, end, min_periods) + return result + + return column_looper diff --git a/pandas/core/numba_kernels.py b/pandas/core/numba_/kernels.py similarity index 98% rename from pandas/core/numba_kernels.py rename to pandas/core/numba_/kernels.py index b560c86f67ecd..532fb6400f4bf 100644 --- a/pandas/core/numba_kernels.py +++ b/pandas/core/numba_/kernels.py @@ -1,5 +1,5 @@ """ -Numba aggregation kernels that can be shared by +Numba 1D aggregation kernels that can be shared by * Dataframe / Series * groupby * rolling / expanding From 8ada0be8d20363c5a5406d95a020f50729a7916d Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 12 Sep 2021 21:20:04 -0700 Subject: [PATCH 03/21] Add stub of a numba apply function --- pandas/core/window/rolling.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f6e991c7d7cd2..8c517987d5a6e 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -69,6 +69,7 @@ PeriodIndex, TimedeltaIndex, ) +from pandas.core.numba_ import executor from pandas.core.reshape.concat import concat from pandas.core.util.numba_ import ( NUMBA_FUNC_CACHE, @@ -567,6 +568,38 @@ def calc(x): else: return self._apply_tablewise(homogeneous_func, name) + def _numba_apply( + self, + func: Callable[..., Any], + engine_kwargs: dict[str, bool], + numba_cache_key: str, + numba_args: tuple[Any, ...] = (), + **kwargs, + ): + window_indexer = self._get_window_indexer() + min_periods = ( + self.min_periods + if self.min_periods is not None + else window_indexer.window_size + ) + obj = self._create_data(self._selected_obj) + if self.axis == 1: + obj = obj.T + values = self._prep_values(obj) + start, end = window_indexer.get_window_bounds( + num_values=len(values), + min_periods=min_periods, + center=self.center, + closed=self.closed, + ) + aggregator = executor.generate_shared_aggregator( + kwargs, func, engine_kwargs, numba_cache_key + ) + result = aggregator(values, start, end, min_periods, *numba_args) + result = result.T if self.axis == 1 else result + out = obj._constructor(result, index=obj.index, columns=obj.columns) + return self._resolve_output(out, obj) + def aggregate(self, func, *args, **kwargs): result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg() if result is None: From f201dbd68562303ce367a8c039f97b9656a9b9eb Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 12 Sep 2021 21:45:15 -0700 Subject: [PATCH 04/21] Hook in numba apply to mean --- pandas/core/window/rolling.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 8c517987d5a6e..2ad048e4e794c 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1355,15 +1355,16 @@ def mean( if maybe_use_numba(engine): if self.method == "table": func = generate_manual_numpy_nan_agg_with_axis(np.nanmean) + return self.apply( + func, + raw=True, + engine=engine, + engine_kwargs=engine_kwargs, + ) else: - func = np.nanmean + from pandas.core.numba_.kernels import sliding_mean - return self.apply( - func, - raw=True, - engine=engine, - engine_kwargs=engine_kwargs, - ) + return self._numba_apply(sliding_mean, engine_kwargs, "rolling_mean") window_func = window_aggregations.roll_mean return self._apply(window_func, name="mean", **kwargs) From 9ec1ef097edc99bcd5be361020baf4894997f3af Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 16 Sep 2021 11:07:09 -0700 Subject: [PATCH 05/21] Fix caching tests, don't parallelize when ineffective --- pandas/core/numba_/kernels.py | 9 ++++----- pandas/core/window/rolling.py | 2 +- pandas/tests/window/test_numba.py | 15 ++++++++++----- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pandas/core/numba_/kernels.py b/pandas/core/numba_/kernels.py index 532fb6400f4bf..a8837fb80441c 100644 --- a/pandas/core/numba_/kernels.py +++ b/pandas/core/numba_/kernels.py @@ -6,12 +6,11 @@ Mirrors pandas/_libs/window/aggregation.pyx """ - import numba import numpy as np -@numba.jit(nopython=True, nogil=True, parallel=True) +@numba.jit(nopython=True, nogil=True, parallel=False) def is_monotonic_increasing(bounds): n = len(bounds) if n == 1: @@ -26,7 +25,7 @@ def is_monotonic_increasing(bounds): return True -@numba.jit(nopython=True, nogil=True, parallel=True) +@numba.jit(nopython=True, nogil=True, parallel=False) def add_mean(val, nobs, sum_x, neg_ct, compensation): if not np.isnan(val): nobs += 1 @@ -39,7 +38,7 @@ def add_mean(val, nobs, sum_x, neg_ct, compensation): return nobs, sum_x, neg_ct, compensation -@numba.jit(nopython=True, nogil=True, parallel=True) +@numba.jit(nopython=True, nogil=True, parallel=False) def remove_mean(val, nobs, sum_x, neg_ct, compensation): if not np.isnan(val): nobs -= 1 @@ -52,7 +51,7 @@ def remove_mean(val, nobs, sum_x, neg_ct, compensation): return nobs, sum_x, neg_ct, compensation -@numba.jit(nopython=True, nogil=True, parallel=True) +@numba.jit(nopython=True, nogil=True, parallel=False) def sliding_mean( values: np.array, start: np.array, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 996ebcfaaf2b7..1d8e86eb87746 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -586,7 +586,7 @@ def _numba_apply( obj = self._create_data(self._selected_obj) if self.axis == 1: obj = obj.T - values = self._prep_values(obj) + values = self._prep_values(obj.to_numpy()) start, end = window_indexer.get_window_bounds( num_values=len(values), min_periods=min_periods, diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 1086857f38b62..7fb6d8855e278 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -57,7 +57,11 @@ def test_numba_vs_cython_rolling_methods( expected = getattr(roll, method)(engine="cython") # Check the cache - assert (getattr(np, f"nan{method}"), "Rolling_apply_single") in NUMBA_FUNC_CACHE + if method != "mean": + assert ( + getattr(np, f"nan{method}"), + "Rolling_apply_single", + ) in NUMBA_FUNC_CACHE tm.assert_frame_equal(result, expected) @@ -75,10 +79,11 @@ def test_numba_vs_cython_expanding_methods( expected = getattr(expand, method)(engine="cython") # Check the cache - assert ( - getattr(np, f"nan{method}"), - "Expanding_apply_single", - ) in NUMBA_FUNC_CACHE + if method != "mean": + assert ( + getattr(np, f"nan{method}"), + "Expanding_apply_single", + ) in NUMBA_FUNC_CACHE tm.assert_frame_equal(result, expected) From 81326229b4cddba9545216c5bb8444e7a9741f71 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 16 Sep 2021 14:44:21 -0700 Subject: [PATCH 06/21] Add whatsnew and fix caching --- doc/source/whatsnew/v1.4.0.rst | 2 +- pandas/core/window/rolling.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index eb1bc270a7d1b..5fd39b7ae791c 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -354,7 +354,7 @@ Performance improvements - Performance improvement in indexing with a :class:`MultiIndex` indexer on another :class:`MultiIndex` (:issue:43370`) - Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`) - :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`) -- +- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:``) .. --------------------------------------------------------------------------- diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 1d8e86eb87746..daca900bbceff 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -573,7 +573,7 @@ def _numba_apply( self, func: Callable[..., Any], engine_kwargs: dict[str, bool], - numba_cache_key: str, + numba_cache_key_str: str, numba_args: tuple[Any, ...] = (), **kwargs, ): @@ -594,9 +594,10 @@ def _numba_apply( closed=self.closed, ) aggregator = executor.generate_shared_aggregator( - kwargs, func, engine_kwargs, numba_cache_key + kwargs, func, engine_kwargs, numba_cache_key_str ) result = aggregator(values, start, end, min_periods, *numba_args) + NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator result = result.T if self.axis == 1 else result out = obj._constructor(result, index=obj.index, columns=obj.columns) return self._resolve_output(out, obj) From d9b39bda9f1b3b78b3d3ac5eaddf192c28d3dba3 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 16 Sep 2021 14:55:38 -0700 Subject: [PATCH 07/21] add PR number --- doc/source/whatsnew/v1.4.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 5fd39b7ae791c..8cd67b25d0077 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -354,7 +354,7 @@ Performance improvements - Performance improvement in indexing with a :class:`MultiIndex` indexer on another :class:`MultiIndex` (:issue:43370`) - Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`) - :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`) -- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:``) +- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:`43612`) .. --------------------------------------------------------------------------- From 9ddf423c42541fb05d17c52fa53bb89587814086 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 16 Sep 2021 15:36:18 -0700 Subject: [PATCH 08/21] Make _numba private --- pandas/core/{numba_ => _numba}/__init__.py | 0 pandas/core/{numba_ => _numba}/executor.py | 0 pandas/core/{numba_ => _numba}/kernels.py | 0 pandas/core/window/rolling.py | 4 ++-- 4 files changed, 2 insertions(+), 2 deletions(-) rename pandas/core/{numba_ => _numba}/__init__.py (100%) rename pandas/core/{numba_ => _numba}/executor.py (100%) rename pandas/core/{numba_ => _numba}/kernels.py (100%) diff --git a/pandas/core/numba_/__init__.py b/pandas/core/_numba/__init__.py similarity index 100% rename from pandas/core/numba_/__init__.py rename to pandas/core/_numba/__init__.py diff --git a/pandas/core/numba_/executor.py b/pandas/core/_numba/executor.py similarity index 100% rename from pandas/core/numba_/executor.py rename to pandas/core/_numba/executor.py diff --git a/pandas/core/numba_/kernels.py b/pandas/core/_numba/kernels.py similarity index 100% rename from pandas/core/numba_/kernels.py rename to pandas/core/_numba/kernels.py diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index daca900bbceff..b3ae50e8ef8d1 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -49,6 +49,7 @@ ) from pandas.core.dtypes.missing import notna +from pandas.core._numba import executor from pandas.core.algorithms import factorize from pandas.core.apply import ResamplerWindowApply from pandas.core.arrays import ExtensionArray @@ -70,7 +71,6 @@ PeriodIndex, TimedeltaIndex, ) -from pandas.core.numba_ import executor from pandas.core.reshape.concat import concat from pandas.core.util.numba_ import ( NUMBA_FUNC_CACHE, @@ -1364,7 +1364,7 @@ def mean( engine_kwargs=engine_kwargs, ) else: - from pandas.core.numba_.kernels import sliding_mean + from pandas.core._numba.kernels import sliding_mean return self._numba_apply(sliding_mean, engine_kwargs, "rolling_mean") window_func = window_aggregations.roll_mean From 68524fdf00e048db6d40acd917e2809fad1ce7e9 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 16 Sep 2021 15:42:20 -0700 Subject: [PATCH 09/21] Switch args --- pandas/core/_numba/executor.py | 6 +++--- pandas/core/window/rolling.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 908a42e777231..8b81a8813bd28 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -17,8 +17,8 @@ def generate_shared_aggregator( - kwargs: dict[str, Any], func: Callable[..., Scalar], + kwargs: dict[str, Any], engine_kwargs: dict[str, bool] | None, cache_key_str: str, ): @@ -28,10 +28,10 @@ def generate_shared_aggregator( Parameters ---------- - kwargs : dict - **kwargs to be passed into the function func : function function to be applied to each window and will be JITed + kwargs : dict + **kwargs to be passed into the function engine_kwargs : dict dictionary of arguments to be passed into numba.jit cache_key_str: str diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index b3ae50e8ef8d1..7f52b4b3bd64d 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -594,7 +594,7 @@ def _numba_apply( closed=self.closed, ) aggregator = executor.generate_shared_aggregator( - kwargs, func, engine_kwargs, numba_cache_key_str + func, kwargs, engine_kwargs, numba_cache_key_str ) result = aggregator(values, start, end, min_periods, *numba_args) NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator From 2d19aa067f9dfdcb872d75f7db505c66be5803b3 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 11:28:34 -0700 Subject: [PATCH 10/21] Fix typing --- pandas/core/_numba/kernels.py | 6 +++--- pandas/core/window/rolling.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pandas/core/_numba/kernels.py b/pandas/core/_numba/kernels.py index a8837fb80441c..31420c171a794 100644 --- a/pandas/core/_numba/kernels.py +++ b/pandas/core/_numba/kernels.py @@ -53,9 +53,9 @@ def remove_mean(val, nobs, sum_x, neg_ct, compensation): @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_mean( - values: np.array, - start: np.array, - end: np.array, + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, min_periods: int, ): N = len(start) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 7f52b4b3bd64d..bd9d207729877 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -572,8 +572,8 @@ def calc(x): def _numba_apply( self, func: Callable[..., Any], - engine_kwargs: dict[str, bool], numba_cache_key_str: str, + engine_kwargs: dict[str, bool] | None = None, numba_args: tuple[Any, ...] = (), **kwargs, ): @@ -1366,7 +1366,7 @@ def mean( else: from pandas.core._numba.kernels import sliding_mean - return self._numba_apply(sliding_mean, engine_kwargs, "rolling_mean") + return self._numba_apply(sliding_mean, "rolling_mean", engine_kwargs) window_func = window_aggregations.roll_mean return self._apply(window_func, name="mean", **kwargs) From 705bb8c5993b0540f440f9528850bdf206f0d956 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 11:32:13 -0700 Subject: [PATCH 11/21] Tighten docstring --- pandas/core/_numba/executor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 8b81a8813bd28..53a3c0fd971a3 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -23,15 +23,16 @@ def generate_shared_aggregator( cache_key_str: str, ): """ - Generate a numba function that loops over the columns 2D object and applies + Generate a Numba function that loops over the columns 2D object and applies a 1D numba kernel over each column. Parameters ---------- func : function - function to be applied to each window and will be JITed + aggregation function to be applied to each column kwargs : dict - **kwargs to be passed into the function + **kwargs to be passed into the function. Should be unused as not + supported by Numba engine_kwargs : dict dictionary of arguments to be passed into numba.jit cache_key_str: str From 36227008c4c7dcf132f6832e8852094fe1b3282a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 14:46:28 -0700 Subject: [PATCH 12/21] Keep kernels in their own directory --- pandas/core/_numba/kernels/__init__.py | 1 + pandas/core/_numba/{kernels.py => kernels/mean_.py} | 0 2 files changed, 1 insertion(+) create mode 100644 pandas/core/_numba/kernels/__init__.py rename pandas/core/_numba/{kernels.py => kernels/mean_.py} (100%) diff --git a/pandas/core/_numba/kernels/__init__.py b/pandas/core/_numba/kernels/__init__.py new file mode 100644 index 0000000000000..f7870a07cd503 --- /dev/null +++ b/pandas/core/_numba/kernels/__init__.py @@ -0,0 +1 @@ +from pandas.core._numba.kernels.mean_ import sliding_mean # noqa:F401 diff --git a/pandas/core/_numba/kernels.py b/pandas/core/_numba/kernels/mean_.py similarity index 100% rename from pandas/core/_numba/kernels.py rename to pandas/core/_numba/kernels/mean_.py From 0fa551a57370f8020b9368b8ccf7b3d2d18fd238 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 14:51:06 -0700 Subject: [PATCH 13/21] Add some typing --- pandas/core/_numba/kernels/mean_.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py index 31420c171a794..84589c73ca001 100644 --- a/pandas/core/_numba/kernels/mean_.py +++ b/pandas/core/_numba/kernels/mean_.py @@ -6,12 +6,14 @@ Mirrors pandas/_libs/window/aggregation.pyx """ +from __future__ import annotations + import numba import numpy as np @numba.jit(nopython=True, nogil=True, parallel=False) -def is_monotonic_increasing(bounds): +def is_monotonic_increasing(bounds: np.ndarray) -> bool: n = len(bounds) if n == 1: return bounds[0] == bounds[0] @@ -26,7 +28,9 @@ def is_monotonic_increasing(bounds): @numba.jit(nopython=True, nogil=True, parallel=False) -def add_mean(val, nobs, sum_x, neg_ct, compensation): +def add_mean( + val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float +) -> tuple[float, float, int, float]: if not np.isnan(val): nobs += 1 y = val - compensation @@ -39,7 +43,9 @@ def add_mean(val, nobs, sum_x, neg_ct, compensation): @numba.jit(nopython=True, nogil=True, parallel=False) -def remove_mean(val, nobs, sum_x, neg_ct, compensation): +def remove_mean( + val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float +) -> tuple[float, float, int, float]: if not np.isnan(val): nobs -= 1 y = -val - compensation @@ -57,7 +63,7 @@ def sliding_mean( start: np.ndarray, end: np.ndarray, min_periods: int, -): +) -> np.ndarray: N = len(start) nobs = 0.0 sum_x = 0.0 From 675b5a160f9c48ba5a83cfaa51a9155a9d5b9742 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 17:04:20 -0700 Subject: [PATCH 14/21] Add Series test cases --- pandas/core/window/rolling.py | 11 +++++++++-- pandas/tests/window/test_numba.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index bd9d207729877..e6fd8b3714e2d 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -587,6 +587,8 @@ def _numba_apply( if self.axis == 1: obj = obj.T values = self._prep_values(obj.to_numpy()) + if values.ndim == 1: + values = values.reshape(-1, 1) start, end = window_indexer.get_window_bounds( num_values=len(values), min_periods=min_periods, @@ -599,8 +601,13 @@ def _numba_apply( result = aggregator(values, start, end, min_periods, *numba_args) NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator result = result.T if self.axis == 1 else result - out = obj._constructor(result, index=obj.index, columns=obj.columns) - return self._resolve_output(out, obj) + if obj.ndim == 1: + result = result.squeeze() + out = obj._constructor(result, index=obj.index, name=obj.name) + return out + else: + out = obj._constructor(result, index=obj.index, columns=obj.columns) + return self._resolve_output(out, obj) def aggregate(self, func, *args, **kwargs): result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg() diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 7fb6d8855e278..0cee21e8d6818 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -43,16 +43,16 @@ def f(x, *args): ) tm.assert_series_equal(result, expected) + @pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))]) def test_numba_vs_cython_rolling_methods( - self, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): method = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - df = DataFrame(np.eye(5)) - roll = df.rolling(2) + roll = data.rolling(2) result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs) expected = getattr(roll, method)(engine="cython") @@ -63,18 +63,19 @@ def test_numba_vs_cython_rolling_methods( "Rolling_apply_single", ) in NUMBA_FUNC_CACHE - tm.assert_frame_equal(result, expected) + tm.assert_equal(result, expected) + @pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))]) def test_numba_vs_cython_expanding_methods( - self, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): method = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - df = DataFrame(np.eye(5)) - expand = df.expanding() + data = DataFrame(np.eye(5)) + expand = data.expanding() result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs) expected = getattr(expand, method)(engine="cython") @@ -85,7 +86,7 @@ def test_numba_vs_cython_expanding_methods( "Expanding_apply_single", ) in NUMBA_FUNC_CACHE - tm.assert_frame_equal(result, expected) + tm.assert_equal(result, expected) @pytest.mark.parametrize("jit", [True, False]) def test_cache_apply(self, jit, nogil, parallel, nopython): From a169423ad9313b17b5f5d4e01c052e098cebc3ab Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 17:08:24 -0700 Subject: [PATCH 15/21] Type column looper --- pandas/core/_numba/executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 53a3c0fd971a3..9dab8a59b4c5f 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -53,10 +53,10 @@ def generate_shared_aggregator( @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) def column_looper( - values, - start, - end, - min_periods, + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, ): result = np.empty((len(start), values.shape[1])) for i in numba.prange(values.shape[1]): From 2842199c6038560ac8c2e1fdcd48d5561e50f046 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Fri, 17 Sep 2021 17:09:37 -0700 Subject: [PATCH 16/21] Add name --- pandas/tests/window/test_numba.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 0cee21e8d6818..af2ca7270c982 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -43,7 +43,9 @@ def f(x, *args): ) tm.assert_series_equal(result, expected) - @pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))]) + @pytest.mark.parametrize( + "data", [DataFrame(np.eye(5)), Series(range(5), name="foo")] + ) def test_numba_vs_cython_rolling_methods( self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators ): From 46f1b6b51d080ade30abbcbb0a3e84fd1165b1b1 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 19 Sep 2021 11:27:13 -0700 Subject: [PATCH 17/21] Add __all__ --- pandas/core/_numba/kernels/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandas/core/_numba/kernels/__init__.py b/pandas/core/_numba/kernels/__init__.py index f7870a07cd503..eb43de1e0d979 100644 --- a/pandas/core/_numba/kernels/__init__.py +++ b/pandas/core/_numba/kernels/__init__.py @@ -1 +1,3 @@ -from pandas.core._numba.kernels.mean_ import sliding_mean # noqa:F401 +from pandas.core._numba.kernels.mean_ import sliding_mean + +__all__ = ["sliding_mean"] From c4bd78c6f1276c4de34ce00468a84271d2b01205 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Mon, 20 Sep 2021 10:07:37 -0700 Subject: [PATCH 18/21] Add dtype to empty result --- pandas/core/_numba/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 9dab8a59b4c5f..c73f501d9a1aa 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -58,7 +58,7 @@ def column_looper( end: np.ndarray, min_periods: int, ): - result = np.empty((len(start), values.shape[1])) + result = np.empty((len(start), values.shape[1]), dtype=np.float64) for i in numba.prange(values.shape[1]): result[:, i] = func(values[:, i], start, end, min_periods) return result From 8801cf96bf7275d8d5323cc919cc441d8c947967 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Mon, 20 Sep 2021 10:13:20 -0700 Subject: [PATCH 19/21] Change nobs to int --- pandas/core/_numba/kernels/mean_.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py index 84589c73ca001..7cc6a6c2668da 100644 --- a/pandas/core/_numba/kernels/mean_.py +++ b/pandas/core/_numba/kernels/mean_.py @@ -29,8 +29,8 @@ def is_monotonic_increasing(bounds: np.ndarray) -> bool: @numba.jit(nopython=True, nogil=True, parallel=False) def add_mean( - val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float -) -> tuple[float, float, int, float]: + val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float +) -> tuple[int, float, int, float]: if not np.isnan(val): nobs += 1 y = val - compensation @@ -44,8 +44,8 @@ def add_mean( @numba.jit(nopython=True, nogil=True, parallel=False) def remove_mean( - val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float -) -> tuple[float, float, int, float]: + val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float +) -> tuple[int, float, int, float]: if not np.isnan(val): nobs -= 1 y = -val - compensation @@ -65,7 +65,7 @@ def sliding_mean( min_periods: int, ) -> np.ndarray: N = len(start) - nobs = 0.0 + nobs = 0 sum_x = 0.0 neg_ct = 0 compensation_add = 0.0 From ce6aafb69db62bbe7e791b0b88ac113b38c0174b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Mon, 20 Sep 2021 10:20:26 -0700 Subject: [PATCH 20/21] Simplify monitonically increasing for bounds --- pandas/core/_numba/kernels/mean_.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py index 7cc6a6c2668da..32ea505513ed0 100644 --- a/pandas/core/_numba/kernels/mean_.py +++ b/pandas/core/_numba/kernels/mean_.py @@ -14,16 +14,16 @@ @numba.jit(nopython=True, nogil=True, parallel=False) def is_monotonic_increasing(bounds: np.ndarray) -> bool: + """Check if int64 values are monotonically increasing.""" n = len(bounds) - if n == 1: - return bounds[0] == bounds[0] - elif n < 2: + if n < 2: return True prev = bounds[0] for i in range(1, n): cur = bounds[i] if cur < prev: return False + prev = cur return True @@ -111,7 +111,7 @@ def sliding_mean( output[i] = result if not is_monotonic_increasing_bounds: - nobs = 0.0 + nobs = 0 sum_x = 0.0 neg_ct = 0 compensation_remove = 0.0 From fd595c5642a8fea4cd04d2fb87614085d4ba2e0d Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Wed, 22 Sep 2021 23:06:45 -0700 Subject: [PATCH 21/21] Remove unused kwargs and args --- pandas/core/_numba/executor.py | 11 ++--------- pandas/core/window/rolling.py | 6 ++---- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index c73f501d9a1aa..c666bb1a0ad4b 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -1,9 +1,6 @@ from __future__ import annotations -from typing import ( - Any, - Callable, -) +from typing import Callable import numpy as np @@ -18,7 +15,6 @@ def generate_shared_aggregator( func: Callable[..., Scalar], - kwargs: dict[str, Any], engine_kwargs: dict[str, bool] | None, cache_key_str: str, ): @@ -30,9 +26,6 @@ def generate_shared_aggregator( ---------- func : function aggregation function to be applied to each column - kwargs : dict - **kwargs to be passed into the function. Should be unused as not - supported by Numba engine_kwargs : dict dictionary of arguments to be passed into numba.jit cache_key_str: str @@ -43,7 +36,7 @@ def generate_shared_aggregator( ------- Numba function """ - nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) + nopython, nogil, parallel = get_jit_arguments(engine_kwargs, None) cache_key = (func, cache_key_str) if cache_key in NUMBA_FUNC_CACHE: diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index a69eab70baf4f..2060f2d701276 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -582,8 +582,6 @@ def _numba_apply( func: Callable[..., Any], numba_cache_key_str: str, engine_kwargs: dict[str, bool] | None = None, - numba_args: tuple[Any, ...] = (), - **kwargs, ): window_indexer = self._get_window_indexer() min_periods = ( @@ -604,9 +602,9 @@ def _numba_apply( closed=self.closed, ) aggregator = executor.generate_shared_aggregator( - func, kwargs, engine_kwargs, numba_cache_key_str + func, engine_kwargs, numba_cache_key_str ) - result = aggregator(values, start, end, min_periods, *numba_args) + result = aggregator(values, start, end, min_periods) NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator result = result.T if self.axis == 1 else result if obj.ndim == 1: