Skip to content

apply_ufunc: Add meta kwarg + bump dask to 2.2 #3660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/requirements/py36-min-all-deps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dependencies:
- cfgrib=0.9
- cftime=1.0
- coveralls
- dask=1.2
- distributed=1.27
- dask=2.2
- distributed=2.2
- flake8
- h5netcdf=0.7
- h5py=2.9 # Policy allows for 2.10, but it's a conflict-fest
Expand Down
7 changes: 6 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ v0.15.0 (unreleased)

Breaking changes
~~~~~~~~~~~~~~~~
- Bumped minimum ``dask`` version to 2.2.
- Remove ``compat`` and ``encoding`` kwargs from ``DataArray``, which
have been deprecated since 0.12. (:pull:`3650`).
Instead, specify the encoding when writing to disk or set
Expand Down Expand Up @@ -50,6 +51,8 @@ New Features
- Added the ``count`` reduction method to both :py:class:`~core.rolling.DatasetCoarsen`
and :py:class:`~core.rolling.DataArrayCoarsen` objects. (:pull:`3500`)
By `Deepak Cherian <https://github.com/dcherian>`_
- Add ``meta`` kwarg to :py:func:`~xarray.apply_ufunc`; this is passed on to
:py:meth:`dask.array.blockwise`. (:pull:`3660`) By `Deepak Cherian <https://github.com/dcherian>`_.
- Add `attrs_file` option in :py:func:`~xarray.open_mfdataset` to choose the
source file for global attributes in a multi-file dataset (:issue:`2382`,
:pull:`3498`) by `Julien Seguinot <https://github.com/juseg>_`.
Expand All @@ -63,7 +66,9 @@ New Features

Bug fixes
~~~~~~~~~

- Applying a user-defined function that adds new dimensions using :py:func:`apply_ufunc`
and ``vectorize=True`` now works with ``dask > 2.0``. (:issue:`3574`, :pull:`3660`).
By `Deepak Cherian <https://github.com/dcherian>`_.
- Fix :py:meth:`xarray.combine_by_coords` to allow for combining incomplete
hypercubes of Datasets (:issue:`3648`). By `Ian Bolliger
<https://github.com/bolliger32>`_.
Expand Down
23 changes: 22 additions & 1 deletion xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ def apply_variable_ufunc(
output_dtypes=None,
output_sizes=None,
keep_attrs=False,
meta=None,
):
"""Apply a ndarray level function over Variable and/or ndarray objects.
"""
Expand Down Expand Up @@ -590,6 +591,7 @@ def func(*arrays):
signature,
output_dtypes,
output_sizes,
meta,
)

elif dask == "allowed":
Expand Down Expand Up @@ -648,7 +650,14 @@ def func(*arrays):


def _apply_blockwise(
func, args, input_dims, output_dims, signature, output_dtypes, output_sizes=None
func,
args,
input_dims,
output_dims,
signature,
output_dtypes,
output_sizes=None,
meta=None,
):
import dask.array

Expand Down Expand Up @@ -720,6 +729,7 @@ def _apply_blockwise(
dtype=dtype,
concatenate=True,
new_axes=output_sizes,
meta=meta,
)


Expand Down Expand Up @@ -761,6 +771,7 @@ def apply_ufunc(
dask: str = "forbidden",
output_dtypes: Sequence = None,
output_sizes: Mapping[Any, int] = None,
meta: Any = None,
) -> Any:
"""Apply a vectorized function for unlabeled arrays on xarray objects.

Expand Down Expand Up @@ -857,6 +868,9 @@ def apply_ufunc(
Optional mapping from dimension names to sizes for outputs. Only used
if dask='parallelized' and new dimensions (not found on inputs) appear
on outputs.
meta : optional
Size-0 object representing the type of array wrapped by dask array. Passed on to
``dask.array.blockwise``.

Returns
-------
Expand Down Expand Up @@ -990,6 +1004,11 @@ def earth_mover_distance(first_samples,
func = functools.partial(func, **kwargs)

if vectorize:
if meta is None:
# set meta=np.ndarray by default for numpy vectorized functions
# work around dask bug computing meta with vectorized functions: GH5642
meta = np.ndarray

if signature.all_core_dims:
func = np.vectorize(
func, otypes=output_dtypes, signature=signature.to_gufunc_string()
Expand All @@ -1006,6 +1025,7 @@ def earth_mover_distance(first_samples,
dask=dask,
output_dtypes=output_dtypes,
output_sizes=output_sizes,
meta=meta,
)

if any(isinstance(a, GroupBy) for a in args):
Expand All @@ -1020,6 +1040,7 @@ def earth_mover_distance(first_samples,
dataset_fill_value=dataset_fill_value,
keep_attrs=keep_attrs,
dask=dask,
meta=meta,
)
return apply_groupby_func(this_apply, *args)
elif any(is_dict_like(a) for a in args):
Expand Down
12 changes: 10 additions & 2 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from xarray.core import indexing
from xarray.core.options import set_options
from xarray.core.pycompat import dask_array_type
from xarray.tests import mock
from xarray.tests import LooseVersion, mock

from . import (
arm_xfail,
Expand Down Expand Up @@ -76,9 +76,14 @@
pass

try:
import dask
import dask.array as da

dask_version = dask.__version__
except ImportError:
pass
# needed for xfailed tests when dask < 2.4.0
# remove when min dask > 2.4.0
dask_version = "10.0"

ON_WINDOWS = sys.platform == "win32"

Expand Down Expand Up @@ -1723,6 +1728,7 @@ def test_hidden_zarr_keys(self):
with xr.decode_cf(store):
pass

@pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334")
def test_write_persistence_modes(self):
original = create_test_data()

Expand Down Expand Up @@ -1787,6 +1793,7 @@ def test_encoding_kwarg_fixed_width_string(self):
def test_dataset_caching(self):
super().test_dataset_caching()

@pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334")
def test_append_write(self):
ds, ds_to_append, _ = create_append_test_data()
with self.create_zarr_target() as store_target:
Expand Down Expand Up @@ -1863,6 +1870,7 @@ def test_check_encoding_is_consistent_after_append(self):
xr.concat([ds, ds_to_append], dim="time"),
)

@pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334")
def test_append_with_new_variable(self):

ds, ds_to_append, ds_with_new_var = create_append_test_data()
Expand Down
18 changes: 18 additions & 0 deletions xarray/tests/test_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,24 @@ def test_vectorize_dask():
assert_identical(expected, actual)


@requires_dask
def test_vectorize_dask_new_output_dims():
# regression test for GH3574
data_array = xr.DataArray([[0, 1, 2], [1, 2, 3]], dims=("x", "y"))
func = lambda x: x[np.newaxis, ...]
expected = data_array.expand_dims("z")
actual = apply_ufunc(
func,
data_array.chunk({"x": 1}),
output_core_dims=[["z"]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
output_sizes={"z": 1},
).transpose(*expected.dims)
assert_identical(expected, actual)


def test_output_wrong_number():
variable = xr.Variable("x", np.arange(10))

Expand Down
13 changes: 13 additions & 0 deletions xarray/tests/test_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,16 @@ def test_dask_token():
t5 = dask.base.tokenize(ac + 1)
assert t4 != t5
assert isinstance(ac.data._meta, sparse.COO)


@requires_dask
def test_apply_ufunc_meta_to_blockwise():
da = xr.DataArray(np.zeros((2, 3)), dims=["x", "y"]).chunk({"x": 2, "y": 1})
sparse_meta = sparse.COO.from_numpy(np.zeros((0, 0)))

# if dask computed meta, it would be np.ndarray
expected = xr.apply_ufunc(
lambda x: x, da, dask="parallelized", output_dtypes=[da.dtype], meta=sparse_meta
).data._meta

assert_sparse_equal(expected, sparse_meta)