Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ReactiveX Sample operator #98

Open
schmich opened this issue Jan 18, 2017 · 8 comments
Open

Implement the ReactiveX Sample operator #98

schmich opened this issue Jan 18, 2017 · 8 comments

Comments

@schmich
Copy link

schmich commented Jan 18, 2017

RxRuby should implement the ReactiveX Sample operator.

I've found Sample in the Rx.NET library useful in my .NET projects and I find myself needing it in my current Ruby project. You can see the Rx.NET implementation here.

I've hacked up something that seems to have the behavior I'm looking for, but it is very untested and probably has some leaks and incorrect corner cases:

module Rx::Observable
  def sample(sampler)
    sample = sampler.map { |e| [e, true] }
    merged = Rx::Observable.merge(self.map { |e| [e, false] }, sample)
    merged.zip(merged.skip(1)).select { |l, r| r[1] }.map { |l, r| l[0] }
  end
end

I looked into implementing Sample myself natively in the library and contributing a pull request, but after looking at the source for some other operators, I think it would take far too much time for me to contribute a correct implementation.

@bittrance
Copy link
Contributor

Here is a start for you: bittrance@b6db908. Needs better error handling, proper synchronization and a force-push with a test to make it look like I am TDD. 😈 I'll try to continue tomorrow; right now I need to go to bed.

@schmich
Copy link
Author

schmich commented Jan 19, 2017

Hi @bittrance, thanks for your help here.

I tried using your WIP implementation, but it doesn't work as I would expect on the test below. I could be mistaken in what the behavior should be. Rx can be difficult to reason about sometimes.

require 'rx'

e = Rx::Subject.new
d = Rx::Subject.new
s = Rx::Subject.new

enable = e.as_observable
disable = d.as_observable
source = s.as_observable

stream = enable.map { true }
        .merge(disable.map { false })
        .combine_latest(source) { |*pair| pair }
        .sample(source)
        .select(&:first)
        .map(&:last)

stream.subscribe do |x|
  puts "Source: #{x}"
end

e.on_next(1)
d.on_next(1)
e.on_next(1)
s.on_next(100)
d.on_next(1)
s.on_next(200)
e.on_next(1)
s.on_next(300)
s.on_next(400)
e.on_next(1)
d.on_next(1)
s.on_next(500)
s.on_next(600)

I would expect the output to be:

Source: 100
Source: 300
Source: 400

In the test above and in my use case, I have 3 streams of events: an "enable" stream, a "disable" stream, and a "source" stream. I want to use the enable/disable streams as a switch/valve/gate to enable and disable the source stream.

In other words, when I receive data on the source stream, if I last saw a disable event, the data should be ignored and dropped. If I last saw an enable event, the data should be emitted. In this way, the enable/disable merged stream acts as a gate for the source stream.

I realize this could be done with a some local variables and mutable state, but I'd like to keep it as functional and Rx-only as possible. The Sample operator seemed like an easy way to do it. I'm open to other implementations using other operators, too (maybe Switch or FlatMap).

Update: After thinking about this more, ultimately, I really just want a gate operator:

# source is the Observable emitting the data I care about
# switch is an Observable emitting true/false that controls source emission
source.gate(switch).subscribe do |x|
  puts "Source: #{x}"
end

I've come up with the following, but since AnonymousObservable and CompositeSubscription aren't well documented, it's tough to know if it's correct:

module Rx::Observable
  def gate(gate, initial: false)
    Rx::AnonymousObservable.new do |obs|
      enabled = initial
      gate_sub = gate.subscribe(
        -> enable { enabled = enable },
        obs.method(:on_error),
        -> { }
      )
      self_sub = self.select { enabled }.subscribe(
        obs.method(:on_next),
        obs.method(:on_error),
        obs.method(:on_completed)
      )
      Rx::CompositeSubscription.new [gate_sub, self_sub]
    end
  end
end

@bittrance
Copy link
Contributor

bittrance commented Jan 19, 2017

On take 1, I am not sure how you see that. We are mostly trying to follow RxJS, so I copied https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/sample.md to ./examples/sample.rb. In this implementation, .sample() is called on the source, and takes a "strobe" observable whose emissions "reset" the sampling. Note also that in your first take, there is no guarantee that .combine_latest() and .sample() will get events in the desired order.

On your update, the .gate() operator will introduce a concept of emission "truthiness" that ReactiveX implementation does not to my knowledge have. I would be more comfortable with select taking a stream of predicates such that you could do:

p = Rx::Subject.new
s = Rx::Subject.new
source = s.as_observable
source
  .filter(p, lambda {|_| false}) # lambda is initial predicate
  .subscribe { |e| puts e }
s.on_next('foo') # not emitted
p.on_next(lambda { |_| true })
s.on_next('bar') # => 'bar'
s.on_next('baz') # => 'baz'
p.on_next(lambda {|_| false })
s.on_next('bar') # not emitted

Or perhaps this should in fact be called .gate() rather than .filter()? I cannot find any other implementation that has something like this, but nor can I express it reasonably using other operators.

@schmich
Copy link
Author

schmich commented Jan 19, 2017

On take 1, I am not sure how you see that.

I ported my test case above to RxJS and I do see the behavior I expected above:

RxJS test case
let e = new Rx.Subject();
let d = new Rx.Subject();
let s = new Rx.Subject();

let enable = e.asObservable();
let disable = d.asObservable();
let source = s.asObservable();

let stream = enable.map(_ => true)
  .merge(disable.map(_ => false))
  .combineLatest(source)
  .sample(source)
  .where(pair => pair[0])
  .map(pair => pair[1]);

stream.subscribe(x => console.log(`Source: ${x}`));

e.onNext(1)
d.onNext(1)
e.onNext(1)
s.onNext(100)
d.onNext(1)
s.onNext(200)
e.onNext(1)
s.onNext(300)
s.onNext(400)
e.onNext(1)
d.onNext(1)
s.onNext(500)
s.onNext(600)

// Output:
// Source: 100
// Source: 300
// Source: 400

Note also that in your first take, there is no guarantee that .combine_latest() and .sample() will get events in the desired order.

You're right, thanks! I'm dealing with slower, discrete, ordered events as opposed to frequent, async events, so I completely missed that.

I would be more comfortable with select taking a stream of predicates such that you could do:

I'm interested in why you'd prefer predicates over just true/false values. Would the gate/filter observable then be a lazily-evaluated stream of predicates that are evaluated every time when the source emits a value?

I didn't intend for this issue to become general discussion about my problem. With your help and after thinking a bit deeper, I think I have a workable solution in the form of gate/filter. If you'd like to keep this issue open and implement sample, I believe it's unique and would benefit the RxRuby library, but at this point, I don't currently need it.

@bittrance
Copy link
Contributor

bittrance commented Jan 20, 2017

I meditated and changed my mind. I no longer think a stream of predicates is a good idea because there is no way you can "inspect" or transform a Ruby lambda without executing it, possibly incurring unknown side effects and so they don't really adhere to the Rx spirit.

Instead I think your use case and many other could be handled elegantly if this was possible:

strobe = Rx::Observable.interval(1)
events
  .sample(strobe) { |latest, pulse| latest if pulse }
  .subscribe { |event| ... }

One use could be to have strobe emit predicates, but you could equally do booleans like this example. It would become sort of a "combine_latest_left" which I have actually been missing. What say you?

@bittrance
Copy link
Contributor

PR up for basic function #102 .

@schmich
Copy link
Author

schmich commented Jan 23, 2017

It would become sort of a "combine_latest_left" which I have actually been missing. What say you?

I looked at CombineLatestLeft in Rx.NET and its analog withLatestFrom in RxJS, and I think it's exactly what I've been missing as well!

Porting the Rx.NET implementation to Ruby, I end up with something (admittedly ugly) like:

module Rx::Observable
  def with_latest_from(other, &combine)
    combine ||= ->(*pair) { pair }

    lefts = self.map { |e| { is_left: true, left: e, right: nil } }
    rights = other.map { |e| { is_left: false, left: nil, right: e } }

    lefts.merge(rights)
      .scan({ is_left: false, left: nil, right: nil }) { |o, n|
        n[:is_left] ?
        { is_left: true, left: n[:left], right: o[:right] } :
        { is_left: false, left: nil, right: n[:right] }
      }.select { |e|
        e[:is_left]
      }.map { |e|
        combine.call(e[:left], e[:right])
      }
  end
end

With this, implementing gate above is as simple as:

module Rx::Observable
  def gate(gate)
    self.with_latest_from(gate).select(&:last).map(&:first)
  end
end

And I've already run into another scenario where I was able to apply it (outside of the gate pattern).

Calling it with_latest_from makes more intuitive sense to me than combine_latest_left, but this is definitely a basic operator that I've been missing. In my opinion, it deserves a place in RxRuby, much like withLatestFrom in RxJS.

@bittrance
Copy link
Contributor

We appear to be in agreement. In fact, I think .with_latest_from() is more common/useful than .sample(). The typical use case for .with_latest_from() would be where you have a dynamic rule-set or configuration: you want to process each item in a stream according to the latest available rules/configs.

I would like some feedback from the core developers on my .sample() implementation before I go further, but if if is appreciated, I will certainly implement .with_latest_from() in a similar fashion.

Thank you for your input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants