paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

there are some matter work with celery

Open fuuhoo opened this issue 3 years ago • 1 comments

myMQtt.py

class myMqtt:
    def __init__(self):
        self.client=mqtt.Client()
        self.MQTT_TOPIC=settings.MQTT_TOPIC
        self.MQTT_USER=settings.MQTT_USER
        self.MQTT_PWD=settings.MQTT_PWD
        self.MQTT_SERVER=settings.MQTT_SERVER
        self.MQTT_PORT=int(settings.MQTT_PORT)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client._transport="websockets"
        self.client.username_pw_set(self.MQTT_USER, self.MQTT_PWD)
        print("MQTT:",self.MQTT_SERVER,self.MQTT_PORT,self.MQTT_USER,self.MQTT_PWD,self.MQTT_TOPIC)
        cr=self.client.connect(self.MQTT_SERVER, self.MQTT_PORT, 6000) # 600为keepalive的时间间隔

    def on_connect(self,client, userdata, flags, rc):
        # 0: Connection successful
        # 1: Connection refused - incorrect protocol version
        # 2: Connection refused - invalid client identifier
        # 3: Connection refused - server unavailable
        # 4: Connection refused - bad username or password
        # 5: Connection refused - not authorised
        # 6-255: Currently unused.
        print("MQtt onnected with result code: " + str(rc))
    def on_message(self,client, userdata, msg):
        print(msg.topic + " " + str(msg.payload))
    def on_publish(self,client, userdata, mid):
        print("on_publish:::",client, userdata, mid)
    def publish(self,payload):
        try:
            r=self.client.publish(self.MQTT_TOPIC, payload=payload, qos=0)
            print("mqtt publish结果")
        except Exception as e :
            print("MQTT publish ERROR",str(e))

app = Celery()
@app.task
def onlinePersonBroadcast():
    retrunList=[]
    keys=redisHandle.scan_iter("idcard:*")
    for key in keys:
        print(key)
        value=redisHandle.get(key)
        jsonValue=json.loads(value)
        retrunList.append(jsonValue)
    mymqtt.publish(json.dumps(retrunList,cls=DateEncoder))

When I call the client in celery's tasks file and publish the message, there will be no error or wraining output, but I can't actually get data. Onlineperson broadcasts is a scheduled task in django.anyone help?

当我在celery的tasks文件中调用客户端并且发布消息的时候,会没有报错,但是实际上收不到数据。onlinePersonBroadcas是一个定时任务

fuuhoo avatar Mar 30 '22 08:03 fuuhoo

Could you please attempt this with logging enabled? It's possible the task is being shut down before the message is actually sent (calling publish adds the message to a queue; consider using wait_for_publish). Sorry - I'm not familiar with celery so cannot make any further suggestions (if you can provide more info someone may be able to help further - assuming this is still an issue).

MattBrittan avatar Jan 08 '24 11:01 MattBrittan