Skip to content

ENH: Pandas backend doesn't handle udf with two parameters correctly with trailing_window #1998

@icexelloss

Description

@icexelloss

Reproduce:

import ibis
import pandas as pd
import numpy as np

from ibis.pandas.udf import udf
import ibis.expr.datatypes as dt

client = ibis.pandas.connect({'table': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'key': ['a', 'a', 'a']})})
t = client.table('table')
w = ibis.trailing_window(preceding=1, order_by='key', group_by='key')
#w = ibis.window(group_by='key')


@udf.reduction(input_type=[dt.double, dt.double], output_type=dt.double)
def my_average(v, w):
    return np.average(v, weights=w)

t = t.mutate(new_col=my_average(t.a, t.b).over(w))

t.execute()

This throws exception:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
    688             try:
--> 689                 result = self._python_apply_general(f)
    690             except Exception:

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
    706         keys, values, mutated = self.grouper.apply(f, self._selected_obj,
--> 707                                                    self.axis)
    708 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/ops.py in apply(self, f, data, axis)
    189             group_axes = _get_axes(group)
--> 190             res = f(group)
    191             if not _is_indexed_like(res, group_axes):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in f(x, name, *args)
    797 
--> 798             return x.apply(name, *args, **kwargs)
    799 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1702         return super(Rolling, self).apply(
-> 1703             func, raw=raw, args=args, kwargs=kwargs)
   1704 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1011         return self._apply(f, func, args=args, kwargs=kwargs,
-> 1012                            center=False, raw=raw)
   1013 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in _apply(self, func, name, window, center, check_minp, **kwargs)
    879                 else:
--> 880                     result = calc(values)
    881 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in calc(x)
    873                     return func(x, window, min_periods=self.min_periods,
--> 874                                 closed=self.closed)
    875 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in f(arg, window, min_periods, closed)
   1008                 arg, window, minp, indexi,
-> 1009                 closed, offset, func, raw, args, kwargs)
   1010 

pandas/_libs/window.pyx in pandas._libs.window.roll_generic()

~/workspace/ibis/ibis/pandas/aggcontext.py in apply(data, function, args, kwargs)
    261     ):
--> 262         return function(data, *args, **kwargs)
    263 

~/workspace/ibis/ibis/pandas/udf.py in aggregator(first, *rest, **kwargs)
    548                     )
--> 549                     return func(*args, **kwargs)
    550 

<ipython-input-57-0aeaa0468489> in my_average(v, w)
     15 def my_average(v, w):
---> 16     return np.average(v, weights=w)
     17 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/numpy/lib/function_base.py in average(a, axis, weights, returned)
    405                 raise TypeError(
--> 406                     "Axis must be specified when shapes of a and weights "
    407                     "differ.")

TypeError: Axis must be specified when shapes of a and weights differ.

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
<ipython-input-57-0aeaa0468489> in <module>
     18 t = t.mutate(new_col=my_average(t.a, t.b).over(w))
     19 
---> 20 t.execute()

~/workspace/ibis/ibis/expr/types.py in execute(self, limit, params, **kwargs)
    200         from ibis.client import execute
    201 
--> 202         return execute(self, limit=limit, params=params, **kwargs)
    203 
    204     def compile(self, limit=None, params=None):

~/workspace/ibis/ibis/client.py in execute(expr, limit, params, **kwargs)
    283 def execute(expr, limit='default', params=None, **kwargs):
    284     (backend,) = validate_backends(list(find_backends(expr)))
--> 285     return backend.execute(expr, limit=limit, params=params, **kwargs)
    286 
    287 

~/workspace/ibis/ibis/pandas/client.py in execute(self, query, params, limit, **kwargs)
    370                 )
    371             )
--> 372         return execute_and_reset(query, params=params, **kwargs)
    373 
    374     def compile(self, expr, *args, **kwargs):

~/workspace/ibis/ibis/pandas/core.py in execute_and_reset(expr, params, scope, aggcontext, **kwargs)
    412     """
    413     result = execute(
--> 414         expr, params=params, scope=scope, aggcontext=aggcontext, **kwargs
    415     )
    416     if isinstance(result, pd.DataFrame):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/core.py in main_execute(expr, params, scope, aggcontext, **kwargs)
    369 
    370     new_scope = toolz.merge(scope, params)
--> 371     return execute_with_scope(expr, new_scope, aggcontext=aggcontext, **kwargs)
    372 
    373 

~/workspace/ibis/ibis/pandas/core.py in execute_with_scope(expr, scope, aggcontext, clients, **kwargs)
    188             **kwargs,
    189         ),
--> 190         **kwargs,
    191     )
    192 

~/workspace/ibis/ibis/pandas/core.py in execute_until_in_scope(expr, scope, aggcontext, clients, post_execute_, **kwargs)
    224         post_execute_=post_execute_,
    225         clients=clients,
--> 226         **kwargs,
    227     )
    228     new_scope = toolz.merge(

~/workspace/ibis/ibis/pandas/core.py in execute_bottom_up(expr, scope, aggcontext, post_execute_, clients, **kwargs)
    317         aggcontext=aggcontext,
    318         clients=clients,
--> 319         **kwargs,
    320     )
    321     computed = post_execute_(op, result)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/execution/selection.py in execute_selection_dataframe(op, data, scope, **kwargs)
    289         for selection in selections:
    290             pandas_object = compute_projection(
--> 291                 selection, op, data, scope=scope, **kwargs
    292             )
    293             data_pieces.append(pandas_object)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/execution/selection.py in compute_projection_column_expr(expr, parent, data, scope, **kwargs)
    113 
    114     new_scope = toolz.merge(scope, additional_scope)
--> 115     result = execute(expr, scope=new_scope, **kwargs)
    116     assert result_name is not None, 'Column selection name is None'
    117     if np.isscalar(result):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/core.py in main_execute(expr, params, scope, aggcontext, **kwargs)
    369 
    370     new_scope = toolz.merge(scope, params)
--> 371     return execute_with_scope(expr, new_scope, aggcontext=aggcontext, **kwargs)
    372 
    373 

~/workspace/ibis/ibis/pandas/core.py in execute_with_scope(expr, scope, aggcontext, clients, **kwargs)
    188             **kwargs,
    189         ),
--> 190         **kwargs,
    191     )
    192 

~/workspace/ibis/ibis/pandas/core.py in execute_until_in_scope(expr, scope, aggcontext, clients, post_execute_, **kwargs)
    224         post_execute_=post_execute_,
    225         clients=clients,
--> 226         **kwargs,
    227     )
    228     new_scope = toolz.merge(

~/workspace/ibis/ibis/pandas/core.py in execute_bottom_up(expr, scope, aggcontext, post_execute_, clients, **kwargs)
    317         aggcontext=aggcontext,
    318         clients=clients,
--> 319         **kwargs,
    320     )
    321     computed = post_execute_(op, result)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/execution/window.py in execute_window_op(op, data, window, scope, aggcontext, clients, **kwargs)
    202         aggcontext=aggcontext,
    203         clients=clients,
--> 204         **kwargs,
    205     )
    206     series = post_process(result, data, ordering_keys, grouping_keys)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/core.py in main_execute(expr, params, scope, aggcontext, **kwargs)
    369 
    370     new_scope = toolz.merge(scope, params)
--> 371     return execute_with_scope(expr, new_scope, aggcontext=aggcontext, **kwargs)
    372 
    373 

~/workspace/ibis/ibis/pandas/core.py in execute_with_scope(expr, scope, aggcontext, clients, **kwargs)
    188             **kwargs,
    189         ),
--> 190         **kwargs,
    191     )
    192 

~/workspace/ibis/ibis/pandas/core.py in execute_until_in_scope(expr, scope, aggcontext, clients, post_execute_, **kwargs)
    224         post_execute_=post_execute_,
    225         clients=clients,
--> 226         **kwargs,
    227     )
    228     new_scope = toolz.merge(

~/workspace/ibis/ibis/pandas/core.py in execute_bottom_up(expr, scope, aggcontext, post_execute_, clients, **kwargs)
    317         aggcontext=aggcontext,
    318         clients=clients,
--> 319         **kwargs,
    320     )
    321     computed = post_execute_(op, result)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    276             self._cache[types] = func
    277         try:
--> 278             return func(*args, **kwargs)
    279 
    280         except MDNotImplementedError:

~/workspace/ibis/ibis/pandas/udf.py in execute_udaf_node_groupby(op, *args, **kwargs)
    549                     return func(*args, **kwargs)
    550 
--> 551                 result = aggcontext.agg(args[0], aggregator, *iters, **kwargs)
    552                 return result
    553 

~/workspace/ibis/ibis/pandas/aggcontext.py in agg(self, grouped_data, function, *args, **kwargs)
    408                 "ignore", message=".+raw=True.+", category=FutureWarning
    409             )
--> 410             result = method(windowed)
    411         index = result.index
    412         result.index = pd.MultiIndex.from_arrays(

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1701     def apply(self, func, raw=None, args=(), kwargs={}):
   1702         return super(Rolling, self).apply(
-> 1703             func, raw=raw, args=args, kwargs=kwargs)
   1704 
   1705     @Substitution(name='rolling')

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1010 
   1011         return self._apply(f, func, args=args, kwargs=kwargs,
-> 1012                            center=False, raw=raw)
   1013 
   1014     def sum(self, *args, **kwargs):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in _apply(self, func, name, window, center, check_minp, **kwargs)
    798             return x.apply(name, *args, **kwargs)
    799 
--> 800         return self._groupby.apply(f)
    801 
    802 

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/generic.py in apply(self, func, *args, **kwargs)
    747                       examples=_apply_docs['series_examples']))
    748     def apply(self, func, *args, **kwargs):
--> 749         return super(SeriesGroupBy, self).apply(func, *args, **kwargs)
    750 
    751     @Substitution(see_also=_agg_see_also_doc,

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py in apply(self, func, *args, **kwargs)
    699 
    700                 with _group_selection_context(self):
--> 701                     return self._python_apply_general(f)
    702 
    703         return result

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/groupby.py in _python_apply_general(self, f)
    705     def _python_apply_general(self, f):
    706         keys, values, mutated = self.grouper.apply(f, self._selected_obj,
--> 707                                                    self.axis)
    708 
    709         return self._wrap_applied_output(

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/groupby/ops.py in apply(self, f, data, axis)
    188             # group might be modified
    189             group_axes = _get_axes(group)
--> 190             res = f(group)
    191             if not _is_indexed_like(res, group_axes):
    192                 mutated = True

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in f(x, name, *args)
    796                 return getattr(x, name)(*args, **kwargs)
    797 
--> 798             return x.apply(name, *args, **kwargs)
    799 
    800         return self._groupby.apply(f)

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1701     def apply(self, func, raw=None, args=(), kwargs={}):
   1702         return super(Rolling, self).apply(
-> 1703             func, raw=raw, args=args, kwargs=kwargs)
   1704 
   1705     @Substitution(name='rolling')

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in apply(self, func, raw, args, kwargs)
   1010 
   1011         return self._apply(f, func, args=args, kwargs=kwargs,
-> 1012                            center=False, raw=raw)
   1013 
   1014     def sum(self, *args, **kwargs):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in _apply(self, func, name, window, center, check_minp, **kwargs)
    878                     result = np.apply_along_axis(calc, self.axis, values)
    879                 else:
--> 880                     result = calc(values)
    881 
    882             if center:

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in calc(x)
    872                 def calc(x):
    873                     return func(x, window, min_periods=self.min_periods,
--> 874                                 closed=self.closed)
    875 
    876             with np.errstate(all='ignore'):

~/miniconda3/envs/ibis-dev/lib/python3.7/site-packages/pandas/core/window.py in f(arg, window, min_periods, closed)
   1007             return libwindow.roll_generic(
   1008                 arg, window, minp, indexi,
-> 1009                 closed, offset, func, raw, args, kwargs)
   1010 
   1011         return self._apply(f, func, args=args, kwargs=kwargs,

pandas/_libs/window.pyx in pandas._libs.window.roll_generic()

~/workspace/ibis/ibis/pandas/aggcontext.py in apply(data, function, args, kwargs)
    260         kwargs=kwargs if kwargs is not None else {},
    261     ):
--> 262         return function(data, *args, **kwargs)
    263 
    264     return apply

~/workspace/ibis/ibis/pandas/udf.py in aggregator(first, *rest, **kwargs)
    545                     # TODO: might be inefficient to do this on every call
    546                     args, kwargs = arguments_from_signature(
--> 547                         funcsig, first, *map(next, rest), **kwargs
    548                     )
    549                     return func(*args, **kwargs)

~/workspace/ibis/ibis/pandas/udf.py in arguments_from_signature(signature, *args, **kwargs)
     89     new_kwargs = {
     90         k: meta_kwargs[k]
---> 91         for k in remaining_parameters
     92         if k in signature.parameters
     93         if signature.parameters[k].kind

~/workspace/ibis/ibis/pandas/udf.py in <dictcomp>(.0)
     95             Parameter.KEYWORD_ONLY,
     96             Parameter.POSITIONAL_OR_KEYWORD,
---> 97             Parameter.VAR_KEYWORD,
     98         }
     99     }

KeyError: 'w'

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions