Skip to content

vectorized groupby binary ops #5804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dcherian opened this issue Sep 20, 2021 · 1 comment · Fixed by #6160
Closed

vectorized groupby binary ops #5804

dcherian opened this issue Sep 20, 2021 · 1 comment · Fixed by #6160

Comments

@dcherian
Copy link
Contributor

dcherian commented Sep 20, 2021

By switching to numpy_groupies we are vectorizing our groupby reductions. I think we can do the same for groupby's binary ops.

Here's an example array

import numpy as np
import xarray as xr

%load_ext memory_profiler

N = 4 * 2000
da = xr.DataArray(
    np.random.random((N, N)),
    dims=("x", "y"),
    coords={"labels": ("x", np.repeat(["a", "b", "c", "d", "e", "f", "g", "h"], repeats=N//8))},
)

Consider this "anomaly" calculation, anomaly defined relative to the group mean

def anom_current(da):
    grouped = da.groupby("labels")
    mean = grouped.mean()
    anom = grouped - mean
    return anom

With this approach, we loop over each group and apply the binary operation:

iterators = []
for arg in args:
if isinstance(arg, GroupBy):
iterator = (value for _, value in arg)
elif hasattr(arg, "dims") and grouped_dim in arg.dims:
if isinstance(arg, Variable):
raise ValueError(
"groupby operations cannot be performed with "
"xarray.Variable objects that share a dimension with "
"the grouped dimension"
)
iterator = _iter_over_selections(arg, grouped_dim, unique_values)
else:
iterator = itertools.repeat(arg)
iterators.append(iterator)
applied = (func(*zipped_args) for zipped_args in zip(*iterators))
applied_example, applied = peek_at(applied)
combine = first_groupby._combine
if isinstance(applied_example, tuple):
combined = tuple(combine(output) for output in zip(*applied))
else:
combined = combine(applied)
return combined

This saves some memory, but becomes slow for large number of groups.

We could instead do

def anom_vectorized(da):
    mean = da.groupby("labels").mean()
    mean_expanded = mean.sel(labels=da.labels)
    anom = da - mean_expanded
    return anom

Now we are faster, but construct an extra array as big as the original array (I think this is an OK tradeoff).

%timeit anom_current(da)

# 1.4 s ± 20.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit anom_vectorized(da)

# 937 ms ± 5.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

(I haven't experimented with dask yet, so the following is just a theory).

I think the real benefit comes with dask. Depending on where the groups are located relative to chunking, we could end up creating a lot of tiny chunks by splitting up existing chunks. With the vectorized approach we can do better.

Ideally we would reindex the "mean" dask array with a numpy-array-of-repeated-ints such that the chunking of mean_expanded exactly matches the chunking of da along the grouped dimension.

In practice, dask.array.take doesn't allow specifying "output chunks" so we'd end up chunking "mean_expanded" based on dask's automatic heuristics, and then rechunking again for the binary operation.

Thoughts?

cc @rabernat

@shoyer
Copy link
Member

shoyer commented Sep 21, 2021

I agree, I think this would be a much cleaner way to do these sorts of operations!

dcherian added a commit to dcherian/xarray that referenced this issue Jan 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants