paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

on_published() called before publish() returns

Open teslafields opened this issue 5 years ago • 2 comments

I noticed an awkward scenario, where I have a MQTT client instance publishing messages periodically to a localhost mosquitto broker. Turns out, that occurred that I received the publish confirmation before the publish() method returns. Here is the log:

[PRINT] 1586267171460 -> 6940
[LOG]   2020-04-07 10:46:11,461 INF [mqtt_clien:174] [bridge] --> mid=6940 topic/5e851e3895be4d00079b86f3/state [1665] {"time":{"$date":"2020-04-07T13:46:11.458Z"},"data":{"net_eth0_mac":"00:01:c0:26:13:df","net_dns2":"8.8.4.4","sys_uptime":263738,"sys_cpu_freq_avg":1200.0,"sys_mem_avail":522,"dckr_cntr0_cpu_avg":96.0,"net_eth1_gw":"0.0.0.0","dckr_img0_created":"2020-01-10T16:49:04","net_eth0_dhcp":false,"net_eth1_mac":"00:01:c0:26:13:e7","sys_cpu_usage_min":19.4,"dckr_cntr0_image":"topic/edge-agent:latest","sys_board_model":"IOTG-CIMX7-D1G-N16-E2-WAB-POE-PS-XL-V130-191016","net_eth1_ip":"","net_ping_rtt_min":-1,"dckr_cntr0_mem_min":333,"dckr_cntr0_name":"edge-agent","sys_mem_used_min":348,"net_eth1_nmask":"","dckr_img0_id":"0b8fe711a7","dckr_cntr0_mem_avg":369.0,"sys_disk_total":14136,"sys_mem_total":999,"net_eth0_ip":"192.168.0.10","net_ping_loss_max":0,"sys_cpu_freq_min":1200,"dckr_img0_name":"topic/edge-agent:latest","net_eth1_active":false,"dckr_img0_size":1224,"dckr_cntr0_cpu_min":0.0,"net_wlan0_rssi":74,"net_wlan0_ssid":"VIVO-1711","net_eth1_dhcp":false,"net_eth1_rx":0,"net_eth1_tx":0,"dckr_cntr0_mem_max":369,"sys_cpu_freq_max":1200,"net_wlan0_tx":61116,"net_eth0_tx":0,"sys_app_version":"0.2.2","net_ping_rtt_avg":45.06,"net_ping_loss_min":0,"sys_disk_used_max":5321,"dckr_cntr0_id":"345d3d8402","sys_cpu_usage_max":77.0,"net_eth0_gw":"0.0.0.0","net_dns1":"8.8.8.8","sys_cpu_usage_avg":34.75,"sys_mem_used_avg":413.0,"net_wlan0_mac":"40:06:a0:86:40:e1","net_ping_loss_avg":0.0,"sys_disk_used_min":5275,"dckr_cntr0_status":"running","net_wlan0_active":true,"sys_disk_used_avg":5321.0,"net_ping_rtt_max":1106.883,"net_wlan0_rx":61116,"net_eth0_active":true,"net_eth0_nmask":"255.255.255.0","sys_mem_used_max":413,"dckr_cntr0_cpu_max":189.47,"net_eth0_rx":0}}
[PRINT] 1586267171468 <- 6940
2020-04-07 10:46:11,473 INF [mqtt_clien:217] [bridge] mid=6940 published!
[PRINT] 1586267189673 -> 6941
[LOG]   2020-04-07 10:46:29,674 INF [mqtt_clien:174] [bridge] --> mid=6941 topic/5e851e3895be4d00079b86f3/state [1665] {"time":{"$date":"2020-04-07T13:46:29.672Z"},"data":{"net_eth0_mac":"00:01:c0:26:13:df","net_dns2":"8.8.4.4","sys_uptime":263752,"sys_cpu_freq_avg":1200.0,"sys_mem_avail":522,"dckr_cntr0_cpu_avg":70.27,"net_eth1_gw":"0.0.0.0","dckr_img0_created":"2020-01-10T16:49:04","net_eth0_dhcp":false,"net_eth1_mac":"00:01:c0:26:13:e7","sys_cpu_usage_min":19.4,"dckr_cntr0_image":"topic/edge-agent:latest","sys_board_model":"IOTG-CIMX7-D1G-N16-E2-WAB-POE-PS-XL-V130-191016","net_eth1_ip":"","net_ping_rtt_min":-1,"dckr_cntr0_mem_min":333,"dckr_cntr0_name":"edge-agent","sys_mem_used_min":348,"net_eth1_nmask":"","dckr_img0_id":"0b8fe711a7","dckr_cntr0_mem_avg":369.0,"sys_disk_total":14136,"sys_mem_total":999,"net_eth0_ip":"192.168.0.10","net_ping_loss_max":0,"sys_cpu_freq_min":1200,"dckr_img0_name":"topic/edge-agent:latest","net_eth1_active":false,"dckr_img0_size":1224,"dckr_cntr0_cpu_min":0.0,"net_wlan0_rssi":77,"net_wlan0_ssid":"VIVO-1711","net_eth1_dhcp":false,"net_eth1_rx":0,"net_eth1_tx":0,"dckr_cntr0_mem_max":369,"sys_cpu_freq_max":1200,"net_wlan0_tx":61119,"net_eth0_tx":0,"sys_app_version":"0.2.2","net_ping_rtt_avg":38.51,"net_ping_loss_min":0,"sys_disk_used_max":5321,"dckr_cntr0_id":"345d3d8402","sys_cpu_usage_max":77.0,"net_eth0_gw":"0.0.0.0","net_dns1":"8.8.8.8","sys_cpu_usage_avg":34.5,"sys_mem_used_avg":413.0,"net_wlan0_mac":"40:06:a0:86:40:e1","net_ping_loss_avg":0.0,"sys_disk_used_min":5275,"dckr_cntr0_status":"running","net_wlan0_active":true,"sys_disk_used_avg":5321.0,"net_ping_rtt_max":1106.883,"net_wlan0_rx":61119,"net_eth0_active":true,"net_eth0_nmask":"255.255.255.0","sys_mem_used_max":413,"dckr_cntr0_cpu_max":189.47,"net_eth0_rx":0}}
[PRINT] 1586267189676 <- 6941
[LOG]   2020-04-07 10:46:29,677 INF [mqtt_clien:217] [bridge] mid=6941 published!
[PRINT] 1586267213887 <- 6942
[LOG]   2020-04-07 10:46:53,888 INF [mqtt_clien:217] [bridge] mid=6942 published!
[PRINT] 1586267213889 -> 6942
[LOG]   2020-04-07 10:46:53,889 INF [mqtt_clien:174] [bridge] --> mid=6942 topic/5e851e3895be4d00079b86f3/state [1665] {"time":{"$date":"2020-04-07T13:46:53.880Z"},"data":{"net_eth0_mac":"00:01:c0:26:13:df","net_dns2":"8.8.4.4","sys_uptime":263780,"sys_cpu_freq_avg":1200.0,"sys_mem_avail":522,"dckr_cntr0_cpu_avg":72.84,"net_eth1_gw":"0.0.0.0","dckr_img0_created":"2020-01-10T16:49:04","net_eth0_dhcp":false,"net_eth1_mac":"00:01:c0:26:13:e7","sys_cpu_usage_min":19.4,"dckr_cntr0_image":"topic/edge-agent:latest","sys_board_model":"IOTG-CIMX7-D1G-N16-E2-WAB-POE-PS-XL-V130-191016","net_eth1_ip":"","net_ping_rtt_min":-1,"dckr_cntr0_mem_min":333,"dckr_cntr0_name":"edge-agent","sys_mem_used_min":348,"net_eth1_nmask":"","dckr_img0_id":"0b8fe711a7","dckr_cntr0_mem_avg":369.0,"sys_disk_total":14136,"sys_mem_total":999,"net_eth0_ip":"192.168.0.10","net_ping_loss_max":0,"sys_cpu_freq_min":1200,"dckr_img0_name":"topic/edge-agent:latest","net_eth1_active":false,"dckr_img0_size":1224,"dckr_cntr0_cpu_min":0.0,"net_wlan0_rssi":67,"net_wlan0_ssid":"VIVO-1711","net_eth1_dhcp":false,"net_eth1_rx":0,"net_eth1_tx":0,"dckr_cntr0_mem_max":369,"sys_cpu_freq_max":1200,"net_wlan0_tx":61124,"net_eth0_tx":0,"sys_app_version":"0.2.2","net_ping_rtt_avg":45.31,"net_ping_loss_min":0,"sys_disk_used_max":5321,"dckr_cntr0_id":"345d3d8402","sys_cpu_usage_max":77.0,"net_eth0_gw":"0.0.0.0","net_dns1":"8.8.8.8","sys_cpu_usage_avg":30.4,"sys_mem_used_avg":413.0,"net_wlan0_mac":"40:06:a0:86:40:e1","net_ping_loss_avg":0.0,"sys_disk_used_min":5275,"dckr_cntr0_status":"running","net_wlan0_active":true,"sys_disk_used_avg":5321.0,"net_ping_rtt_max":1106.883,"net_wlan0_rx":61124,"net_eth0_active":true,"net_eth0_nmask":"255.255.255.0","sys_mem_used_max":413,"dckr_cntr0_cpu_max":189.47,"net_eth0_rx":0}}

In this log, the prefix [PRINT] means that is a print() call from line numbers 172 and 214 (see code bellow). On the other hand, the [LOG] prefix means that was a logging.info() call from line numbers 173 and 217.

From my understanding, I was hoping that I would always see the print() first, followed by the logging.info() from lines 172 & 173, and after that, the print() followed by the logging.info() from the lines 214 & 217, like I do in mids 6940 and 6941. However, in the mid 6942, I have received the publishing confirmation before my publish() call returns. Looks like the cb_publish() was called before the publish() returns.

Here's my code (the class that contains these methods inherits from paho.mqtt.client.Client.):

165     def do_publish(self, message):
166         if not isinstance(message, Message):
167             return None
168         topic = message.topic
169         payload = json.dumps(message.payload, separators=(',', ':'))
170         p = self.publish(topic, payload, qos=message.qos, retain=self.retain)
171         self.__msg_pending[p.mid] = message
172         print(timemnow(),'->', p.mid, file=sys.stderr)                                                                                                                                                         
173         self.logger.info('--> mid=%d %s [%3d] %s' % (p.mid, topic, len(payload),
174                                                      payload))

213     def cb_publish(self, client, userdata, mid):
214         print(timemnow(), '<-', mid, file=sys.stderr)
215         if mid in self.__msg_pending:
216             del self.__msg_pending[mid]
217         self.logger.info('mid=%s published!' % mid)

version: paho-mqtt==1.5.0

In the mosquitto log, I can see that the message was correctly received and ack-ed.

Apr  7 10:46:11 cl-debian mosquitto[16045]: Received PUBLISH from core-app (d0, q1, r0, m6940, 'topic/5e851e3895be4d00079b86f3/state', ... (1665 bytes))
Apr  7 10:46:11 cl-debian mosquitto[16045]: Sending PUBACK to core-app (m6940, rc0)
Apr  7 10:46:29 cl-debian mosquitto[16045]: Received PUBLISH from core-app (d0, q1, r0, m6941, 'topic/5e851e3895be4d00079b86f3/state', ... (1665 bytes))
Apr  7 10:46:29 cl-debian mosquitto[16045]: Sending PUBACK to core-app (m6941, rc0)
Apr  7 10:46:53 cl-debian mosquitto[16045]: Received PUBLISH from core-app (d0, q1, r0, m6942, 'topic/5e851e3895be4d00079b86f3/state', ... (1665 bytes))
Apr  7 10:46:53 cl-debian mosquitto[16045]: Sending PUBACK to core-app (m6942, rc0)

I am running this on armhf and using pyinstaller to freeze my python code.

Any thoughts about it?

teslafields avatar Apr 07 '20 19:04 teslafields

I'm seeing the same behavior. It makes it quite difficult to build a client-side persistence layer (you don't know the mid until publish() returns, but in on_publish() you want to remove the message from persistent store by mid).

Ideally, one should be able to:

  • call publish()
  • write the message in the persistent store, keyed by mid
  • remove it by mid in on_publish

tibboh avatar Sep 08 '23 15:09 tibboh

This could be solved by allowing per-message callback argument in publish (which I've seen planned somewhere in the docs). Then we could:

  • generate a unique ID
  • store the message in the persistent store keyed by ID
  • pass the ID as a callback argument and delete from persistent store in the callback

tibboh avatar Sep 08 '23 15:09 tibboh