embedded-queue icon indicating copy to clipboard operation
embedded-queue copied to clipboard

Race between queue.RequestJobForProcessing and queue.AddJob

Open emcodem opened this issue 1 year ago • 2 comments

Hi @hajipy, thanks for this repository!

As i spent a lot of time debugging this, i would like to share my experiences so it can be either fixed or others can profit from it: What i experienced was that a job was executed twice without any obvious reason.

In a nutshell, my code does about this:

const ingest_queue= await EmbeddedQueue.Queue.createQueue({ inMemoryOnly:true});
ingest_queue.process(
	"1234_randomqueuname",
	async (job) => ingestWorker(job),
	5 
);
await global.queues.ingest_queue.createJob({
	type: "1234_randomqueuname",
	data: someJobData
}

I did some debugging and it seems to turn out that the reason why this job is executed twice is that the setup of the workers (queue.RequestJobForProcessing ) is still ongoing while queue.AddJob is called. AddJob can then notice that a worker is already waiting (in this.waitingRequests) so it marks the job in the database as active and resolves the promise of the first available worker.

So far so good but meanwhile, at the same time, another worker is in preparation (as we specified 5 workers at queue setup), it searches the database for inactive jobs and finds one (the one that AddJob just marks as Active, but this process did not yet finish). So in the end, RequestJobForProcessing does not put the worker into waitingRequests but instead it also resolves the promise with the job it has got from database.

My solution for it is to double check if the job is really inactive in RequestJobForProcessing:

let neDbJob = await this.repository.findInactiveJobByType(type);
await sleep(2);
neDbJob = await this.repository.findInactiveJobByType(type);

It seems to solve my issue but of course it is not a solution. Probably using a lock around findInactiveJobByType and setStateActive would be the real solution, not sure about it.

emcodem avatar Nov 29 '24 16:11 emcodem

I can add that this problem was 100% reproduceable for me, otherwise i would not have been able to find the cause. After understanding how the mutex stuff works, i changed above solution to:

Change AddJob function, add this as first line before try:

const releaseMutex = await this.requestJobForProcessingMutex.acquire();

and at the end of the function, release the mutex in a finally:

        catch (error) {
            this.emit(event_1.Event.Error, error, job);
            throw error;
        }finally{
            releaseMutex();
        }

emcodem avatar Nov 29 '24 16:11 emcodem

Hello, @emcodem

Thank you for your detailed report.

I am also very glad to know that this package, which hasn't been maintained for three years, is still being used.

When I originally created this package, my use case only involved running a single worker, so I apologize for the insufficient testing regarding the operation with multiple workers.

As you suggested, I think the solution to this issue would be to use a mutex to handle exclusion control where race conditions could occur.

Ideally, I would address this issue immediately, but my main job keeps me busy at the moment, making it difficult to respond promptly. However, I hope to work on improvements as much as possible in the future.

hajipy avatar Nov 29 '24 16:11 hajipy