Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental async reduction, e.g. value(fs, reduce = ...) #681

Open
bwlewis opened this issue May 6, 2023 · 5 comments
Open

Incremental async reduction, e.g. value(fs, reduce = ...) #681

bwlewis opened this issue May 6, 2023 · 5 comments

Comments

@bwlewis
Copy link

bwlewis commented May 6, 2023

Asynchronous incremental reduction

Asynchronous incremental reduction applies a reduction function to incremental
results as they are delivered (that is, asynchronously). It's implemented in
foreach using the .combine function argument. That argument is simply a
standard reduction function similar the argument used by standard R Reduce.
Because no order is guaranteed, the reduction function must be commutative and
associative (however, foreach also provides an .inorder argument that caches results
locally untill order can be guaranteed).

This can be (very) useful in cases when the full Map result is too large to fit
in memory and the reduction function output is small. Using incremental async
reduce in those cases lowers memory pressure on the coordinating R process.
Note that, as far as I know, async reduce is only implemented in doRedis (edit: on CRAN anyway, I know that there were a number of forks of doMPI that had something similar).

In principle, this might be achieved in future using the resolve function by
adding a reduction function argument? That could be an elegant solution.

Alternatively, the resolved function plus looping and polling might be able
to achieve something similar, as sketched in the doc for resolved:

It should also be possible to use the method for
polling the future until it is resolved (without having to wait
infinitely long)

But there are no examples, and this also seems cumbersome.

@HenrikBengtsson
Copy link
Collaborator

In principle, this might be achieved in future using the resolve function by adding a reduction function argument? That could be an elegant solution.

Segue: value() is a generic function. The S3 methods for lists, environments, and list environments (listenv package) gather the results for the input set of futures as they get ready. The results are return in the same order as the input. Now, they're implemented to also preserve the relaying order of stdout and conditions, but as soon as they can, they re-output/re-signal them. (There's also a mechanism to drop them from memory when no longer needed).

I can imagine that the value() function could support an optional reduction argument for doing reduction "on the fly".

The only way I can see a reduction function to work properly is that it reduces the elements in the same order as Reduce() would do on the final result. That is, we need to reduce the "head" and the next element in order. (This is the same constraint that stdout and conditions are relayed).

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

@HenrikBengtsson
Copy link
Collaborator

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

Oh, I forgot that you already wrote "Because no order is guaranteed, the reduction function must be commutative and
associative". So, yes, that would make it possible to reduce ASAP. And then you wrote "(however, foreach also provides an .inorder argument that caches results locally untill order can be guaranteed)", which I think should also be supported.

So, maybe something like:

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = `+`)

and

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = structure(`-`, inorder = TRUE))

Maybe inorder = TRUE should be the default, to minimize the risk for silent bugs.

@HenrikBengtsson HenrikBengtsson added this to the Next release milestone May 7, 2023
@bwlewis
Copy link
Author

bwlewis commented May 7, 2023 via email

@HenrikBengtsson
Copy link
Collaborator

What's an example where multicombine/maxcombine is needed? Why wouldn't pairwise reduce be sufficient?

@bwlewis
Copy link
Author

bwlewis commented May 8, 2023 via email

@HenrikBengtsson HenrikBengtsson modified the milestones: 1.33.0, Next release Jul 1, 2023
@HenrikBengtsson HenrikBengtsson modified the milestones: 1.33.1, Next release Dec 21, 2023
@HenrikBengtsson HenrikBengtsson modified the milestones: 1.33.2, Next release Mar 26, 2024
@HenrikBengtsson HenrikBengtsson changed the title Incremental async reduction Incremental async reduction, e.g. value(fs, reduce = ...) Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants