Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Computational performance of iris.cube.Cube.aggregated_by with lazy data #5455

Open
bouweandela opened this issue Sep 1, 2023 · 9 comments

Comments

@bouweandela
Copy link
Member

bouweandela commented Sep 1, 2023

Current behaviour

The method iris.cube.Cube.aggregated_by produces many tiny chunks and a very large graph when used with lazy data. When using this method as part of a larger computation, my dask graphs become so large that the computation fails to run. Even if the computation would run, it would be needlessly slow because of the many tiny chunks.

Expected behaviour

The method iris.cube.Cube.aggregated_by should respect the chunks of the input data as much as possible and produce a modestly sized dask graph.

Example code showing the current behaviour

Here is some example script that demonstrates the issue. The cube in the example represents 250 years of monthly data on a 300 x 300 spatial grid and the script computes the annual mean.

import dask.array as da
import iris.cube
import iris.coords
import numpy as np

cube = iris.cube.Cube(
    da.empty((250 * 12, 300, 300), chunks=('auto', None, None)),
    aux_coords_and_dims=[
        (iris.coords.AuxCoord(np.repeat(np.arange(250), 12), var_name="year"), 0)
    ],
)
print("Input data:")
print(f"{cube.lazy_data().chunks=}")
print(f"{cube.lazy_data().dask=}")

result = cube.aggregated_by("year", iris.analysis.MEAN)

print("Result:")
print(f"{result.lazy_data().chunks=}")
print(f"{result.lazy_data().dask=}")

prints the following

cube.lazy_data().chunks=((186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 186, 24), (300,), (300,))
cube.lazy_data().dask=HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fc42cb9de10>
 0. empty_like-9ffb0624c777d40491118ccfa8357d00

Result:
result.lazy_data().chunks=((1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), (300,), (300,))
result.lazy_data().dask=HighLevelGraph with 752 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fc42c874990>
 0. empty_like-9ffb0624c777d40491118ccfa8357d00
 1. getitem-875ae1b7554c57fa4b243547465267f6
 2. mean_chunk-ee11e7299515ada537da00382a6512db
 3. mean_agg-aggregate-0e111cc07c3b62b465b43f12f0cd78e3
 4. getitem-50ebcc520f0974e2d456be1155a58224
 5. mean_chunk-87cd31da7a615d73c687b49bc2bd21e5
 6. mean_agg-aggregate-fe86efb73fd4e3936f2c495bcfeb0851

 ... etc
@bouweandela
Copy link
Member Author

I found this project which seems relevant: https://github.com/xarray-contrib/flox

@pp-mo
Copy link
Member

pp-mo commented Sep 6, 2023

@scitools/peloton our standard answer to these problems is that getting Dask to work well is hard, and you should take out the data and use Dask (!)
But also, we think the problem here is that aggregation can't easily respect chunks anyway. So if there is a repeating structure in the data, you really want to see if you can reshape the input with a year dimension. And then you wouldn't need to aggregate_by at all.

So, that approach probably waits on #5398. But we'd be keen to investigate if that can solve this case!

@bouweandela
Copy link
Member Author

bouweandela commented Oct 26, 2023

But also, we think the problem here is that aggregation can't easily respect chunks anyway.

Not generally, but I had a look at the use cases in ESMValCore and they are all time-related and the there is always an (almost) repeating structure in the coordinate. In some cases, e.g. day of the month, the structure can easily be made repeating by adding some extra days and then masking out the extra data.

So if there is a repeating structure in the data, you really want to see if you can reshape the input with a year dimension. And then you wouldn't need to aggregate_by at all.

Indeed slicing so all repeats have the same size, masking the extra data points introduced by slicing, and then reshaping and collapsing along the required dimension seems the best solution here. However, I struggled to reshape a cube, is there some feature available to do this that I overlooked? I'm also concerned that getting the right values for the coordinates is tricky when using Cube.collapsed instead of Cube.aggregated_by.

Maybe the best solution would be to use the algorithm described above internally in Cube.aggregated_by when the input data is a Dask array because the current implementation (iterating over an array index in Python and then using dask.array.stack) leads to task graphs that are so large that the scheduler hangs, so this is not a usable approach for Dask users.

So, that approach probably waits on #5398.

My feeling is that it depends on how numpy actually iterates over the input array when it computes the statistics along a dimension whether or not that will be beneficial.

@dcherian
Copy link
Contributor

dcherian commented Oct 26, 2023

The method iris.cube.Cube.aggregated_by produces many tiny chunks and a very large graph when used with lazy data

Sounds familiar to me as an Xarray user :)

and they are all time-related and the there is always an (almost) repeating structure in the coordinate.

Absolutely. See https://flox.readthedocs.io/en/latest/implementation.html#method-cohorts

I recommend strongly you use flox. I wrote it a year ago to solve exactly these problems with Xarray.

  1. It vectorizes the core pure-numpy groupby using a number of options (numbagg, numpy-groupies, or its own internal ufunc.reduceat strategy).
  2. It implements the groupby-reduction as a tree reduction which works great with dask.
  3. It lets you opt-in to smarter graph construction that exploits patterns in the variable you're grouping by.

See https://xarray.dev/blog/flox

Copy link
Contributor

In order to maintain a backlog of relevant issues, we automatically label them as stale after 500 days of inactivity.

If this issue is still important to you, then please comment on this issue and the stale label will be removed.

Otherwise this issue will be automatically closed in 28 days time.

@github-actions github-actions bot added the Stale A stale issue/pull-request label Mar 10, 2025
@bouweandela
Copy link
Member Author

This is still important.

@github-actions github-actions bot removed the Stale A stale issue/pull-request label Mar 11, 2025
@trexfeathers trexfeathers self-assigned this Mar 12, 2025
@trexfeathers
Copy link
Contributor

For what it's worth, the current behaviour is because the result is explicitly created as a series of arrays which are then stacked. So it looks like alternative behaviour would require a core rewrite of our aggregation system; that's not impossible but not something we have planned at the moment.

iris/lib/iris/cube.py

Lines 4664 to 4697 in bf9e9ba

groupby_subarrs = (
iris.util._slice_data_with_keys(
input_data, front_slice + (groupby_slice,) + back_slice
)[1]
for groupby_slice in groupby.group()
)
if weights is not None:
groupby_subweights = (
weights[front_slice + (groupby_slice,) + back_slice]
for groupby_slice in groupby.group()
)
else:
groupby_subweights = (None for _ in range(len(groupby)))
# Aggregate data slices.
agg = iris.analysis.create_weighted_aggregator_fn(
agg_method, axis=dimension_to_groupby, **kwargs
)
result = tuple(map(agg, groupby_subarrs, groupby_subweights))
# If weights are returned, "result" is a list of tuples (each tuple
# contains two elements; the first is the aggregated data, the
# second is the aggregated weights). Convert these to two lists
# (one for the aggregated data and one for the aggregated weights)
# before combining the different slices.
if return_weights:
data_result, weights_result = list(zip(*result))
aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby)
else:
data_result = result
aggregateby_weights = None
aggregateby_data = _lazy.stack(data_result, axis=dimension_to_groupby)

@trexfeathers
Copy link
Contributor

Other approaches

Flox

I would ordinarily push for using Xarray for super-user stuff like this. However I'm aware that one of Iris' USPs is how careful it is with CF metadata, and aggregation is one of the places where this is important.

Flox looks seriously impressive and sophisticated, I don't think there is a prospect of maintaining something with equivalent power within Iris.

Reshaping

This isn't the first time where reshaping a Cube would help with a big data problem, and I have similar frustrations to @bouweandela when this comes up.

Perhaps the quickest, most flexible solution would be to provide API for reshaping a Cube, together with some documentation examples of how doing so can solve this problem and others?

@dcherian
Copy link
Contributor

dcherian commented Apr 3, 2025

Flox looks seriously impressive and sophisticated,

Thanks!

I don't think there is a prospect of maintaining something with equivalent power within Iris.

But you shouldn't need to. There is a bare array API that does all the work: https://flox.readthedocs.io/en/latest/generated/flox.groupby_reduce.html#flox.groupby_reduce

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 📋 Backlog
Status: No status
Development

No branches or pull requests

4 participants