paho.mqtt.java icon indicating copy to clipboard operation
paho.mqtt.java copied to clipboard

Connection lost (32109) - java.io.EOFException

Open llakhch opened this issue 6 years ago • 13 comments

library: <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 1.2.1

issue: never reconnected in spite of AutomaticReconnect true.

2019-05-16 14:13:41,748 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:19) - main cause :: Connection lost (32109) - java.io.EOFException at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:189) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:136) ... 1 more 2019-05-16 14:13:41,769 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:35) - error code mqttexception :: 32109 ,whoHasCalledThis :: connectionLost

properties :

mqtt.connectiontimeout=60 mqtt.keepaliveinterval=30 mqtt.qos=0 mqtt.cleanSession=true mqtt.msg.retained=false mqtt.maxInflight=10000000 mqtt.ssl.enabled=true mqtt.ssl.url=ssl://xxxxx.clearblade.com mqtt.ssl.port=1884 mqtt.ssl.protocol=TLSv1.2 re.auth.code.list=4 conOpt.setAutomaticReconnect(true);

source code attached

`package com.publish.client;

import java.util.Arrays; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;

import org.apache.log4j.Logger; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;

import com.publish.dto.ResponseDto; import com.publish.main.PublisherApp; import com.publish.service.ExternalApiService; import com.publish.util.ExceptionAssistService;

@Service public class MqttAsyncPublisher implements MqttCallback, IMqttActionListener, MqttCallbackExtended {

private static Logger logger = Logger.getLogger(MqttAsyncPublisher.class);

@Autowired
private ExternalApiService externalApiService;

@Autowired
private ExceptionAssistService exceptionAssistService;

private static final String EMPTY = "empty";
private static final String MQTT_IS_COMM_DEVICE_FORMAT_ENABLED = "mqtt.isCommDeviceFormatEnabled";
private static final String MQTT_IS_COMM_DEVICE_ERROR_ENABLED = "mqtt.isCommDeviceErrorEnabled";

private static final String MQTT_IS_MQTT_OPERATING = "mqtt.isMqttOperating";
private static final String MQTT_QOS = "mqtt.qos";
private static final String ENCODING = "UTF-8";
private static final String ERROR_CODE_LIST = "re.auth.code.list";
private static final String MQTT_PASSWORD = "mqtt.password";
private static final String MQTT_CONNECTIONTIMEOUT = "mqtt.connectiontimeout";
private static final String MQTT_KEEPALIVEINTERVAL = "mqtt.keepaliveinterval";
private static final String MQTT_CLEAN_SESSION = "mqtt.cleanSession";
private static final String MQTT_MAX_INFLIGHT = "mqtt.maxInflight";
private static final String MQTT_MSG_RETAINED = "mqtt.msg.retained";
private static final String IS_TLS_ENABLED = "mqtt.ssl.enabled";
private static final String TLS_PROTOCOL = "mqtt.ssl.protocol";
private static final String MQTT_TLS_PORT = "mqtt.ssl.port";
private static final String MQTT_TLS_URL = "mqtt.ssl.url";
private static final String PUBLISHER_NAME = "publisher.name";

private static final String MQTT_PORT = "mqtt.port";
private static final String MQTT_URL = "mqtt.url";

private ResponseDto responseDto = null;
private MqttConnectOptions conOpt = null;
private MqttAsyncClient client = null;
private IMqttToken connectToken;
private String[] errorCodeList = {};
private int mqttMaxInflight = 0;
private boolean isMqttOperating = false;
private boolean isCommDeviceFormatEnabled = false;
private boolean isCommDeviceErrorEnabled = false;
private int qos = 0;
private boolean msgRetained = false;
private static AtomicInteger atomicMessageCount = new AtomicInteger(0);

@PostConstruct
private void init() {
	logger.info("Initializing mqtt connection for broker");
	try {
		responseDto = externalApiService.getAuthFromClearBlade();
		if (responseDto == null) {
			logger.error("Not a valid auth.");
			return;
		}
		qos = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_QOS));
		msgRetained = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_MSG_RETAINED));
		isMqttOperating = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_MQTT_OPERATING));
		isCommDeviceFormatEnabled = Boolean
				.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_FORMAT_ENABLED));
		isCommDeviceErrorEnabled = Boolean
				.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_ERROR_ENABLED));
		errorCodeList = PublisherApp.prop.getProperty(ERROR_CODE_LIST).split("##");
		mqttMaxInflight = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_MAX_INFLIGHT));
		conOpt = new MqttConnectOptions();
		conOpt.setUserName(responseDto.getUserToken());
		conOpt.setPassword(PublisherApp.prop.getProperty(MQTT_PASSWORD).toCharArray());
		conOpt.setConnectionTimeout(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_CONNECTIONTIMEOUT)));
		conOpt.setKeepAliveInterval(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_KEEPALIVEINTERVAL)));
		conOpt.setCleanSession(Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_CLEAN_SESSION)));
		conOpt.setMaxInflight(mqttMaxInflight);
		conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
		conOpt.setAutomaticReconnect(true);
		if (Boolean.parseBoolean(PublisherApp.prop.getProperty(IS_TLS_ENABLED))) {
			logger.info("isTLSEnabled :: true");
			SSLContext sslContext = SSLContext.getInstance(PublisherApp.prop.getProperty(TLS_PROTOCOL));
			sslContext.init(null, null, new java.security.SecureRandom());
			SSLSocketFactory socketFactory = sslContext.getSocketFactory();
			conOpt.setSocketFactory(socketFactory);
			client = new MqttAsyncClient(
					PublisherApp.prop.getProperty(MQTT_TLS_URL) + ":"
							+ PublisherApp.prop.getProperty(MQTT_TLS_PORT),
					PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
		} else {
			logger.info("isTLSEnabled :: false");
			client = new MqttAsyncClient(
					PublisherApp.prop.getProperty(MQTT_URL) + ":" + PublisherApp.prop.getProperty(MQTT_PORT),
					PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
		}
		logger.info("Connection opt ::" + conOpt.toString());
		client.setCallback(this);
		connectToken = client.connect(conOpt, null, this);
	} catch (Exception ex) {
		logger.error("init :: ", ex);
	}
}

@PreDestroy
private void destory() {
	try {
		if (isConnected()) {
			client.disconnect().waitForCompletion();
		}
	} catch (Exception ex) {
		logger.error("destory exception :: ", ex);
	} finally {
		try {
			client.close();
		} catch (MqttException e) {
			logger.error("destory final block exception :: ", e);
		}
	}
}

@Override
public void connectionLost(Throwable cause) {
	try {
		if (exceptionAssistService.exceptionStrategy("connectionLost", cause, errorCodeList))
			responseDto = externalApiService.getAuthFromClearBlade();
		if (responseDto == null) {
			logger.error("Not a valid auth.");
			return;
		}
		conOpt.setUserName(responseDto.getUserToken());
		logger.error("Connection opt at connectionLost ::" + conOpt.toString());
	} catch (Exception ex) {
		logger.error("connectionLost at connectionLost :: ", ex);
	}
}

@Override
public void onSuccess(IMqttToken asyncActionToken) {
	try {
		if (asyncActionToken != null && asyncActionToken.equals(connectToken))
			logger.info("client connected successfully");
	} catch (Exception ex) {
		logger.error("onSuccess :: ", ex);
	}
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
	if (exceptionAssistService.exceptionStrategy("onFailure", exception, errorCodeList))
		responseDto = externalApiService.getAuthFromClearBlade();
}

public void publishMessage(String topic, String message) {
	try {
		if (isConnected()) {
			client.getInFlightMessageCount();
			atomicMessageCount.getAndIncrement();
			if (mqttMaxInflight * 90 / 100 < client.getInFlightMessageCount())
				logger.error("message_count::" + atomicMessageCount.get() + " inflight_count :: "
						+ client.getInFlightMessageCount());
			client.publish(topic, message.getBytes(ENCODING), qos, msgRetained);
			return;
		} else {
			logger.error("client is not connected");
		}
	} catch (Exception ex) {
		logger.error("publishMessage :: ", ex);
	}
	return;
}

private boolean isConnected() {
	return (client != null) && (client.isConnected());
}	

public String getAccountId() {
	if (responseDto != null)
		return responseDto.getAccountId();
	return EMPTY;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
	logger.info("not needed for this client");
	return;
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
	// dont do anything as of now not needed.
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
	logger.error("reconnect called :: " + reconnect + " ,is connected :: " + client.isConnected());
}

@Override
public String toString() {
	return responseDto.toString() + ", clientConnected ? " + client.isConnected() + ", errorCodeList="
			+ Arrays.toString(errorCodeList) + ", mqttMaxInflight=" + mqttMaxInflight + ", msgInflightCount="
			+ client.getInFlightMessageCount() + ", publishedMessageCount=" + atomicMessageCount.get()
			+ ", current time=" + new Date() + ", isMqttOperating=" + isMqttOperating + ", qos=" + qos
			+ ", isCommDeviceFormatEnabled=" + isCommDeviceFormatEnabled + ", isCommDeviceErrorEnabled="
			+ isCommDeviceErrorEnabled + conOpt.toString();
}

} `

llakhch avatar May 19 '19 04:05 llakhch

team this is production issue, any update would be appreciated.

llakhch avatar May 20 '19 16:05 llakhch

hi team , i am also facing the same issue..

Sujitrai88 avatar May 21 '19 06:05 Sujitrai88

I get the same error.The callback gets called but client does not reconnect and paho just seem to die.

ArunSandhu avatar May 21 '19 07:05 ArunSandhu

Hi same error here! paho does not reconnect and just stop consuming!

siavashsoleymani avatar Jun 27 '19 17:06 siavashsoleymani

facing the same issue we decided to reconnect manually in the connectionLost callback.

damaddin avatar Jul 10 '19 19:07 damaddin

Screen Shot 2019-08-16 at 12 14 35 AM

vanrin avatar Aug 15 '19 17:08 vanrin

I get the same error. Now I must call connect again in the connection fail.

nguyenvanquan7826 avatar Oct 05 '19 02:10 nguyenvanquan7826

I am got this problem in a high-concurrency consumer program with paho.mqtt.java version 1.2.2

org.eclipse.paho.client.mqttv3.MqttException: Connection lost at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190) [org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) ~[?:1.8.0_202] at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?]

I think it's a bug! I I modified the source code org.eclipse.paho.client.mqttv3.internal.CommsReceiver.java and resolved this problem as follows:

original source code in v1.2.2: image

modified source code base on v1.2.2: image

penggle avatar Dec 06 '19 06:12 penggle

Same issue here. Solution provided by @penggle works just fine. Thanks !!

dsambugaro avatar Feb 04 '20 12:02 dsambugaro

any update on this issue ?

vivek11111994 avatar Mar 18 '20 14:03 vivek11111994

I have the same issue. I cannot subscribe to a topic after connection, and the application fails with:

2020-04-03 14:24:22.078 21100-21100/appname W/System.err: Connection lost (32109) - java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at java.lang.Thread.run(Thread.java:764)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err: Caused by: java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at java.io.DataInputStream.readByte(DataInputStream.java:270)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err: 	... 1 more

lcarnevale avatar Apr 03 '20 12:04 lcarnevale

@penggle Thanks for your suggested code change. Synchronizing in.readMqttWireMessage() may not be a good idea because in.readMqttWireMessage() can take significant amount of time. You are making the process blocking.

I think we need to revisit readMqttWireMessage().

@lcarnevale @penggle Do you have a simple test case to reproduce this issue?

rdasgupt avatar Apr 03 '20 15:04 rdasgupt

Hi there ( @rdasgupt , @lcarnevale , @penggle , @icraggs ),

as I am facing the same issue in a Testcontainers-setup, I published code for testing in the following REPO:

  • https://github.com/dbraun1991/mqtt-test

Please follow the readme.

Best wishes ;-)

dbraun1991 avatar Jan 26 '24 14:01 dbraun1991