Connection lost (32109) - java.io.EOFException
library:
issue: never reconnected in spite of AutomaticReconnect true.
2019-05-16 14:13:41,748 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:19) - main cause :: Connection lost (32109) - java.io.EOFException at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:189) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:136) ... 1 more 2019-05-16 14:13:41,769 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:35) - error code mqttexception :: 32109 ,whoHasCalledThis :: connectionLost
properties :
mqtt.connectiontimeout=60 mqtt.keepaliveinterval=30 mqtt.qos=0 mqtt.cleanSession=true mqtt.msg.retained=false mqtt.maxInflight=10000000 mqtt.ssl.enabled=true mqtt.ssl.url=ssl://xxxxx.clearblade.com mqtt.ssl.port=1884 mqtt.ssl.protocol=TLSv1.2 re.auth.code.list=4 conOpt.setAutomaticReconnect(true);
source code attached
`package com.publish.client;
import java.util.Arrays; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;
import org.apache.log4j.Logger; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import com.publish.dto.ResponseDto; import com.publish.main.PublisherApp; import com.publish.service.ExternalApiService; import com.publish.util.ExceptionAssistService;
@Service public class MqttAsyncPublisher implements MqttCallback, IMqttActionListener, MqttCallbackExtended {
private static Logger logger = Logger.getLogger(MqttAsyncPublisher.class);
@Autowired
private ExternalApiService externalApiService;
@Autowired
private ExceptionAssistService exceptionAssistService;
private static final String EMPTY = "empty";
private static final String MQTT_IS_COMM_DEVICE_FORMAT_ENABLED = "mqtt.isCommDeviceFormatEnabled";
private static final String MQTT_IS_COMM_DEVICE_ERROR_ENABLED = "mqtt.isCommDeviceErrorEnabled";
private static final String MQTT_IS_MQTT_OPERATING = "mqtt.isMqttOperating";
private static final String MQTT_QOS = "mqtt.qos";
private static final String ENCODING = "UTF-8";
private static final String ERROR_CODE_LIST = "re.auth.code.list";
private static final String MQTT_PASSWORD = "mqtt.password";
private static final String MQTT_CONNECTIONTIMEOUT = "mqtt.connectiontimeout";
private static final String MQTT_KEEPALIVEINTERVAL = "mqtt.keepaliveinterval";
private static final String MQTT_CLEAN_SESSION = "mqtt.cleanSession";
private static final String MQTT_MAX_INFLIGHT = "mqtt.maxInflight";
private static final String MQTT_MSG_RETAINED = "mqtt.msg.retained";
private static final String IS_TLS_ENABLED = "mqtt.ssl.enabled";
private static final String TLS_PROTOCOL = "mqtt.ssl.protocol";
private static final String MQTT_TLS_PORT = "mqtt.ssl.port";
private static final String MQTT_TLS_URL = "mqtt.ssl.url";
private static final String PUBLISHER_NAME = "publisher.name";
private static final String MQTT_PORT = "mqtt.port";
private static final String MQTT_URL = "mqtt.url";
private ResponseDto responseDto = null;
private MqttConnectOptions conOpt = null;
private MqttAsyncClient client = null;
private IMqttToken connectToken;
private String[] errorCodeList = {};
private int mqttMaxInflight = 0;
private boolean isMqttOperating = false;
private boolean isCommDeviceFormatEnabled = false;
private boolean isCommDeviceErrorEnabled = false;
private int qos = 0;
private boolean msgRetained = false;
private static AtomicInteger atomicMessageCount = new AtomicInteger(0);
@PostConstruct
private void init() {
logger.info("Initializing mqtt connection for broker");
try {
responseDto = externalApiService.getAuthFromClearBlade();
if (responseDto == null) {
logger.error("Not a valid auth.");
return;
}
qos = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_QOS));
msgRetained = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_MSG_RETAINED));
isMqttOperating = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_MQTT_OPERATING));
isCommDeviceFormatEnabled = Boolean
.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_FORMAT_ENABLED));
isCommDeviceErrorEnabled = Boolean
.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_ERROR_ENABLED));
errorCodeList = PublisherApp.prop.getProperty(ERROR_CODE_LIST).split("##");
mqttMaxInflight = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_MAX_INFLIGHT));
conOpt = new MqttConnectOptions();
conOpt.setUserName(responseDto.getUserToken());
conOpt.setPassword(PublisherApp.prop.getProperty(MQTT_PASSWORD).toCharArray());
conOpt.setConnectionTimeout(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_CONNECTIONTIMEOUT)));
conOpt.setKeepAliveInterval(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_KEEPALIVEINTERVAL)));
conOpt.setCleanSession(Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_CLEAN_SESSION)));
conOpt.setMaxInflight(mqttMaxInflight);
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
conOpt.setAutomaticReconnect(true);
if (Boolean.parseBoolean(PublisherApp.prop.getProperty(IS_TLS_ENABLED))) {
logger.info("isTLSEnabled :: true");
SSLContext sslContext = SSLContext.getInstance(PublisherApp.prop.getProperty(TLS_PROTOCOL));
sslContext.init(null, null, new java.security.SecureRandom());
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
conOpt.setSocketFactory(socketFactory);
client = new MqttAsyncClient(
PublisherApp.prop.getProperty(MQTT_TLS_URL) + ":"
+ PublisherApp.prop.getProperty(MQTT_TLS_PORT),
PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
} else {
logger.info("isTLSEnabled :: false");
client = new MqttAsyncClient(
PublisherApp.prop.getProperty(MQTT_URL) + ":" + PublisherApp.prop.getProperty(MQTT_PORT),
PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
}
logger.info("Connection opt ::" + conOpt.toString());
client.setCallback(this);
connectToken = client.connect(conOpt, null, this);
} catch (Exception ex) {
logger.error("init :: ", ex);
}
}
@PreDestroy
private void destory() {
try {
if (isConnected()) {
client.disconnect().waitForCompletion();
}
} catch (Exception ex) {
logger.error("destory exception :: ", ex);
} finally {
try {
client.close();
} catch (MqttException e) {
logger.error("destory final block exception :: ", e);
}
}
}
@Override
public void connectionLost(Throwable cause) {
try {
if (exceptionAssistService.exceptionStrategy("connectionLost", cause, errorCodeList))
responseDto = externalApiService.getAuthFromClearBlade();
if (responseDto == null) {
logger.error("Not a valid auth.");
return;
}
conOpt.setUserName(responseDto.getUserToken());
logger.error("Connection opt at connectionLost ::" + conOpt.toString());
} catch (Exception ex) {
logger.error("connectionLost at connectionLost :: ", ex);
}
}
@Override
public void onSuccess(IMqttToken asyncActionToken) {
try {
if (asyncActionToken != null && asyncActionToken.equals(connectToken))
logger.info("client connected successfully");
} catch (Exception ex) {
logger.error("onSuccess :: ", ex);
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
if (exceptionAssistService.exceptionStrategy("onFailure", exception, errorCodeList))
responseDto = externalApiService.getAuthFromClearBlade();
}
public void publishMessage(String topic, String message) {
try {
if (isConnected()) {
client.getInFlightMessageCount();
atomicMessageCount.getAndIncrement();
if (mqttMaxInflight * 90 / 100 < client.getInFlightMessageCount())
logger.error("message_count::" + atomicMessageCount.get() + " inflight_count :: "
+ client.getInFlightMessageCount());
client.publish(topic, message.getBytes(ENCODING), qos, msgRetained);
return;
} else {
logger.error("client is not connected");
}
} catch (Exception ex) {
logger.error("publishMessage :: ", ex);
}
return;
}
private boolean isConnected() {
return (client != null) && (client.isConnected());
}
public String getAccountId() {
if (responseDto != null)
return responseDto.getAccountId();
return EMPTY;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("not needed for this client");
return;
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// dont do anything as of now not needed.
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.error("reconnect called :: " + reconnect + " ,is connected :: " + client.isConnected());
}
@Override
public String toString() {
return responseDto.toString() + ", clientConnected ? " + client.isConnected() + ", errorCodeList="
+ Arrays.toString(errorCodeList) + ", mqttMaxInflight=" + mqttMaxInflight + ", msgInflightCount="
+ client.getInFlightMessageCount() + ", publishedMessageCount=" + atomicMessageCount.get()
+ ", current time=" + new Date() + ", isMqttOperating=" + isMqttOperating + ", qos=" + qos
+ ", isCommDeviceFormatEnabled=" + isCommDeviceFormatEnabled + ", isCommDeviceErrorEnabled="
+ isCommDeviceErrorEnabled + conOpt.toString();
}
} `
team this is production issue, any update would be appreciated.
hi team , i am also facing the same issue..
I get the same error.The callback gets called but client does not reconnect and paho just seem to die.
Hi same error here! paho does not reconnect and just stop consuming!
facing the same issue we decided to reconnect manually in the connectionLost callback.
I get the same error. Now I must call connect again in the connection fail.
I am got this problem in a high-concurrency consumer program with paho.mqtt.java version 1.2.2
org.eclipse.paho.client.mqttv3.MqttException: Connection lost at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190) [org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) ~[?:1.8.0_202] at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?]
I think it's a bug! I I modified the source code org.eclipse.paho.client.mqttv3.internal.CommsReceiver.java and resolved this problem as follows:
original source code in v1.2.2:

modified source code base on v1.2.2:

Same issue here. Solution provided by @penggle works just fine. Thanks !!
any update on this issue ?
I have the same issue. I cannot subscribe to a topic after connection, and the application fails with:
2020-04-03 14:24:22.078 21100-21100/appname W/System.err: Connection lost (32109) - java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: at java.lang.Thread.run(Thread.java:764)
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: Caused by: java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: at java.io.DataInputStream.readByte(DataInputStream.java:270)
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
2020-04-03 14:24:22.079 21100-21100/appname W/System.err: ... 1 more
@penggle Thanks for your suggested code change. Synchronizing in.readMqttWireMessage() may not be a good idea because in.readMqttWireMessage() can take significant amount of time. You are making the process blocking.
I think we need to revisit readMqttWireMessage().
@lcarnevale @penggle Do you have a simple test case to reproduce this issue?
Hi there ( @rdasgupt , @lcarnevale , @penggle , @icraggs ),
as I am facing the same issue in a Testcontainers-setup, I published code for testing in the following REPO:
- https://github.com/dbraun1991/mqtt-test
Please follow the readme.
Best wishes ;-)