rocketmq-client-python icon indicating copy to clipboard operation
rocketmq-client-python copied to clipboard

同时订阅多个server_address的topic,产生的问题

Open kennard520 opened this issue 1 year ago • 0 comments

` class RocketMQListener: def init(self, server_address, topic, group_id, mq_tag, callback): self.server_address = server_address self.topic = topic self.group_id = group_id self.mq_tag = mq_tag self.callback = callback self.consumer = None self.stop_event = threading.Event()

def start(self):
    self.consumer = PushConsumer(self.group_id)
    self.consumer.set_group(self.group_id)
    self.consumer.set_thread_count(1)
    self.consumer.set_name_server_address(self.server_address)
    self.consumer.subscribe(self.topic, self.process_message, self.mq_tag)
    self.consumer.start()
    logging.info(f"Started RocketMQ listener for {self.server_address} topic {self.topic} and group {self.group_id} and mq_tag {self.mq_tag}.")

    while not self.stop_event.is_set():
        time.sleep(5)
        pass

    self.consumer.shutdown()

def process_message(self, message):
    return self.callback(message)

def stop(self):
    self.stop_event.set()`

`import threading from contextlib import asynccontextmanager

import uvicorn from fastapi import FastAPI from starlette.responses import JSONResponse

from rocketmq_listener import RocketMQListener from rocketmq.client import PushConsumer, ConsumeStatus

app = FastAPI() brokers_and_topics = [ ("192.168.0.137:9876", "kl-video-editing-l", "kl-group-video-process-15", "video_process_create_tag"), ("192.168.0.138:9876", "kl-video-editing", "kl-group-video-process-16", "video_process_create_tag") ]

listeners=[]

def test(msg): print(msg) return ConsumeStatus.RECONSUME_LATER

def start_listener(server_address, topic, group_id, tag): listener = RocketMQListener(server_address, topic, group_id, tag, test) listener.start() listeners.append(listener) @asynccontextmanager async def lifespan(app: FastAPI): for topic in brokers_and_topics: threading.Thread(target=start_listener, args=topic).start() yield # Clean up the ML models and release the resources app = FastAPI(lifespan=lifespan)

if name == 'main': try: uvicorn.run("main:app", host="0.0.0.0", port=5007, log_level="warning", workers=1) except Exception as e: print(e) `如上所示,我用两个线程分别订阅192.168.0.137和192.168.0.138的topic,而且groupid不一样,最后却在192.168.0.137上创建了两个消费者 image

kennard520 avatar Sep 04 '24 01:09 kennard520