Hello, I am trying to count events produced by a Connectable Observable in 2 ways:
- The total number of events and
- The number of events that pass a filter.
Here is some example code:
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(expensiveReadFromDisk(0))
next <- rxgo.Of(expensiveReadFromDisk(1))
next <- rxgo.Of(expensiveReadFromDisk(2))
}}, rxgo.WithPublishStrategy())
total := events.Count()
filtered := events.Filter(func(i interface{}) bool {
return i.(int) > 0
}).Count()
events.Connect(context.Background())
t, _ := total.Get()
fmt.Printf(" Total: %d\n", t.V)
f, _ := filtered.Get()
fmt.Printf("Filtered: %d\n", f.V)
}
func expensiveReadFromDisk(e int) int {
fmt.Printf("Reading event: %d\n", e)
return e
}
I expected the code to output
Reading event: 0
Reading event: 1
Reading event: 2
Total: 3
Filtered: 2
Instead, however, the code outputs only this:
Reading event: 0
Reading event: 1
Reading event: 2
Total: 3
Then it blocks on the following line and gets stuck forever.
Is this the intended behavior? If yes, what would be the correct way of achieving the intended result?
Thank you very much!
Hello, I am trying to count events produced by a Connectable Observable in 2 ways:
Here is some example code:
I expected the code to output
Instead, however, the code outputs only this:
Then it blocks on the following line and gets stuck forever.
Is this the intended behavior? If yes, what would be the correct way of achieving the intended result?
Thank you very much!