Skip to content

Connecting Observable #318

Closed
Closed
@dayaftereh

Description

@dayaftereh

I'm not sure if the following behavior an issue or i'm on the wrong track. I want to have a producer observable from a single channel with the ability of rxgo.WithPublishStrategy() to broadcast the items to child observables. I'm creating the child observables by using Observable#Observe channel from the producer observable with rxgo.FromChannel, but when the child Observable is canceled via the context from context.WithCancel() the parent observable stops emitting items.

using: github.com/reactivex/rxgo/v2 v2.5.0

To Reproduce
The following test allows to reproduce the behavior:

import "github.com/reactivex/rxgo/v2"

func TestRxGo(t *testing.T) {
	producerCtx, cancelProducer := context.WithCancel(context.Background())
	// create a infinity producer with context for cancel
	producer := rxgo.Create(
		[]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
			log.Print("create producer")
			defer func() {
				log.Print("producer done")
			}()

			i := 0
			for {
				timeout := time.After(time.Second * 1)
				select {
				case <-ctx.Done():
					return
				case <-timeout:
					next <- rxgo.Of(i)
					log.Printf("producer fire %d", i)
				}
				i = i + 1
			}
		}},
		rxgo.WithPublishStrategy(),
		rxgo.WithContext(producerCtx),
		rxgo.WithBackPressureStrategy(rxgo.Drop),
	)

	// connect to the producer
	producer.Connect(producerCtx)

	// create a child observable to receive items from producer
	append := func(index int, parent context.Context) rxgo.Observable {
		observable := rxgo.FromChannel(
			producer.Observe(),
			rxgo.WithBackPressureStrategy(rxgo.Drop),
			rxgo.WithContext(parent),
		)
		observable.DoOnNext(func(i interface{}) {
			log.Printf("Observable %d: %v", index, i)
		})

		return observable
	}

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	// create the first observable with own context for cancel
	observable1Ctx, cancelObservable1 := context.WithCancel(context.Background())
	append(1, observable1Ctx)

	// create the second observable with own context for cancel
	observable2Ctx, cancelObservable2 := context.WithCancel(context.Background())
	append(2, observable2Ctx)

	go func() {
		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 1 observable")
		cancelObservable1()

		log.Printf("sleep 5")
		time.Sleep(time.Second * 5)

		log.Print("cancel 2 observable")
		cancelObservable2()
	}()

	log.Print("wait for done")
	<-observable2Ctx.Done()
	log.Print("all observable canceled")

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)

	log.Printf("cancel the producer")
	cancelProducer()

	log.Printf("sleep 5")
	time.Sleep(time.Second * 5)
}

The console output is:

=== RUN   TestRxGo
2021/05/05 14:57:32 sleep 5
2021/05/05 14:57:32 create producer
2021/05/05 14:57:33 producer fire 0
2021/05/05 14:57:34 producer fire 1
2021/05/05 14:57:35 producer fire 2
2021/05/05 14:57:36 producer fire 3
2021/05/05 14:57:37 wait for done
2021/05/05 14:57:37 sleep 5
2021/05/05 14:57:37 producer fire 4
2021/05/05 14:57:37 Observable 1: 4
2021/05/05 14:57:37 Observable 2: 4
2021/05/05 14:57:38 producer fire 5
2021/05/05 14:57:38 Observable 2: 5
2021/05/05 14:57:38 Observable 1: 5
2021/05/05 14:57:39 producer fire 6
2021/05/05 14:57:39 Observable 2: 6
2021/05/05 14:57:39 Observable 1: 6
2021/05/05 14:57:40 producer fire 7
2021/05/05 14:57:40 Observable 2: 7
2021/05/05 14:57:40 Observable 1: 7
2021/05/05 14:57:41 producer fire 8
2021/05/05 14:57:41 Observable 2: 8
2021/05/05 14:57:41 Observable 1: 8
2021/05/05 14:57:42 cancel 1 observable
2021/05/05 14:57:42 sleep 5
2021/05/05 14:57:42 producer fire 9

##--> Missing Observable 2
##--> Missing producer fire

2021/05/05 14:57:47 cancel 2 observable
2021/05/05 14:57:47 all observable canceled
2021/05/05 14:57:47 sleep 5

##--> Missing producer fire

2021/05/05 14:57:52 cancel the producer

##--> Missing producer done

2021/05/05 14:57:52 sleep 5
--- PASS: TestRxGo (25.04s)
PASS

Expected behavior

Actually i what that the producer observable still emits items without blocking and creating child observables dynamically.

Metadata

Metadata

Assignees

Labels

questionQuestion regarding how RxGo is working etc.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions