1111
1212from zarr .attrs import Attributes
1313from zarr .codecs import AsType , get_codec
14- from zarr .errors import ArrayNotFoundError , ReadOnlyError
15- from zarr .indexing import (BasicIndexer , CoordinateIndexer , MaskIndexer ,
16- OIndex , OrthogonalIndexer , VIndex , check_fields ,
17- check_no_multi_fields , ensure_tuple ,
18- err_too_many_indices , is_contiguous_selection ,
19- is_scalar , pop_fields )
14+ from zarr .errors import ArrayNotFoundError , ReadOnlyError , ArrayIndexError
15+ from zarr .indexing import (
16+ BasicIndexer ,
17+ CoordinateIndexer ,
18+ MaskIndexer ,
19+ OIndex ,
20+ OrthogonalIndexer ,
21+ VIndex ,
22+ PartialChunkIterator ,
23+ check_fields ,
24+ check_no_multi_fields ,
25+ ensure_tuple ,
26+ err_too_many_indices ,
27+ is_contiguous_selection ,
28+ is_scalar ,
29+ pop_fields ,
30+ )
2031from zarr .meta import decode_array_metadata , encode_array_metadata
2132from zarr .storage import array_meta_key , attrs_key , getsize , listdir
22- from zarr .util import (InfoReporter , check_array_shape , human_readable_size ,
23- is_total_slice , nolock , normalize_chunks ,
24- normalize_resize_args , normalize_shape ,
25- normalize_storage_path )
33+ from zarr .util import (
34+ InfoReporter ,
35+ check_array_shape ,
36+ human_readable_size ,
37+ is_total_slice ,
38+ nolock ,
39+ normalize_chunks ,
40+ normalize_resize_args ,
41+ normalize_shape ,
42+ normalize_storage_path ,
43+ PartialReadBuffer ,
44+ )
2645
2746
2847# noinspection PyUnresolvedReferences
29- class Array ( object ) :
48+ class Array :
3049 """Instantiate an array from an initialized store.
3150
3251 Parameters
@@ -51,6 +70,12 @@ class Array(object):
5170 If True (default), user attributes will be cached for attribute read
5271 operations. If False, user attributes are reloaded from the store prior
5372 to all attribute read operations.
73+ partial_decompress : bool, optional
74+ If True and while the chunk_store is a FSStore and the compresion used
75+ is Blosc, when getting data from the array chunks will be partially
76+ read and decompressed when possible.
77+
78+ .. versionadded:: 2.7
5479
5580 Attributes
5681 ----------
@@ -102,8 +127,17 @@ class Array(object):
102127
103128 """
104129
105- def __init__ (self , store , path = None , read_only = False , chunk_store = None ,
106- synchronizer = None , cache_metadata = True , cache_attrs = True ):
130+ def __init__ (
131+ self ,
132+ store ,
133+ path = None ,
134+ read_only = False ,
135+ chunk_store = None ,
136+ synchronizer = None ,
137+ cache_metadata = True ,
138+ cache_attrs = True ,
139+ partial_decompress = False ,
140+ ):
107141 # N.B., expect at this point store is fully initialized with all
108142 # configuration metadata fully specified and normalized
109143
@@ -118,6 +152,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None,
118152 self ._synchronizer = synchronizer
119153 self ._cache_metadata = cache_metadata
120154 self ._is_view = False
155+ self ._partial_decompress = partial_decompress
121156
122157 # initialize metadata
123158 self ._load_metadata ()
@@ -1112,7 +1147,7 @@ def __setitem__(self, selection, value):
11121147
11131148 See Also
11141149 --------
1115- get_basic_selection , set_basic_selection, get_mask_selection, set_mask_selection,
1150+ basic_selection , set_basic_selection, get_mask_selection, set_mask_selection,
11161151 get_coordinate_selection, set_coordinate_selection, get_orthogonal_selection,
11171152 set_orthogonal_selection, vindex, oindex, __getitem__
11181153
@@ -1580,8 +1615,17 @@ def _set_selection(self, indexer, value, fields=None):
15801615 self ._chunk_setitems (lchunk_coords , lchunk_selection , chunk_values ,
15811616 fields = fields )
15821617
1583- def _process_chunk (self , out , cdata , chunk_selection , drop_axes ,
1584- out_is_ndarray , fields , out_selection ):
1618+ def _process_chunk (
1619+ self ,
1620+ out ,
1621+ cdata ,
1622+ chunk_selection ,
1623+ drop_axes ,
1624+ out_is_ndarray ,
1625+ fields ,
1626+ out_selection ,
1627+ partial_read_decode = False ,
1628+ ):
15851629 """Take binary data from storage and fill output array"""
15861630 if (out_is_ndarray and
15871631 not fields and
@@ -1604,8 +1648,9 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
16041648 # optimization: we want the whole chunk, and the destination is
16051649 # contiguous, so we can decompress directly from the chunk
16061650 # into the destination array
1607-
16081651 if self ._compressor :
1652+ if isinstance (cdata , PartialReadBuffer ):
1653+ cdata = cdata .read_full ()
16091654 self ._compressor .decode (cdata , dest )
16101655 else :
16111656 chunk = ensure_ndarray (cdata ).view (self ._dtype )
@@ -1614,6 +1659,33 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
16141659 return
16151660
16161661 # decode chunk
1662+ try :
1663+ if partial_read_decode :
1664+ cdata .prepare_chunk ()
1665+ # size of chunk
1666+ tmp = np .empty (self ._chunks , dtype = self .dtype )
1667+ index_selection = PartialChunkIterator (chunk_selection , self .chunks )
1668+ for start , nitems , partial_out_selection in index_selection :
1669+ expected_shape = [
1670+ len (
1671+ range (* partial_out_selection [i ].indices (self .chunks [0 ] + 1 ))
1672+ )
1673+ if i < len (partial_out_selection )
1674+ else dim
1675+ for i , dim in enumerate (self .chunks )
1676+ ]
1677+ cdata .read_part (start , nitems )
1678+ chunk_partial = self ._decode_chunk (
1679+ cdata .buff ,
1680+ start = start ,
1681+ nitems = nitems ,
1682+ expected_shape = expected_shape ,
1683+ )
1684+ tmp [partial_out_selection ] = chunk_partial
1685+ out [out_selection ] = tmp [chunk_selection ]
1686+ return
1687+ except ArrayIndexError :
1688+ cdata = cdata .read_full ()
16171689 chunk = self ._decode_chunk (cdata )
16181690
16191691 # select data from chunk
@@ -1688,11 +1760,36 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
16881760 out_is_ndarray = False
16891761
16901762 ckeys = [self ._chunk_key (ch ) for ch in lchunk_coords ]
1691- cdatas = self .chunk_store .getitems (ckeys , on_error = "omit" )
1763+ if (
1764+ self ._partial_decompress
1765+ and self ._compressor
1766+ and self ._compressor .codec_id == "blosc"
1767+ and hasattr (self ._compressor , "decode_partial" )
1768+ and not fields
1769+ and self .dtype != object
1770+ and hasattr (self .chunk_store , "getitems" )
1771+ ):
1772+ partial_read_decode = True
1773+ cdatas = {
1774+ ckey : PartialReadBuffer (ckey , self .chunk_store )
1775+ for ckey in ckeys
1776+ if ckey in self .chunk_store
1777+ }
1778+ else :
1779+ partial_read_decode = False
1780+ cdatas = self .chunk_store .getitems (ckeys , on_error = "omit" )
16921781 for ckey , chunk_select , out_select in zip (ckeys , lchunk_selection , lout_selection ):
16931782 if ckey in cdatas :
1694- self ._process_chunk (out , cdatas [ckey ], chunk_select , drop_axes ,
1695- out_is_ndarray , fields , out_select )
1783+ self ._process_chunk (
1784+ out ,
1785+ cdatas [ckey ],
1786+ chunk_select ,
1787+ drop_axes ,
1788+ out_is_ndarray ,
1789+ fields ,
1790+ out_select ,
1791+ partial_read_decode = partial_read_decode ,
1792+ )
16961793 else :
16971794 # check exception type
16981795 if self ._fill_value is not None :
@@ -1706,7 +1803,8 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None):
17061803 ckeys = [self ._chunk_key (co ) for co in lchunk_coords ]
17071804 cdatas = [self ._process_for_setitem (key , sel , val , fields = fields )
17081805 for key , sel , val in zip (ckeys , lchunk_selection , values )]
1709- self .chunk_store .setitems ({k : v for k , v in zip (ckeys , cdatas )})
1806+ values = {k : v for k , v in zip (ckeys , cdatas )}
1807+ self .chunk_store .setitems (values )
17101808
17111809 def _chunk_setitem (self , chunk_coords , chunk_selection , value , fields = None ):
17121810 """Replace part or whole of a chunk.
@@ -1800,11 +1898,17 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
18001898 def _chunk_key (self , chunk_coords ):
18011899 return self ._key_prefix + '.' .join (map (str , chunk_coords ))
18021900
1803- def _decode_chunk (self , cdata ):
1804-
1901+ def _decode_chunk (self , cdata , start = None , nitems = None , expected_shape = None ):
18051902 # decompress
18061903 if self ._compressor :
1807- chunk = self ._compressor .decode (cdata )
1904+ # only decode requested items
1905+ if (
1906+ all ([x is not None for x in [start , nitems ]])
1907+ and self ._compressor .codec_id == "blosc"
1908+ ) and hasattr (self ._compressor , "decode_partial" ):
1909+ chunk = self ._compressor .decode_partial (cdata , start , nitems )
1910+ else :
1911+ chunk = self ._compressor .decode (cdata )
18081912 else :
18091913 chunk = cdata
18101914
@@ -1829,7 +1933,7 @@ def _decode_chunk(self, cdata):
18291933
18301934 # ensure correct chunk shape
18311935 chunk = chunk .reshape (- 1 , order = 'A' )
1832- chunk = chunk .reshape (self ._chunks , order = self ._order )
1936+ chunk = chunk .reshape (expected_shape or self ._chunks , order = self ._order )
18331937
18341938 return chunk
18351939
0 commit comments