paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

using async handler for message callback

Open nbraun-wolf opened this issue 4 years ago • 8 comments

I need to be able to use async/await in the paho on_message callback. My idea is to schedule a coroutine from the synchronous handler, but it doesn't work as I expect. The code is not executed.

import time
from asyncio import ensure_future, get_event_loop, sleep

from paho.mqtt import client as mqtt_client

_loop = get_event_loop()


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
        client.subscribe("test")
        client.subscribe("foo")
    else:
        print("Failed to connect, return code %d\n", rc)


# this code nevers runs for some reason
async def coro_handler(client, payload):
    print(payload)
    await sleep(3)
    client.publish("foo", "async")


def test_handler(client, userdata, msg):
    print(msg.payload)
    client.publish("foo", "bar")
    # I need to be able to run async code in the handler
    # so I am trying something like this
    ensure_future(coro_handler(client, msg.payload), loop=_loop)


def foo_handler(client, userdata, msg):
    print(msg.payload)


client = mqtt_client.Client("paho_async_handler_test")
client.on_connect = on_connect

client.message_callback_add("test", test_handler)
client.message_callback_add("foo", foo_handler)

client.connect_async("localhost", 1883)
# start the client on another thread
client.loop_start()

# simulate a long running main program
while True:
    print("running the main program")
    time.sleep(10)

When I start the program and send the payload [3] to the topic test from mqtt-explorer, I get the below output.

running the main program
Connected to MQTT Broker!
b'[3]'
b'bar'
running the main program
...

It would be great if this was supported first class. I.e.

client.message_async_callback_add("test", my_coroutine)

nbraun-wolf avatar Oct 03 '21 11:10 nbraun-wolf

Could you please try this out using the 1.6.x branch?

ralight avatar Oct 03 '21 12:10 ralight

Hi, thanks for getting back, I have tried the same code with 1.6.x but the result is exactly the same.

nbraun-wolf avatar Oct 03 '21 15:10 nbraun-wolf

I don't believe that your asyncio event loop (_loop) is ever started. You would want to run that on its own thread, or run the entire application asynchronously with asyncio.run().

martindurant avatar Oct 08 '21 15:10 martindurant

Hi, thank you, I have tried to run the main function coroutine with asyncio.run, but it didn't work properly either. I think the problem is that the start_loop function runs on another thread, which doesn't have access to the event loop I start in the main thread. I also have to await the tasks somewhere, I guess.

Original I am coming here because I am looking to integrate this with fastapi, which runs its own event loop.

Do you plan to expose some kind of asnyc interface in the future? Given the I/O bound nature of a mqtt client, this would be suitable architecture wise IMO.

nbraun-wolf avatar Oct 30 '21 08:10 nbraun-wolf

I think I managed to get something workable. I am not sure if there might be issues with this code performance wise, but from some simple tests, spamming messages, it seems to behave.

import asyncio
import json

from paho.mqtt.client import Client


def init_client(topics):
    client = Client("test")
    client.connect_async("localhost")

    @client.connect_callback()
    def on_connect(client, userdata, flags, rc):
        print("Connection returned " + str(rc))
        client.subscribe(topics)
    
    # get the main event loop (from fastapi)
    loop = asyncio.get_event_loop()

    def topic(sub):
        def decorator(coro):
            @client.topic_callback(sub)
            def handle(client, userdata, msg):
                # run the async callbacks on the main event loop
                asyncio.run_coroutine_threadsafe(coro(client, userdata, msg), loop)

        return decorator

    return client, topic

# async main program (potentially fastapi)
async def main():
    client, topic = init_client("my/topic")

    @topic("my/topic")
    async def print_payload(client, userdata, msg):
        await asyncio.sleep(2)
        print(msg.payload)

    client.loop_start()

    while True:
        print("running...")
        await asyncio.sleep(5)
        client.publish("my/topic", json.dumps({"mqtt": "msg"}))


asyncio.run(main())

nbraun-wolf avatar Oct 30 '21 10:10 nbraun-wolf

Does https://github.com/eclipse/paho.mqtt.python/blob/master/examples/loop_asyncio.py help at all?

ralight avatar Oct 30 '21 10:10 ralight

Hi, I have already looked into this, but I am not entirly sure how to integrate this with a program this runs its own event loop. I want to be able to delare async http handler from the web framework as well as async message callbacks from the mqtt client.

As far as I understand the gist of this pieace of code is to create an empty future, and then set its result in the message callback.

def on_message(self, client, userdata, msg):
    if not self.got_message:
        print("Got unexpected message: {}".format(msg.decode()))
    else:
        self.got_message.set_result(msg.payload)

self.got_message = self.loop.create_future()
msg = await self.got_message

I may try to experiment with that some more but it seems difficult at the moment.

Do you think the way I have implemented it above is problematic?

nbraun-wolf avatar Oct 30 '21 11:10 nbraun-wolf

I haven't really played with async in python, so I can't comment too much. I just wanted to make sure you'd seen that other example.

ralight avatar Oct 30 '21 12:10 ralight

I'm going to close this issue off as it looks like a request for help (and Roger gave what help he was able). There is a separate request (#445) to improve asyncio support (and I believe addressing that will answer this question).

If you are still working on this and do require further help then please feel free to reopen the issue (but as it's been idle for a couple of years closing it seemed the appropriate action).

Note: This is part of an exercise to clean up old issues so that the project can move forwards. Due to the number of issues being worked through mistakes will be made; please feel free to reopen this issue (or comment) if you believe it's been closed in error.

MattBrittan avatar Jan 08 '24 03:01 MattBrittan