Skip to content

Commit cc10cc5

Browse files
authored
Remove caching logic from xarray.Variable (#1128)
* Disable all caching on xarray.Variable This is a follow-up to generalize the changes from #1024: - Caching and copy-on-write behavior has been moved to separate array classes that are explicitly used in `open_dataset` to wrap arrays loaded from disk (if `cache=True`). - Dask specific logic has been removed from the caching/loading logic on `xarray.Variable`. - Pickle no longer caches automatically under any circumstances. Still needs tests for the `cache` argument to `open_dataset`, but everything else seems to be working. * Fixes for test failures * Fix IndexVariable.load * Made DataStores pickle-able * Add dask.distributed test * Fix failing Python 2 tests * Fix failing test on Windows * Alternative fix for windows issue * yet another attempt to fix windows tests * a different windows fix * yet another attempt to fix test on windows * another attempt at fixing windows * Skip remaining failing test on windows only * Allow file cleanup failures on windows
1 parent b21948d commit cc10cc5

22 files changed

+500
-182
lines changed

ci/requirements-py27-cdat+pynio.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ dependencies:
55
- python=2.7
66
- cdat-lite
77
- dask
8+
- distributed
89
- pytest
910
- numpy
1011
- pandas>=0.15.0

ci/requirements-py27-netcdf4-dev.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ dependencies:
33
- python=2.7
44
- cython
55
- dask
6+
- distributed
67
- h5py
78
- pytest
89
- numpy

ci/requirements-py27-pydap.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ name: test_env
22
dependencies:
33
- python=2.7
44
- dask
5+
- distributed
56
- h5py
67
- netcdf4
78
- pytest

ci/requirements-py35.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ dependencies:
33
- python=3.5
44
- cython
55
- dask
6+
- distributed
67
- h5py
78
- matplotlib
89
- netcdf4

doc/whats-new.rst

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,20 @@ Breaking changes
2525
merges will now succeed in cases that previously raised
2626
``xarray.MergeError``. Set ``compat='broadcast_equals'`` to restore the
2727
previous default.
28-
- Pickling an xarray object based on the dask backend, or reading its
29-
:py:meth:`values` property, won't automatically convert the array from dask
30-
to numpy in the original object anymore.
31-
If a dask object is used as a coord of a :py:class:`~xarray.DataArray` or
32-
:py:class:`~xarray.Dataset`, its values are eagerly computed and cached,
33-
but only if it's used to index a dim (e.g. it's used for alignment).
28+
- Reading :py:attr:`~DataArray.values` no longer always caches values in a NumPy
29+
array :issue:`1128`. Caching of ``.values`` on variables read from netCDF
30+
files on disk is still the default when :py:func:`open_dataset` is called with
31+
``cache=True``.
32+
By `Guido Imperiale <https://github.com/crusaderky>`_ and
33+
`Stephan Hoyer <https://github.com/shoyer>`_.
34+
- Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer
35+
caches its values into memory before pickling :issue:`1128`. Instead, pickle
36+
stores file paths and restores objects by reopening file references. This
37+
enables preliminary, experimental use of xarray for opening files with
38+
`dask.distributed <https://distributed.readthedocs.io>`_.
39+
By `Stephan Hoyer <https://github.com/shoyer>`_.
40+
- Coordinates used to index a dimension are now loaded eagerly into
41+
:py:class:`pandas.Index` objects, instead of loading the values lazily.
3442
By `Guido Imperiale <https://github.com/crusaderky>`_.
3543

3644
Deprecations

xarray/backends/api.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313

1414
from .. import backends, conventions
1515
from .common import ArrayWriter
16+
from ..core import indexing
1617
from ..core.combine import auto_combine
1718
from ..core.utils import close_on_error, is_remote_uri
1819
from ..core.pycompat import basestring
1920

2021
DATAARRAY_NAME = '__xarray_dataarray_name__'
2122
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__'
2223

24+
2325
def _get_default_engine(path, allow_remote=False):
2426
if allow_remote and is_remote_uri(path): # pragma: no cover
2527
try:
@@ -46,6 +48,13 @@ def _get_default_engine(path, allow_remote=False):
4648
return engine
4749

4850

51+
def _normalize_path(path):
52+
if is_remote_uri(path):
53+
return path
54+
else:
55+
return os.path.abspath(os.path.expanduser(path))
56+
57+
4958
_global_lock = threading.Lock()
5059

5160

@@ -117,10 +126,20 @@ def check_attr(name, value):
117126
check_attr(k, v)
118127

119128

129+
def _protect_dataset_variables_inplace(dataset, cache):
130+
for name, variable in dataset.variables.items():
131+
if name not in variable.dims:
132+
# no need to protect IndexVariable objects
133+
data = indexing.CopyOnWriteArray(variable._data)
134+
if cache:
135+
data = indexing.MemoryCachedArray(data)
136+
variable.data = data
137+
138+
120139
def open_dataset(filename_or_obj, group=None, decode_cf=True,
121140
mask_and_scale=True, decode_times=True,
122141
concat_characters=True, decode_coords=True, engine=None,
123-
chunks=None, lock=None, drop_variables=None):
142+
chunks=None, lock=None, cache=None, drop_variables=None):
124143
"""Load and decode a dataset from a file or file-like object.
125144
126145
Parameters
@@ -162,14 +181,22 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
162181
'netcdf4'.
163182
chunks : int or dict, optional
164183
If chunks is provided, it used to load the new dataset into dask
165-
arrays. This is an experimental feature; see the documentation for more
166-
details.
184+
arrays. ``chunks={}`` loads the dataset with dask using a single
185+
chunk for all arrays. This is an experimental feature; see the
186+
documentation for more details.
167187
lock : False, True or threading.Lock, optional
168188
If chunks is provided, this argument is passed on to
169189
:py:func:`dask.array.from_array`. By default, a per-variable lock is
170190
used when reading data from netCDF files with the netcdf4 and h5netcdf
171191
engines to avoid issues with concurrent access when using dask's
172192
multithreaded backend.
193+
cache : bool, optional
194+
If True, cache data loaded from the underlying datastore in memory as
195+
NumPy arrays when accessed to avoid reading from the underlying data-
196+
store multiple times. Defaults to True unless you specify the `chunks`
197+
argument to use dask, in which case it defaults to False. Does not
198+
change the behavior of coordinates corresponding to dimensions, which
199+
always load their data from disk into a ``pandas.Index``.
173200
drop_variables: string or iterable, optional
174201
A variable or list of variables to exclude from being parsed from the
175202
dataset. This may be useful to drop variables with problems or
@@ -190,12 +217,17 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
190217
concat_characters = False
191218
decode_coords = False
192219

220+
if cache is None:
221+
cache = chunks is None
222+
193223
def maybe_decode_store(store, lock=False):
194224
ds = conventions.decode_cf(
195225
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
196226
concat_characters=concat_characters, decode_coords=decode_coords,
197227
drop_variables=drop_variables)
198228

229+
_protect_dataset_variables_inplace(ds, cache)
230+
199231
if chunks is not None:
200232
try:
201233
from dask.base import tokenize
@@ -226,6 +258,17 @@ def maybe_decode_store(store, lock=False):
226258
if isinstance(filename_or_obj, backends.AbstractDataStore):
227259
store = filename_or_obj
228260
elif isinstance(filename_or_obj, basestring):
261+
262+
if (isinstance(filename_or_obj, bytes) and
263+
filename_or_obj.startswith(b'\x89HDF')):
264+
raise ValueError('cannot read netCDF4/HDF5 file images')
265+
elif (isinstance(filename_or_obj, bytes) and
266+
filename_or_obj.startswith(b'CDF')):
267+
# netCDF3 file images are handled by scipy
268+
pass
269+
elif isinstance(filename_or_obj, basestring):
270+
filename_or_obj = _normalize_path(filename_or_obj)
271+
229272
if filename_or_obj.endswith('.gz'):
230273
if engine is not None and engine != 'scipy':
231274
raise ValueError('can only read gzipped netCDF files with '
@@ -274,7 +317,7 @@ def maybe_decode_store(store, lock=False):
274317
def open_dataarray(filename_or_obj, group=None, decode_cf=True,
275318
mask_and_scale=True, decode_times=True,
276319
concat_characters=True, decode_coords=True, engine=None,
277-
chunks=None, lock=None, drop_variables=None):
320+
chunks=None, lock=None, cache=None, drop_variables=None):
278321
"""
279322
Opens an DataArray from a netCDF file containing a single data variable.
280323
@@ -328,6 +371,13 @@ def open_dataarray(filename_or_obj, group=None, decode_cf=True,
328371
used when reading data from netCDF files with the netcdf4 and h5netcdf
329372
engines to avoid issues with concurrent access when using dask's
330373
multithreaded backend.
374+
cache : bool, optional
375+
If True, cache data loaded from the underlying datastore in memory as
376+
NumPy arrays when accessed to avoid reading from the underlying data-
377+
store multiple times. Defaults to True unless you specify the `chunks`
378+
argument to use dask, in which case it defaults to False. Does not
379+
change the behavior of coordinates corresponding to dimensions, which
380+
always load their data from disk into a ``pandas.Index``.
331381
drop_variables: string or iterable, optional
332382
A variable or list of variables to exclude from being parsed from the
333383
dataset. This may be useful to drop variables with problems or
@@ -349,7 +399,7 @@ def open_dataarray(filename_or_obj, group=None, decode_cf=True,
349399
dataset = open_dataset(filename_or_obj, group, decode_cf,
350400
mask_and_scale, decode_times,
351401
concat_characters, decode_coords, engine,
352-
chunks, lock, drop_variables)
402+
chunks, lock, cache, drop_variables)
353403

354404
if len(dataset.data_vars) != 1:
355405
raise ValueError('Given file dataset contains more than one data '
@@ -494,8 +544,10 @@ def to_netcdf(dataset, path=None, mode='w', format=None, group=None,
494544
raise ValueError('invalid engine for creating bytes with '
495545
'to_netcdf: %r. Only the default engine '
496546
"or engine='scipy' is supported" % engine)
497-
elif engine is None:
498-
engine = _get_default_engine(path)
547+
else:
548+
if engine is None:
549+
engine = _get_default_engine(path)
550+
path = _normalize_path(path)
499551

500552
# validate Dataset keys, DataArray names, and attr keys/values
501553
_validate_dataset_names(dataset)

xarray/backends/common.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,22 @@ def store(self, variables, attributes, check_encoding_set=frozenset()):
235235
cf_variables, cf_attrs = cf_encoder(variables, attributes)
236236
AbstractWritableDataStore.store(self, cf_variables, cf_attrs,
237237
check_encoding_set)
238+
239+
240+
class DataStorePickleMixin(object):
241+
"""Subclasses must define `ds`, `_opener` and `_mode` attributes.
242+
243+
Do not subclass this class: it is not part of xarray's external API.
244+
"""
245+
246+
def __getstate__(self):
247+
state = self.__dict__.copy()
248+
del state['ds']
249+
if self._mode == 'w':
250+
# file has already been created, don't override when restoring
251+
state['_mode'] = 'a'
252+
return state
253+
254+
def __setstate__(self, state):
255+
self.__dict__.update(state)
256+
self.ds = self._opener(mode=self._mode)

xarray/backends/h5netcdf_.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
99
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict
1010

11-
from .common import WritableCFDataStore
11+
from .common import WritableCFDataStore, DataStorePickleMixin
1212
from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype, _extract_nc4_encoding,
1313
BaseNetCDF4Array)
1414

@@ -37,24 +37,32 @@ def _read_attributes(h5netcdf_var):
3737
lsd_okay=False, backend='h5netcdf')
3838

3939

40-
class H5NetCDFStore(WritableCFDataStore):
40+
def _open_h5netcdf_group(filename, mode, group):
41+
import h5netcdf.legacyapi
42+
ds = h5netcdf.legacyapi.Dataset(filename, mode=mode)
43+
with close_on_error(ds):
44+
return _nc4_group(ds, group, mode)
45+
46+
47+
class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
4148
"""Store for reading and writing data via h5netcdf
4249
"""
4350
def __init__(self, filename, mode='r', format=None, group=None,
4451
writer=None):
45-
import h5netcdf.legacyapi
4652
if format not in [None, 'NETCDF4']:
4753
raise ValueError('invalid format for h5netcdf backend')
48-
ds = h5netcdf.legacyapi.Dataset(filename, mode=mode)
49-
with close_on_error(ds):
50-
self.ds = _nc4_group(ds, group, mode)
54+
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
55+
group=group)
56+
self.ds = opener()
5157
self.format = format
58+
self._opener = opener
5259
self._filename = filename
60+
self._mode = mode
5361
super(H5NetCDFStore, self).__init__(writer)
5462

55-
def open_store_variable(self, var):
63+
def open_store_variable(self, name, var):
5664
dimensions = var.dimensions
57-
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(var))
65+
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self))
5866
attrs = _read_attributes(var)
5967

6068
# netCDF4 specific encoding
@@ -69,7 +77,7 @@ def open_store_variable(self, var):
6977
return Variable(dimensions, data, attrs, encoding)
7078

7179
def get_variables(self):
72-
return FrozenOrderedDict((k, self.open_store_variable(v))
80+
return FrozenOrderedDict((k, self.open_store_variable(k, v))
7381
for k, v in iteritems(self.ds.variables))
7482

7583
def get_attrs(self):

0 commit comments

Comments
 (0)