nestjs icon indicating copy to clipboard operation
nestjs copied to clipboard

Error when use @RabbitRPC in one controller and listten exchange and queue that same for all actions with diffirent routing keys

Open sur-ser opened this issue 1 year ago • 23 comments

when i use RabbitRPC in one controller with multiple actions like this

@RabbitRPC({
    exchange: AuthRegister.exchange,
    routingKey: AuthRegister.topic,
    queue: AuthRegister.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async register(@Body() dto: AuthRegister.Request) : Promise<AuthRegister.Response> {
    const newUser = await this.authService.register(dto);
    return { email: newUser.email };
  }


  @RabbitRPC({
    exchange: AuthJWTLogin.exchange,
    routingKey: AuthJWTLogin.topic,
    queue: AuthJWTLogin.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async login(@Body() dto: AuthJWTLogin.Request): Promise<AuthJWTLogin.Response> {
    const { id } = await this.authService.validateUser(dto.email, dto.password);
    return this.authService.login(id);
  }

and create request like this

return await this.amqpConnection.request<AuthRegister.Response>({
       exchange: AuthRegister.exchange,
       routingKey: AuthRegister.topic,
       payload: dto,
       timeout: 10000
     })
  one time it works fine and second time i have got error like this
  

[Nest] 119672 - 06/27/2024, 7:57:17 PM ERROR [AmqpConnection] Received message with invalid routing key: sso.auth.register.command

but if i keep one action it works fine or when i change que name and keep one routing key in queue it works fine

my exchange is topic and in controller exchange and queue is same for all actions only routing key is diffirent

sur-ser avatar Jun 28 '24 03:06 sur-ser

Same for me. v 4.1 works fine. v 5 gives errors. I found that the matchesRoutingKey() function is called ant tries to match the message routing key always with different handlers. So it is not taking messages from queue that match given routing keys, it just takes all messages from queue and tries to match them with a random routingKey from a file.

In my project I have three @RabbitRPC() listeners in one file and I sometimes get listener working ok and some times errors with "invalid routing key".

Here is my code and errors as example:

@RabbitRPC({
    exchange: 'amq.topic',      //using default exchange for topics
    routingKey: 'user.upsert',  //when message has this routing key (topic)
    queue: 'user',           //message will be sent to this queue and proceed by this service
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.find.byphone',//when client wants to find user by phone
    queue: 'user',                  //message will be sent to this queue and proceed by this service
    allowNonJsonMessages: true,     //we allow non-json messages (like strings, numbers, etc)
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.get.byuuid',  //when client wants to get user by uuid
    queue: 'user',                  //message will be sent to this queue and proceed by this service
  })

and here is log when I send the same message to queue: image

"ret: ...uuid... " - is the right processing of the listener. As you can see (I've modified matcher a little to see what its trying to match) there always different matcher for same routingKey. Is it by design? :)

P.S. Also while I was looking for problem I saw project with same problem (https://github.com/pavlokobyliatskyi/demo-chat/), he just tell people to use old version :) but maybe some fix can be introduced to new versions? Interesting that not many people have this.

slyk avatar Jul 20 '24 23:07 slyk

The error message is in "connections.ts", looking at the v4 code in that place (line 535) instead of error there were just nack() so it was just grabbing message, check for routing key and putting it back for each of @RabbitRPC() call while it looking for perfect match?

if (rpcOptions.routingKey !== msg.fields.routingKey) {
            channel.nack(msg, false, true);
            return;
          }

and in v5 there are more complex check because now routingKey could be an array, but nack() function called with requeue option "false" so we get error and lost message.

if (
            !matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)
          ) {
            channel.nack(msg, false, false);
            this.logger.error(
              'Received message with invalid routing key: ' +
                msg.fields.routingKey
            );
            return;
          }

https://github.com/golevelup/nestjs/blob/master/packages/rabbitmq/src/amqp/connection.ts

I'm new to rabbit. What is the right way to handle messages from queue that has specific routingKey?

Quick fix would be set requeue back to 'true' in nack(), but then we would have many errors in log... and maybe there should be some other logic?

slyk avatar Jul 20 '24 23:07 slyk

Seems like good practice is to make different queues for this messages, so I changed my code and got more queues, it works and for my small first app just to became familiar with rabbitMQ its ok :)

@RabbitRPC({
    exchange: 'amq.direct',      // DIRECT here instead of topic
    routingKey: 'user.upsert',  
    queue: 'user.upsert',         // name of the qeue that is same as reoutingKey
  })

slyk avatar Jul 21 '24 11:07 slyk

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

sur-ser avatar Aug 14 '24 03:08 sur-ser

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

You're right, this is definitely sensible behavior. We're definitely open to contributions to improve the handling of routing keys

WonderPanda avatar Aug 14 '24 18:08 WonderPanda

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

Don't see API in rabbit MQ that could allow us to get/subscribe only to some routing keys in queue.

So first consumer will receive the first message, if its not match needed routing key we should nack it with requeue.

Each message could go to each consumer while searching for match. So for many routing keys and consumers we will create many useless data flow...

Also here is quote from docs: When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

So if needed consumer service is down now other consumers will infinitely get the message, nack it back and so on.

So one queue for one consumer seems like bt design feature of rabbitMQ (or amqp).

The one idea that comes to me to handle our request is to handle routing of the messge on nestjs side according to routing key. Some wrapper function that knows all functions in current app that are waiting for the message with specified routing key and will send message to that function only.

slyk avatar Aug 14 '24 23:08 slyk

thank you for sharing your thoughts, but unfortunately, you're mistaken in your understanding of how RabbitMQ works. The main idea behind using routing keys in RabbitMQ is precisely that different messages going into the same queue can be processed by different consumers depending on the matching routing keys.

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key. This allows one queue to be efficiently used by multiple consumers, each processing only the messages relevant to them, which is the core concept of routing keys in the AMQP protocol.

I opened this issue not because I want new functionality, but because there’s a bug in the current version of the nestjs-rabbitmq package that prevents routing keys from working correctly. In version 5, changes were made to the message handling logic, causing messages to be incorrectly processed and leading to errors. In version 4, everything worked as expected.

What you’re suggesting—creating separate queues for each routing key—might be a workaround, but it goes against the principle of using routing keys and leads to unnecessary system complexity. My goal is to address the issue in the current implementation, not to introduce new functionality.

sur-ser avatar Aug 27 '24 08:08 sur-ser

@sur-ser If you have the time/willing to help us by contributing with a proper fix for this matter we would appreciate :pray:

underfisk avatar Sep 26 '24 13:09 underfisk

Any news?

After this commit it stopped working https://github.com/golevelup/nestjs/issues/712

upundefined avatar Oct 02 '24 14:10 upundefined

Any news?

After this commit it stopped working #712

Seems like not everybody need it. Maybe another issue with feature request tag will help to understand how many people really need this and someone could decide to implement it to public (for bounty?)

For my small project I've decided to split to bigger number of queues.

As far as I read amqp 0-9-1 specs there is only one mention of "message selectors" that could help us to not create endless loop of nack(), but no info how this should be implemented and seems like rabbitMQ does not have this in API. And we should use its API in current lib...

So if this will be the real problem in my project, I guess we should implement logic on nestjs side but rabbitMQ still wil give us ALL messages from queue in FIFO order, we just search for functions by routing_key and if nothing found we should nack()

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key.

This would be good, but I can't find that type of functionality described nor in amqp protocol specs nor in rabbitMQ. All filtering with rouiting_key end up in exchange when we bind it to queue(s). The exchange is smart part that understant logic... the queue can only FIFO to consumer.

slyk avatar Oct 02 '24 15:10 slyk

I found a library that uses a single queue, it is specified when initializing the module with the queueName parameter https://github.com/AlariCode/nestjs-rmq https://github.com/AlariCode/nestjs-rmq/blob/3ef5ecc62b1e6e19254a4ae76288a35da96c463c/lib/rmq.service.ts#L227

upundefined avatar Oct 02 '24 15:10 upundefined

Hello! I have started to use this library today and I am facing this issue too.

I have manually modified to the previous behavior (pre #713), and now it does work as expected.

PierreKiwi avatar Nov 06 '24 07:11 PierreKiwi

Having the same issues as mentioned above

SinPP avatar Jan 02 '25 13:01 SinPP

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] avatar Feb 11 '25 02:02 github-actions[bot]

The core issue appears to be in how RabbitMQ message routing is handled in version 5+ of the nestjs-rabbitmq package, specifically when multiple RPC handlers are configured to consume from the same queue but with different routing keys.

Here's what's happening:

In v4, messages that didn't match a handler's routing key were nacked with requeue=true, allowing other handlers to attempt processing In v5, messages are nacked with requeue=false and an error is logged when routing keys don't match, effectively dropping messages Here's a proposed solution that maintains the intended behavior while fixing the issue:


public async setupRpcChannel<T, U>(
  handler: RpcSubscriberHandler<T, U>,
  rpcOptions: MessageHandlerOptions,
  channel: ConfirmChannel,
): Promise<ConsumerTag> {
  const queue = await this.setupQueue(rpcOptions, channel);

  const { consumerTag }: { consumerTag: ConsumerTag } = await channel.consume(
    queue,
    this.wrapConsumer(async (msg) => {
      try {
        if (msg == null) {
          throw new Error('Received null message');
        }

        // Check if routing key matches
        if (!matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)) {
          // Instead of nacking with requeue=false, requeue the message
          // This allows other handlers to process it
          channel.nack(msg, false, true);
          return;
        }

        const result = this.deserializeMessage<T>(msg, rpcOptions);
        const response = await handler(result.message, msg, result.headers);

        if (response instanceof Nack) {
          channel.nack(msg, false, response.requeue);
          return;
        }

        const { replyTo, correlationId, expiration, headers } = msg.properties;
        if (replyTo) {
          await this.publish('', replyTo, response, {
            correlationId,
            expiration,
            headers,
            persistent: rpcOptions.usePersistentReplyTo ?? false,
          });
        }
        channel.ack(msg);
      } catch (e) {
        if (msg == null) {
          return;
        } else {
          const errorHandler =
            rpcOptions.errorHandler ||
            this.config.defaultRpcErrorHandler ||
            getHandlerForLegacyBehavior(
              rpcOptions.errorBehavior ||
                this.config.defaultSubscribeErrorBehavior,
            );

          await errorHandler(channel, msg, e);
        }
      }
    }),
    rpcOptions?.queueOptions?.consumerOptions,
  );

  this.registerConsumerForQueue({
    type: 'rpc',
    consumerTag,
    handler,
    msgOptions: rpcOptions,
    channel,
  });

  return consumerTag;
}
  1. Changed channel.nack(msg, false, false) to channel.nack(msg, false, true) when routing keys don't match
  2. Removed the error logging for non-matching routing keys since this is expected behavior
  3. Maintained the ability for multiple handlers to consume from the same queue with different routing keys

The fix must restore the functionality that worked in v4 while maintaining the improvements made in v5 regarding routing key pattern matching.

I think this will fix it

sur-ser avatar Feb 20 '25 11:02 sur-ser

@sur-ser Are you down to contribute with a PR? I think you're proposal does make sense, if we can have some test coverage along side that would be awesome

underfisk avatar Feb 20 '25 11:02 underfisk

Any updates on thi ? This issue still persists.

echonabin avatar Apr 11 '25 15:04 echonabin

@sur-ser Didi you get work-around for this ?

echonabin avatar Apr 11 '25 16:04 echonabin

@sur-ser Nice one on this, for anyone having this issue, you can edit node_modules\@golevelup\nestjs-rabbitmq\lib\amqp\connection.js and replace condition at 387 with

if (!(0, utils_1.matchesRoutingKey)(msg.fields.routingKey, rpcOptions.routingKey)) {
    channel.nack(msg, false, true);
    return;
}

then run npx patch-package @golevelup/nestjs-rabbitmq temporary fix, not amazing by any shot but it works

lntel avatar Jun 12 '25 22:06 lntel