Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Flow Operators #1389

Open
9 of 19 tasks
Neverlord opened this issue Mar 12, 2023 · 5 comments
Open
9 of 19 tasks

More Flow Operators #1389

Neverlord opened this issue Mar 12, 2023 · 5 comments

Comments

@Neverlord
Copy link
Member

Neverlord commented Mar 12, 2023

CAF 0.19 introduced ReactiveX-style flow processing with a base set of the most useful operators. There are more operators defined by ReactiveX and eventually we should implement all of them:

Creating Observables

Transforming Observables

Filtering Observables

Combining Observables

Error Handling Operators

@Neverlord
Copy link
Member Author

Neverlord commented Jul 12, 2023

@shariarriday these should be straightforward to implement as a step (like take):

  • First (actually, this is just take(1))
  • TakeLast
  • Last (after implementing TakeLast, this is a freeby: take_last(1))
  • IgnoreElements
  • ElementAt
  • SkipLast

@Neverlord
Copy link
Member Author

Neverlord commented Sep 19, 2023

The catch operator is actually more like a family of operators. For example, RxJava splits this into onErrorComplete, onErrorResumeNext, onErrorReturn, onErrorReturnItem and onExceptionResumeNext.

We already have on_error_complete. We will skip the exception version, as we build our API generally around expected. This leaves:

  • on_error_resume_next
  • on_error_return
  • on_error_return_item

@shariarriday: I think on_error_return and on_error_return_item can be implemented as steps again. Mind giving it a shot? The function object passed to on_error_return should return either a T or an expected<T>. If the function object returns an expected with an error, the new error simply replaces the "input" error and we still call on_error on next.

@Neverlord
Copy link
Member Author

Neverlord commented Oct 20, 2023

Scan is just another name for Reduce, which is already implemented. (edit: mixed something up)

I think we can implement start_with(x) as just(x).concat(*this). To implement sample, we can use the implementation of op/buffer.hpp as a template. Instead of having a std::vector<input_type> buf_ member, we can simply have a std::optional<input_type> last_ member and adjust the rest accordingly.

@shariarriday wanna give these two a shot?

@Neverlord
Copy link
Member Author

@shariarriday I think retry is a good candidate to tackle next. Specifically, retry_when to retry subscribing to an input observable whenever the "control" observable emits a value. Here's a usage example:

self->make_observable()
  .fail<int>(sec::runtime_error) // subscribed to three times
  .retry_when([self](auto attempts) { // attempts: observable<error>
      return self->make_observable().zip_with(
               [](const error&, int num) { return num; },
               attempts, self->make_observable().range(1, 3))
             .flat_map([self](int num) {
               printf("delay next retry by %d second(s)", num);
               return self->make_observable().timer(std::chrono::seconds{num});
             });
  })
  .for_each([](int) { });

Should print:

delay next retry by 1 second(s)
delay next retry by 2 second(s)
delay next retry by 3 second(s)

And then stop after three times. With retry_when, we can probably build the other retry versions very easily later on as a followup.

@shariarriday
Copy link
Member

Thanks, I am planning to implement retry_when operator after completing the implementation for combine_latest operator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants