-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the ReactiveX Sample operator #98
Comments
Here is a start for you: bittrance@b6db908. Needs better error handling, proper synchronization and a force-push with a test to make it look like I am TDD. 😈 I'll try to continue tomorrow; right now I need to go to bed. |
Hi @bittrance, thanks for your help here. I tried using your WIP implementation, but it doesn't work as I would expect on the test below. I could be mistaken in what the behavior should be. Rx can be difficult to reason about sometimes. require 'rx'
e = Rx::Subject.new
d = Rx::Subject.new
s = Rx::Subject.new
enable = e.as_observable
disable = d.as_observable
source = s.as_observable
stream = enable.map { true }
.merge(disable.map { false })
.combine_latest(source) { |*pair| pair }
.sample(source)
.select(&:first)
.map(&:last)
stream.subscribe do |x|
puts "Source: #{x}"
end
e.on_next(1)
d.on_next(1)
e.on_next(1)
s.on_next(100)
d.on_next(1)
s.on_next(200)
e.on_next(1)
s.on_next(300)
s.on_next(400)
e.on_next(1)
d.on_next(1)
s.on_next(500)
s.on_next(600) I would expect the output to be:
In the test above and in my use case, I have 3 streams of events: an "enable" stream, a "disable" stream, and a "source" stream. I want to use the enable/disable streams as a switch/valve/gate to enable and disable the source stream. In other words, when I receive data on the source stream, if I last saw a disable event, the data should be ignored and dropped. If I last saw an enable event, the data should be emitted. In this way, the enable/disable merged stream acts as a gate for the source stream. I realize this could be done with a some local variables and mutable state, but I'd like to keep it as functional and Rx-only as possible. The Sample operator seemed like an easy way to do it. I'm open to other implementations using other operators, too (maybe Switch or FlatMap). Update: After thinking about this more, ultimately, I really just want a gate operator: # source is the Observable emitting the data I care about
# switch is an Observable emitting true/false that controls source emission
source.gate(switch).subscribe do |x|
puts "Source: #{x}"
end I've come up with the following, but since module Rx::Observable
def gate(gate, initial: false)
Rx::AnonymousObservable.new do |obs|
enabled = initial
gate_sub = gate.subscribe(
-> enable { enabled = enable },
obs.method(:on_error),
-> { }
)
self_sub = self.select { enabled }.subscribe(
obs.method(:on_next),
obs.method(:on_error),
obs.method(:on_completed)
)
Rx::CompositeSubscription.new [gate_sub, self_sub]
end
end
end |
On take 1, I am not sure how you see that. We are mostly trying to follow RxJS, so I copied https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/sample.md to ./examples/sample.rb. In this implementation, On your update, the p = Rx::Subject.new
s = Rx::Subject.new
source = s.as_observable
source
.filter(p, lambda {|_| false}) # lambda is initial predicate
.subscribe { |e| puts e }
s.on_next('foo') # not emitted
p.on_next(lambda { |_| true })
s.on_next('bar') # => 'bar'
s.on_next('baz') # => 'baz'
p.on_next(lambda {|_| false })
s.on_next('bar') # not emitted Or perhaps this should in fact be called |
I ported my test case above to RxJS and I do see the behavior I expected above: RxJS test caselet e = new Rx.Subject();
let d = new Rx.Subject();
let s = new Rx.Subject();
let enable = e.asObservable();
let disable = d.asObservable();
let source = s.asObservable();
let stream = enable.map(_ => true)
.merge(disable.map(_ => false))
.combineLatest(source)
.sample(source)
.where(pair => pair[0])
.map(pair => pair[1]);
stream.subscribe(x => console.log(`Source: ${x}`));
e.onNext(1)
d.onNext(1)
e.onNext(1)
s.onNext(100)
d.onNext(1)
s.onNext(200)
e.onNext(1)
s.onNext(300)
s.onNext(400)
e.onNext(1)
d.onNext(1)
s.onNext(500)
s.onNext(600)
// Output:
// Source: 100
// Source: 300
// Source: 400
You're right, thanks! I'm dealing with slower, discrete, ordered events as opposed to frequent, async events, so I completely missed that.
I'm interested in why you'd prefer predicates over just I didn't intend for this issue to become general discussion about my problem. With your help and after thinking a bit deeper, I think I have a workable solution in the form of gate/filter. If you'd like to keep this issue open and implement |
I meditated and changed my mind. I no longer think a stream of predicates is a good idea because there is no way you can "inspect" or transform a Ruby lambda without executing it, possibly incurring unknown side effects and so they don't really adhere to the Rx spirit. Instead I think your use case and many other could be handled elegantly if this was possible:
One use could be to have strobe emit predicates, but you could equally do booleans like this example. It would become sort of a "combine_latest_left" which I have actually been missing. What say you? |
PR up for basic function #102 . |
I looked at Porting the Rx.NET implementation to Ruby, I end up with something (admittedly ugly) like: module Rx::Observable
def with_latest_from(other, &combine)
combine ||= ->(*pair) { pair }
lefts = self.map { |e| { is_left: true, left: e, right: nil } }
rights = other.map { |e| { is_left: false, left: nil, right: e } }
lefts.merge(rights)
.scan({ is_left: false, left: nil, right: nil }) { |o, n|
n[:is_left] ?
{ is_left: true, left: n[:left], right: o[:right] } :
{ is_left: false, left: nil, right: n[:right] }
}.select { |e|
e[:is_left]
}.map { |e|
combine.call(e[:left], e[:right])
}
end
end With this, implementing module Rx::Observable
def gate(gate)
self.with_latest_from(gate).select(&:last).map(&:first)
end
end And I've already run into another scenario where I was able to apply it (outside of the Calling it |
We appear to be in agreement. In fact, I think I would like some feedback from the core developers on my Thank you for your input. |
RxRuby should implement the ReactiveX Sample operator.
I've found Sample in the Rx.NET library useful in my .NET projects and I find myself needing it in my current Ruby project. You can see the Rx.NET implementation here.
I've hacked up something that seems to have the behavior I'm looking for, but it is very untested and probably has some leaks and incorrect corner cases:
I looked into implementing Sample myself natively in the library and contributing a pull request, but after looking at the source for some other operators, I think it would take far too much time for me to contribute a correct implementation.
The text was updated successfully, but these errors were encountered: