Persistent out messages
It looks like messages that are published while there was no connection to the broker are stored in memory only. I.e. not persisted to the disk. If the paho client happens to restart (or the computer it runs on does), these will be lost.
I feel like if this was an issue, it would be already implemented. Am I the only one with a use case for persistent storage, or am I missing something?
Thanks!
Well you are not the only one with this use case. We had a similar use case and we developed a mongoDB based backend to store the messages and delete them from mongo only when we receive an ack from Broker (using QOS1).
I do wonder if we should add persistent memory storage at library level itself, or should we create a new library, that uses MQTT library as a sub library to do this.
Makes sense to me that we would add persistent storage at the library level (I believe the Java client already does this). I wonder if there is interest in a PR for this project that does that?
@sfphh4 - when do you delete the message from your MongoDb backend? on_publish() seems like a good candidate, but the mid passed is not a great identifier (Since it starts over on every connection)
here is the algo we have used
- wrapper library publish function will simply insert the MQTTMessage object into a python Queue object, say Queue1
- another thread (blocking at the Queue1) will take that object and insert it into mongoDB and write the MQTTMessage object, with ObjectID of MongoDB into another Queue, say Queue2
- another thread, block on Queue2, upon getting the message, checks if mqtt client is in a connected state (see https://github.com/eclipse/paho.mqtt.python/blob/9782ab81fe7ee3a05e74c7f3e1d03d5611ea4be4/src/paho/mqtt/client.py#L1337), and then checks the number of messages in flight (see https://github.com/eclipse/paho.mqtt.python/blob/9782ab81fe7ee3a05e74c7f3e1d03d5611ea4be4/src/paho/mqtt/client.py#L1590). If you are connected and the number of messages in air are less than you acceptable threshold, then transmit the message, and pass MQTTMessage, ObjectID and MQTTMessageInfo to another queue, say Queue3
- another thread, blocking on Queue3, will wait till the message is delivered, and then upon delivery, delete the message
Which QOS are you looking to work with? 1 or 2? If you application requires 2, then you will need to write a lot of code to cover the many edge cases that may arise, to avoid repeat of message transmission.
I only have to worry about QOS 1. I ended up with this scenario
- prior to publishing, store the message with key=mid
- in
on_publish, remove the message using themidargument in the callback - in
on_connectcheck if this is the first connect, or reconnects. If it's the first connect, then paho has been restarted so send any messages in the persistent storage (and remove them right after)
@ttichy I'm curious about your solution. You say that you store the message with key=mid prior to publishing. But isn't mid available only after you call publish()?
With QoS=0, publish() may actually call on_publish before returning, making it difficult to store the message with its mid and then remove it in on_publish. I have some ideas for a workaround, but I'm curious how other people solved this.
here is the algo we have used
- wrapper library publish function will simply insert the MQTTMessage object into a python Queue object, say Queue1
- another thread (blocking at the Queue1) will take that object and insert it into mongoDB and write the MQTTMessage object, with ObjectID of MongoDB into another Queue, say Queue2
- another thread, block on Queue2, upon getting the message, checks if mqtt client is in a connected state (see https://github.com/eclipse/paho.mqtt.python/blob/9782ab81fe7ee3a05e74c7f3e1d03d5611ea4be4/src/paho/mqtt/client.py#L1337 ), and then checks the number of messages in flight (see https://github.com/eclipse/paho.mqtt.python/blob/9782ab81fe7ee3a05e74c7f3e1d03d5611ea4be4/src/paho/mqtt/client.py#L1590 ). If you are connected and the number of messages in air are less than you acceptable threshold, then transmit the message, and pass MQTTMessage, ObjectID and MQTTMessageInfo to another queue, say Queue3
- another thread, blocking on Queue3, will wait till the message is delivered, and then upon delivery, delete the message
Which QOS are you looking to work with? 1 or 2? If you application requires 2, then you will need to write a lot of code to cover the many edge cases that may arise, to avoid repeat of message transmission.
Why not publish this solution to the community?
@ttichy I'm curious about your solution. You say that you store the message with key=mid prior to publishing. But isn't mid available only after you call
publish()? With QoS=0,publish()may actually callon_publishbefore returning, making it difficult to store the message with itsmidand then remove it inon_publish. I have some ideas for a workaround, but I'm curious how other people solved this.
Hey @tibboh - sorry I just came across your question. Alas, I don't work on that project anymore and I can't really remember what we ended up doing.