diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 36611555a7d..5ee67efb1da 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -27,6 +27,9 @@ Breaking changes New Features ~~~~~~~~~~~~ +- :py:func:`open_dataset` and :py:func:`open_mfdataset` + now works with ``engine="zarr"`` (:issue:`3668`, :pull:`4003`, :pull:`4187`). + By `Miguel Jimenez `_ and `Wei Ji Leong `_. Bug fixes ~~~~~~~~~ diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 9f45474e7e7..cd1ee88f504 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -1,5 +1,6 @@ import os.path import warnings +from collections.abc import MutableMapping from glob import glob from io import BytesIO from numbers import Number @@ -344,14 +345,16 @@ def open_dataset( If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \ - "pseudonetcdf"}, optional + "pseudonetcdf", "zarr"}, optional Engine to use when reading files. If not provided, the default engine is chosen based on available dependencies, with a preference for "netcdf4". chunks : int or dict, optional - If chunks is provided, it used to load the new dataset into dask + If chunks is provided, it is used to load the new dataset into dask arrays. ``chunks={}`` loads the dataset with dask using a single - chunk for all arrays. + chunk for all arrays. When using ``engine="zarr"`, setting + `chunks='auto'` will create dask chunks based on the variable's zarr + chunks. lock : False or lock-like, optional Resource lock to use when reading data from disk. Only relevant when using dask or another form of parallelism. By default, appropriate @@ -413,6 +416,7 @@ def open_dataset( "pynio", "cfgrib", "pseudonetcdf", + "zarr", ] if engine not in engines: raise ValueError( @@ -447,7 +451,7 @@ def open_dataset( if backend_kwargs is None: backend_kwargs = {} - def maybe_decode_store(store, lock=False): + def maybe_decode_store(store, chunks, lock=False): ds = conventions.decode_cf( store, mask_and_scale=mask_and_scale, @@ -461,7 +465,7 @@ def maybe_decode_store(store, lock=False): _protect_dataset_variables_inplace(ds, cache) - if chunks is not None: + if chunks is not None and engine != "zarr": from dask.base import tokenize # if passed an actual file path, augment the token with @@ -487,10 +491,40 @@ def maybe_decode_store(store, lock=False): ) name_prefix = "open_dataset-%s" % token ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token) - ds2._file_obj = ds._file_obj + + elif engine == "zarr": + # adapted from Dataset.Chunk() and taken from open_zarr + if not (isinstance(chunks, (int, dict)) or chunks is None): + if chunks != "auto": + raise ValueError( + "chunks must be an int, dict, 'auto', or None. " + "Instead found %s. " % chunks + ) + + if chunks == "auto": + try: + import dask.array # noqa + except ImportError: + chunks = None + + # auto chunking needs to be here and not in ZarrStore because + # the variable chunks does not survive decode_cf + # return trivial case + if chunks is None: + return ds + + if isinstance(chunks, int): + chunks = dict.fromkeys(ds.dims, chunks) + + variables = { + k: store.maybe_chunk(k, v, chunks, overwrite_encoded_chunks) + for k, v in ds.variables.items() + } + ds2 = ds._replace(variables) + else: ds2 = ds - + ds2._file_obj = ds._file_obj return ds2 if isinstance(filename_or_obj, Path): @@ -499,6 +533,17 @@ def maybe_decode_store(store, lock=False): if isinstance(filename_or_obj, AbstractDataStore): store = filename_or_obj + elif isinstance(filename_or_obj, MutableMapping) and engine == "zarr": + # Zarr supports a wide range of access modes, but for now xarray either + # reads or writes from a store, never both. + # For open_dataset(engine="zarr"), we only read (i.e. mode="r") + mode = "r" + _backend_kwargs = backend_kwargs.copy() + overwrite_encoded_chunks = _backend_kwargs.pop("overwrite_encoded_chunks", None) + store = backends.ZarrStore.open_group( + filename_or_obj, mode=mode, group=group, **_backend_kwargs + ) + elif isinstance(filename_or_obj, str): filename_or_obj = _normalize_path(filename_or_obj) @@ -526,7 +571,16 @@ def maybe_decode_store(store, lock=False): store = backends.CfGribDataStore( filename_or_obj, lock=lock, **backend_kwargs ) - + elif engine == "zarr": + # on ZarrStore, mode='r', synchronizer=None, group=None, + # consolidated=False. + _backend_kwargs = backend_kwargs.copy() + overwrite_encoded_chunks = _backend_kwargs.pop( + "overwrite_encoded_chunks", None + ) + store = backends.ZarrStore.open_group( + filename_or_obj, group=group, **_backend_kwargs + ) else: if engine not in [None, "scipy", "h5netcdf"]: raise ValueError( @@ -542,7 +596,7 @@ def maybe_decode_store(store, lock=False): ) with close_on_error(store): - ds = maybe_decode_store(store) + ds = maybe_decode_store(store, chunks) # Ensure source filename always stored in dataset object (GH issue #2550) if "source" not in ds.encoding: @@ -794,7 +848,7 @@ def open_mfdataset( If provided, call this function on each dataset prior to concatenation. You can find the file-name from which each dataset was loaded in ``ds.encoding["source"]``. - engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib"}, \ + engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", "zarr"}, \ optional Engine to use when reading files. If not provided, the default engine is chosen based on available dependencies, with a preference for diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index f74fddb694e..2651f3148fd 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -7,6 +7,7 @@ from ..core.pycompat import integer_types from ..core.utils import FrozenDict, HiddenKeyDict from ..core.variable import Variable +from .api import open_dataset from .common import AbstractWritableDataStore, BackendArray, _encode_variable_name # need some special secret attributes to tell us the dimensions @@ -361,6 +362,51 @@ def encode_variable(self, variable): def encode_attribute(self, a): return encode_zarr_attr_value(a) + def get_chunk(self, name, var, chunks): + chunk_spec = dict(zip(var.dims, var.encoding.get("chunks"))) + + # Coordinate labels aren't chunked + if var.ndim == 1 and var.dims[0] == name: + return chunk_spec + + if chunks == "auto": + return chunk_spec + + for dim in var.dims: + if dim in chunks: + spec = chunks[dim] + if isinstance(spec, int): + spec = (spec,) + if isinstance(spec, (tuple, list)) and chunk_spec[dim]: + if any(s % chunk_spec[dim] for s in spec): + warnings.warn( + "Specified Dask chunks %r would " + "separate Zarr chunk shape %r for " + "dimension %r. This significantly " + "degrades performance. Consider " + "rechunking after loading instead." + % (chunks[dim], chunk_spec[dim], dim), + stacklevel=2, + ) + chunk_spec[dim] = chunks[dim] + return chunk_spec + + def maybe_chunk(self, name, var, chunks, overwrite_encoded_chunks): + chunk_spec = self.get_chunk(name, var, chunks) + + if (var.ndim > 0) and (chunk_spec is not None): + from dask.base import tokenize + + # does this cause any data to be read? + token2 = tokenize(name, var._data, chunks) + name2 = f"xarray-{name}-{token2}" + var = var.chunk(chunk_spec, name=name2, lock=None) + if overwrite_encoded_chunks and var.chunks is not None: + var.encoding["chunks"] = tuple(x[0] for x in var.chunks) + return var + else: + return var + def store( self, variables, @@ -601,130 +647,33 @@ def open_zarr( ---------- http://zarr.readthedocs.io/ """ - if "auto_chunk" in kwargs: - auto_chunk = kwargs.pop("auto_chunk") - if auto_chunk: - chunks = "auto" # maintain backwards compatibility - else: - chunks = None - - warnings.warn( - "auto_chunk is deprecated. Use chunks='auto' instead.", - FutureWarning, - stacklevel=2, - ) if kwargs: raise TypeError( "open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys()) ) - if not isinstance(chunks, (int, dict)): - if chunks != "auto" and chunks is not None: - raise ValueError( - "chunks must be an int, dict, 'auto', or None. " - "Instead found %s. " % chunks - ) - - if chunks == "auto": - try: - import dask.array # noqa - except ImportError: - chunks = None - - if not decode_cf: - mask_and_scale = False - decode_times = False - concat_characters = False - decode_coords = False - decode_timedelta = False - - def maybe_decode_store(store, lock=False): - ds = conventions.decode_cf( - store, - mask_and_scale=mask_and_scale, - decode_times=decode_times, - concat_characters=concat_characters, - decode_coords=decode_coords, - drop_variables=drop_variables, - decode_timedelta=decode_timedelta, - use_cftime=use_cftime, - ) + backend_kwargs = { + "synchronizer": synchronizer, + "consolidated": consolidated, + "overwrite_encoded_chunks": overwrite_encoded_chunks, + "chunk_store": chunk_store, + } - # TODO: this is where we would apply caching - - return ds - - # Zarr supports a wide range of access modes, but for now xarray either - # reads or writes from a store, never both. For open_zarr, we only read - mode = "r" - zarr_store = ZarrStore.open_group( - store, - mode=mode, - synchronizer=synchronizer, + ds = open_dataset( + filename_or_obj=store, group=group, - consolidated=consolidated, - chunk_store=chunk_store, + decode_cf=decode_cf, + mask_and_scale=mask_and_scale, + decode_times=decode_times, + concat_characters=concat_characters, + decode_coords=decode_coords, + engine="zarr", + chunks=chunks, + drop_variables=drop_variables, + backend_kwargs=backend_kwargs, + decode_timedelta=decode_timedelta, + use_cftime=use_cftime, ) - ds = maybe_decode_store(zarr_store) - - # auto chunking needs to be here and not in ZarrStore because variable - # chunks do not survive decode_cf - # return trivial case - if not chunks: - return ds - - # adapted from Dataset.Chunk() - if isinstance(chunks, int): - chunks = dict.fromkeys(ds.dims, chunks) - - if isinstance(chunks, tuple) and len(chunks) == len(ds.dims): - chunks = dict(zip(ds.dims, chunks)) - - def get_chunk(name, var, chunks): - chunk_spec = dict(zip(var.dims, var.encoding.get("chunks"))) - - # Coordinate labels aren't chunked - if var.ndim == 1 and var.dims[0] == name: - return chunk_spec - - if chunks == "auto": - return chunk_spec - - for dim in var.dims: - if dim in chunks: - spec = chunks[dim] - if isinstance(spec, int): - spec = (spec,) - if isinstance(spec, (tuple, list)) and chunk_spec[dim]: - if any(s % chunk_spec[dim] for s in spec): - warnings.warn( - "Specified Dask chunks %r would " - "separate Zarr chunk shape %r for " - "dimension %r. This significantly " - "degrades performance. Consider " - "rechunking after loading instead." - % (chunks[dim], chunk_spec[dim], dim), - stacklevel=2, - ) - chunk_spec[dim] = chunks[dim] - return chunk_spec - - def maybe_chunk(name, var, chunks): - from dask.base import tokenize - - chunk_spec = get_chunk(name, var, chunks) - - if (var.ndim > 0) and (chunk_spec is not None): - # does this cause any data to be read? - token2 = tokenize(name, var._data) - name2 = "zarr-%s" % token2 - var = var.chunk(chunk_spec, name=name2, lock=None) - if overwrite_encoded_chunks and var.chunks is not None: - var.encoding["chunks"] = tuple(x[0] for x in var.chunks) - return var - else: - return var - variables = {k: maybe_chunk(k, v, chunks) for k, v in ds.variables.items()} - return ds._replace_vars_and_dims(variables) + return ds diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 33ac26cfd39..f9cc802f2c8 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1555,7 +1555,7 @@ def save(self, dataset, store_target, **kwargs): @contextlib.contextmanager def open(self, store_target, **kwargs): - with xr.open_zarr(store_target, **kwargs) as ds: + with xr.open_dataset(store_target, engine="zarr", **kwargs) as ds: yield ds @contextlib.contextmanager @@ -1565,7 +1565,7 @@ def roundtrip( if save_kwargs is None: save_kwargs = {} if open_kwargs is None: - open_kwargs = {} + open_kwargs = {"chunks": "auto"} with self.create_zarr_target() as store_target: self.save(data, store_target, **save_kwargs) with self.open(store_target, **open_kwargs) as ds: @@ -1577,7 +1577,7 @@ def test_roundtrip_consolidated(self): with self.roundtrip( expected, save_kwargs={"consolidated": True}, - open_kwargs={"consolidated": True}, + open_kwargs={"backend_kwargs": {"consolidated": True}}, ) as actual: self.check_dtypes_roundtripped(expected, actual) assert_identical(expected, actual) @@ -1587,7 +1587,7 @@ def test_with_chunkstore(self): with self.create_zarr_target() as store_target, self.create_zarr_target() as chunk_store: save_kwargs = {"chunk_store": chunk_store} self.save(expected, store_target, **save_kwargs) - open_kwargs = {"chunk_store": chunk_store} + open_kwargs = {"backend_kwargs": {"chunk_store": chunk_store}} with self.open(store_target, **open_kwargs) as ds: assert_equal(ds, expected) @@ -1614,16 +1614,14 @@ def test_auto_chunk(self): def test_manual_chunk(self): original = create_test_data().chunk({"dim1": 3, "dim2": 4, "dim3": 3}) - # All of these should return non-chunked arrays - NO_CHUNKS = (None, 0, {}) - for no_chunk in NO_CHUNKS: - open_kwargs = {"chunks": no_chunk} - with self.roundtrip(original, open_kwargs=open_kwargs) as actual: - for k, v in actual.variables.items(): - # only index variables should be in memory - assert v._in_memory == (k in actual.dims) - # there should be no chunks - assert v.chunks is None + # Using chunks = None should return non-chunked arrays + open_kwargs = {"chunks": None} + with self.roundtrip(original, open_kwargs=open_kwargs) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + # there should be no chunks + assert v.chunks is None # uniform arrays for i in range(2, 6): @@ -1639,7 +1637,10 @@ def test_manual_chunk(self): chunks = {"dim1": 2, "dim2": 3, "dim3": 5} rechunked = original.chunk(chunks=chunks) - open_kwargs = {"chunks": chunks, "overwrite_encoded_chunks": True} + open_kwargs = { + "chunks": chunks, + "backend_kwargs": {"overwrite_encoded_chunks": True}, + } with self.roundtrip(original, open_kwargs=open_kwargs) as actual: for k, v in actual.variables.items(): assert v.chunks == rechunked[k].chunks @@ -1678,7 +1679,7 @@ def test_warning_on_bad_chunks(self): @requires_dask def test_deprecate_auto_chunk(self): original = create_test_data().chunk() - with pytest.warns(FutureWarning): + with pytest.raises(TypeError): with self.roundtrip(original, open_kwargs={"auto_chunk": True}) as actual: for k, v in actual.variables.items(): # only index variables should be in memory @@ -1686,7 +1687,7 @@ def test_deprecate_auto_chunk(self): # chunk size should be the same as original assert v.chunks == original[k].chunks - with pytest.warns(FutureWarning): + with pytest.raises(TypeError): with self.roundtrip(original, open_kwargs={"auto_chunk": False}) as actual: for k, v in actual.variables.items(): # only index variables should be in memory @@ -1847,7 +1848,9 @@ def test_write_persistence_modes(self, group): ds.to_zarr(store_target, mode="w", group=group) ds_to_append.to_zarr(store_target, append_dim="time", group=group) original = xr.concat([ds, ds_to_append], dim="time") - actual = xr.open_zarr(store_target, group=group) + actual = xr.open_dataset( + store_target, group=group, chunks="auto", engine="zarr" + ) assert_identical(original, actual) def test_compressor_encoding(self): @@ -1938,11 +1941,11 @@ def test_check_encoding_is_consistent_after_append(self): encoding = {"da": {"compressor": compressor}} ds.to_zarr(store_target, mode="w", encoding=encoding) ds_to_append.to_zarr(store_target, append_dim="time") - actual_ds = xr.open_zarr(store_target) + actual_ds = xr.open_dataset(store_target, chunks="auto", engine="zarr") actual_encoding = actual_ds["da"].encoding["compressor"] assert actual_encoding.get_config() == compressor.get_config() assert_identical( - xr.open_zarr(store_target).compute(), + xr.open_dataset(store_target, chunks="auto", engine="zarr").compute(), xr.concat([ds, ds_to_append], dim="time"), ) @@ -1957,7 +1960,9 @@ def test_append_with_new_variable(self): ds_with_new_var.to_zarr(store_target, mode="a") combined = xr.concat([ds, ds_to_append], dim="time") combined["new_var"] = ds_with_new_var["new_var"] - assert_identical(combined, xr.open_zarr(store_target)) + assert_identical( + combined, xr.open_dataset(store_target, chunks="auto", engine="zarr") + ) @requires_dask def test_to_zarr_compute_false_roundtrip(self): @@ -2567,7 +2572,7 @@ def test_write_inconsistent_chunks(self): assert actual["y"].encoding["chunksizes"] == (100, 50) -@pytest.fixture(params=["scipy", "netcdf4", "h5netcdf", "pynio"]) +@pytest.fixture(params=["scipy", "netcdf4", "h5netcdf", "pynio", "zarr"]) def readengine(request): return request.param @@ -2627,7 +2632,10 @@ def test_open_mfdataset_manyfiles( # split into multiple sets of temp files for ii in original.x.values: subds = original.isel(x=slice(ii, ii + 1)) - subds.to_netcdf(tmpfiles[ii], engine=writeengine) + if writeengine != "zarr": + subds.to_netcdf(tmpfiles[ii], engine=writeengine) + else: # if writeengine == "zarr": + subds.to_zarr(store=tmpfiles[ii]) # check that calculation on opened datasets works properly with open_mfdataset( @@ -2636,7 +2644,7 @@ def test_open_mfdataset_manyfiles( concat_dim="x", engine=readengine, parallel=parallel, - chunks=chunks, + chunks=chunks if (not chunks and readengine != "zarr") else "auto", ) as actual: # check that using open_mfdataset returns dask arrays for variables diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 8011171d223..7886e9fd0d4 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -135,8 +135,8 @@ def test_dask_distributed_read_netcdf_integration_test( def test_dask_distributed_zarr_integration_test(loop, consolidated, compute): if consolidated: pytest.importorskip("zarr", minversion="2.2.1.dev2") - write_kwargs = dict(consolidated=True) - read_kwargs = dict(consolidated=True) + write_kwargs = {"consolidated": True} + read_kwargs = {"backend_kwargs": {"consolidated": True}} else: write_kwargs = read_kwargs = {} chunks = {"dim1": 4, "dim2": 3, "dim3": 5} @@ -151,7 +151,9 @@ def test_dask_distributed_zarr_integration_test(loop, consolidated, compute): ) if not compute: maybe_futures.compute() - with xr.open_zarr(filename, **read_kwargs) as restored: + with xr.open_dataset( + filename, chunks="auto", engine="zarr", **read_kwargs + ) as restored: assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed)