From 4b26301265737a1c9e3529b80098618810b426b0 Mon Sep 17 00:00:00 2001 From: Jeff Reback Date: Tue, 12 Dec 2017 18:17:48 -0500 Subject: [PATCH 1/3] CLN: factor apply out of frame.py --- pandas/core/apply.py | 301 +++++++++++++++++++++++++++++ pandas/core/frame.py | 313 +++++++------------------------ pandas/tests/frame/test_apply.py | 6 +- 3 files changed, 372 insertions(+), 248 deletions(-) create mode 100644 pandas/core/apply.py diff --git a/pandas/core/apply.py b/pandas/core/apply.py new file mode 100644 index 0000000000000..2f43087f7dff9 --- /dev/null +++ b/pandas/core/apply.py @@ -0,0 +1,301 @@ +import numpy as np +from pandas import compat +from pandas._libs import lib +from pandas.core.dtypes.common import ( + is_extension_type, + is_sequence) + +from pandas.io.formats.printing import pprint_thing + + +def frame_apply(obj, func, axis=0, broadcast=False, + raw=False, reduce=None, args=(), **kwds): + """ construct and return a row or column based frame apply object """ + + axis = obj._get_axis_number(axis) + if axis == 0: + klass = FrameRowApply + elif axis == 1: + klass = FrameColumnApply + + return klass(obj, func, broadcast=broadcast, + raw=raw, reduce=reduce, args=args, kwds=kwds) + + +class FrameApply(object): + + def __init__(self, obj, func, broadcast, raw, reduce, args, kwds): + self.obj = obj + self.broadcast = broadcast + self.raw = raw + self.reduce = reduce + self.args = args + + self.ignore_failures = kwds.pop('ignore_failures', False) + self.kwds = kwds + + # curry if needed + if kwds or args and not isinstance(func, np.ufunc): + def f(x): + return func(x, *args, **kwds) + else: + f = func + + self.f = f + + @property + def columns(self): + return self.obj.columns + + @property + def index(self): + return self.obj.index + + @property + def values(self): + return self.obj.values + + @property + def agg_axis(self): + return self.obj._get_agg_axis(self.axis) + + def get_result(self): + """ compute the results """ + + # all empty + if len(self.columns) == 0 and len(self.index) == 0: + return self.apply_empty_result() + + # string dispatch + if isinstance(self.f, compat.string_types): + if self.axis: + self.kwds['axis'] = self.axis + return getattr(self.obj, self.f)(*self.args, **self.kwds) + + # ufunc + elif isinstance(self.f, np.ufunc): + with np.errstate(all='ignore'): + results = self.f(self.values) + return self.obj._constructor(data=results, index=self.index, + columns=self.columns, copy=False) + + # broadcasting + if self.broadcast: + return self.apply_broadcast() + + # one axis empty + if not all(self.obj.shape): + return self.apply_empty_result() + + # raw + if self.raw and not self.obj._is_mixed_type: + return self.apply_raw() + + return self.apply_standard() + + def apply_empty_result(self): + from pandas import Series + reduce = self.reduce + + if reduce is None: + reduce = False + + EMPTY_SERIES = Series([]) + try: + r = self.f(EMPTY_SERIES, *self.args, **self.kwds) + reduce = not isinstance(r, Series) + except Exception: + pass + + if reduce: + return Series(np.nan, index=self.agg_axis) + else: + return self.obj.copy() + + def apply_raw(self): + try: + result = lib.reduce(self.values, self.f, axis=self.axis) + except Exception: + result = np.apply_along_axis(self.f, self.axis, self.values) + + # TODO: mixed type case + from pandas import DataFrame, Series + if result.ndim == 2: + return DataFrame(result, index=self.index, columns=self.columns) + else: + return Series(result, index=self.agg_axis) + + def apply_standard(self): + from pandas import Series + + reduce = self.reduce + if reduce is None: + reduce = True + + # try to reduce first (by default) + # this only matters if the reduction in values is of different dtype + # e.g. if we want to apply to a SparseFrame, then can't directly reduce + if reduce: + values = self.values + + # we cannot reduce using non-numpy dtypes, + # as demonstrated in gh-12244 + if not is_extension_type(values): + + # Create a dummy Series from an empty array + index = self.obj._get_axis(self.axis) + empty_arr = np.empty(len(index), dtype=values.dtype) + + dummy = Series(empty_arr, index=index, dtype=values.dtype) + + try: + labels = self.agg_axis + result = lib.reduce(values, self.f, + axis=self.axis, + dummy=dummy, + labels=labels) + return Series(result, index=labels) + except Exception: + pass + + # compute the result using the series generator + results, res_index, res_columns = self._apply_series_generator() + + # wrap results + return self.wrap_results(results, res_index, res_columns) + + def _apply_series_generator(self): + series_gen = self.series_generator + res_index = self.result_index + res_columns = self.result_columns + + i = None + keys = [] + results = {} + if self.ignore_failures: + successes = [] + for i, v in enumerate(series_gen): + try: + results[i] = self.f(v) + keys.append(v.name) + successes.append(i) + except Exception: + pass + + # so will work with MultiIndex + if len(successes) < len(res_index): + res_index = res_index.take(successes) + + else: + try: + for i, v in enumerate(series_gen): + results[i] = self.f(v) + keys.append(v.name) + except Exception as e: + if hasattr(e, 'args'): + + # make sure i is defined + if i is not None: + k = res_index[i] + e.args = e.args + ('occurred at index %s' % + pprint_thing(k), ) + raise + + return results, res_index, res_columns + + def wrap_results(self, results, res_index, res_columns): + from pandas import Series + + if len(results) > 0 and is_sequence(results[0]): + if not isinstance(results[0], Series): + index = res_columns + else: + index = None + + result = self.obj._constructor(data=results, index=index) + result.columns = res_index + + if self.axis == 1: + result = result.T + result = result._convert( + datetime=True, timedelta=True, copy=False) + + else: + + result = Series(results) + result.index = res_index + + return result + + def _apply_broadcast(self, target): + result_values = np.empty_like(target.values) + columns = target.columns + for i, col in enumerate(columns): + result_values[:, i] = self.f(target[col]) + + result = self.obj._constructor(result_values, index=target.index, + columns=target.columns) + return result + + +class FrameRowApply(FrameApply): + axis = 0 + + def get_result(self): + + # dispatch to agg + if isinstance(self.f, (list, dict)): + return self.obj.aggregate(self.f, axis=self.axis, + *self.args, **self.kwds) + + return super(FrameRowApply, self).get_result() + + def apply_broadcast(self): + return self._apply_broadcast(self.obj) + + @property + def series_generator(self): + return (self.obj._ixs(i, axis=1) + for i in range(len(self.columns))) + + @property + def result_index(self): + return self.columns + + @property + def result_columns(self): + return self.index + + +class FrameColumnApply(FrameApply): + axis = 1 + + def __init__(self, obj, func, broadcast, raw, reduce, args, kwds): + super(FrameColumnApply, self).__init__(obj, func, broadcast, + raw, reduce, args, kwds) + + # skip if we are mixed datelike and trying reduce across axes + # GH6125 + if self.reduce: + if self.obj._is_mixed_type and self.obj._is_datelike_mixed_type: + self.reduce = False + + def apply_broadcast(self): + return self._apply_broadcast(self.obj.T).T + + @property + def series_generator(self): + from pandas import Series + dtype = object if self.obj._is_mixed_type else None + return (Series._from_array(arr, index=self.columns, name=name, + dtype=dtype) + for i, (arr, name) in enumerate(zip(self.values, + self.index))) + + @property + def result_index(self): + return self.index + + @property + def result_columns(self): + return self.columns diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 5f323d0f040bc..e15e0310edb07 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -4808,256 +4808,79 @@ def aggregate(self, func, axis=0, *args, **kwargs): agg = aggregate - def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, - args=(), **kwds): - """ - Applies function along input axis of DataFrame. - - Objects passed to functions are Series objects having index - either the DataFrame's index (axis=0) or the columns (axis=1). - Return type depends on whether passed function aggregates, or the - reduce argument if the DataFrame is empty. - - Parameters - ---------- - func : function - Function to apply to each column/row - axis : {0 or 'index', 1 or 'columns'}, default 0 - * 0 or 'index': apply function to each column - * 1 or 'columns': apply function to each row - broadcast : boolean, default False - For aggregation functions, return object of same size with values - propagated - raw : boolean, default False - If False, convert each row or column into a Series. If raw=True the - passed function will receive ndarray objects instead. If you are - just applying a NumPy reduction function this will achieve much - better performance - reduce : boolean or None, default None - Try to apply reduction procedures. If the DataFrame is empty, - apply will use reduce to determine whether the result should be a - Series or a DataFrame. If reduce is None (the default), apply's - return value will be guessed by calling func an empty Series (note: - while guessing, exceptions raised by func will be ignored). If - reduce is True a Series will always be returned, and if False a - DataFrame will always be returned. - args : tuple - Positional arguments to pass to function in addition to the - array/series - Additional keyword arguments will be passed as keywords to the function - - Notes - ----- - In the current implementation apply calls func twice on the - first column/row to decide whether it can take a fast or slow - code path. This can lead to unexpected behavior if func has - side-effects, as they will take effect twice for the first - column/row. - - Examples - -------- - >>> df.apply(numpy.sqrt) # returns DataFrame - >>> df.apply(numpy.sum, axis=0) # equiv to df.sum(0) - >>> df.apply(numpy.sum, axis=1) # equiv to df.sum(1) - - See also - -------- - DataFrame.applymap: For elementwise operations - DataFrame.aggregate: only perform aggregating type operations - DataFrame.transform: only perform transformating type operations - - Returns - ------- - applied : Series or DataFrame - """ - axis = self._get_axis_number(axis) - ignore_failures = kwds.pop('ignore_failures', False) - - # dispatch to agg - if axis == 0 and isinstance(func, (list, dict)): - return self.aggregate(func, axis=axis, *args, **kwds) - - if len(self.columns) == 0 and len(self.index) == 0: - return self._apply_empty_result(func, axis, reduce, *args, **kwds) - - # if we are a string, try to dispatch - if isinstance(func, compat.string_types): - if axis: - kwds['axis'] = axis - return getattr(self, func)(*args, **kwds) - - if kwds or args and not isinstance(func, np.ufunc): - def f(x): - return func(x, *args, **kwds) - else: - f = func - - if isinstance(f, np.ufunc): - with np.errstate(all='ignore'): - results = f(self.values) - return self._constructor(data=results, index=self.index, - columns=self.columns, copy=False) - else: - if not broadcast: - if not all(self.shape): - return self._apply_empty_result(func, axis, reduce, *args, - **kwds) - - if raw and not self._is_mixed_type: - return self._apply_raw(f, axis) - else: - if reduce is None: - reduce = True - return self._apply_standard( - f, axis, - reduce=reduce, - ignore_failures=ignore_failures) - else: - return self._apply_broadcast(f, axis) + _shared_docs['apply'] = (""" + Applies function along input axis of DataFrame. - def _apply_empty_result(self, func, axis, reduce, *args, **kwds): - if reduce is None: - reduce = False - try: - reduce = not isinstance(func(_EMPTY_SERIES, *args, **kwds), - Series) - except Exception: - pass + Objects passed to functions are Series objects having index + either the DataFrame's index (axis=0) or the columns (axis=1). + Return type depends on whether passed function aggregates, or the + reduce argument if the DataFrame is empty. - if reduce: - return Series(np.nan, index=self._get_agg_axis(axis)) - else: - return self.copy() - - def _apply_raw(self, func, axis): - try: - result = lib.reduce(self.values, func, axis=axis) - except Exception: - result = np.apply_along_axis(func, axis, self.values) - - # TODO: mixed type case - if result.ndim == 2: - return DataFrame(result, index=self.index, columns=self.columns) - else: - return Series(result, index=self._get_agg_axis(axis)) - - def _apply_standard(self, func, axis, ignore_failures=False, reduce=True): - - # skip if we are mixed datelike and trying reduce across axes - # GH6125 - if (reduce and axis == 1 and self._is_mixed_type and - self._is_datelike_mixed_type): - reduce = False - - # try to reduce first (by default) - # this only matters if the reduction in values is of different dtype - # e.g. if we want to apply to a SparseFrame, then can't directly reduce - if reduce: - values = self.values - - # we cannot reduce using non-numpy dtypes, - # as demonstrated in gh-12244 - if not is_extension_type(values): - # Create a dummy Series from an empty array - index = self._get_axis(axis) - empty_arr = np.empty(len(index), dtype=values.dtype) - dummy = Series(empty_arr, index=self._get_axis(axis), - dtype=values.dtype) - - try: - labels = self._get_agg_axis(axis) - result = lib.reduce(values, func, axis=axis, dummy=dummy, - labels=labels) - return Series(result, index=labels) - except Exception: - pass - - dtype = object if self._is_mixed_type else None - if axis == 0: - series_gen = (self._ixs(i, axis=1) - for i in range(len(self.columns))) - res_index = self.columns - res_columns = self.index - elif axis == 1: - res_index = self.index - res_columns = self.columns - values = self.values - series_gen = (Series._from_array(arr, index=res_columns, name=name, - dtype=dtype) - for i, (arr, name) in enumerate(zip(values, - res_index))) - else: # pragma : no cover - raise AssertionError('Axis must be 0 or 1, got %s' % str(axis)) - - i = None - keys = [] - results = {} - if ignore_failures: - successes = [] - for i, v in enumerate(series_gen): - try: - results[i] = func(v) - keys.append(v.name) - successes.append(i) - except Exception: - pass - # so will work with MultiIndex - if len(successes) < len(res_index): - res_index = res_index.take(successes) - else: - try: - for i, v in enumerate(series_gen): - results[i] = func(v) - keys.append(v.name) - except Exception as e: - if hasattr(e, 'args'): - # make sure i is defined - if i is not None: - k = res_index[i] - e.args = e.args + ('occurred at index %s' % - pprint_thing(k), ) - raise - - if len(results) > 0 and is_sequence(results[0]): - if not isinstance(results[0], Series): - index = res_columns - else: - index = None - - result = self._constructor(data=results, index=index) - result.columns = res_index - - if axis == 1: - result = result.T - result = result._convert(datetime=True, timedelta=True, copy=False) - - else: - - result = Series(results) - result.index = res_index - - return result - - def _apply_broadcast(self, func, axis): - if axis == 0: - target = self - elif axis == 1: - target = self.T - else: # pragma: no cover - raise AssertionError('Axis must be 0 or 1, got %s' % axis) + Parameters + ---------- + func : function + Function to apply to each column/row + axis : {0 or 'index', 1 or 'columns'}, default 0 + * 0 or 'index': apply function to each column + * 1 or 'columns': apply function to each row + broadcast : boolean, default False + For aggregation functions, return object of same size with values + propagated + raw : boolean, default False + If False, convert each row or column into a Series. If raw=True the + passed function will receive ndarray objects instead. If you are + just applying a NumPy reduction function this will achieve much + better performance + reduce : boolean or None, default None + Try to apply reduction procedures. If the DataFrame is empty, + apply will use reduce to determine whether the result should be a + Series or a DataFrame. If reduce is None (the default), apply's + return value will be guessed by calling func an empty Series (note: + while guessing, exceptions raised by func will be ignored). If + reduce is True a Series will always be returned, and if False a + DataFrame will always be returned. + args : tuple + Positional arguments to pass to function in addition to the + array/series + Additional keyword arguments will be passed as keywords to the function + + Notes + ----- + In the current implementation apply calls func twice on the + first column/row to decide whether it can take a fast or slow + code path. This can lead to unexpected behavior if func has + side-effects, as they will take effect twice for the first + column/row. - result_values = np.empty_like(target.values) - columns = target.columns - for i, col in enumerate(columns): - result_values[:, i] = func(target[col]) + Examples + -------- + >>> df.apply(numpy.sqrt) # returns DataFrame + >>> df.apply(numpy.sum, axis=0) # equiv to df.sum(0) + >>> df.apply(numpy.sum, axis=1) # equiv to df.sum(1) - result = self._constructor(result_values, index=target.index, - columns=target.columns) + See also + -------- + DataFrame.applymap: For elementwise operations + DataFrame.aggregate: only perform aggregating type operations + DataFrame.transform: only perform transformating type operations - if axis == 1: - result = result.T + Returns + ------- + applied : Series or DataFrame + """) - return result + @Appender(_shared_docs['apply']) + def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, + args=(), **kwds): + from pandas.core.apply import frame_apply + op = frame_apply(self, + func=func, + axis=axis, + broadcast=broadcast, + raw=raw, + reduce=reduce, + args=args, **kwds) + return op.get_result() def applymap(self, func): """ @@ -6189,8 +6012,6 @@ def isin(self, values): ops.add_flex_arithmetic_methods(DataFrame, **ops.frame_flex_funcs) ops.add_special_arithmetic_methods(DataFrame, **ops.frame_special_funcs) -_EMPTY_SERIES = Series([]) - def _arrays_to_mgr(arrays, arr_names, index, columns, dtype=None): """ diff --git a/pandas/tests/frame/test_apply.py b/pandas/tests/frame/test_apply.py index ab2e810d77634..65dd166e1f6a8 100644 --- a/pandas/tests/frame/test_apply.py +++ b/pandas/tests/frame/test_apply.py @@ -13,6 +13,7 @@ Timestamp, compat) import pandas as pd from pandas.core.dtypes.dtypes import CategoricalDtype +from pandas.core.apply import frame_apply from pandas.util.testing import (assert_series_equal, assert_frame_equal) import pandas.util.testing as tm @@ -153,8 +154,9 @@ def test_apply_axis1(self): assert tapplied[d] == np.mean(self.frame.xs(d)) def test_apply_ignore_failures(self): - result = self.mixed_frame._apply_standard(np.mean, 0, - ignore_failures=True) + result = frame_apply(self.mixed_frame, + np.mean, 0, + ignore_failures=True).apply_standard() expected = self.mixed_frame._get_numeric_data().apply(np.mean) assert_series_equal(result, expected) From ad12c643c4d9c3ce9ea1e6c539077d58fe9c7b4e Mon Sep 17 00:00:00 2001 From: Jeff Reback Date: Tue, 12 Dec 2017 21:26:11 -0500 Subject: [PATCH 2/3] fix doc --- pandas/core/frame.py | 113 +++++++++++++++++++++---------------------- 1 file changed, 55 insertions(+), 58 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index e15e0310edb07..753c623b2de4c 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -4808,70 +4808,67 @@ def aggregate(self, func, axis=0, *args, **kwargs): agg = aggregate - _shared_docs['apply'] = (""" - Applies function along input axis of DataFrame. + def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, + args=(), **kwds): + """Applies function along input axis of DataFrame. - Objects passed to functions are Series objects having index - either the DataFrame's index (axis=0) or the columns (axis=1). - Return type depends on whether passed function aggregates, or the - reduce argument if the DataFrame is empty. + Objects passed to functions are Series objects having index + either the DataFrame's index (axis=0) or the columns (axis=1). + Return type depends on whether passed function aggregates, or the + reduce argument if the DataFrame is empty. - Parameters - ---------- - func : function - Function to apply to each column/row - axis : {0 or 'index', 1 or 'columns'}, default 0 - * 0 or 'index': apply function to each column - * 1 or 'columns': apply function to each row - broadcast : boolean, default False - For aggregation functions, return object of same size with values - propagated - raw : boolean, default False - If False, convert each row or column into a Series. If raw=True the - passed function will receive ndarray objects instead. If you are - just applying a NumPy reduction function this will achieve much - better performance - reduce : boolean or None, default None - Try to apply reduction procedures. If the DataFrame is empty, - apply will use reduce to determine whether the result should be a - Series or a DataFrame. If reduce is None (the default), apply's - return value will be guessed by calling func an empty Series (note: - while guessing, exceptions raised by func will be ignored). If - reduce is True a Series will always be returned, and if False a - DataFrame will always be returned. - args : tuple - Positional arguments to pass to function in addition to the - array/series - Additional keyword arguments will be passed as keywords to the function - - Notes - ----- - In the current implementation apply calls func twice on the - first column/row to decide whether it can take a fast or slow - code path. This can lead to unexpected behavior if func has - side-effects, as they will take effect twice for the first - column/row. + Parameters + ---------- + func : function + Function to apply to each column/row + axis : {0 or 'index', 1 or 'columns'}, default 0 + * 0 or 'index': apply function to each column + * 1 or 'columns': apply function to each row + broadcast : boolean, default False + For aggregation functions, return object of same size with values + propagated + raw : boolean, default False + If False, convert each row or column into a Series. If raw=True the + passed function will receive ndarray objects instead. If you are + just applying a NumPy reduction function this will achieve much + better performance + reduce : boolean or None, default None + Try to apply reduction procedures. If the DataFrame is empty, + apply will use reduce to determine whether the result should be a + Series or a DataFrame. If reduce is None (the default), apply's + return value will be guessed by calling func an empty Series (note: + while guessing, exceptions raised by func will be ignored). If + reduce is True a Series will always be returned, and if False a + DataFrame will always be returned. + args : tuple + Positional arguments to pass to function in addition to the + array/series + Additional keyword arguments will be passed as keywords to the function - Examples - -------- - >>> df.apply(numpy.sqrt) # returns DataFrame - >>> df.apply(numpy.sum, axis=0) # equiv to df.sum(0) - >>> df.apply(numpy.sum, axis=1) # equiv to df.sum(1) + Notes + ----- + In the current implementation apply calls func twice on the + first column/row to decide whether it can take a fast or slow + code path. This can lead to unexpected behavior if func has + side-effects, as they will take effect twice for the first + column/row. - See also - -------- - DataFrame.applymap: For elementwise operations - DataFrame.aggregate: only perform aggregating type operations - DataFrame.transform: only perform transformating type operations + Examples + -------- + >>> df.apply(numpy.sqrt) # returns DataFrame + >>> df.apply(numpy.sum, axis=0) # equiv to df.sum(0) + >>> df.apply(numpy.sum, axis=1) # equiv to df.sum(1) - Returns - ------- - applied : Series or DataFrame - """) + See also + -------- + DataFrame.applymap: For elementwise operations + DataFrame.aggregate: only perform aggregating type operations + DataFrame.transform: only perform transformating type operations - @Appender(_shared_docs['apply']) - def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, - args=(), **kwds): + Returns + ------- + applied : Series or DataFrame + """ from pandas.core.apply import frame_apply op = frame_apply(self, func=func, From 0f5e6ae7c77447cf919ab47573897567ae560c06 Mon Sep 17 00:00:00 2001 From: Jeff Reback Date: Wed, 13 Dec 2017 09:00:18 -0500 Subject: [PATCH 3/3] sparse --- pandas/core/sparse/frame.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pandas/core/sparse/frame.py b/pandas/core/sparse/frame.py index 36a18d8f8b4a0..05f39a8caa6f6 100644 --- a/pandas/core/sparse/frame.py +++ b/pandas/core/sparse/frame.py @@ -861,11 +861,17 @@ def apply(self, func, axis=0, broadcast=False, reduce=False): new_series, index=self.index, columns=self.columns, default_fill_value=self._default_fill_value, default_kind=self._default_kind).__finalize__(self) - else: - if not broadcast: - return self._apply_standard(func, axis, reduce=reduce) - else: - return self._apply_broadcast(func, axis) + + from pandas.core.apply import frame_apply + op = frame_apply(self, + func=func, + axis=axis, + reduce=reduce) + + if broadcast: + return op.apply_broadcast() + + return op.apply_standard() def applymap(self, func): """