-
Notifications
You must be signed in to change notification settings - Fork 338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connecting Observable #318
Comments
Using Here is the test: func TestRxGoWithForEach(t *testing.T) {
producerCtx, cancelProducer := context.WithCancel(context.Background())
// create a infinity producer with context for cancel
producer := rxgo.Create(
[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
log.Print("create producer")
defer func() {
log.Print("producer done")
}()
i := 0
for {
timeout := time.After(time.Second * 1)
select {
case <-ctx.Done():
return
case <-timeout:
next <- rxgo.Of(i)
log.Printf("producer fire %d", i)
}
i = i + 1
}
}},
rxgo.WithPublishStrategy(),
rxgo.WithContext(producerCtx),
rxgo.WithBackPressureStrategy(rxgo.Drop),
)
// connect to the producer
producer.Connect(producerCtx)
// create a child observable to receive items from producer
append := func(index int, parent context.Context) rxgo.Observable {
observableCtx, cancel := context.WithCancel(context.Background())
observable := rxgo.Create(
[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
log.Printf("create observable %d", index)
defer func() {
log.Printf("observable done %d", index)
}()
done := false
wait := producer.ForEach(
func(i interface{}) {
if !done {
next <- rxgo.Of(i)
} else {
log.Printf("ForEach observable %d", index)
}
},
func(err error) {
if !done {
next <- rxgo.Error(err)
}
},
func() {
cancel()
},
rxgo.WithContext(ctx),
)
select {
case <-observableCtx.Done():
case <-wait:
}
done = true
}},
rxgo.WithPublishStrategy(),
rxgo.WithContext(observableCtx),
rxgo.WithBackPressureStrategy(rxgo.Drop),
)
_, abort := observable.Connect(context.Background())
go func() {
<-parent.Done()
abort()
cancel()
}()
observable.DoOnNext(func(i interface{}) {
log.Printf("Observable %d: %v", index, i)
})
return observable
}
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
// create the first observable with own context for cancel
observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
append(1, observable1Ctx)
// create the second observable with own context for cancel
observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
append(2, observable2Ctx)
go func() {
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 1 observable")
cancelObservable1()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 2 observable")
cancelObservable2()
}()
log.Print("wait for done")
<-observable2Ctx.Done()
log.Print("all observable canceled")
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Printf("cancel the producer")
cancelProducer()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
} and the console output:
|
I guess I know why that blocks func (i *channelIterable) produce(ctx context.Context) {
defer func() {
i.mutex.RLock()
for _, subscriber := range i.subscribers {
close(subscriber)
}
i.mutex.RUnlock()
}()
for {
select {
case <-ctx.Done():
return
case item, ok := <-i.next:
if !ok {
return
}
i.mutex.RLock()
for _, subscriber := range i.subscribers {
subscriber <- item
}
i.mutex.RUnlock()
}
}
} The |
@dayaftereh - I made some changes in the PR linked to allow unsubscribe. For testing purposes, I changed the producer part in your example to a channel based producer. Here is the modified test func TestRxGo(t *testing.T) {
producerCtx, cancelProducer := context.WithCancel(context.Background())
// create a infinity producer with context for cancel
itemCh := make(chan Item)
go func() {
defer func() {
log.Print("producer done")
close(itemCh)
}()
i := 0
for {
select {
case <-producerCtx.Done():
return
case <-time.After(1 * time.Second):
select {
case <-producerCtx.Done():
return
case itemCh <- Of(i):
log.Printf("producer fire %d", i)
}
i++
}
}
}()
producer := FromChannel(itemCh,
WithPublishStrategy(),
WithContext(producerCtx),
WithBackPressureStrategy(Drop))
// connect to the producer
producer.Connect(producerCtx)
// create a child observable to receive items from producer
append := func(index int, parent context.Context) {
producer.DoOnNext(func(i interface{}) {
log.Printf("Observable %d: %v", index, i)
}, WithContext(parent))
}
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
// create the first observable with own context for cancel
observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
append(1, observable1Ctx)
// create the second observable with own context for cancel
observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
append(2, observable2Ctx)
go func() {
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 1 observable")
cancelObservable1()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 2 observable")
cancelObservable2()
}()
log.Print("wait for done")
<-observable2Ctx.Done()
log.Print("all observable canceled")
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Printf("cancel the producer")
cancelProducer()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
} And..........Here is the output
|
Perfect. |
I'm not sure if the following behavior an issue or i'm on the wrong track. I want to have a producer observable from a single channel with the ability of
rxgo.WithPublishStrategy()
to broadcast the items to child observables. I'm creating the child observables by usingObservable#Observe
channel from the producer observable withrxgo.FromChannel
, but when the child Observable is canceled via the context fromcontext.WithCancel()
the parent observable stops emitting items.using:
github.com/reactivex/rxgo/v2 v2.5.0
To Reproduce
The following test allows to reproduce the behavior:
The console output is:
Expected behavior
Actually i what that the producer observable still emits items without blocking and creating child observables dynamically.
The text was updated successfully, but these errors were encountered: