aiomqtt icon indicating copy to clipboard operation
aiomqtt copied to clipboard

Reconnection and shared connections after exception

Open bobthemighty opened this issue 9 months ago • 1 comments

I'm looking for some input on how to handle reconnection in the presence of error with a shared Client.

I recently updated our code to share a single client connection across two coroutines, and to connect in an outer loop. Since then I have had highly unstable connections. After a disconnect, the client is locked into a failed state until I restart the process. I have deployed a hotfix to create a new client on every loop, but I'm interested to understand if this behaviour is known and intentional.

The following code reproduces on AWS IoT. When I send a "FAIL" message to the topic, we inject an MqttError onto the disconnected event, reproducing what I see in production. After the fault, the loop does not recover. Each time the client tries to subscribe, I get a "status code 128" error and a reconnection.

This code does not reproduce the issue on a local mosquitto instance.

import asyncio
import aiomqtt
import random
import ssl

CA_CERT=..."
CLIENT_CERT="..."
CLIENT_KEY="..."
TOPIC = "..."

MQTT_HOST = "....iot.eu-west-2.amazonaws.com"

def get_tls_context(
    ca_cert: str, client_cert: str, private_key: str, port: int
) -> ssl.SSLContext:
    tls_context = ssl.create_default_context()
    tls_context.load_verify_locations(cafile=ca_cert)
    tls_context.load_cert_chain(client_cert, private_key)
    if port == 443:
        tls_context.set_alpn_protocols(["x-amzn-mqtt-ca"])
    return tls_context


async def main():
    client = aiomqtt.Client(MQTT_HOST, port=8883, tls_context=get_tls_context(CA_CERT, CLIENT_CERT, CLIENT_KEY, 8883))
    interval = 5

    while True:
        try:
            async with client:
                await client._connected
                await client.subscribe("bal-bridge")
                print("Subscribed")
                async for message in client.messages:
                    payload = message.payload.decode()
                    print("Received:", payload)
                    if payload == "FAIL":
                        client._disconnected.set_exception(aiomqtt.MqttError())

        except aiomqtt.MqttError as e:
            print(f"Connection lost: {e}. Reconnecting in {interval}s")
            await asyncio.sleep(interval)


asyncio.run(main())

When I send a message to the topic, it's received, and I then trigger a connection failure, but the Client does not recover.

Subscribed
Received: hello bob
Received: hello bob
Received: FAIL
Connection lost: . Reconnecting in 5s
Connection lost: [code:128] Unspecified error. Reconnecting in 5s
Connection lost: [code:128] Unspecified error. Reconnecting in 5s
Connection lost: [code:128] Unspecified error. Reconnecting in 5s
Connection lost: [code:128] Unspecified error. Reconnecting in 5s

Interestingly, if I look at the logs on the IoT core side, I see that the resubscription is consistently logged before the connect event.

{
    "timestamp": "2025-07-19 09:04:32.574",
    "logLevel": "INFO",
    "traceId": "8d89560b-c950-4f2c-375b-2b4f60478f62",
    "status": "Success",
    "eventType": "Disconnect",
    "protocol": "MQTT",
    "clientId": "$GEN/10ccfa7b-1660-483f-b51a-a4bb1c9f029d",
    "principalId": "3c07f9c98baf745b9c2c831790197f1ddf8e97f3f9b7ec5b6a71d95ddfd2a052",
    "disconnectReason": "CONNECTION_LOST"
}

{
    "timestamp": "2025-07-19 09:04:32.700",
    "logLevel": "ERROR",
    "traceId": "b2b91aa9-7b36-3614-e7bb-c2a47d76221d",
    "status": "Failure",
    "eventType": "Subscribe",
    "protocol": "MQTT",
    "clientId": "$GEN/eb0afc61-ba8a-4616-a5c4-b121be1d9c8f",
    "principalId": "3c07f9c98baf745b9c2c831790197f1ddf8e97f3f9b7ec5b6a71d95ddfd2a052",
}

{
    "timestamp": "2025-07-19 09:04:32.753",
    "logLevel": "INFO",
    "traceId": "c3db5b50-2a0c-fccb-01e4-2a6f0d9ddf56",
    "status": "Success",
    "eventType": "Connect",
    "protocol": "MQTT",
    "clientId": "$GEN/eb0afc61-ba8a-4616-a5c4-b121be1d9c8f",
    "principalId": "3c07f9c98baf745b9c2c831790197f1ddf8e97f3f9b7ec5b6a71d95ddfd2a052",
}

bobthemighty avatar Jul 19 '25 09:07 bobthemighty

Interesting, thanks for the detailed report!

Given that the problem doesn't reproduce on mosquitto and your example being simple enough, I assume the problem is on AWS IoT side. For reference, we have another issue on problems with AWS IoT which is also about session management.

On the mismatch in the order of the connect and subscribe events in the logs: Inside aiomqtt's __aenter__ we're waiting for the ConnAck packet before returning, It shouldn't be possible for the Subscribe packet to arrive at the broker before the Connect packet.

What's interesting is that it seems like we actually get a successful ConnAck back, otherwise aiomqtt would fail directly on the async with client: line. So it's the "duplicate" Subscribe packet that makes the problems.

Getting a code 128 from the broker also doesn't really help, the spec actually defines this as an "Unspecified error" 😄

I do wonder, though, why creating a new client works. Could you try passing a fixed string to the client's identifier argument in the working case to see if that fails? That would point to some kind of session management issue inside AWS IoT.

empicano avatar Aug 04 '25 16:08 empicano