Skip to content

KeyError when using distributed scheduler with __array_function__ #5224

@gforsyth

Description

@gforsyth

The _get_computation_codes introduced in #5001 causes errors if relying on numpy ducktyping on a dask.dataframe

What happened: I get a KeyError when I use numpy.where on a column of a dask.dataframe

What you expected to happen: Forced evaluation of column

Minimal Complete Verifiable Example:
No issue (without distributed scheduler)

In [1]: import dask.datasets
In [2]: ddf = dask.datasets.timeseries(seed=123)
In [3]: import numpy
In [4]: numpy.where(ddf.id > 1000)
Out[4]: (array([      1,       3,       6, ..., 2591988, 2591990, 2591997]),)

with distributed scheduler

In [1]: from distributed.client import Client
In [2]: import numpy
In [3]: import dask.datasets
In [4]: client = Client()
In [5]: ddf = dask.datasets.timeseries(seed=123)
In [6]: numpy.where(ddf.id > 1000)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-6-6492628d3a98> in <module>
----> 1 numpy.where(ddf.id > 1000)

<__array_function__ internals> in where(*args, **kwargs)

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/dask/dataframe/core.py in __array__(self, dtype, **kwargs)
    413 
    414     def __array__(self, dtype=None, **kwargs):
--> 415         self._computed = self.compute()
    416         x = np.array(self._computed)
    417         return x

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2650         Client.compute : Compute asynchronous collections
   2651         """
-> 2652         futures = self._graph_to_futures(
   2653             dsk,
   2654             keys=set(flatten([keys])),

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2589                     "fifo_timeout": fifo_timeout,
   2590                     "actors": actors,
-> 2591                     "code": self._get_computation_code(),
   2592                 }
   2593             )

~/miniforge3/envs/dasklatest/lib/python3.8/site-packages/distributed/client.py in _get_computation_code()
   2524             breakpoint()
   2525             if pattern is None or (
-> 2526                 not pattern.match(fr.f_globals["__name__"])
   2527                 and fr.f_code.co_name not in ("<listcomp>", "<dictcomp>")
   2528             ):

KeyError: '__name__'

Anything else we need to know?:

Environment:

  • Dask version: 2021.8.0
  • Python version: 3.8.10
  • Operating System: OSX
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions