Intro-To-RxJava icon indicating copy to clipboard operation
Intro-To-RxJava copied to clipboard

Hot and Cold observables - Disconnecting: Example output is not correct.

Open huylv opened this issue 10 years ago • 3 comments

Hi, Please check the code example here:

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

The actual output is:

0
1
2
3
4
Closing connection
Reconnecting

Notice no more values appear after Reconnecting. Looks like calling unsubscribe on the ConnectableObservable's subscription also terminates subsequent subscriptions, too. I need to call connectable.subscribe(i -> System.out.println(i)) again in order to produce output while the text says that 'old observers will begin receiving values again' after calling connect() on the ConnectableObservable object. I'm using RxJava 1.0.14.

huylv avatar Oct 29 '15 19:10 huylv

Thanks for discovering this. It appears that the behaviour has changed since the example was written.

I think this may be a new bug in RxJava. Apart from the fact that I don't understand the logic behind such a change, the old subscriber isn't actually unsubscribed.

This

ConnectableObservable<Long> connectable =
    Observable.interval(200, TimeUnit.MILLISECONDS)
        .doOnNext(x -> System.out.println("BEFORE " + x))
        .publish();
Subscription s = connectable.connect();

Subscription consumerSubscription = connectable
    .doOnUnsubscribe(() -> System.out.println("AFTER unsubscribed"))
    .subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

System.out.println("Unsubscribed: " + consumerSubscription.isUnsubscribed());
System.in.read();

produces

BEFORE 0
0
BEFORE 1
1
BEFORE 2
2
BEFORE 3
3
BEFORE 4
4
Closing connection
Reconnecting
Unsubscribed: false
BEFORE 0
BEFORE 1
BEFORE 2
BEFORE 3
...

The old subscriber isn't actually unsubscribed. It's just lost in the process.

Froussios avatar Oct 30 '15 17:10 Froussios

Thanks for your explanation. There're several other errors as well but I don't recall for now. Will report when I see them again. Cheers.

huylv avatar Oct 30 '15 17:10 huylv

I ran the code against older versions of RxJava. It appears that the new behaviour is for 1.0.10 and newer.

Froussios avatar Oct 30 '15 23:10 Froussios