-
Notifications
You must be signed in to change notification settings - Fork 357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concat_map implementation or equivalent? #581
Labels
Comments
Just as a reference for anyone needing the same, I implemented it myself as follows: # Standard Imports
from typing import Callable
# Third Party Imports
from rx.core import Observable
import rx.operators as ops
from rx.core.typing import Mapper
def concat_map(mapper: Mapper) -> Callable[[Observable], Observable]:
def _concat_map(source: Observable) -> Observable:
return source.pipe(
ops.map(mapper),
ops.merge(max_concurrent=1)
)
return _concat_map Now you can use it easily, e.g.: source.pipe(
concat_map(observable_factory),
concat_map(another_factory),
# ...
) If anybody sees any issues with the above please kindly comment and let me know. |
Merged
This shows merged.. Is there a reason this issue was left open? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Coming from RxJS it was really convenient having the concatMap operator to have a pipeline of dynamic observables that wait for the previous to complete. I noticed that RxPY does not have an obvious equivalent (closest is flat_map but I can't have interleaving).
From what relevant docs/issues I could find, it seems that the following would achieve the same functionality but feels dirty. The
of(...)
's are a stand-in for some code that would dynamically generate a new observable depending on the result from the previous.Is there a better way to achieve this currently (or in general a more "reactive" way)?
Is there any plan to implement a concat_map operator/alias for this functionality?
The text was updated successfully, but these errors were encountered: