From 0a5e9d1b9e0c7b57055c836d975a8b7732f53b79 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 27 Feb 2023 15:44:30 +0100 Subject: [PATCH 1/2] merge consecutive reads in shards --- zarr/_storage/v3_storage_transformers.py | 45 +++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 3675d42c38..b7acb1282b 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -340,13 +340,56 @@ def get_partial_values(self, key_ranges): ) else: # pragma: no cover transformed_key_ranges.append((key, range_)) - values = self.inner_store.get_partial_values(transformed_key_ranges) + merged_key_ranges, reverse_lookup = self._merge_ranges(transformed_key_ranges) + merged_values = self.inner_store.get_partial_values(merged_key_ranges) + values = [merged_values[i][slice_] for i, slice_ in reverse_lookup] for i in none_indices: values.insert(i, None) return values else: return StoreV3.get_partial_values(self, key_ranges) + def _merge_ranges(self, key_ranges): + merged_key_ranges = [] + reverse_lookup = [] + mergable_ranges_per_key = {} + for i, (key, _range) in enumerate(key_ranges): + if self._is_data_key(key): + ranges = mergable_ranges_per_key.setdefault(key, []) + ranges.append((i, _range)) + reverse_lookup.append(None) # placeholder + else: + reverse_lookup.append((len(merged_key_ranges), slice(0, None))) + merged_key_ranges.append((key, _range)) + for key, indexed_ranges in mergable_ranges_per_key.items(): + reverse_slices = [] + current_start, current_length = (None, None) + for i, (range_start, range_length) in sorted(indexed_ranges, key=lambda x: x[1]): + # range_start and range_length are positive integers + if current_start is None: + current_start = range_start + current_length = range_length + if range_start > current_start + current_length: + # merging not possible, write out previous range and reset: + merged_key_ranges.append((key, (current_start, current_length))) + current_start = range_start + current_length = range_length + else: + # merge with previous ranges + current_length = max( + current_length, + range_start + range_length - current_start + ) + relative_start = range_start - current_start + relative_end = relative_start + range_length + reverse_lookup[i] = ( + len(merged_key_ranges), + slice(relative_start, relative_end) + ) + # write out last range + merged_key_ranges.append((key, (current_start, current_length))) + return merged_key_ranges, reverse_lookup + def supports_efficient_set_partial_values(self): return False From a98c7b804d220732b32c72135a717884dfbbcefa Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 27 Feb 2023 15:50:41 +0100 Subject: [PATCH 2/2] rm unused line --- zarr/_storage/v3_storage_transformers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index b7acb1282b..22a024bdaa 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -362,7 +362,6 @@ def _merge_ranges(self, key_ranges): reverse_lookup.append((len(merged_key_ranges), slice(0, None))) merged_key_ranges.append((key, _range)) for key, indexed_ranges in mergable_ranges_per_key.items(): - reverse_slices = [] current_start, current_length = (None, None) for i, (range_start, range_length) in sorted(indexed_ranges, key=lambda x: x[1]): # range_start and range_length are positive integers