using async handler for message callback
I need to be able to use async/await in the paho on_message callback. My idea is to schedule a coroutine from the synchronous handler, but it doesn't work as I expect. The code is not executed.
import time
from asyncio import ensure_future, get_event_loop, sleep
from paho.mqtt import client as mqtt_client
_loop = get_event_loop()
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe("test")
client.subscribe("foo")
else:
print("Failed to connect, return code %d\n", rc)
# this code nevers runs for some reason
async def coro_handler(client, payload):
print(payload)
await sleep(3)
client.publish("foo", "async")
def test_handler(client, userdata, msg):
print(msg.payload)
client.publish("foo", "bar")
# I need to be able to run async code in the handler
# so I am trying something like this
ensure_future(coro_handler(client, msg.payload), loop=_loop)
def foo_handler(client, userdata, msg):
print(msg.payload)
client = mqtt_client.Client("paho_async_handler_test")
client.on_connect = on_connect
client.message_callback_add("test", test_handler)
client.message_callback_add("foo", foo_handler)
client.connect_async("localhost", 1883)
# start the client on another thread
client.loop_start()
# simulate a long running main program
while True:
print("running the main program")
time.sleep(10)
When I start the program and send the payload [3] to the topic test from mqtt-explorer, I get the below output.
running the main program
Connected to MQTT Broker!
b'[3]'
b'bar'
running the main program
...
It would be great if this was supported first class. I.e.
client.message_async_callback_add("test", my_coroutine)
Could you please try this out using the 1.6.x branch?
Hi, thanks for getting back, I have tried the same code with 1.6.x but the result is exactly the same.
I don't believe that your asyncio event loop (_loop) is ever started. You would want to run that on its own thread, or run the entire application asynchronously with asyncio.run().
Hi, thank you, I have tried to run the main function coroutine with asyncio.run, but it didn't work properly either. I think the problem is that the start_loop function runs on another thread, which doesn't have access to the event loop I start in the main thread. I also have to await the tasks somewhere, I guess.
Original I am coming here because I am looking to integrate this with fastapi, which runs its own event loop.
Do you plan to expose some kind of asnyc interface in the future? Given the I/O bound nature of a mqtt client, this would be suitable architecture wise IMO.
I think I managed to get something workable. I am not sure if there might be issues with this code performance wise, but from some simple tests, spamming messages, it seems to behave.
import asyncio
import json
from paho.mqtt.client import Client
def init_client(topics):
client = Client("test")
client.connect_async("localhost")
@client.connect_callback()
def on_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
client.subscribe(topics)
# get the main event loop (from fastapi)
loop = asyncio.get_event_loop()
def topic(sub):
def decorator(coro):
@client.topic_callback(sub)
def handle(client, userdata, msg):
# run the async callbacks on the main event loop
asyncio.run_coroutine_threadsafe(coro(client, userdata, msg), loop)
return decorator
return client, topic
# async main program (potentially fastapi)
async def main():
client, topic = init_client("my/topic")
@topic("my/topic")
async def print_payload(client, userdata, msg):
await asyncio.sleep(2)
print(msg.payload)
client.loop_start()
while True:
print("running...")
await asyncio.sleep(5)
client.publish("my/topic", json.dumps({"mqtt": "msg"}))
asyncio.run(main())
Does https://github.com/eclipse/paho.mqtt.python/blob/master/examples/loop_asyncio.py help at all?
Hi, I have already looked into this, but I am not entirly sure how to integrate this with a program this runs its own event loop. I want to be able to delare async http handler from the web framework as well as async message callbacks from the mqtt client.
As far as I understand the gist of this pieace of code is to create an empty future, and then set its result in the message callback.
def on_message(self, client, userdata, msg):
if not self.got_message:
print("Got unexpected message: {}".format(msg.decode()))
else:
self.got_message.set_result(msg.payload)
self.got_message = self.loop.create_future()
msg = await self.got_message
I may try to experiment with that some more but it seems difficult at the moment.
Do you think the way I have implemented it above is problematic?
I haven't really played with async in python, so I can't comment too much. I just wanted to make sure you'd seen that other example.
I'm going to close this issue off as it looks like a request for help (and Roger gave what help he was able). There is a separate request (#445) to improve asyncio support (and I believe addressing that will answer this question).
If you are still working on this and do require further help then please feel free to reopen the issue (but as it's been idle for a couple of years closing it seemed the appropriate action).
Note: This is part of an exercise to clean up old issues so that the project can move forwards. Due to the number of issues being worked through mistakes will be made; please feel free to reopen this issue (or comment) if you believe it's been closed in error.