Closed
Description
I'm not sure if the following behavior an issue or i'm on the wrong track. I want to have a producer observable from a single channel with the ability of rxgo.WithPublishStrategy()
to broadcast the items to child observables. I'm creating the child observables by using Observable#Observe
channel from the producer observable with rxgo.FromChannel
, but when the child Observable is canceled via the context from context.WithCancel()
the parent observable stops emitting items.
using: github.com/reactivex/rxgo/v2 v2.5.0
To Reproduce
The following test allows to reproduce the behavior:
import "github.com/reactivex/rxgo/v2"
func TestRxGo(t *testing.T) {
producerCtx, cancelProducer := context.WithCancel(context.Background())
// create a infinity producer with context for cancel
producer := rxgo.Create(
[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
log.Print("create producer")
defer func() {
log.Print("producer done")
}()
i := 0
for {
timeout := time.After(time.Second * 1)
select {
case <-ctx.Done():
return
case <-timeout:
next <- rxgo.Of(i)
log.Printf("producer fire %d", i)
}
i = i + 1
}
}},
rxgo.WithPublishStrategy(),
rxgo.WithContext(producerCtx),
rxgo.WithBackPressureStrategy(rxgo.Drop),
)
// connect to the producer
producer.Connect(producerCtx)
// create a child observable to receive items from producer
append := func(index int, parent context.Context) rxgo.Observable {
observable := rxgo.FromChannel(
producer.Observe(),
rxgo.WithBackPressureStrategy(rxgo.Drop),
rxgo.WithContext(parent),
)
observable.DoOnNext(func(i interface{}) {
log.Printf("Observable %d: %v", index, i)
})
return observable
}
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
// create the first observable with own context for cancel
observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
append(1, observable1Ctx)
// create the second observable with own context for cancel
observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
append(2, observable2Ctx)
go func() {
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 1 observable")
cancelObservable1()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Print("cancel 2 observable")
cancelObservable2()
}()
log.Print("wait for done")
<-observable2Ctx.Done()
log.Print("all observable canceled")
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
log.Printf("cancel the producer")
cancelProducer()
log.Printf("sleep 5")
time.Sleep(time.Second * 5)
}
The console output is:
=== RUN TestRxGo
2021/05/05 14:57:32 sleep 5
2021/05/05 14:57:32 create producer
2021/05/05 14:57:33 producer fire 0
2021/05/05 14:57:34 producer fire 1
2021/05/05 14:57:35 producer fire 2
2021/05/05 14:57:36 producer fire 3
2021/05/05 14:57:37 wait for done
2021/05/05 14:57:37 sleep 5
2021/05/05 14:57:37 producer fire 4
2021/05/05 14:57:37 Observable 1: 4
2021/05/05 14:57:37 Observable 2: 4
2021/05/05 14:57:38 producer fire 5
2021/05/05 14:57:38 Observable 2: 5
2021/05/05 14:57:38 Observable 1: 5
2021/05/05 14:57:39 producer fire 6
2021/05/05 14:57:39 Observable 2: 6
2021/05/05 14:57:39 Observable 1: 6
2021/05/05 14:57:40 producer fire 7
2021/05/05 14:57:40 Observable 2: 7
2021/05/05 14:57:40 Observable 1: 7
2021/05/05 14:57:41 producer fire 8
2021/05/05 14:57:41 Observable 2: 8
2021/05/05 14:57:41 Observable 1: 8
2021/05/05 14:57:42 cancel 1 observable
2021/05/05 14:57:42 sleep 5
2021/05/05 14:57:42 producer fire 9
##--> Missing Observable 2
##--> Missing producer fire
2021/05/05 14:57:47 cancel 2 observable
2021/05/05 14:57:47 all observable canceled
2021/05/05 14:57:47 sleep 5
##--> Missing producer fire
2021/05/05 14:57:52 cancel the producer
##--> Missing producer done
2021/05/05 14:57:52 sleep 5
--- PASS: TestRxGo (25.04s)
PASS
Expected behavior
Actually i what that the producer observable still emits items without blocking and creating child observables dynamically.