websocket automatically closed
I just using the demo to connect our websocket url with the authority passcode and login . and I connect successfully,but after few seconds the websocket will automatically closed . the loog are here:
09-04 09:49:40.681 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: onWebsocketHandshakeReceivedAsClient with response: 101 Switching Protocols 09-04 09:49:40.681 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: onOpen with handshakeData: 101 Switching Protocols 09-04 09:49:40.682 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Emit lifecycle event: OPENED 09-04 09:49:40.683 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Send STOMP message: CONNECT version:1.1,1.0 passcode:wxa30d083338bc417a_oCrgEuE0GKW7yVm5CSCDjS2O5LAs___wechat login:wxa30d083338bc417a_oCrgEuE0GKW7yVm5CSCDjS2O5LAs
��
09-04 09:49:40.685 31681-31681/ua.naiksoftware.stompclientexample I/websocket: Stomp connection opened 09-04 09:49:40.835 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: onMessage: 09-04 09:49:40.835 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Emit STOMP message: 09-04 09:49:40.836 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: onMessage: CONNECTED session:session-ax9Hzqn4VS_K_yejTmR7-g heart-beat:0,0 server:RabbitMQ/3.5.4 version:1.0
��
09-04 09:49:40.837 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Emit STOMP message: CONNECTED session:session-ax9Hzqn4VS_K_yejTmR7-g heart-beat:0,0 server:RabbitMQ/3.5.4 version:1.0
��
09-04 09:49:40.842 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Send STOMP message: SUBSCRIBE id:611d1293-17f8-4f67-ae8f-11775ff841c1 destination:/topic/app.1 ack:auto
��
09-04 09:50:40.733 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: onClose: code=1006 reason= remote=true 09-04 09:50:40.733 31681-31889/ua.naiksoftware.stompclientexample D/WebSocketsConnectionProvider: Emit lifecycle event: CLOSED 09-04 09:50:40.736 31681-31681/ua.naiksoftware.stompclientexample I/websocket: Stomp connection closed 09-04 10:00:19.379 31681-31681/ua.naiksoftware.stompclientexample E/websocket: Error send REST echo java.net.SocketTimeoutException: failed to connect to /10.0.2.2 (port 8080) after 10000ms at libcore.io.IoBridge.connectErrno(IoBridge.java:174) at libcore.io.IoBridge.connect(IoBridge.java:127) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:457) at java.net.Socket.connect(Socket.java:959) at okhttp3.internal.platform.AndroidPlatform.connectSocket(AndroidPlatform.java:63) at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:223) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:149) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:192) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at okhttp3.RealCall.execute(RealCall.java:69) at retrofit2.OkHttpCall.execute(OkHttpCall.java:180) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41) at io.reactivex.Observable.subscribe(Observable.java:10901) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10901) at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29) at io.reactivex.Flowable.subscribe(Flowable.java:12995) at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:32) at io.reactivex.Flowable.subscribe(Flowable.java:12995) at io.reactivex.internal.operators.flowable.FlowableUnsubscribeOn.subscribeActual(FlowableUnsubscribeOn.java:33) at io.reactivex.Flowable.subscribe(Flowable.java:12995) at io.reactivex.Flowable.subscribe(Flowable.java:12941) at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) at java.lang.Thread.run(Thread.java:833)
Are you certain that you really aren't facing a broken socket for some other reason?
thanks for your reply,I fix the problem at last ,some problem occured in the server, and I suggest the author that add the heart-beat on the project, after I take a look in the stomp protocal document about the format of the heart-beat header, I just know the format is ("heart-beat","1000,1000"),the former 1000 represent the heart-beat which your client sent,and the later 1000 represents the heart-beat which your server sent, I suugest the heart-beat set(0,10000)
Oh, so it was a heartbeat mismatch, and the fault was in the server software. Got it. 👍
hello my friend.
I have some questions about the source code, it really confuses me a lot .
About the Flowable I write a demo to show how it work.
`private void start() { Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e)
throws Exception
{
Log.d("rxman",e.requested()+"");
e.onNext(0);
boolean flag;
for(int i=0;;i++){
flag=false;
while (e.requested()==0){
if(!flag){
flag=true;
Log.d("rxman","ends");
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d("rxman", "onSubscribe");
// s.request(200);
}
@Override
public void onNext(Integer integer) {
Log.d("rxman", "onnext"+integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("rxman", "oncomplete");
}
});
}`
The demos show that only invoke method s.request(200) in the method of onSubscribe ,then the onnext will be invoked in the method of the subscribe. if delete the request,the onnext will not be invoked.
but when i read the source code,I find that there is no request.but still could invoke the onnext. could you tell me the way you invoke it?
mStompClient.lifecycle() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(lifecycleEvent -> { switch (lifecycleEvent.getType()) { case OPENED: showToast("Stomp connection opened"); break; case ERROR: Log.e(TAG, "Stomp connection error", lifecycleEvent.getException()); showToast("Stomp connection error"); break; case CLOSED: Log.e(TAG, "Stomp connection closed"); getQueueInfo(); } });
public Flowable<LifecycleEvent> getLifecycleReceiver() { return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) .doOnCancel(() -> { synchronized (mLifecycleLock) { Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator(); while (iterator.hasNext()) { if (iterator.next().isCancelled()) iterator.remove(); } } }); }
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) { synchronized (mLifecycleLock) { Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) { emitter.onNext(lifecycleEvent); } } }
Thanks very much in advance.
Sorry, I need to make this readable for myself and anyone else first.
Your demo:
private void start() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
Log.d("rxman", e.requested() + "");
e.onNext(0);
boolean flag;
for (int i = 0; /* No end condition? */ ; i++) {
flag = false;
while (e.requested() == 0) {
if (!flag) {
flag = true;
Log.d("rxman", "ends");
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer> () {
@Override
public void onSubscribe(Subscription s) {
Log.d("rxman", "onSubscribe");
// s.request(200);
}
@Override
public void onNext(Integer integer) {
Log.d("rxman", "onnext" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("rxman", "oncomplete");
}
});
}
Your reference to the example app:
mStompClient.lifecycle()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(lifecycleEvent - > {
switch (lifecycleEvent.getType()) {
case OPENED:
toast("Stomp connection opened");
break;
case ERROR:
Log.e(TAG, "Stomp connection error", lifecycleEvent.getException());
toast("Stomp connection error");
break;
case CLOSED:
toast("Stomp connection closed");
}
});
Your reference to OkHttpConnectionProvider.getLifecycleReceiver:
@Override
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() - > {
synchronized(mLifecycleLock) {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
}
});
}
Your reference to OkHttpConnectionProvider.emitLifecycleEvent:
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
synchronized(mLifecycleLock) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
subscriber.onNext(lifecycleEvent);
}
}
}
There. Now we can begin to approach your question.
It seems like you're trying to learn RxJava. Instead of trying to learn it from this library, I'd recommend to look at a couple articles or tutorials on the subject. Reactive Programming isn't something you can just pick up by reading example code; it's a serious paradigm shift.
You seem like you already get the general idea, so I think you should move forward by reading this and then this.
If you start having trouble wrapping your head around the Rx concept, read through the Intro to Rx. It's widely known to describe the idea incredibly well, but keep in mind that it was written for RxJava 1.
Thanks very much for your advise about learning the rxjava,I will read it later. it will much help for me. for the question which I mentioned yesterday. I get the answer now. the codes:
my question is : where is the request? ` Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e)
throws Exception
{
Log.d("rxman",e.requested()+"");
e.onNext(0);
*//* boolean flag;
for(int i=0;;i++){
flag=false;
while (e.requested()==0){
if(!flag){
flag=true;
Log.d("rxman","end");
}
}
e.onNext(i);
}*//*
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d("rxman", "onSubscribe");
s.request(200);
}
@Override
public void onNext(Integer integer) {
Log.d("rxman", "onnext"+integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("rxman", "oncomplete");
}
});`
the answer is : `Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e)
throws Exception
{
Log.d("rxman",e.requested()+"");
e.onNext(0);
boolean flag;
for(int i=0;;i++){
flag=false;
while (e.requested()==0){
if(!flag){
flag=true;
Log.d("rxman","end");
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer)
throws Exception
{
Log.d("rxman",integer+"");
}
});`
and if you are using the lambda, the code will be like that: ` Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e)
throws Exception
{
Log.d("rxman",e.requested()+"");
e.onNext(0);
boolean flag;
for(int i=0;;i++){
flag=false;
while (e.requested()==0){
if(!flag){
flag=true;
Log.d("rxman","end");
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(e ->Log.d("rxman",e+""));`
it will be automatically considered as request(Interger.MAX_VALUE) if using the Consumer instead of the Subscriber.
and I will read the paper which you offered to me , thanks again my friend.
No problem. If that's all you need, feel free to close the issue. :)
@NaikSoftware Hey, can you give me permissions to close issues? It would make things a little easier. 😄