Unexpected close of worker after all the I/O intensive tasks has finished
Description
- What is the current behavior? I submitted 5 40-seconds-long tasks and the tasks were finished successfully causing the worker exits unexpectedly with the following exception logs:
node:events:498
throw er; // Unhandled 'error' event
^
Error: Unexpected close
at succeed (/Users/... .../node_modules/amqplib/lib/connection.js:272:13)
at onOpenOk (/Users/... .../node_modules/amqplib/lib/connection.js:254:5)
at /Users/... .../node_modules/amqplib/lib/connection.js:166:32
at /Users/... .../node_modules/amqplib/lib/connection.js:160:12
at Socket.recv (/Users/... .../node_modules/amqplib/lib/connection.js:499:12)
at Object.onceWrapper (node:events:639:28)
at Socket.emit (node:events:520:28)
at emitReadable_ (node:internal/streams/readable:578:12)
at processTicksAndRejections (node:internal/process/task_queues:82:21)
Emitted 'error' event on ChannelModel instance at:
at Connection.emit (node:events:520:28)
at Connection.C.onSocketError (/Users/... .../node_modules/amqplib/lib/connection.js:353:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1346:12)
at processTicksAndRejections (node:internal/process/task_queues:83:21)
-
What is the expected behavior? I expect the worker keeps running and be able to take more tasks without crashing by above exception.
-
Please tell us about your environment:
- Version: Nodejs:
16.14.0, celery-node:0.5.8 - OS: [macOS Monterey 12.2.1, Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz]
- Language: [ES, not sure which version, but I use
"use strict"andrequire()]
- Version: Nodejs:
-
Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)
-
I looked into the problem a little bit more by searching internet, and found out that it's the classic
amqplibproblem.amqplibhas a default heartbeat which is sent in a 60-seconds-long interval. My tasks did I/O intensive work that blocks the heartbeat intervals. What confuses me is that each of my task requires less than 60 seconds, and theamqplibshould have the chance to sent heartbeat in between consecutive tasks. However the fact is that heartbeat intervals got postponed to the point when all tasks have been processed. Thus, the rabbitmq detects that heartbeat is missed and closes the connection.
I found a workaround today: using channel.prefetch(1)
const worker = celery.createWorker(broker = "xxx", backend = "xxx", queue = "xxx")
worker.broker.channel.then(ch => ch.prefetch(1))
this way, when you have only one worker, large number of time-consuming tasks in the message queue won't be added to the node's internal event-loop, and the heartbeat setInterval could have a chance to run instead of being postponed indefinitely.
And BTW, when channel consumes the queue in the broker code, the channel acknowledges the message before the worker-registered-handler runs. Does the acknowledgement happens too soon?