celery.node icon indicating copy to clipboard operation
celery.node copied to clipboard

How to set workers message treatment concurrency ?

Open hashbulla opened this issue 3 years ago • 5 comments

Description

Hey guys, thanks for the good work on this project, it rocks.

Although, I have not been able to play with my workers message treatment concurrency. In python you can pass the --concurrency flag to the celery worker command. How can I achieve that with celery-node ?

The best scenario for me would be to set my workers message treatment concurrency to 1. I am using k8s and want to have full controle on scaling. Not being able to set concurrency to an arbitrary value goes against it.

Thanks !

hashbulla avatar Feb 16 '22 11:02 hashbulla

Hi, PoiraudVictor! Thank you for your attention to celery.node project.

I'm not sure that I fully understand, but there's no control worker_threads or promise in celery.node. The only part of controlling promise is below and it's hard-coded as 1. https://github.com/actumn/celery.node/blob/efc4334d555abea78e5dfeb45cfca93742d67699/src/kombu/brokers/redis.ts#L138-L150

Sorry for not being of much help. It would be nice if you can describe your environment and codes more.

actumn avatar Feb 16 '22 13:02 actumn

Thanks a lot actumn for your quick answer.

I have probably been unclear in my first message.

Actually I have 2 services, the first one (service A) sends messages to the second one (service B). When service B has finished processing a message, it sends it to a new queue (q1). Service B is listening to a queue (q0) for messages from service A. Although, if A sends a message m1 to q0 while B is still processing a previous message m0, B will process m1 and m0 in parallel. I want to process them in series not in parallel, so B should wait for finishing the processing of m0 before processing m1.

How can I achieve this with celery-node ? In classic python celery, I would pass the --concurrency=1 to my celery worker <...> command.

Hereafter, service B code :

import { createClient as createPublisher } from "celery-node";
import { createWorker as createConsumer } from "celery-node";

const { CELERY_BROKER, CELERY_BACKEND, Q0,  Q1 } = process.env;

const celeryConsumer: CeleryConsumer = createConsumer(
    CELERY_BROKER,
    CELERY_BACKEND,
    Q0
  );
  const celeryPublisher: CeleryPublisher = createPublisher(
    CELERY_BROKER,
    CELERY_BACKEND,
    Q1
  );
celeryConsumer.register("some.task", async (message: CustomMessageType) => {
    <...message-processing-code...>;
    const task: Task = celeryPublisher.createTask("some.other.task");
    task.applyAsync([someArgs]);
  });
  await celeryConsumer.start();

I run my code locally on both macos Monterey v12.1 and Ubuntu container. Node : v17.4.0 Typescript : v4.5.5

hashbulla avatar Feb 16 '22 13:02 hashbulla

were you able to solve this?

akhilacubrio avatar Nov 23 '22 23:11 akhilacubrio

I implemented solo execution in my fork but only for redis right now, I will try to add more options next week, https://github.com/imdark/celery.node

imdark avatar Dec 19 '22 20:12 imdark

came over this issue and I have a cool solution that may be very helpful for rabbitMQ. you

const message = require('celery-node/dist/kombu/message');


class AMQPMessage extends message.Message {
    constructor(payload: any) {
        super(payload.content, payload.properties.contentType, payload.properties.contentEncoding, payload.properties, payload.properties.headers);
    }
}

worker.broker.subscribe = (queue: string, callback: any) => {
    worker.broker.channel
        .then((ch: amqplib.Channel) => {
            ch.assertQueue(queue, {
                durable: true,
                autoDelete: false,
                exclusive: false,
                // nowait: false,
                arguments: null
            })
                .then(() => Promise.resolve(ch))
        })
    return worker.broker.channel.then(
        (ch: amqplib.Channel) => {
            ch.consume(queue, (rawMsg: any) => {
                if (runningTasks === 0) {
                    // When runningTasks === 0 process the task
                    ch.ack(rawMsg);
                    // now supports only application/json of content-type
                    if (rawMsg.properties.contentType !== "application/json") {
                        throw new Error(`unsupported content type ${rawMsg.properties.contentType}`);
                    }
                    // now supports only utf-8 of content-encoding
                    if (rawMsg.properties.contentEncoding !== "utf-8") {
                        throw new Error(`unsupported content encoding ${rawMsg.properties.contentEncoding}`);
                    }
                    callback(new AMQPMessage(rawMsg));
                } else {
                    // When runningTasks != 0 reject the task and requeue it.
                    ch.reject(rawMsg, true);
                }
            });
        }
    );
}

I don't think its the best solution but after using it after your const worker = celery.createWorker it should do the trick

shoham112233 avatar Jun 11 '23 21:06 shoham112233