Could not stream and plot live sensor data in reflex using MQTT
Describe the issue
I'm trying to implement live streaming of sensor data using mqtt in a reflex app. My code is pasted below:
import reflex as rx
import paho.mqtt.client as mqtt
import asyncio
from typing import Callable, Any
# Public MQTT broker (replace with your own if needed)
BROKER = "mqtt.eclipseprojects.io"
TOPIC = "sensor/data"
class SensorState(rx.State):
data: list = []
def on_message(self, client, userdata, msg):
try:
# new_value = float(msg.payload.decode()) # Convert received data to float
new_value = msg.payload.decode()
# self.data.append({"time": len(self.data), "value": new_value})
self.data.append(new_value)
if len(self.data) > 50:
self.data.pop(0) # Keep the latest 50 points
self.refresh()
except ValueError:
pass # Ignore invalid data
async def start_mqtt(self):
client = mqtt.Client()
client.on_message = self.on_message
client.connect(BROKER, 1883, 60)
client.subscribe(TOPIC)
await asyncio.to_thread(client.loop_start, ())
def index() -> rx.Component:
return rx.vstack(
rx.heading("Real-Time Sensor Data (MQTT)"),
rx.recharts.line_chart(
rx.recharts.line(
data_key="Temperature",
),
# rx.line(data=SensorState.data, x="time", y="value", stroke="blue"),
rx.recharts.x_axis(data_key="name"),
rx.recharts.y_axis(),
data=SensorState.data,
width="100%", height=400,
),
rx.button("Start MQTT Streaming", on_click=SensorState.start_mqtt),
)
app = rx.App()
app.add_page(index)
But I got the error:
Cannot connect to server: websocket error. Check if server is reachable at ws://localhost:8000/_event
Expected behavior
What should have happened?
I expected a line chart that updates in real-time to be displayed.
...
Steps to reproduce (if applicable)
Environment
- Reflex Version: 0.7.3
- Python Version: 3.11
- OS: Windows 11
- Browser: Google Chrome
You need to run a background event and handle the event loop yourself (it would be easier if mqtt had an async client here).
import asyncio
import paho.mqtt.client as mqtt
import reflex as rx
# Public MQTT broker (replace with your own if needed)
BROKER = "mqtt.eclipseprojects.io"
TOPIC = "sensor/data"
class SensorState(rx.State):
data: list = []
is_running: bool = False
@rx.event(background=True)
async def start_mqtt(self):
async with self:
if self.is_running:
return
self.is_running = True
client = mqtt.Client()
client_id = self.router.session.client_token
latest_message = []
def on_message(client, userdata, msg):
latest_message.append(msg)
async def handle_messages():
datas = []
for message in latest_message:
string_with_temprature = message.payload.decode()
space_sep = string_with_temprature.split(" ")
if space_sep[0] == "Temperature:":
actual_value = float(space_sep[1])
datas.append(actual_value)
async with self:
self.data.extend(
{
"time": len(self.data) + i,
"value": datapoint,
}
for i, datapoint in enumerate(datas)
)
client.on_message = on_message
client.connect(BROKER, 1883, 60)
client.subscribe(TOPIC)
while True:
client.loop()
await handle_messages()
latest_message = []
await asyncio.sleep(0.1)
if client_id not in app.event_namespace.token_to_sid:
break
client.disconnect()
async with self:
self.is_running = False
def index() -> rx.Component:
return rx.vstack(
rx.heading("Real-Time Sensor Data (MQTT)"),
rx.recharts.line_chart(
rx.recharts.line(data_key="value"),
rx.recharts.x_axis(data_key="time"),
rx.recharts.y_axis(),
data=SensorState.data,
width="100%",
height=400,
),
rx.button("Start MQTT Streaming", on_click=SensorState.start_mqtt),
)
app = rx.App()
app.add_page(index)
Dear Khaleel,
Thanks once again for your help. mqtt has an async client qmqtt as well as others built on top of paho-mqtt. In your last response you mentioned that the solution (see trail below) would be easier if mqtt has an async client, how should I modify the solution to use an async client such as gmqtt?
I would also like to use chart.js with it’s streaming plugin in reflex. I have checked your documentation but still not clear how to wrap the library and the plugin in reflex so that they work together.
Thanks in advance for your help.
Kind regards, Musa
From: Khaleel Al-Adhami @.> Sent: Monday, 28 April 2025 21:38 To: reflex-dev/reflex @.> Cc: Musa Mohamma @.>; Author @.> Subject: Re: [reflex-dev/reflex] Could not stream and plot live sensor data in reflex using MQTT (Issue #5061)
[https://avatars.githubusercontent.com/u/27952765?s=20&v=4]adhami3310 left a comment (reflex-dev/reflex#5061)https://github.com/reflex-dev/reflex/issues/5061#issuecomment-2836512931
You need to run a background event and handle the event loop yourself (it would be easier if mqtt had an async client here).
import asyncio
import paho.mqtt.client as mqtt
import reflex as rx
Public MQTT broker (replace with your own if needed)
BROKER = "mqtt.eclipseprojects.io"
TOPIC = "sensor/data"
class SensorState(rx.State):
data: list = []
is_running: bool = False
@rx.event(background=True)
async def start_mqtt(self):
async with self:
if self.is_running:
return
self.is_running = True
client = mqtt.Client()
client_id = self.router.session.client_token
latest_message = []
def on_message(client, userdata, msg):
latest_message.append(msg)
async def handle_messages():
datas = []
for message in latest_message:
string_with_temprature = message.payload.decode()
space_sep = string_with_temprature.split(" ")
if space_sep[0] == "Temperature:":
actual_value = float(space_sep[1])
datas.append(actual_value)
async with self:
self.data.extend(
{
"time": len(self.data) + i,
"value": datapoint,
}
for i, datapoint in enumerate(datas)
)
client.on_message = on_message
client.connect(BROKER, 1883, 60)
client.subscribe(TOPIC)
while True:
client.loop()
await handle_messages()
latest_message = []
await asyncio.sleep(0.1)
if client_id not in app.event_namespace.token_to_sid:
break
client.disconnect()
async with self:
self.is_running = False
def index() -> rx.Component:
return rx.vstack(
rx.heading("Real-Time Sensor Data (MQTT)"),
rx.recharts.line_chart(
rx.recharts.line(data_key="value"),
rx.recharts.x_axis(data_key="time"),
rx.recharts.y_axis(),
data=SensorState.data,
width="100%",
height=400,
),
rx.button("Start MQTT Streaming", on_click=SensorState.start_mqtt),
)
app = rx.App()
app.add_page(index)
— Reply to this email directly, view it on GitHubhttps://github.com/reflex-dev/reflex/issues/5061#issuecomment-2836512931, or unsubscribehttps://github.com/notifications/unsubscribe-auth/APNQCSF6R4ZRYGQ5ZA6VVTL232GSDAVCNFSM6AAAAAB2CNNNPSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDQMZWGUYTEOJTGE. You are receiving this because you authored the thread.Message ID: @.@.>>
would be easier if mqtt has an async client, how should I modify the solution to use an async client such as gmqtt?
honestly these async clients aren't particularly helpful in developing this, they often add some form of events but those events aren't async, which is the issue, since we want to update the UI on these events, and that requires an async context
I would also like to use chart.js with it’s streaming plugin in reflex. I have checked your documentation but still not clear how to wrap the library and the plugin in reflex so that they work together.
i haven't personally looked into chart.js myself, reflex has many other charting libraries wrapped though, maybe some could give an insight if the documentation isn't enough?
Thanks
would be easier if mqtt has an async client, how should I modify the solution to use an async client such as gmqtt?
honestly these async clients aren't particularly helpful in developing this, they often add some form of events but those events aren't async, which is the issue, since we want to update the UI on these events, and that requires an async context
I would also like to use chart.js with it’s streaming plugin in reflex. I have checked your documentation but still not clear how to wrap the library and the plugin in reflex so that they work together.
i haven't personally looked into chart.js myself, reflex has many other charting libraries wrapped though, maybe some could give an insight if the documentation isn't enough?
Thanks very much, I will check those out.