How to set workers message treatment concurrency ?
Description
Hey guys, thanks for the good work on this project, it rocks.
Although, I have not been able to play with my workers message treatment concurrency.
In python you can pass the --concurrency flag to the celery worker command. How can I achieve that with celery-node ?
The best scenario for me would be to set my workers message treatment concurrency to 1. I am using k8s and want to have full controle on scaling. Not being able to set concurrency to an arbitrary value goes against it.
Thanks !
Hi, PoiraudVictor!
Thank you for your attention to celery.node project.
I'm not sure that I fully understand, but there's no control worker_threads or promise in celery.node.
The only part of controlling promise is below and it's hard-coded as 1.
https://github.com/actumn/celery.node/blob/efc4334d555abea78e5dfeb45cfca93742d67699/src/kombu/brokers/redis.ts#L138-L150
Sorry for not being of much help. It would be nice if you can describe your environment and codes more.
Thanks a lot actumn for your quick answer.
I have probably been unclear in my first message.
Actually I have 2 services, the first one (service A) sends messages to the second one (service B). When service B has finished processing a message, it sends it to a new queue (q1). Service B is listening to a queue (q0) for messages from service A. Although, if A sends a message m1 to q0 while B is still processing a previous message m0, B will process m1 and m0 in parallel. I want to process them in series not in parallel, so B should wait for finishing the processing of m0 before processing m1.
How can I achieve this with celery-node ? In classic python celery, I would pass the --concurrency=1 to my celery worker <...> command.
Hereafter, service B code :
import { createClient as createPublisher } from "celery-node";
import { createWorker as createConsumer } from "celery-node";
const { CELERY_BROKER, CELERY_BACKEND, Q0, Q1 } = process.env;
const celeryConsumer: CeleryConsumer = createConsumer(
CELERY_BROKER,
CELERY_BACKEND,
Q0
);
const celeryPublisher: CeleryPublisher = createPublisher(
CELERY_BROKER,
CELERY_BACKEND,
Q1
);
celeryConsumer.register("some.task", async (message: CustomMessageType) => {
<...message-processing-code...>;
const task: Task = celeryPublisher.createTask("some.other.task");
task.applyAsync([someArgs]);
});
await celeryConsumer.start();
I run my code locally on both macos Monterey v12.1 and Ubuntu container. Node : v17.4.0 Typescript : v4.5.5
were you able to solve this?
I implemented solo execution in my fork but only for redis right now, I will try to add more options next week, https://github.com/imdark/celery.node
came over this issue and I have a cool solution that may be very helpful for rabbitMQ. you
const message = require('celery-node/dist/kombu/message');
class AMQPMessage extends message.Message {
constructor(payload: any) {
super(payload.content, payload.properties.contentType, payload.properties.contentEncoding, payload.properties, payload.properties.headers);
}
}
worker.broker.subscribe = (queue: string, callback: any) => {
worker.broker.channel
.then((ch: amqplib.Channel) => {
ch.assertQueue(queue, {
durable: true,
autoDelete: false,
exclusive: false,
// nowait: false,
arguments: null
})
.then(() => Promise.resolve(ch))
})
return worker.broker.channel.then(
(ch: amqplib.Channel) => {
ch.consume(queue, (rawMsg: any) => {
if (runningTasks === 0) {
// When runningTasks === 0 process the task
ch.ack(rawMsg);
// now supports only application/json of content-type
if (rawMsg.properties.contentType !== "application/json") {
throw new Error(`unsupported content type ${rawMsg.properties.contentType}`);
}
// now supports only utf-8 of content-encoding
if (rawMsg.properties.contentEncoding !== "utf-8") {
throw new Error(`unsupported content encoding ${rawMsg.properties.contentEncoding}`);
}
callback(new AMQPMessage(rawMsg));
} else {
// When runningTasks != 0 reject the task and requeue it.
ch.reject(rawMsg, true);
}
});
}
);
}
I don't think its the best solution but after using it after your const worker = celery.createWorker it should do the trick