CombineLatest on observables created with Defer
Describe the bug When creating an Observable with Defer, every call to Observe restarts the producer and every observer gets the same values. But this is not the case when the Observable is transformed with operators that itself call Observe on that Observable, like CombineLatest. In contrast to that, the Map operator is respecting the Defer origin of the Observable.
To Reproduce Steps to reproduce the behavior:
func TestCombineLatest(t *testing.T) {
o := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
}})
co := rxgo.CombineLatest(func(i ...interface{}) interface{} {
return i[0]
}, []rxgo.Observable{o})
ch1 := co.Observe()
ch2 := co.Observe()
rr1 := make([]interface{}, 0)
rr2 := make([]interface{}, 0)
done1 := make(chan struct{}, 1)
go func() {
for i := 0; i < 3; i++ {
r1 := <-ch1
rr1 = append(rr1, r1.V)
}
close(done1)
}()
done2 := make(chan struct{}, 1)
go func() {
for i := 0; i < 3; i++ {
r2 := <-ch2
rr2 = append(rr2, r2.V)
}
close(done2)
}()
<-done1
<-done2
Expect(rr1).To(Equal(rr2))
}
The Producer is called only once. Depending on the scheduling 1, 2, 3 is distributed on rr1 and rr2, when no more values are available, rr1, rr2 get nils from the closed channels.
Expected behavior Both observers should get the 1, 2, 3.
Additional context The same pattern is working with System.Reactive.Linq in dotnet.
Some additional thoughts:
- I fear CombineLatest is not the only operator with this behavior.
- As a workaround, one can wrap calls to CombineLatest (and perhaps other operators or operator chains) with
func Defer(produceObservable func() rxgo.Observable, opts ...rxgo.Option) rxgo.Observable {
return rxgo.Defer([]rxgo.Producer{
func(ctx context.Context, next chan<- rxgo.Item) {
inCh := produceObservable().Observe(opts...)
for {
select {
case item, ok := <-inCh:
if !ok {
return
}
select {
case next <- item:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
},
},
)
}
but you need to know if it is necessary.
- When the workaround is used, the whole operator chain called in the producer, with all its goroutines, is duplicated. It would be better if the multiplexing of the transformed value takes place after the operator chain.