Hot and Cold observables - Disconnecting: Example output is not correct.
Hi, Please check the code example here:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
The actual output is:
0
1
2
3
4
Closing connection
Reconnecting
Notice no more values appear after Reconnecting. Looks like calling unsubscribe on the ConnectableObservable's subscription also terminates subsequent subscriptions, too. I need to call connectable.subscribe(i -> System.out.println(i)) again in order to produce output while the text says that 'old observers will begin receiving values again' after calling connect() on the ConnectableObservable object. I'm using RxJava 1.0.14.
Thanks for discovering this. It appears that the behaviour has changed since the example was written.
I think this may be a new bug in RxJava. Apart from the fact that I don't understand the logic behind such a change, the old subscriber isn't actually unsubscribed.
This
ConnectableObservable<Long> connectable =
Observable.interval(200, TimeUnit.MILLISECONDS)
.doOnNext(x -> System.out.println("BEFORE " + x))
.publish();
Subscription s = connectable.connect();
Subscription consumerSubscription = connectable
.doOnUnsubscribe(() -> System.out.println("AFTER unsubscribed"))
.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
System.out.println("Unsubscribed: " + consumerSubscription.isUnsubscribed());
System.in.read();
produces
BEFORE 0
0
BEFORE 1
1
BEFORE 2
2
BEFORE 3
3
BEFORE 4
4
Closing connection
Reconnecting
Unsubscribed: false
BEFORE 0
BEFORE 1
BEFORE 2
BEFORE 3
...
The old subscriber isn't actually unsubscribed. It's just lost in the process.
Thanks for your explanation. There're several other errors as well but I don't recall for now. Will report when I see them again. Cheers.
I ran the code against older versions of RxJava. It appears that the new behaviour is for 1.0.10 and newer.