From c7ba0eb79c3351cb7e03b741e7612322ccb8760f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Mon, 10 Aug 2020 23:54:23 -0700 Subject: [PATCH 01/12] Roll back groupby agg changes --- pandas/core/groupby/generic.py | 14 ++---------- pandas/core/groupby/groupby.py | 27 ++++++---------------- pandas/core/groupby/ops.py | 42 ++++------------------------------ 3 files changed, 13 insertions(+), 70 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 53242c0332a8c..c8ecb1a8a4883 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -252,16 +252,11 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) return getattr(self, cyfunc)() if self.grouper.nkeys > 1: - return self._python_agg_general( - func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs - ) + return self._python_agg_general(func, *args, **kwargs) try: - return self._python_agg_general( - func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs - ) + return self._python_agg_general(func, *args, **kwargs) except (ValueError, KeyError): - # Do not catch Numba errors here, we want to raise and not fall back. # TODO: KeyError is raised in _python_agg_general, # see see test_groupby.test_basic result = self._aggregate_named(func, *args, **kwargs) @@ -935,11 +930,6 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) relabeling, func, columns, order = reconstruct_func(func, **kwargs) - if maybe_use_numba(engine): - return self._python_agg_general( - func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs - ) - result, how = self._aggregate(func, *args, **kwargs) if how is None: return result diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 4597afeeaddbf..4b5bfe9b84862 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1051,12 +1051,9 @@ def _cython_agg_general( return self._wrap_aggregated_output(output) - def _python_agg_general( - self, func, *args, engine="cython", engine_kwargs=None, **kwargs - ): + def _python_agg_general(self, func, *args, **kwargs): func = self._is_builtin_func(func) - if engine != "numba": - f = lambda x: func(x, *args, **kwargs) + f = lambda x: func(x, *args, **kwargs) # iterate through "columns" ex exclusions to populate output dict output: Dict[base.OutputKey, np.ndarray] = {} @@ -1067,21 +1064,11 @@ def _python_agg_general( # agg_series below assumes ngroups > 0 continue - if maybe_use_numba(engine): - result, counts = self.grouper.agg_series( - obj, - func, - *args, - engine=engine, - engine_kwargs=engine_kwargs, - **kwargs, - ) - else: - try: - # if this function is invalid for this dtype, we will ignore it. - result, counts = self.grouper.agg_series(obj, f) - except TypeError: - continue + try: + # if this function is invalid for this dtype, we will ignore it. + result, counts = self.grouper.agg_series(obj, f) + except TypeError: + continue assert result is not None key = base.OutputKey(label=name, position=idx) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 64eb413fe78fa..8b35e38d0ea25 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -610,21 +610,11 @@ def _transform( return result def agg_series( - self, - obj: Series, - func: F, - *args, - engine: str = "cython", - engine_kwargs=None, - **kwargs, + self, obj: Series, func: F, *args, **kwargs, ): # Caller is responsible for checking ngroups != 0 assert self.ngroups != 0 - if maybe_use_numba(engine): - return self._aggregate_series_pure_python( - obj, func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs - ) if len(obj) == 0: # SeriesGrouper would raise if we were to call _aggregate_series_fast return self._aggregate_series_pure_python(obj, func) @@ -670,20 +660,8 @@ def _aggregate_series_fast(self, obj: Series, func: F): return result, counts def _aggregate_series_pure_python( - self, - obj: Series, - func: F, - *args, - engine: str = "cython", - engine_kwargs=None, - **kwargs, + self, obj: Series, func: F, *args, **kwargs, ): - - if maybe_use_numba(engine): - numba_func, cache_key = generate_numba_func( - func, engine_kwargs, kwargs, "groupby_agg" - ) - group_index, _, ngroups = self.group_info counts = np.zeros(ngroups, dtype=int) @@ -692,13 +670,7 @@ def _aggregate_series_pure_python( splitter = get_splitter(obj, group_index, ngroups, axis=0) for label, group in splitter: - if maybe_use_numba(engine): - values, index = split_for_numba(group) - res = numba_func(values, index, *args) - if cache_key not in NUMBA_FUNC_CACHE: - NUMBA_FUNC_CACHE[cache_key] = numba_func - else: - res = func(group, *args, **kwargs) + res = func(group, *args, **kwargs) if result is None: if isinstance(res, (Series, Index, np.ndarray)): @@ -876,13 +848,7 @@ def groupings(self) -> "List[grouper.Grouping]": ] def agg_series( - self, - obj: Series, - func: F, - *args, - engine: str = "cython", - engine_kwargs=None, - **kwargs, + self, obj: Series, func: F, *args, **kwargs, ): # Caller is responsible for checking ngroups != 0 assert self.ngroups != 0 From 2d799849c88ad0199f3ccc3553a01b80b144eaee Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 13 Aug 2020 00:21:03 -0700 Subject: [PATCH 02/12] Add aggragate_with_numba --- pandas/core/groupby/generic.py | 26 ++++++++++++ pandas/core/groupby/groupby.py | 38 +++++++++++++++-- pandas/core/groupby/numba_.py | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 pandas/core/groupby/numba_.py diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index ae0197561ed17..3d012817d639a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -70,6 +70,7 @@ GroupBy, _agg_template, _apply_docs, + _group_selection_context, _transform_template, get_groupby, ) @@ -230,6 +231,18 @@ def apply(self, func, *args, **kwargs): ) def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): + if maybe_use_numba(engine): + if not callable(func): + raise ValueError( + "Numba engine can only be used with a single function." + ) + with _group_selection_context(self): + data = self._selected_obj + result, index = self._aggregate_with_numba( + data, 1, func, *args, engine_kwargs=None, **kwargs + ) + return self.obj._constructor(result, index=index, name=data.name) + relabeling = func is None columns = None if relabeling: @@ -928,6 +941,19 @@ class DataFrameGroupBy(GroupBy[DataFrame]): ) def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): + if maybe_use_numba(engine): + if not callable(func): + raise ValueError( + "Numba engine can only be used with a single function." + ) + with _group_selection_context(self): + data = self._selected_obj + num_labels = len(data.columns) + result, index = self._aggregate_with_numba( + func, num_labels, *args, engine_kwargs=None, **kwargs + ) + return self.obj._constructor(result, index=index, columns=data.columns) + relabeling, func, columns, order = reconstruct_func(func, **kwargs) result, how = self._aggregate(func, *args, **kwargs) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 4b5bfe9b84862..d2bff0a06ece0 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -34,7 +34,7 @@ class providing the base-class of operations. from pandas._config.config import option_context -from pandas._libs import Timestamp +from pandas._libs import Timestamp, lib import pandas._libs.groupby as libgroupby from pandas._typing import F, FrameOrSeries, FrameOrSeriesUnion, Scalar from pandas.compat.numpy import function as nv @@ -61,11 +61,11 @@ class providing the base-class of operations. import pandas.core.common as com from pandas.core.frame import DataFrame from pandas.core.generic import NDFrame -from pandas.core.groupby import base, ops +from pandas.core.groupby import base, numba_, ops from pandas.core.indexes.api import CategoricalIndex, Index, MultiIndex from pandas.core.series import Series from pandas.core.sorting import get_group_index_sorter -from pandas.core.util.numba_ import maybe_use_numba +from pandas.core.util.numba_ import NUMBA_FUNC_CACHE _common_see_also = """ See Also @@ -1051,6 +1051,38 @@ def _cython_agg_general( return self._wrap_aggregated_output(output) + def _aggregate_with_numba( + self, data, num_labels, func, *args, engine_kwargs=None, **kwargs + ): + group_keys = self.grouper._get_group_keys() + labels, _, n_groups = self.grouper.group_info + sorted_index = get_group_index_sorter(labels, n_groups) + sorted_labels = algorithms.take_nd(labels, sorted_index, allow_fill=False) + sorted_data = data.take(sorted_index, axis=self.axis) + starts, ends = lib.generate_slices(sorted_labels, n_groups) + cache_key = (func, "groupby_agg") + if cache_key in NUMBA_FUNC_CACHE: + # Return an already compiled version of roll_apply if available + apply_func = NUMBA_FUNC_CACHE[cache_key] + else: + apply_func = numba_.generate_numba_apply_func( + tuple(args), kwargs, func, engine_kwargs + ) + result = apply_func( + sorted_data.to_numpy(), + sorted_index, + starts, + ends, + len(group_keys), + num_labels, + ) + + if self.grouper.nkeys > 1: + index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) + else: + index = Index(group_keys, name=self.grouper.names[0]) + return result, index + def _python_agg_general(self, func, *args, **kwargs): func = self._is_builtin_func(func) f = lambda x: func(x, *args, **kwargs) diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py new file mode 100644 index 0000000000000..746776ff9e393 --- /dev/null +++ b/pandas/core/groupby/numba_.py @@ -0,0 +1,75 @@ +from typing import Any, Callable, Dict, Optional, Tuple + +import numpy as np + +from pandas._typing import Scalar +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + check_kwargs_and_nopython, + get_jit_arguments, + jit_user_function, +) + + +def generate_numba_apply_func( + args: Tuple, + kwargs: Dict[str, Any], + func: Callable[..., Scalar], + engine_kwargs: Optional[Dict[str, bool]], +): + """ + Generate a numba jitted apply function specified by values from engine_kwargs. + + 1. jit the user's function + 2. Return a rolling apply function with the jitted function inline + + Configurations specified in engine_kwargs apply to both the user's + function _AND_ the rolling apply function. + + Parameters + ---------- + args : tuple + *args to be passed into the function + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each window and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + check_kwargs_and_nopython(kwargs, nopython) + + numba_func = jit_user_function(func, nopython, nogil, parallel) + + numba = import_optional_dependency("numba") + + if parallel: + loop_range = numba.prange + else: + loop_range = range + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def group_apply( + values: np.ndarray, + index: np.ndarray, + begin: np.ndarray, + end: np.ndarray, + num_groups: int, + num_columns: int, + ) -> np.ndarray: + result = np.empty((num_groups, num_columns)) + for i in loop_range(num_groups): + group_index = index[begin[i] : end[i]] + for j in loop_range(num_columns): + group = values[begin[i] : end[i], j] + result[i, j] = numba_func(group, group_index, *args) + return result + + return group_apply From d3498168e5f549870518484ff326e82cad599c44 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 00:05:01 -0700 Subject: [PATCH 03/12] Fix cases where operation on Series inputs --- pandas/core/groupby/generic.py | 6 +++--- pandas/core/groupby/groupby.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index e9adee00454e7..9007e772438f3 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -239,9 +239,9 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) with _group_selection_context(self): data = self._selected_obj result, index = self._aggregate_with_numba( - data, 1, func, *args, engine_kwargs=None, **kwargs + data, 1, func, *args, engine_kwargs=engine_kwargs, **kwargs ) - return self.obj._constructor(result, index=index, name=data.name) + return self.obj._constructor(result.ravel(), index=index, name=data.name) relabeling = func is None columns = None @@ -954,7 +954,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) data = self._selected_obj num_labels = len(data.columns) result, index = self._aggregate_with_numba( - func, num_labels, *args, engine_kwargs=None, **kwargs + data, num_labels, func, *args, engine_kwargs=engine_kwargs, **kwargs ) return self.obj._constructor(result, index=index, columns=data.columns) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index bf379f2df7e17..644fe6b3b4387 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1071,7 +1071,8 @@ def _aggregate_with_numba( tuple(args), kwargs, func, engine_kwargs ) result = apply_func( - sorted_data.to_numpy(), + # Numba agg routines operates on 2D values; reshape Series-based data + sorted_data.to_numpy().reshape(len(sorted_data), 1), sorted_index, starts, ends, From 6292d7583ba70f649d87759a08bf4f1f834e9ba6 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 00:19:59 -0700 Subject: [PATCH 04/12] Simplify case, handle Series correctly --- pandas/core/groupby/generic.py | 5 ++--- pandas/core/groupby/groupby.py | 14 +++----------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 9007e772438f3..d6e9abfaa0f61 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -239,7 +239,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) with _group_selection_context(self): data = self._selected_obj result, index = self._aggregate_with_numba( - data, 1, func, *args, engine_kwargs=engine_kwargs, **kwargs + data.to_frame(), func, *args, engine_kwargs=engine_kwargs, **kwargs ) return self.obj._constructor(result.ravel(), index=index, name=data.name) @@ -952,9 +952,8 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) ) with _group_selection_context(self): data = self._selected_obj - num_labels = len(data.columns) result, index = self._aggregate_with_numba( - data, num_labels, func, *args, engine_kwargs=engine_kwargs, **kwargs + data, func, *args, engine_kwargs=engine_kwargs, **kwargs ) return self.obj._constructor(result, index=index, columns=data.columns) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 644fe6b3b4387..10132dafe5fb5 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1053,14 +1053,12 @@ def _cython_agg_general( return self._wrap_aggregated_output(output, index=self.grouper.result_index) - def _aggregate_with_numba( - self, data, num_labels, func, *args, engine_kwargs=None, **kwargs - ): + def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): group_keys = self.grouper._get_group_keys() labels, _, n_groups = self.grouper.group_info sorted_index = get_group_index_sorter(labels, n_groups) sorted_labels = algorithms.take_nd(labels, sorted_index, allow_fill=False) - sorted_data = data.take(sorted_index, axis=self.axis) + sorted_data = data.take(sorted_index, axis=self.axis).to_numpy() starts, ends = lib.generate_slices(sorted_labels, n_groups) cache_key = (func, "groupby_agg") if cache_key in NUMBA_FUNC_CACHE: @@ -1071,13 +1069,7 @@ def _aggregate_with_numba( tuple(args), kwargs, func, engine_kwargs ) result = apply_func( - # Numba agg routines operates on 2D values; reshape Series-based data - sorted_data.to_numpy().reshape(len(sorted_data), 1), - sorted_index, - starts, - ends, - len(group_keys), - num_labels, + sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns), ) if self.grouper.nkeys > 1: From 66edc214374094ea2718d03adf12b184de087145 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 19:40:07 -0700 Subject: [PATCH 05/12] Ensure function is being cached, validate the udf signature for groupby agg --- pandas/core/groupby/groupby.py | 8 +++++--- pandas/core/groupby/numba_.py | 5 ++++- pandas/core/util/numba_.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 10132dafe5fb5..d2edd6dab0a89 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1063,14 +1063,16 @@ def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs) cache_key = (func, "groupby_agg") if cache_key in NUMBA_FUNC_CACHE: # Return an already compiled version of roll_apply if available - apply_func = NUMBA_FUNC_CACHE[cache_key] + numba_agg_func = NUMBA_FUNC_CACHE[cache_key] else: - apply_func = numba_.generate_numba_apply_func( + numba_agg_func = numba_.generate_numba_agg_func( tuple(args), kwargs, func, engine_kwargs ) - result = apply_func( + result = numba_agg_func( sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns), ) + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_agg_func if self.grouper.nkeys > 1: index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index 746776ff9e393..a34b512a71ffd 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -9,10 +9,11 @@ check_kwargs_and_nopython, get_jit_arguments, jit_user_function, + validate_udf, ) -def generate_numba_apply_func( +def generate_numba_agg_func( args: Tuple, kwargs: Dict[str, Any], func: Callable[..., Scalar], @@ -46,6 +47,8 @@ def generate_numba_apply_func( check_kwargs_and_nopython(kwargs, nopython) + validate_udf(func) + numba_func = jit_user_function(func, nopython, nogil, parallel) numba = import_optional_dependency("numba") diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index c9b7943478cdd..51d9eaaa8eebc 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -149,7 +149,7 @@ def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray]: def validate_udf(func: Callable) -> None: """ - Validate user defined function for ops when using Numba. + Validate user defined function for ops when using Numba with groupby ops. The first signature arguments should include: From 608c9557ada87829a19b8616b464ee2f5482665e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 21:05:51 -0700 Subject: [PATCH 06/12] Move some functionality to groupby/numba_.py --- pandas/core/groupby/generic.py | 8 +-- pandas/core/groupby/numba_.py | 102 +++++++++++++++++++++++++++++++-- pandas/core/groupby/ops.py | 6 -- pandas/core/util/numba_.py | 93 ------------------------------ 4 files changed, 100 insertions(+), 109 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index d6e9abfaa0f61..b7a9ea99f8aaa 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -74,16 +74,12 @@ _transform_template, get_groupby, ) +from pandas.core.groupby.numba_ import generate_numba_func, split_for_numba from pandas.core.indexes.api import Index, MultiIndex, all_indexes_same import pandas.core.indexes.base as ibase from pandas.core.internals import BlockManager, make_block from pandas.core.series import Series -from pandas.core.util.numba_ import ( - NUMBA_FUNC_CACHE, - generate_numba_func, - maybe_use_numba, - split_for_numba, -) +from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.plotting import boxplot_frame_groupby diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index a34b512a71ffd..bf5ff3e5bb5d9 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -1,18 +1,112 @@ +"""Common utilities for Numba operations with groupby ops""" +import inspect from typing import Any, Callable, Dict, Optional, Tuple import numpy as np -from pandas._typing import Scalar +from pandas._typing import FrameOrSeries, Scalar from pandas.compat._optional import import_optional_dependency from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, + NumbaUtilError, check_kwargs_and_nopython, get_jit_arguments, jit_user_function, - validate_udf, ) +def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray]: + """ + Split pandas object into its components as numpy arrays for numba functions. + + Parameters + ---------- + arg : Series or DataFrame + + Returns + ------- + (ndarray, ndarray) + values, index + """ + return arg.to_numpy(), arg.index.to_numpy() + + +def validate_udf(func: Callable) -> None: + """ + Validate user defined function for ops when using Numba with groupby ops. + + The first signature arguments should include: + + def f(values, index, ...): + ... + + Parameters + ---------- + func : function, default False + user defined function + + Returns + ------- + None + + Raises + ------ + NumbaUtilError + """ + udf_signature = list(inspect.signature(func).parameters.keys()) + expected_args = ["values", "index"] + min_number_args = len(expected_args) + if ( + len(udf_signature) < min_number_args + or udf_signature[:min_number_args] != expected_args + ): + raise NumbaUtilError( + f"The first {min_number_args} arguments to {func.__name__} must be " + f"{expected_args}" + ) + + +def generate_numba_func( + func: Callable, + engine_kwargs: Optional[Dict[str, bool]], + kwargs: dict, + cache_key_str: str, +) -> Tuple[Callable, Tuple[Callable, str]]: + """ + Return a JITed function and cache key for the NUMBA_FUNC_CACHE + + This _may_ be specific to groupby (as it's only used there currently). + + Parameters + ---------- + func : function + user defined function + engine_kwargs : dict or None + numba.jit arguments + kwargs : dict + kwargs for func + cache_key_str : str + string representing the second part of the cache key tuple + + Returns + ------- + (JITed function, cache key) + + Raises + ------ + NumbaUtilError + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + check_kwargs_and_nopython(kwargs, nopython) + validate_udf(func) + cache_key = (func, cache_key_str) + numba_func = NUMBA_FUNC_CACHE.get( + cache_key, jit_user_function(func, nopython, nogil, parallel) + ) + return numba_func, cache_key + + def generate_numba_agg_func( args: Tuple, kwargs: Dict[str, Any], @@ -20,10 +114,10 @@ def generate_numba_agg_func( engine_kwargs: Optional[Dict[str, bool]], ): """ - Generate a numba jitted apply function specified by values from engine_kwargs. + Generate a numba jitted agg function specified by values from engine_kwargs. 1. jit the user's function - 2. Return a rolling apply function with the jitted function inline + 2. Return a groupby agg function with the jitted function inline Configurations specified in engine_kwargs apply to both the user's function _AND_ the rolling apply function. diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 8b35e38d0ea25..c6171a55359fe 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -55,12 +55,6 @@ get_group_index_sorter, get_indexer_dict, ) -from pandas.core.util.numba_ import ( - NUMBA_FUNC_CACHE, - generate_numba_func, - maybe_use_numba, - split_for_numba, -) class BaseGrouper: diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index 51d9eaaa8eebc..b951cd4f0cc2a 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -1,12 +1,10 @@ """Common utilities for Numba operations""" from distutils.version import LooseVersion -import inspect import types from typing import Callable, Dict, Optional, Tuple import numpy as np -from pandas._typing import FrameOrSeries from pandas.compat._optional import import_optional_dependency from pandas.errors import NumbaUtilError @@ -129,94 +127,3 @@ def impl(data, *_args): return impl return numba_func - - -def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray]: - """ - Split pandas object into its components as numpy arrays for numba functions. - - Parameters - ---------- - arg : Series or DataFrame - - Returns - ------- - (ndarray, ndarray) - values, index - """ - return arg.to_numpy(), arg.index.to_numpy() - - -def validate_udf(func: Callable) -> None: - """ - Validate user defined function for ops when using Numba with groupby ops. - - The first signature arguments should include: - - def f(values, index, ...): - ... - - Parameters - ---------- - func : function, default False - user defined function - - Returns - ------- - None - - Raises - ------ - NumbaUtilError - """ - udf_signature = list(inspect.signature(func).parameters.keys()) - expected_args = ["values", "index"] - min_number_args = len(expected_args) - if ( - len(udf_signature) < min_number_args - or udf_signature[:min_number_args] != expected_args - ): - raise NumbaUtilError( - f"The first {min_number_args} arguments to {func.__name__} must be " - f"{expected_args}" - ) - - -def generate_numba_func( - func: Callable, - engine_kwargs: Optional[Dict[str, bool]], - kwargs: dict, - cache_key_str: str, -) -> Tuple[Callable, Tuple[Callable, str]]: - """ - Return a JITed function and cache key for the NUMBA_FUNC_CACHE - - This _may_ be specific to groupby (as it's only used there currently). - - Parameters - ---------- - func : function - user defined function - engine_kwargs : dict or None - numba.jit arguments - kwargs : dict - kwargs for func - cache_key_str : str - string representing the second part of the cache key tuple - - Returns - ------- - (JITed function, cache key) - - Raises - ------ - NumbaUtilError - """ - nopython, nogil, parallel = get_jit_arguments(engine_kwargs) - check_kwargs_and_nopython(kwargs, nopython) - validate_udf(func) - cache_key = (func, cache_key_str) - numba_func = NUMBA_FUNC_CACHE.get( - cache_key, jit_user_function(func, nopython, nogil, parallel) - ) - return numba_func, cache_key From cd0ed3f9792f4d0b008473b1db4b1498b42d05f0 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 22:22:47 -0700 Subject: [PATCH 07/12] Change ValueError to NotImplementedError --- pandas/core/groupby/generic.py | 4 ++-- pandas/tests/groupby/aggregate/test_numba.py | 24 +++++++++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index b7a9ea99f8aaa..7da46b9bc858f 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -229,7 +229,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) if maybe_use_numba(engine): if not callable(func): - raise ValueError( + raise NotImplementedError( "Numba engine can only be used with a single function." ) with _group_selection_context(self): @@ -943,7 +943,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) if maybe_use_numba(engine): if not callable(func): - raise ValueError( + raise NotImplementedError( "Numba engine can only be used with a single function." ) with _group_selection_context(self): diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py index 690694b0e66f5..29e65e938f6f9 100644 --- a/pandas/tests/groupby/aggregate/test_numba.py +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -4,7 +4,7 @@ from pandas.errors import NumbaUtilError import pandas.util._test_decorators as td -from pandas import DataFrame, option_context +from pandas import DataFrame, NamedAgg, option_context import pandas._testing as tm from pandas.core.util.numba_ import NUMBA_FUNC_CACHE @@ -128,3 +128,25 @@ def func_1(values, index): with option_context("compute.use_numba", True): result = grouped.agg(func_1, engine=None) tm.assert_frame_equal(expected, result) + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.parametrize( + "agg_func", + [ + ["min", "max"], + "min", + {"B": ["min", "max"], "C": "sum"}, + NamedAgg(column="B", aggfunc="min"), + ], +) +def test_multifunc_notimplimented(agg_func): + data = DataFrame( + {0: ["a", "a", "b", "b", "a"], 1: [1.0, 2.0, 3.0, 4.0, 5.0]}, columns=[0, 1], + ) + grouped = data.groupby(0) + with pytest.raises(NotImplementedError, match="Numba engine can"): + grouped.agg(agg_func, engine="numba") + + with pytest.raises(NotImplementedError, match="Numba engine can"): + grouped[1].agg(agg_func, engine="numba") From 3d2f9551b4b4be9434632734296d7d29d52171f3 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 22:24:50 -0700 Subject: [PATCH 08/12] Comment that it's only 1 function that is supported --- pandas/core/groupby/groupby.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index d2edd6dab0a89..361598f884702 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -384,7 +384,8 @@ class providing the base-class of operations. - dict of axis labels -> functions, function names or list of such. Can also accept a Numba JIT function with - ``engine='numba'`` specified. + ``engine='numba'`` specified. Only passing a single function is supported + with this engine. If the ``'numba'`` engine is chosen, the function must be a user defined function with ``values`` and ``index`` as the From b4d8dab386df31f6037a6a352e97e086ce5374db Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Aug 2020 22:48:07 -0700 Subject: [PATCH 09/12] Add whatsnew --- doc/source/whatsnew/v1.2.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 42f95d88d74ac..835ccb19b2335 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -151,7 +151,7 @@ Deprecations Performance improvements ~~~~~~~~~~~~~~~~~~~~~~~~ -- +- Performance improvement in :meth:`GroupBy.agg` with the ``numba`` engine (:issue:``) - .. --------------------------------------------------------------------------- From 8f5e9db22d642c5b6f0eb4584864a73d8edab892 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 18 Aug 2020 22:52:26 -0700 Subject: [PATCH 10/12] Add issue number and correct typing --- doc/source/whatsnew/v1.2.0.rst | 2 +- pandas/core/groupby/numba_.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 835ccb19b2335..910a8181283cd 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -151,7 +151,7 @@ Deprecations Performance improvements ~~~~~~~~~~~~~~~~~~~~~~~~ -- Performance improvement in :meth:`GroupBy.agg` with the ``numba`` engine (:issue:``) +- Performance improvement in :meth:`GroupBy.agg` with the ``numba`` engine (:issue:`35759`) - .. --------------------------------------------------------------------------- diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index bf5ff3e5bb5d9..aebe60f797fcd 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -112,7 +112,7 @@ def generate_numba_agg_func( kwargs: Dict[str, Any], func: Callable[..., Scalar], engine_kwargs: Optional[Dict[str, bool]], -): +) -> Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int, int], np.ndarray]: """ Generate a numba jitted agg function specified by values from engine_kwargs. From 09c43092bbec1f8140e21dc27eb675e2facb930e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 19 Aug 2020 22:32:41 -0700 Subject: [PATCH 11/12] Add docstring for _aggregate_with_numba --- pandas/core/groupby/groupby.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 361598f884702..9ac2d5ea99c71 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1055,6 +1055,13 @@ def _cython_agg_general( return self._wrap_aggregated_output(output, index=self.grouper.result_index) def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): + """ + Perform groupby aggregation routine with the numba engine. + + This routine mimics the data splitting routine of the DataSplitter class + to generate the indices of each group in the sorted data and then passes the data + and indices into a Numba jitted function. + """ group_keys = self.grouper._get_group_keys() labels, _, n_groups = self.grouper.group_info sorted_index = get_group_index_sorter(labels, n_groups) From 7234f7e8c40458e6ab7660432b4892b085206210 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 19 Aug 2020 22:35:08 -0700 Subject: [PATCH 12/12] Lint --- pandas/core/groupby/groupby.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 9ac2d5ea99c71..f96b488fb8d0d 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1059,8 +1059,8 @@ def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs) Perform groupby aggregation routine with the numba engine. This routine mimics the data splitting routine of the DataSplitter class - to generate the indices of each group in the sorted data and then passes the data - and indices into a Numba jitted function. + to generate the indices of each group in the sorted data and then passes the + data and indices into a Numba jitted function. """ group_keys = self.grouper._get_group_keys() labels, _, n_groups = self.grouper.group_info