-
Notifications
You must be signed in to change notification settings - Fork 541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More Flow Operators #1389
Comments
@shariarriday these should be straightforward to implement as a
|
The We already have
@shariarriday: I think |
I think we can implement @shariarriday wanna give these two a shot? |
@shariarriday I think self->make_observable()
.fail<int>(sec::runtime_error) // subscribed to three times
.retry_when([self](auto attempts) { // attempts: observable<error>
return self->make_observable().zip_with(
[](const error&, int num) { return num; },
attempts, self->make_observable().range(1, 3))
.flat_map([self](int num) {
printf("delay next retry by %d second(s)", num);
return self->make_observable().timer(std::chrono::seconds{num});
});
})
.for_each([](int) { }); Should print:
And then stop after three times. With |
Thanks, I am planning to implement |
CAF 0.19 introduced ReactiveX-style flow processing with a base set of the most useful operators. There are more operators defined by ReactiveX and eventually we should implement all of them:
Creating Observables
Transforming Observables
Filtering Observables
Combining Observables
Error Handling Operators
The text was updated successfully, but these errors were encountered: