From da5d6a8761da9a3b78597f5b1bde690e14d829ae Mon Sep 17 00:00:00 2001 From: terraputix Date: Tue, 15 Apr 2025 22:40:21 +0200 Subject: [PATCH 1/7] WIP: omfiles support for kerchunk --- kerchunk/open_meteo.py | 291 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + tests/test_open_meteo.py | 128 +++++++++++++++++ 3 files changed, 420 insertions(+) create mode 100644 kerchunk/open_meteo.py create mode 100644 tests/test_open_meteo.py diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py new file mode 100644 index 00000000..2e74b258 --- /dev/null +++ b/kerchunk/open_meteo.py @@ -0,0 +1,291 @@ +import base64 +import json +import os +import math +import logging +import pathlib +from typing import Union, BinaryIO + +import fsspec.core +from fsspec.implementations.reference import LazyReferenceMapper +import numpy as np +import zarr +import numcodecs + +from .utils import ( + _encode_for_JSON, + encode_fill_value, + dict_to_store, + translate_refs_serializable, +) + +try: + import omfiles + from omfiles.omfiles_numcodecs import PyPforDelta2dSerializer, PyPforDelta2d +except ModuleNotFoundError: # pragma: no cover + raise ImportError( + "omfiles is required for kerchunking Open-Meteo files. Please install with " + "`pip install omfiles`." + ) + +logger = logging.getLogger("kerchunk.open_meteo") + +class Reshape(numcodecs.abc.Codec): + """Codec to reshape data between encoding and decoding. + + This codec reshapes the data to a specific shape before passing it to the next + filter in the pipeline, which is particularly useful for filters like Delta2D + that expect 2D data. + + Parameters + ---------- + shape : tuple + Shape to reshape the data to during decoding + """ + + codec_id = 'reshape' + + def __init__(self, shape): + self.shape = tuple(shape) + + def encode(self, buf): + # For encoding, we flatten back to 1D + arr = numcodecs.compat.ensure_ndarray(buf) + return arr.reshape(-1) + + def decode(self, buf, out=None): + print("Reshape decode") + # Reshape to the specified 2D shape for delta2d + arr = numcodecs.compat.ensure_ndarray(buf) + + # Check if total elements match + expected_size = np.prod(self.shape) + if arr.size != expected_size: + raise ValueError(f"Buffer size {arr.size} doesn't match expected size {expected_size}") + + # Reshape + arr = arr.reshape(self.shape) + print("arr.shape", arr.shape) + return arr + + def get_config(self): + return {'id': self.codec_id, 'shape': self.shape} + + def __repr__(self): + return f'{type(self).__name__}(shape={self.shape!r})' + +class Delta2D(numcodecs.abc.Codec): + """Codec to encode data as the difference between adjacent rows in a 2D array.""" + + codec_id = 'delta2d' + + def __init__(self, dtype, astype=None): + self.dtype = np.dtype(dtype) + if astype is None: + self.astype = self.dtype + else: + self.astype = np.dtype(astype) + if self.dtype == np.dtype(object) or self.astype == np.dtype(object): + raise ValueError('object arrays are not supported') + + def encode(self, buf): + arr = numcodecs.compat.ensure_ndarray(buf).view(self.dtype) + if arr.ndim != 2: + raise ValueError("Delta2D only works with 2D arrays") + enc = arr.astype(self.astype, copy=True) + if enc.shape[0] > 1: + for d0 in range(enc.shape[0]-1, 0, -1): + enc[d0] -= enc[d0-1] + return enc + + def decode(self, buf, out=None): + print("Delta2D decode") + print("buf.shape", buf.shape) + enc = numcodecs.compat.ensure_ndarray(buf).view(self.astype) + if enc.ndim != 2: + raise ValueError("Delta2D only works with 2D arrays") + if out is not None: + dec = out.view(self.dtype) + if dec.shape != enc.shape: + raise ValueError("Output array has wrong shape") + else: + dec = np.empty_like(enc, dtype=self.dtype) + dec[0] = enc[0] + if enc.shape[0] > 1: + for d0 in range(1, enc.shape[0]): + dec[d0] = enc[d0] + dec[d0-1] + return dec if out is None else out + + def get_config(self): + return {'id': self.codec_id, 'dtype': self.dtype.str, 'astype': self.astype.str} + + def __repr__(self): + r = f'{type(self).__name__}(dtype={self.dtype.str!r}' + if self.astype != self.dtype: + r += f', astype={self.astype.str!r}' + r += ')' + return r + +# Register codecs +numcodecs.register_codec(PyPforDelta2d, "pfor") +print(PyPforDelta2d.__dict__) +numcodecs.register_codec(PyPforDelta2dSerializer, "pfor_serializer") +numcodecs.register_codec(Delta2D, "delta2d") +numcodecs.register_codec(Reshape, "reshape") +print(numcodecs.registry.codec_registry) + +# codec = numcodecs.get_codec({"id": "pfor_serializer"}) +# print(codec) + + +class SingleOmToZarr: + """Translate a .om file into Zarr metadata""" + + def __init__( + self, + om_file, + url=None, + spec=1, + inline_threshold=500, + storage_options=None + ): + # Initialize a reader for your om file + if isinstance(om_file, (pathlib.Path, str)): + fs, path = fsspec.core.url_to_fs(om_file, **(storage_options or {})) + self.input_file = fs.open(path, "rb") + url = om_file + self.reader = omfiles.OmFilePyReader(self.input_file) + elif isinstance(om_file, io.IOBase): + self.input_file = om_file + self.reader = omfiles.OmFilePyReader(self.input_file) + else: + raise ValueError("type of input `om_file` not recognised") + + self.url = url if url else om_file + self.spec = spec + self.inline = inline_threshold + self.store_dict = {} + self.store = dict_to_store(self.store_dict) + self.name = "data" # FIXME: This should be the name from om-variable + + def translate(self): + """Main method to create the kerchunk references""" + # 1. Extract metadata about shape, dtype, chunks, etc. + shape = self.reader.shape + dtype = self.reader.dtype + chunks = self.reader.chunk_dimensions + scale_factor = self.reader.scale_factor + add_offset = self.reader.add_offset + lut = self.reader.get_complete_lut() + + # Get dimension names if available, otherwise use defaults + dim_names = getattr(self.reader, "dimension_names", ["x", "y", "time"]) + + # Calculate number of chunks in each dimension + chunks_per_dim = [math.ceil(s/c) for s, c in zip(shape, chunks)] + + # 2. Create Zarr array metadata (.zarray) + blocksize = chunks[0] * chunks[1] * chunks[2] if len(chunks) >= 3 else chunks[0] * chunks[1] + + zarray = { + "zarr_format": 2, + "shape": shape, + "chunks": chunks, + "dtype": str(dtype), + "compressor": {"id": "pfor", "length": blocksize}, # As main compressor + "fill_value": None, + "order": "C", + "filters": [ + {"id": "fixedscaleoffset", "scale": scale_factor, "offset": add_offset, "dtype": "f4", "astype": "i2"}, + {"id": "delta2d", "dtype": " 0 and chunk_size < self.inline: + # Read the chunk data and inline it + self.input_file.seek(lut[chunk_idx]) + data = self.input_file.read(chunk_size) + try: + # Try to decode as ASCII + self.store_dict[key] = data.decode('ascii') + except UnicodeDecodeError: + # If not ASCII, encode as base64 + self.store_dict[key] = b"base64:" + base64.b64encode(data) + else: + # Otherwise store as reference + self.store_dict[key] = [self.url, lut[chunk_idx], chunk_size] + + # Convert to proper format for return + if self.spec < 1: + print("self.spec < 1") + return self.store + else: + print("translate_refs_serializable") + translate_refs_serializable(self.store_dict) + store = _encode_for_JSON(self.store_dict) + return {"version": 1, "refs": store} + + def _get_chunk_coords(self, idx, chunks_per_dim): + """Convert linear chunk index to multidimensional coordinates + + Parameters + ---------- + idx : int + Linear chunk index + chunks_per_dim : list + Number of chunks in each dimension + + Returns + ------- + list + Chunk coordinates in multidimensional space + """ + coords = [] + remaining = idx + + # Start from the fastest-changing dimension (C-order) + for chunks_in_dim in reversed(chunks_per_dim): + coords.insert(0, remaining % chunks_in_dim) + remaining //= chunks_in_dim + + return coords + + def _get_file_size(self): + """Get the total file size""" + current_pos = self.input_file.tell() + self.input_file.seek(0, os.SEEK_END) + size = self.input_file.tell() + self.input_file.seek(current_pos) + return size + + def close(self): + """Close the reader""" + if hasattr(self, 'reader'): + self.reader.close() diff --git a/pyproject.toml b/pyproject.toml index b383f04e..68c9f3f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dev = [ "scipy", "netcdf4", "pytest-subtests", + "omfiles @ file:../omfilesrspy" ] [project.urls] diff --git a/tests/test_open_meteo.py b/tests/test_open_meteo.py new file mode 100644 index 00000000..c63f4a23 --- /dev/null +++ b/tests/test_open_meteo.py @@ -0,0 +1,128 @@ +import os +import json +import pytest +import fsspec +import numpy as np +import zarr +import numcodecs + +import omfiles +from omfiles.omfiles_numcodecs import PyPforDelta2dSerializer, PyPforDelta2d + +from kerchunk.utils import fs_as_store, refs_as_store +from kerchunk.df import refs_to_dataframe +from kerchunk.open_meteo import SingleOmToZarr, Delta2D + +# Register codecs needed by our pipeline +numcodecs.register_codec(PyPforDelta2d, "pfor") +numcodecs.register_codec(PyPforDelta2dSerializer, "pfor_serializer") +numcodecs.register_codec(Delta2D) + +# Path to test file - adjust as needed +test_file = "tests/rh_icon_chunk1914.om" +# Skip test if file doesn't exist +pytestmark = pytest.mark.skipif( + not os.path.exists(test_file), reason=f"Test file {test_file} not found" +) + +def test_single_om_to_zarr(): + """Test creating references for a single OM file and reading via Zarr""" + # Create references using SingleOmToZarr + om_to_zarr = SingleOmToZarr(test_file, inline_threshold=0) # No inlining for testing + references = om_to_zarr.translate() + om_to_zarr.close() + + print("type(references)", type(references)) + print("references.keys", references.keys()) + + # Optionally save references to a file for inspection + with open("om_refs.json", "w") as f: + json.dump(references, f) + + output_path = "om_refs_parquet/" # This will be a directory + refs_to_dataframe( + fo="om_refs.json", # References dict + url=output_path, # Output URL + record_size=100000, # Records per file, adjust as needed + categorical_threshold=10 # URL encoding efficiency + ) + + # Create a filesystem from the references + fs = fsspec.filesystem("reference", fo=references) + store = fs_as_store(fs) + + # Open with zarr + group = zarr.open(store, zarr_format=2) + z = group["data"] # Here we just use a dummy data name we have hardcoded in SingleOmToZarr + + # Verify basic metadata matches original file + reader = omfiles.OmFilePyReader(test_file) + assert list(z.shape) == reader.shape, f"Shape mismatch: {z.shape} vs {reader.shape}" + assert str(z.dtype) == str(reader.dtype), f"Dtype mismatch: {z.dtype} vs {reader.dtype}" + assert list(z.chunks) == reader.chunk_dimensions, f"Chunks mismatch: {z.chunks} vs {reader.chunk_dimensions}" + + # Test retrieving a specific chunk (same chunk as in your example) + chunk_index = (5, 5, ...) + + # Get direct chunk data + direct_data = reader[chunk_index] + + # Now get the same chunk via kerchunk/zarr + # Get the data via zarr + try: + print("z", z) + zarr_data = z[chunk_index] + print("zarr_data", zarr_data) + + # Compare the data + print(f"Direct data shape: {direct_data.shape}, Zarr data shape: {zarr_data.shape}") + print(f"Direct data min/max: {np.min(direct_data)}/{np.max(direct_data)}") + print(f"Zarr data min/max: {np.min(zarr_data)}/{np.max(zarr_data)}") + + # Assert data is equivalent + np.testing.assert_allclose(zarr_data, direct_data, rtol=1e-5) + + print("✓ Data from kerchunk matches direct access!") + except Exception as e: + pytest.fail(f"Failed to read data through kerchunk: {e}") + + # Clean up + reader.close() + + +# def test_om_to_xarray(): +# """Test opening kerchunked OM file with xarray""" +# import xarray as xr + +# # Create references using SingleOmToZarr +# om_to_zarr = SingleOmToZarr(test_file) +# references = om_to_zarr.translate() +# om_to_zarr.close() + +# # Open with xarray +# store = refs_as_store(references) + +# try: +# # Open dataset with xarray using zarr engine +# ds = xr.open_zarr(store, consolidated=False) + +# # Basic validation +# reader = omfiles.OmFilePyReader(test_file) +# assert ds.dims == dict(zip(["time", "y", "x"], reader.shape)) + +# # Get some data to verify decompression pipeline +# data_sample = ds.isel(time=0, y=0, x=slice(0, 5)).data +# assert data_sample.shape == (5,) + +# print(f"Successfully opened dataset with xarray: {ds}") +# print(f"Sample data: {data_sample}") + +# reader.close() +# except Exception as e: +# pytest.fail(f"Failed to open with xarray: {e}") + + +if __name__ == "__main__": + # Run tests directly if file is executed + test_single_om_to_zarr() + # test_om_to_xarray() From 7306ad89b4992694e5d83f7012afcb2f05b91486 Mon Sep 17 00:00:00 2001 From: terraputix Date: Wed, 16 Apr 2025 12:58:07 +0200 Subject: [PATCH 2/7] WIP: MultiZarrToZarr test --- kerchunk/open_meteo.py | 181 +++++++++++++++++++++++++++++++++++++-- tests/test_open_meteo.py | 80 ++++++++++++++--- 2 files changed, 242 insertions(+), 19 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 2e74b258..383a957e 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -1,14 +1,18 @@ import base64 import json import os +import io import math import logging import pathlib from typing import Union, BinaryIO +from dataclasses import dataclass +from enum import Enum +import numpy as np +import datetime import fsspec.core from fsspec.implementations.reference import LazyReferenceMapper -import numpy as np import zarr import numcodecs @@ -30,6 +34,45 @@ logger = logging.getLogger("kerchunk.open_meteo") +class SupportedDomain(Enum): + dwd_icon = "dwd_icon" + + # Potentially all this information should be available in the + # meta.json file under /data/{domain}/{static}/meta.json + def s3_chunk_size(self) -> int: + """Defines how many timesteps in native model-dt are in the om-files on AWS S3""" + if self == SupportedDomain.dwd_icon: + return 253 + else: + raise ValueError(f"Unsupported domain {self}") + + + +@dataclass +class StartLengthStepSize: + start: datetime.datetime + length: int + step_dt: int # in seconds + + +def chunk_number_to_start_time(domain: SupportedDomain, chunk_no: int) -> StartLengthStepSize: + meta_file = f"openmeteo/data/{domain.value}/static/meta.json" + # Load metadata from S3 + fs = fsspec.filesystem(protocol="s3", anon=True) + with fs.open(meta_file, mode="r") as f: + metadata = json.load(f) + + dt_seconds = metadata["temporal_resolution_seconds"] + om_file_length = domain.s3_chunk_size() + + seconds_since_epoch = chunk_no * om_file_length * dt_seconds + + epoch = datetime.datetime(1970, 1, 1) + chunk_start = epoch + datetime.timedelta(seconds=seconds_since_epoch) + print("chunk_start", chunk_start) + return StartLengthStepSize(start=chunk_start, length=om_file_length, step_dt=dt_seconds) + + class Reshape(numcodecs.abc.Codec): """Codec to reshape data between encoding and decoding. @@ -54,7 +97,7 @@ def encode(self, buf): return arr.reshape(-1) def decode(self, buf, out=None): - print("Reshape decode") + print(f"Reshape decode to {self.shape}") # Reshape to the specified 2D shape for delta2d arr = numcodecs.compat.ensure_ndarray(buf) @@ -128,12 +171,10 @@ def __repr__(self): # Register codecs numcodecs.register_codec(PyPforDelta2d, "pfor") -print(PyPforDelta2d.__dict__) -numcodecs.register_codec(PyPforDelta2dSerializer, "pfor_serializer") numcodecs.register_codec(Delta2D, "delta2d") numcodecs.register_codec(Reshape, "reshape") -print(numcodecs.registry.codec_registry) +# print(numcodecs.registry.codec_registry) # codec = numcodecs.get_codec({"id": "pfor_serializer"}) # print(codec) @@ -147,7 +188,11 @@ def __init__( url=None, spec=1, inline_threshold=500, - storage_options=None + storage_options=None, + chunk_no=None, + domain=None, + reference_time=None, + time_step=3600, ): # Initialize a reader for your om file if isinstance(om_file, (pathlib.Path, str)): @@ -168,6 +213,15 @@ def __init__( self.store = dict_to_store(self.store_dict) self.name = "data" # FIXME: This should be the name from om-variable + if domain is not None and chunk_no is not None: + start_step = chunk_number_to_start_time(domain=domain, chunk_no=chunk_no) + # Store time parameters + self.reference_time = start_step.start + self.time_step = start_step.step_dt + else: + self.reference_time = None + self.time_step = None + def translate(self): """Main method to create the kerchunk references""" # 1. Extract metadata about shape, dtype, chunks, etc. @@ -179,6 +233,7 @@ def translate(self): lut = self.reader.get_complete_lut() # Get dimension names if available, otherwise use defaults + # FIXME: Currently we don't have dimension names exposed by the reader (or even necessarily in the file) dim_names = getattr(self.reader, "dimension_names", ["x", "y", "time"]) # Calculate number of chunks in each dimension @@ -207,7 +262,7 @@ def translate(self): self.store_dict[".zgroup"] = json.dumps({"zarr_format": 2}) self.store_dict[f"{self.name}/.zarray"] = json.dumps(zarray) self.store_dict[f"{self.name}/.zattrs"] = json.dumps({ - "_ARRAY_DIMENSIONS": list(dim_names), + "_ARRAY_DIMENSIONS": dim_names, "scale_factor": scale_factor, "add_offset": add_offset }) @@ -242,6 +297,17 @@ def translate(self): # Otherwise store as reference self.store_dict[key] = [self.url, lut[chunk_idx], chunk_size] + # 5. Create coordinate arrays. TODO: This needs to be improved + # Add coordinate arrays for ALL dimensions + for i, dim_name in enumerate(dim_names): + dim_size = shape[i] + if dim_name == "time": + # Special handling for time dimension + self._add_time_coordinate(dim_size, i) + else: + # Add generic coordinate for other dimensions + self._add_coordinate_array(dim_name, dim_size) + # Convert to proper format for return if self.spec < 1: print("self.spec < 1") @@ -252,6 +318,107 @@ def translate(self): store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} + def _add_coordinate_array(self, dim_name, dim_size): + """Add a coordinate array for a generic dimension""" + + # Create coordinate values (just indices) + coord_values = np.arange(dim_size, dtype='f4') + + # Create coordinate array metadata + coord_zarray = { + "zarr_format": 2, + "shape": [dim_size], + "chunks": [dim_size], + "dtype": " 0: + print(f"First timestamp: {time_values[0]} hours since 1970-01-01, Last: {time_values[-1]}") + def _get_chunk_coords(self, idx, chunks_per_dim): """Convert linear chunk index to multidimensional coordinates diff --git a/tests/test_open_meteo.py b/tests/test_open_meteo.py index c63f4a23..abb1f62d 100644 --- a/tests/test_open_meteo.py +++ b/tests/test_open_meteo.py @@ -9,32 +9,27 @@ import omfiles from omfiles.omfiles_numcodecs import PyPforDelta2dSerializer, PyPforDelta2d -from kerchunk.utils import fs_as_store, refs_as_store +from kerchunk.utils import fs_as_store from kerchunk.df import refs_to_dataframe -from kerchunk.open_meteo import SingleOmToZarr, Delta2D +from kerchunk.open_meteo import SingleOmToZarr, Delta2D, SupportedDomain +from kerchunk.combine import MultiZarrToZarr # Register codecs needed by our pipeline numcodecs.register_codec(PyPforDelta2d, "pfor") numcodecs.register_codec(PyPforDelta2dSerializer, "pfor_serializer") numcodecs.register_codec(Delta2D) -# Path to test file - adjust as needed -test_file = "tests/rh_icon_chunk1914.om" -# Skip test if file doesn't exist -pytestmark = pytest.mark.skipif( - not os.path.exists(test_file), reason=f"Test file {test_file} not found" -) def test_single_om_to_zarr(): + # Path to test file - adjust as needed + chunk_no=1914 + test_file = f"tests/rh_icon_chunk{chunk_no}.om" """Test creating references for a single OM file and reading via Zarr""" # Create references using SingleOmToZarr - om_to_zarr = SingleOmToZarr(test_file, inline_threshold=0) # No inlining for testing + om_to_zarr = SingleOmToZarr(test_file, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no) # No inlining for testing references = om_to_zarr.translate() om_to_zarr.close() - print("type(references)", type(references)) - print("references.keys", references.keys()) - # Optionally save references to a file for inspection with open("om_refs.json", "w") as f: json.dump(references, f) @@ -53,14 +48,22 @@ def test_single_om_to_zarr(): # Open with zarr group = zarr.open(store, zarr_format=2) + print("group['time']", group["time"][:]) z = group["data"] # Here we just use a dummy data name we have hardcoded in SingleOmToZarr + print("z.shape", z.shape) + # Verify basic metadata matches original file reader = omfiles.OmFilePyReader(test_file) assert list(z.shape) == reader.shape, f"Shape mismatch: {z.shape} vs {reader.shape}" assert str(z.dtype) == str(reader.dtype), f"Dtype mismatch: {z.dtype} vs {reader.dtype}" assert list(z.chunks) == reader.chunk_dimensions, f"Chunks mismatch: {z.chunks} vs {reader.chunk_dimensions}" + # TODO: Using the following chunk_index leads to a double free / corruption error! + # Most likely, because zarr and open-meteo treat partial chunks differently. + # om-files encode partial chunks with a reduced dimension, while zarr most likely expects a full block of data? + # chunk_index = (slice(0, 100), 2878, ...) + # Test retrieving a specific chunk (same chunk as in your example) chunk_index = (5, 5, ...) @@ -89,6 +92,58 @@ def test_single_om_to_zarr(): # Clean up reader.close() +def test_multizarr_to_zarr(): + """Test combining two OM files into a single kerchunked Zarr reference""" + chunk_no1=1914 + chunk_no2=1915 + file1 = f"tests/rh_icon_chunk{chunk_no1}.om" + file2 = f"tests/rh_icon_chunk{chunk_no2}.om" + assert os.path.exists(file1), f"{file1} not found" + assert os.path.exists(file2), f"{file2} not found" + refs1 = SingleOmToZarr(file1, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no1).translate() + refs2 = SingleOmToZarr(file2, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no2).translate() + + # Combine using MultiZarrToZarr + mzz = MultiZarrToZarr( + [refs1, refs2], + concat_dims=["time"], + coo_map={"time": "data:time"}, + identical_dims=["y", "x"], + remote_protocol=None, + remote_options=None, + target_options=None, + inline_threshold=0, + out=None, + ) + combined_refs = mzz.translate() + + # Open with zarr + fs = fsspec.filesystem("reference", fo=combined_refs) + store = fs_as_store(fs) + group = zarr.open(store, zarr_format=2) + z = group["data"] + + # Open both original files for comparison + reader1 = omfiles.OmFilePyReader(file1) + reader2 = omfiles.OmFilePyReader(file2) + + # Check that the combined shape is the sum along the time axis + expected_shape = list(reader1.shape) + expected_shape[2] += reader2.shape[2] + assert list(z.shape) == expected_shape, f"Combined shape mismatch: {z.shape} vs {expected_shape}" + + # Check that the first part matches file1 and the second part matches file2 + # (Assume 3D: time, y, x) + slc1 = (5, 5, slice(0, reader1.shape[2])) + slc2 = (5, 5, slice(reader1.shape[2], expected_shape[2])) + np.testing.assert_allclose(z[slc1], reader1[slc1], rtol=1e-5) + np.testing.assert_allclose(z[slc2], reader2[slc1], rtol=1e-5) + + print("✓ MultiZarrToZarr combined data matches originals!") + + reader1.close() + reader2.close() + # def test_om_to_xarray(): # """Test opening kerchunked OM file with xarray""" @@ -125,4 +180,5 @@ def test_single_om_to_zarr(): if __name__ == "__main__": # Run tests directly if file is executed test_single_om_to_zarr() + test_multizarr_to_zarr() # test_om_to_xarray() From e061bda9e27cca002ab737c7286ad93b6d2324c4 Mon Sep 17 00:00:00 2001 From: terraputix Date: Wed, 16 Apr 2025 15:58:58 +0200 Subject: [PATCH 3/7] cleanup and proper offset calculation at end of lut --- kerchunk/open_meteo.py | 93 +++++++++++----------------------------- pyproject.toml | 2 +- tests/test_open_meteo.py | 42 +++++++++--------- 3 files changed, 48 insertions(+), 89 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 383a957e..94d96010 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -1,31 +1,29 @@ import base64 -import json -import os +import datetime import io -import math +import json import logging +import math +import os import pathlib -from typing import Union, BinaryIO from dataclasses import dataclass - from enum import Enum -import numpy as np -import datetime + import fsspec.core -from fsspec.implementations.reference import LazyReferenceMapper -import zarr import numcodecs +import numcodecs.abc +import numcodecs.compat +import numpy as np from .utils import ( _encode_for_JSON, - encode_fill_value, dict_to_store, translate_refs_serializable, ) try: import omfiles - from omfiles.omfiles_numcodecs import PyPforDelta2dSerializer, PyPforDelta2d + from omfiles.omfiles_numcodecs import PyPforDelta2d except ModuleNotFoundError: # pragma: no cover raise ImportError( "omfiles is required for kerchunking Open-Meteo files. Please install with " @@ -47,7 +45,6 @@ def s3_chunk_size(self) -> int: raise ValueError(f"Unsupported domain {self}") - @dataclass class StartLengthStepSize: start: datetime.datetime @@ -204,7 +201,7 @@ def __init__( self.input_file = om_file self.reader = omfiles.OmFilePyReader(self.input_file) else: - raise ValueError("type of input `om_file` not recognised") + raise ValueError("type of input `om_file` not recognized") self.url = url if url else om_file self.spec = spec @@ -268,16 +265,13 @@ def translate(self): }) # 4. Add chunk references - for chunk_idx in range(len(lut)): + for chunk_idx in range(len(lut) - 1): # Calculate chunk coordinates (i,j,k) from linear index chunk_coords = self._get_chunk_coords(chunk_idx, chunks_per_dim) - # Calculate chunk size - if chunk_idx < len(lut) - 1: - chunk_size = lut[chunk_idx + 1] - lut[chunk_idx] - else: - # For last chunk, get file size - chunk_size = self._get_file_size() - lut[chunk_idx] + # Calculate chunk size. + # Loop index is defined so this is safe! + chunk_size = lut[chunk_idx + 1] - lut[chunk_idx] # Add to references key = self.name + "/" + ".".join(map(str, chunk_coords)) @@ -305,8 +299,8 @@ def translate(self): # Special handling for time dimension self._add_time_coordinate(dim_size, i) else: - # Add generic coordinate for other dimensions - self._add_coordinate_array(dim_name, dim_size) + print(f"No coordinates for dimension {dim_name}") + continue # Convert to proper format for return if self.spec < 1: @@ -318,63 +312,28 @@ def translate(self): store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} - def _add_coordinate_array(self, dim_name, dim_size): - """Add a coordinate array for a generic dimension""" - - # Create coordinate values (just indices) - coord_values = np.arange(dim_size, dtype='f4') - - # Create coordinate array metadata - coord_zarray = { - "zarr_format": 2, - "shape": [dim_size], - "chunks": [dim_size], - "dtype": " 0: - print(f"First timestamp: {time_values[0]} hours since 1970-01-01, Last: {time_values[-1]}") + print(f"First timestamp: {time_values[0]} seconds since 1970-01-01, Last: {time_values[-1]}") def _get_chunk_coords(self, idx, chunks_per_dim): """Convert linear chunk index to multidimensional coordinates diff --git a/pyproject.toml b/pyproject.toml index 68c9f3f6..b2e91b61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dev = [ "scipy", "netcdf4", "pytest-subtests", - "omfiles @ file:../omfilesrspy" + "omfiles @ git+https://github.com/open-meteo/python-omfiles.git@codecs" ] [project.urls] diff --git a/tests/test_open_meteo.py b/tests/test_open_meteo.py index abb1f62d..68ba5ccb 100644 --- a/tests/test_open_meteo.py +++ b/tests/test_open_meteo.py @@ -1,23 +1,15 @@ import os -import json -import pytest + import fsspec import numpy as np -import zarr -import numcodecs - import omfiles -from omfiles.omfiles_numcodecs import PyPforDelta2dSerializer, PyPforDelta2d +import pytest +import zarr -from kerchunk.utils import fs_as_store -from kerchunk.df import refs_to_dataframe -from kerchunk.open_meteo import SingleOmToZarr, Delta2D, SupportedDomain from kerchunk.combine import MultiZarrToZarr - -# Register codecs needed by our pipeline -numcodecs.register_codec(PyPforDelta2d, "pfor") -numcodecs.register_codec(PyPforDelta2dSerializer, "pfor_serializer") -numcodecs.register_codec(Delta2D) +from kerchunk.df import refs_to_dataframe +from kerchunk.open_meteo import SingleOmToZarr, SupportedDomain +from kerchunk.utils import fs_as_store def test_single_om_to_zarr(): @@ -30,14 +22,14 @@ def test_single_om_to_zarr(): references = om_to_zarr.translate() om_to_zarr.close() - # Optionally save references to a file for inspection - with open("om_refs.json", "w") as f: - json.dump(references, f) + # Save references to json if desired. These are veryyy big... + # with open("om_refs.json", "w") as f: + # json.dump(references, f) - output_path = "om_refs_parquet/" # This will be a directory + # Save references to parquet refs_to_dataframe( - fo="om_refs.json", # References dict - url=output_path, # Output URL + fo=references, # References dict + url="om_refs_parquet/", # Output directory record_size=100000, # Records per file, adjust as needed categorical_threshold=10 # URL encoding efficiency ) @@ -87,7 +79,7 @@ def test_single_om_to_zarr(): print("✓ Data from kerchunk matches direct access!") except Exception as e: - pytest.fail(f"Failed to read data through kerchunk: {e}") + pytest.fail(f"Failed to read kerchunked data through zarr: {e}") # Clean up reader.close() @@ -117,6 +109,14 @@ def test_multizarr_to_zarr(): ) combined_refs = mzz.translate() + # Save references to parquet + refs_to_dataframe( + fo=combined_refs, # References dict + url="om_refs_mzz_parquet/", # Output directory + record_size=100000, # Records per file, adjust as needed + categorical_threshold=10 # URL encoding efficiency + ) + # Open with zarr fs = fsspec.filesystem("reference", fo=combined_refs) store = fs_as_store(fs) From d4957eedef643d3da617adfb656c5a2837ff6a26 Mon Sep 17 00:00:00 2001 From: terraputix Date: Mon, 21 Apr 2025 17:20:24 +0200 Subject: [PATCH 4/7] renamed Codec to TurboPfor --- kerchunk/open_meteo.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 94d96010..9a1bc5fb 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -23,7 +23,7 @@ try: import omfiles - from omfiles.omfiles_numcodecs import PyPforDelta2d + from omfiles.omfiles_numcodecs import TurboPfor except ModuleNotFoundError: # pragma: no cover raise ImportError( "omfiles is required for kerchunking Open-Meteo files. Please install with " @@ -167,7 +167,7 @@ def __repr__(self): return r # Register codecs -numcodecs.register_codec(PyPforDelta2d, "pfor") +numcodecs.register_codec(TurboPfor, "pfor") numcodecs.register_codec(Delta2D, "delta2d") numcodecs.register_codec(Reshape, "reshape") From be65ad911836cedbf1ce7bb932efd8b2244a51a7 Mon Sep 17 00:00:00 2001 From: terraputix Date: Tue, 22 Apr 2025 09:19:02 +0200 Subject: [PATCH 5/7] small codec improvement while trying to solve partial chunk issue --- kerchunk/open_meteo.py | 10 ++-------- tests/test_open_meteo.py | 9 ++++++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 9a1bc5fb..9b3f869f 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -23,7 +23,6 @@ try: import omfiles - from omfiles.omfiles_numcodecs import TurboPfor except ModuleNotFoundError: # pragma: no cover raise ImportError( "omfiles is required for kerchunking Open-Meteo files. Please install with " @@ -167,14 +166,10 @@ def __repr__(self): return r # Register codecs -numcodecs.register_codec(TurboPfor, "pfor") +# NOTE: TurboPfor is register as `turbo_pfor` by omfiles already numcodecs.register_codec(Delta2D, "delta2d") numcodecs.register_codec(Reshape, "reshape") -# print(numcodecs.registry.codec_registry) -# codec = numcodecs.get_codec({"id": "pfor_serializer"}) -# print(codec) - class SingleOmToZarr: """Translate a .om file into Zarr metadata""" @@ -244,14 +239,13 @@ def translate(self): "shape": shape, "chunks": chunks, "dtype": str(dtype), - "compressor": {"id": "pfor", "length": blocksize}, # As main compressor + "compressor": {"id": "turbo_pfor", "chunk_elements": blocksize}, # As main compressor "fill_value": None, "order": "C", "filters": [ {"id": "fixedscaleoffset", "scale": scale_factor, "offset": add_offset, "dtype": "f4", "astype": "i2"}, {"id": "delta2d", "dtype": " Date: Wed, 20 Aug 2025 10:26:11 +0200 Subject: [PATCH 6/7] fix open-meteo kerchunk test --- kerchunk/open_meteo.py | 7 ++++--- pyproject.toml | 2 +- tests/test_open_meteo.py | 12 ++++++------ 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 9b3f869f..5cf59112 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -23,6 +23,7 @@ try: import omfiles + import omfiles._numcodecs except ModuleNotFoundError: # pragma: no cover raise ImportError( "omfiles is required for kerchunking Open-Meteo files. Please install with " @@ -191,10 +192,10 @@ def __init__( fs, path = fsspec.core.url_to_fs(om_file, **(storage_options or {})) self.input_file = fs.open(path, "rb") url = om_file - self.reader = omfiles.OmFilePyReader(self.input_file) + self.reader = omfiles.OmFileReader(self.input_file) elif isinstance(om_file, io.IOBase): self.input_file = om_file - self.reader = omfiles.OmFilePyReader(self.input_file) + self.reader = omfiles.OmFileReader(self.input_file) else: raise ValueError("type of input `om_file` not recognized") @@ -219,7 +220,7 @@ def translate(self): # 1. Extract metadata about shape, dtype, chunks, etc. shape = self.reader.shape dtype = self.reader.dtype - chunks = self.reader.chunk_dimensions + chunks = self.reader.chunks scale_factor = self.reader.scale_factor add_offset = self.reader.add_offset lut = self.reader.get_complete_lut() diff --git a/pyproject.toml b/pyproject.toml index b2e91b61..71d550a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dev = [ "scipy", "netcdf4", "pytest-subtests", - "omfiles @ git+https://github.com/open-meteo/python-omfiles.git@codecs" + "omfiles @ git+https://github.com/open-meteo/python-omfiles.git@get-complete-lut" ] [project.urls] diff --git a/tests/test_open_meteo.py b/tests/test_open_meteo.py index 5a4d86a9..e1b5ef9c 100644 --- a/tests/test_open_meteo.py +++ b/tests/test_open_meteo.py @@ -48,10 +48,10 @@ def test_single_om_to_zarr(): print("z.chunks", z.chunks) # Verify basic metadata matches original file - reader = omfiles.OmFilePyReader(test_file) - assert list(z.shape) == reader.shape, f"Shape mismatch: {z.shape} vs {reader.shape}" + reader = omfiles.OmFileReader(test_file) + assert z.shape == reader.shape, f"Shape mismatch: {z.shape} vs {reader.shape}" assert str(z.dtype) == str(reader.dtype), f"Dtype mismatch: {z.dtype} vs {reader.dtype}" - assert list(z.chunks) == reader.chunk_dimensions, f"Chunks mismatch: {z.chunks} vs {reader.chunk_dimensions}" + assert z.chunks == reader.chunks, f"Chunks mismatch: {z.chunks} vs {reader.chunks}" # TODO: Using the following chunk_index leads to a double free / corruption error! # Even with a concurrency of 1: `zarr.config.config["async"]["concurrency"] = 1` @@ -127,8 +127,8 @@ def test_multizarr_to_zarr(): z = group["data"] # Open both original files for comparison - reader1 = omfiles.OmFilePyReader(file1) - reader2 = omfiles.OmFilePyReader(file2) + reader1 = omfiles.OmFileReader(file1) + reader2 = omfiles.OmFileReader(file2) # Check that the combined shape is the sum along the time axis expected_shape = list(reader1.shape) @@ -165,7 +165,7 @@ def test_multizarr_to_zarr(): # ds = xr.open_zarr(store, consolidated=False) # # Basic validation -# reader = omfiles.OmFilePyReader(test_file) +# reader = omfiles.OmFileReader(test_file) # assert ds.dims == dict(zip(["time", "y", "x"], reader.shape)) # # Get some data to verify decompression pipeline From b71688aa731753f1bc5df4b83bdb16b5793f88ff Mon Sep 17 00:00:00 2001 From: terraputix Date: Wed, 20 Aug 2025 11:58:31 +0200 Subject: [PATCH 7/7] some cleanup --- kerchunk/open_meteo.py | 62 ++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py index 5cf59112..abf73bb8 100644 --- a/kerchunk/open_meteo.py +++ b/kerchunk/open_meteo.py @@ -23,7 +23,6 @@ try: import omfiles - import omfiles._numcodecs except ModuleNotFoundError: # pragma: no cover raise ImportError( "omfiles is required for kerchunking Open-Meteo files. Please install with " @@ -183,11 +182,9 @@ def __init__( inline_threshold=500, storage_options=None, chunk_no=None, - domain=None, - reference_time=None, - time_step=3600, + domain=None ): - # Initialize a reader for your om file + # Initialize a reader for om file if isinstance(om_file, (pathlib.Path, str)): fs, path = fsspec.core.url_to_fs(om_file, **(storage_options or {})) self.input_file = fs.open(path, "rb") @@ -204,7 +201,10 @@ def __init__( self.inline = inline_threshold self.store_dict = {} self.store = dict_to_store(self.store_dict) - self.name = "data" # FIXME: This should be the name from om-variable + # FIXME: This should be the name from om-variable, but currently variables don't need to be named in omfiles + # self.name = self.reader.name + # For now, hardcode the name to "data" + self.name = "data" if domain is not None and chunk_no is not None: start_step = chunk_number_to_start_time(domain=domain, chunk_no=chunk_no) @@ -225,15 +225,19 @@ def translate(self): add_offset = self.reader.add_offset lut = self.reader.get_complete_lut() - # Get dimension names if available, otherwise use defaults - # FIXME: Currently we don't have dimension names exposed by the reader (or even necessarily in the file) - dim_names = getattr(self.reader, "dimension_names", ["x", "y", "time"]) + assert len(shape) == 3, "Only 3D arrays are currently supported" + assert len(chunks) == 3, "Only 3D arrays are currently supported" + + # FIXME: Currently we don't have a real convention how to store dimension names in om files + # It can be easily achieved via the hierarchical structure, but just not finalized yet. + # For now, just hardcode dimension names + dim_names = ["x", "y", "time"] # Calculate number of chunks in each dimension chunks_per_dim = [math.ceil(s/c) for s, c in zip(shape, chunks)] # 2. Create Zarr array metadata (.zarray) - blocksize = chunks[0] * chunks[1] * chunks[2] if len(chunks) >= 3 else chunks[0] * chunks[1] + blocksize = chunks[0] * chunks[1] * chunks[2] zarray = { "zarr_format": 2, @@ -263,31 +267,27 @@ def translate(self): for chunk_idx in range(len(lut) - 1): # Calculate chunk coordinates (i,j,k) from linear index chunk_coords = self._get_chunk_coords(chunk_idx, chunks_per_dim) + chunk_key = self.name + "/" + ".".join(map(str, chunk_coords)) - # Calculate chunk size. - # Loop index is defined so this is safe! - chunk_size = lut[chunk_idx + 1] - lut[chunk_idx] - - # Add to references - key = self.name + "/" + ".".join(map(str, chunk_coords)) + # Calculate chunk offset and chunk size + chunk_offset = lut[chunk_idx] + chunk_size = lut[chunk_idx + 1] - chunk_offset # Check if chunk is small enough to inline if self.inline > 0 and chunk_size < self.inline: # Read the chunk data and inline it - self.input_file.seek(lut[chunk_idx]) + self.input_file.seek(chunk_offset) data = self.input_file.read(chunk_size) - try: - # Try to decode as ASCII - self.store_dict[key] = data.decode('ascii') - except UnicodeDecodeError: - # If not ASCII, encode as base64 - self.store_dict[key] = b"base64:" + base64.b64encode(data) + # Encode as base64, similar to what is done in hdf.py + self.store_dict[chunk_key] = b"base64:" + base64.b64encode(data) else: # Otherwise store as reference - self.store_dict[key] = [self.url, lut[chunk_idx], chunk_size] + self.store_dict[chunk_key] = [self.url, chunk_offset, chunk_size] - # 5. Create coordinate arrays. TODO: This needs to be improved - # Add coordinate arrays for ALL dimensions + # 5. Create coordinate arrays. + # TODO: This needs to be improved, because we need coordinates for all dimensions + # Grid definitions / coordinate arrays might be calculated in the python-omfiles directly in the future: + # https://github.com/open-meteo/python-omfiles/pull/32/files for i, dim_name in enumerate(dim_names): dim_size = shape[i] if dim_name == "time": @@ -299,10 +299,8 @@ def translate(self): # Convert to proper format for return if self.spec < 1: - print("self.spec < 1") return self.store else: - print("translate_refs_serializable") translate_refs_serializable(self.store_dict) store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} @@ -319,7 +317,7 @@ def _add_time_coordinate(self, time_dim, time_axis=0): # Format the reference time as CF-compliant string if isinstance(ref_time, datetime.datetime): - # Calculate hours since epoch (1970-01-01) + # Calculate seconds since epoch (1970-01-01) epoch = datetime.datetime(1970, 1, 1, 0, 0, 0) seconds_since_epoch = int((ref_time - epoch).total_seconds()) @@ -367,12 +365,6 @@ def _add_time_coordinate(self, time_dim, time_axis=0): # Add time values inline (they're small) self.store_dict[f"{time_dim_name}/0"] = time_values.tobytes() - # Debug info - print(f"Created time coordinate '{time_dim_name}' with {time_dim} values") - print(f"Time units: {units}") - if time_dim > 0: - print(f"First timestamp: {time_values[0]} seconds since 1970-01-01, Last: {time_values[-1]}") - def _get_chunk_coords(self, idx, chunks_per_dim): """Convert linear chunk index to multidimensional coordinates