Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 68 additions & 22 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import re
import sys
import threading
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from dataclasses import asdict
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -437,6 +437,41 @@ def to_cframe(self) -> bytes:
"""
return self.compute().to_cframe()

@abstractproperty
def chunks(self) -> tuple[int]:
"""
Return :ref:`LazyArray` chunks.
"""
pass

@abstractproperty
def blocks(self) -> tuple[int]:
"""
Return :ref:`LazyArray` blocks.
"""
pass

def get_chunk(self, nchunk):
"""Get the `nchunk` of the expression, evaluating only that one."""
# Create an empty array with the chunkshape and dtype; this is fast
shape = self.shape
chunks = self.chunks
# Calculate the shape of the (chunk) slice_ (especially at the end of the array)
chunks_idx, _ = get_chunks_idx(shape, chunks)
coords = tuple(np.unravel_index(nchunk, chunks_idx))
slice_ = tuple(
slice(c * s, min((c + 1) * s, shape[i]))
for i, (c, s) in enumerate(zip(coords, chunks, strict=True))
)
loc_chunks = tuple(s.stop - s.start for s in slice_)
out = blosc2.empty(shape=self.chunks, dtype=self.dtype, chunks=self.chunks, blocks=self.blocks)
if loc_chunks == self.chunks:
self.compute(item=slice_, out=out)
else:
_slice_ = tuple(slice(0, s) for s in loc_chunks)
out[_slice_] = self.compute(item=slice_)
return out.schunk.get_chunk(0)


def convert_inputs(inputs):
if not inputs or len(inputs) == 0:
Expand Down Expand Up @@ -2421,27 +2456,6 @@ def __init__(self, new_op): # noqa: C901
self.operands = {"o0": value1, "o1": value2}
self.expression = f"(o0 {op} o1)"

def get_chunk(self, nchunk):
"""Get the `nchunk` of the expression, evaluating only that one."""
# Create an empty array with the chunkshape and dtype; this is fast
shape = self.shape
chunks = self.chunks
# Calculate the shape of the (chunk) slice_ (especially at the end of the array)
chunks_idx, _ = get_chunks_idx(shape, chunks)
coords = tuple(np.unravel_index(nchunk, chunks_idx))
slice_ = tuple(
slice(c * s, min((c + 1) * s, shape[i]))
for i, (c, s) in enumerate(zip(coords, chunks, strict=True))
)
loc_chunks = tuple(s.stop - s.start for s in slice_)
out = blosc2.empty(shape=self.chunks, dtype=self.dtype, chunks=self.chunks, blocks=self.blocks)
if loc_chunks == self.chunks:
self.compute(item=slice_, out=out)
else:
_slice_ = tuple(slice(0, s) for s in loc_chunks)
out[_slice_] = self.compute(item=slice_)
return out.schunk.get_chunk(0)

def update_expr(self, new_op): # noqa: C901
prev_flag = blosc2._disable_overloaded_equal
# We use a lot of the original NDArray.__eq__ as 'is', so deactivate the overloaded one
Expand Down Expand Up @@ -3218,6 +3232,38 @@ def info_items(self):
("dtype", self.dtype),
]

@property
def chunks(self):
if hasattr(self, "_chunks"):
return self._chunks
shape, self._chunks, self._blocks, fast_path = validate_inputs(
self.inputs_dict, getattr(self, "_out", None)
)
if not hasattr(self, "_shape"):
self._shape = shape
if self._shape != shape: # validate inputs only works for elementwise funcs so returned shape might
fast_path = False # be incompatible with true output shape
if not fast_path:
# Not using the fast path, so we need to compute the chunks/blocks automatically
self._chunks, self._blocks = compute_chunks_blocks(self.shape, None, None, dtype=self.dtype)
return self._chunks

@property
def blocks(self):
if hasattr(self, "_blocks"):
return self._blocks
shape, self._chunks, self._blocks, fast_path = validate_inputs(
self.inputs_dict, getattr(self, "_out", None)
)
if not hasattr(self, "_shape"):
self._shape = shape
if self._shape != shape: # validate inputs only works for elementwise funcs so returned shape might
fast_path = False # be incompatible with true output shape
if not fast_path:
# Not using the fast path, so we need to compute the chunks/blocks automatically
self._chunks, self._blocks = compute_chunks_blocks(self.shape, None, None, dtype=self.dtype)
return self._blocks

# TODO: indices and sort are repeated in LazyExpr; refactor
def indices(self, order: str | list[str] | None = None) -> blosc2.LazyArray:
if self.dtype.fields is None:
Expand Down
21 changes: 21 additions & 0 deletions tests/ndarray/test_lazyudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest

import blosc2
from blosc2.ndarray import get_chunks_idx


def udf1p(inputs_tuple, output, offset):
Expand Down Expand Up @@ -471,3 +472,23 @@ def test_save_ludf():
assert isinstance(expr, blosc2.LazyUDF)
res_lazyexpr = expr.compute()
np.testing.assert_array_equal(res_lazyexpr[:], npc)


# Test get_chunk method
def test_get_chunk():
a = blosc2.linspace(0, 100, 100, shape=(10, 10), chunks=(3, 4), blocks=(2, 3))
expr = blosc2.lazyudf(udf1p, (a,), dtype=a.dtype, shape=a.shape)
nres = a[:] + 1
chunksize = np.prod(expr.chunks) * expr.dtype.itemsize
blocksize = np.prod(expr.blocks) * expr.dtype.itemsize
_, nchunks = get_chunks_idx(expr.shape, expr.chunks)
out = blosc2.empty(expr.shape, dtype=expr.dtype, chunks=expr.chunks, blocks=expr.blocks)
for nchunk in range(nchunks):
chunk = expr.get_chunk(nchunk)
out.schunk.update_chunk(nchunk, chunk)
chunksize_ = int.from_bytes(chunk[4:8], byteorder="little")
blocksize_ = int.from_bytes(chunk[8:12], byteorder="little")
# Sometimes the actual chunksize is smaller than the expected chunks due to padding
assert chunksize <= chunksize_
assert blocksize == blocksize_
np.testing.assert_allclose(out[:], nres)
Loading