celery.node icon indicating copy to clipboard operation
celery.node copied to clipboard

Unexpected close of worker after all the I/O intensive tasks has finished

Open can-007 opened this issue 3 years ago • 1 comments

Description

  • What is the current behavior? I submitted 5 40-seconds-long tasks and the tasks were finished successfully causing the worker exits unexpectedly with the following exception logs:
node:events:498
      throw er; // Unhandled 'error' event
      ^

Error: Unexpected close
    at succeed (/Users/... .../node_modules/amqplib/lib/connection.js:272:13)
    at onOpenOk (/Users/... .../node_modules/amqplib/lib/connection.js:254:5)
    at /Users/... .../node_modules/amqplib/lib/connection.js:166:32
    at /Users/... .../node_modules/amqplib/lib/connection.js:160:12
    at Socket.recv (/Users/... .../node_modules/amqplib/lib/connection.js:499:12)
    at Object.onceWrapper (node:events:639:28)
    at Socket.emit (node:events:520:28)
    at emitReadable_ (node:internal/streams/readable:578:12)
    at processTicksAndRejections (node:internal/process/task_queues:82:21)
Emitted 'error' event on ChannelModel instance at:
    at Connection.emit (node:events:520:28)
    at Connection.C.onSocketError (/Users/... .../node_modules/amqplib/lib/connection.js:353:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1346:12)
    at processTicksAndRejections (node:internal/process/task_queues:83:21)
  • What is the expected behavior? I expect the worker keeps running and be able to take more tasks without crashing by above exception.

  • Please tell us about your environment:

    • Version: Nodejs: 16.14.0, celery-node: 0.5.8
    • OS: [macOS Monterey 12.2.1, Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz]
    • Language: [ES, not sure which version, but I use "use strict" and require() ]
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

  • I looked into the problem a little bit more by searching internet, and found out that it's the classic amqplib problem. amqplib has a default heartbeat which is sent in a 60-seconds-long interval. My tasks did I/O intensive work that blocks the heartbeat intervals. What confuses me is that each of my task requires less than 60 seconds, and the amqplib should have the chance to sent heartbeat in between consecutive tasks. However the fact is that heartbeat intervals got postponed to the point when all tasks have been processed. Thus, the rabbitmq detects that heartbeat is missed and closes the connection.

can-007 avatar Mar 13 '22 07:03 can-007

I found a workaround today: using channel.prefetch(1)

const worker = celery.createWorker(broker = "xxx", backend = "xxx", queue = "xxx")
worker.broker.channel.then(ch => ch.prefetch(1))

this way, when you have only one worker, large number of time-consuming tasks in the message queue won't be added to the node's internal event-loop, and the heartbeat setInterval could have a chance to run instead of being postponed indefinitely.

And BTW, when channel consumes the queue in the broker code, the channel acknowledges the message before the worker-registered-handler runs. Does the acknowledgement happens too soon?

can-007 avatar Mar 14 '22 17:03 can-007