同时订阅多个server_address的topic,产生的问题
` class RocketMQListener: def init(self, server_address, topic, group_id, mq_tag, callback): self.server_address = server_address self.topic = topic self.group_id = group_id self.mq_tag = mq_tag self.callback = callback self.consumer = None self.stop_event = threading.Event()
def start(self):
self.consumer = PushConsumer(self.group_id)
self.consumer.set_group(self.group_id)
self.consumer.set_thread_count(1)
self.consumer.set_name_server_address(self.server_address)
self.consumer.subscribe(self.topic, self.process_message, self.mq_tag)
self.consumer.start()
logging.info(f"Started RocketMQ listener for {self.server_address} topic {self.topic} and group {self.group_id} and mq_tag {self.mq_tag}.")
while not self.stop_event.is_set():
time.sleep(5)
pass
self.consumer.shutdown()
def process_message(self, message):
return self.callback(message)
def stop(self):
self.stop_event.set()`
`import threading from contextlib import asynccontextmanager
import uvicorn from fastapi import FastAPI from starlette.responses import JSONResponse
from rocketmq_listener import RocketMQListener from rocketmq.client import PushConsumer, ConsumeStatus
app = FastAPI() brokers_and_topics = [ ("192.168.0.137:9876", "kl-video-editing-l", "kl-group-video-process-15", "video_process_create_tag"), ("192.168.0.138:9876", "kl-video-editing", "kl-group-video-process-16", "video_process_create_tag") ]
listeners=[]
def test(msg): print(msg) return ConsumeStatus.RECONSUME_LATER
def start_listener(server_address, topic, group_id, tag): listener = RocketMQListener(server_address, topic, group_id, tag, test) listener.start() listeners.append(listener) @asynccontextmanager async def lifespan(app: FastAPI): for topic in brokers_and_topics: threading.Thread(target=start_listener, args=topic).start() yield # Clean up the ML models and release the resources app = FastAPI(lifespan=lifespan)
if name == 'main':
try:
uvicorn.run("main:app", host="0.0.0.0", port=5007, log_level="warning", workers=1)
except Exception as e:
print(e)
`如上所示,我用两个线程分别订阅192.168.0.137和192.168.0.138的topic,而且groupid不一样,最后却在192.168.0.137上创建了两个消费者