there are some matter work with celery
myMQtt.py
class myMqtt:
def __init__(self):
self.client=mqtt.Client()
self.MQTT_TOPIC=settings.MQTT_TOPIC
self.MQTT_USER=settings.MQTT_USER
self.MQTT_PWD=settings.MQTT_PWD
self.MQTT_SERVER=settings.MQTT_SERVER
self.MQTT_PORT=int(settings.MQTT_PORT)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client._transport="websockets"
self.client.username_pw_set(self.MQTT_USER, self.MQTT_PWD)
print("MQTT:",self.MQTT_SERVER,self.MQTT_PORT,self.MQTT_USER,self.MQTT_PWD,self.MQTT_TOPIC)
cr=self.client.connect(self.MQTT_SERVER, self.MQTT_PORT, 6000) # 600为keepalive的时间间隔
def on_connect(self,client, userdata, flags, rc):
# 0: Connection successful
# 1: Connection refused - incorrect protocol version
# 2: Connection refused - invalid client identifier
# 3: Connection refused - server unavailable
# 4: Connection refused - bad username or password
# 5: Connection refused - not authorised
# 6-255: Currently unused.
print("MQtt onnected with result code: " + str(rc))
def on_message(self,client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
def on_publish(self,client, userdata, mid):
print("on_publish:::",client, userdata, mid)
def publish(self,payload):
try:
r=self.client.publish(self.MQTT_TOPIC, payload=payload, qos=0)
print("mqtt publish结果")
except Exception as e :
print("MQTT publish ERROR",str(e))
app = Celery()
@app.task
def onlinePersonBroadcast():
retrunList=[]
keys=redisHandle.scan_iter("idcard:*")
for key in keys:
print(key)
value=redisHandle.get(key)
jsonValue=json.loads(value)
retrunList.append(jsonValue)
mymqtt.publish(json.dumps(retrunList,cls=DateEncoder))
When I call the client in celery's tasks file and publish the message, there will be no error or wraining output, but I can't actually get data. Onlineperson broadcasts is a scheduled task in django.anyone help?
当我在celery的tasks文件中调用客户端并且发布消息的时候,会没有报错,但是实际上收不到数据。onlinePersonBroadcas是一个定时任务
Could you please attempt this with logging enabled? It's possible the task is being shut down before the message is actually sent (calling publish adds the message to a queue; consider using wait_for_publish). Sorry - I'm not familiar with celery so cannot make any further suggestions (if you can provide more info someone may be able to help further - assuming this is still an issue).