Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ def _reduce_blockwise(
return result


def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple:
def _normalize_indexes(ndim: int, flatblocks: Sequence[int], blkshape: tuple[int, ...]) -> tuple:
"""
.blocks accessor can only accept one iterable at a time,
but can handle multiple slices.
Expand All @@ -1483,20 +1483,23 @@ def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple:
if i.ndim == 0:
normalized.append(i.item())
else:
if np.array_equal(i, np.arange(blkshape[ax])):
if len(i) == blkshape[ax] and np.array_equal(i, np.arange(blkshape[ax])):
normalized.append(slice(None))
elif np.array_equal(i, np.arange(i[0], i[-1] + 1)):
normalized.append(slice(i[0], i[-1] + 1))
elif _issorted(i) and np.array_equal(i, np.arange(i[0], i[-1] + 1)):
start = None if i[0] == 0 else i[0]
stop = i[-1] + 1
stop = None if stop == blkshape[ax] else stop
normalized.append(slice(start, stop))
else:
normalized.append(list(i))
full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized)
full_normalized = (slice(None),) * (ndim - len(normalized)) + tuple(normalized)

# has no iterables
noiter = list(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized)
# has all iterables
alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")}

mesh = dict(zip(alliter.keys(), np.ix_(*alliter.values())))
mesh = dict(zip(alliter.keys(), np.ix_(*alliter.values()))) # type: ignore[arg-type, var-annotated]

full_tuple = tuple(i if ax not in mesh else mesh[ax] for ax, i in enumerate(noiter))

Expand All @@ -1523,7 +1526,6 @@ def subset_to_blocks(
-------
dask.array
"""
from dask.array.slicing import normalize_index
from dask.base import tokenize

if blkshape is None:
Expand All @@ -1532,10 +1534,9 @@ def subset_to_blocks(
if chunks_as_array is None:
chunks_as_array = tuple(np.array(c) for c in array.chunks)

index = _normalize_indexes(array, flatblocks, blkshape)
index = _normalize_indexes(array.ndim, flatblocks, blkshape)

# These rest is copied from dask.array.core.py with slight modifications
index = normalize_index(index, array.numblocks)
index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index)

name = "groupby-cohort-" + tokenize(array, index)
Expand Down
26 changes: 17 additions & 9 deletions flox/dask_array_ops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import builtins
import math
from functools import partial
from functools import lru_cache, partial
from itertools import product
from numbers import Integral

Expand Down Expand Up @@ -84,14 +84,8 @@ def partial_reduce(
axis: tuple[int, ...],
block_index: int | None = None,
):
numblocks = tuple(len(c) for c in chunks)
ndim = len(numblocks)
parts = [list(partition_all(split_every.get(i, 1), range(n))) for (i, n) in enumerate(numblocks)]
keys = product(*map(range, map(len, parts)))
out_chunks = [
tuple(1 for p in partition_all(split_every[i], c)) if i in split_every else c
for (i, c) in enumerate(chunks)
]
ndim = len(chunks)
keys, parts, out_chunks = get_parts(tuple(split_every.items()), chunks)
for k, p in zip(keys, product(*parts)):
free = {i: j[0] for (i, j) in enumerate(p) if len(j) == 1 and i not in split_every}
dummy = dict(i for i in enumerate(p) if i[0] in split_every)
Expand All @@ -101,3 +95,17 @@ def partial_reduce(
k = (*k[:-1], block_index)
dsk[(name,) + k] = (func, g)
return dsk, out_chunks


@lru_cache
def get_parts(split_every_items, chunks):
numblocks = tuple(len(c) for c in chunks)
split_every = dict(split_every_items)

parts = [list(partition_all(split_every.get(i, 1), range(n))) for (i, n) in enumerate(numblocks)]
keys = tuple(product(*map(range, map(len, parts))))
out_chunks = tuple(
tuple(1 for p in partition_all(split_every[i], c)) if i in split_every else c
for (i, c) in enumerate(chunks)
)
return keys, parts, out_chunks
12 changes: 6 additions & 6 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import warnings
from collections.abc import Callable
from functools import partial, reduce
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from unittest.mock import MagicMock, patch

import numpy as np
Expand Down Expand Up @@ -1538,7 +1538,7 @@ def test_normalize_block_indexing_1d(flatblocks, expected):
nblocks = 5
array = dask.array.ones((nblocks,), chunks=(1,))
expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected)
actual = _normalize_indexes(array, flatblocks, array.blocks.shape)
actual = _normalize_indexes(array.ndim, flatblocks, array.blocks.shape)
assert_equal_tuple(expected, actual)


Expand All @@ -1550,17 +1550,17 @@ def test_normalize_block_indexing_1d(flatblocks, expected):
((1, 2, 3), (0, slice(1, 4))),
((1, 3), (0, [1, 3])),
((0, 1, 3), (0, [0, 1, 3])),
(tuple(range(10)), (slice(0, 2), slice(None))),
((0, 1, 3, 5, 6, 8), (slice(0, 2), [0, 1, 3])),
(tuple(range(10)), (slice(None, 2), slice(None))),
((0, 1, 3, 5, 6, 8), (slice(None, 2), [0, 1, 3])),
((0, 3, 4, 5, 6, 8, 24), np.ix_([0, 1, 4], [0, 1, 3, 4])),
),
)
def test_normalize_block_indexing_2d(flatblocks, expected):
def test_normalize_block_indexing_2d(flatblocks: tuple[int, ...], expected: tuple[Any, ...]) -> None:
nblocks = 5
ndim = 2
array = dask.array.ones((nblocks,) * ndim, chunks=(1,) * ndim)
expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected)
actual = _normalize_indexes(array, flatblocks, array.blocks.shape)
actual = _normalize_indexes(array.ndim, flatblocks, array.blocks.shape)
assert_equal_tuple(expected, actual)


Expand Down
Loading