Having queue in nodejs' memory makes cqrs-saga not amenable for clustering
The fact that queue of events maintained in memory, made me suspect that this is not going to work in clustered environment. Ref.: https://github.com/adrai/node-cqrs-saga/blob/9770440df0e50b5a3897607a07e0252315b25edf/lib/orderQueue.js#L11
Some tests confirmed it. My test was simple:
I run two instances of my "cqrs-saga + cqrs-domain" node. One is subscribed to redis-channel named "SHARD_01", and the other one is subscribed to redis-channel "SHARD_02" (for the events and publishing commands to).
I have two events: orderUpdated and orderCreated (this simulates an action from a REST interface, and is supposed to move a saga forward). The event orderCreated should be processed before orderUpdated.
Thanks to revisionGuard, even if the events arrive out of order at the process manager, the event-processing order stated above can be respected. Revision-guard keeps orderUpdated in the queue (in case it arrives first). Once orderCreated arrives (later), it will process orderCreated, and followed by orderUpdated (pulled out of the queue). For that mechanism to work, I set the revision number of orderCreated to 1, and orderUpdated to 2.
It works just fine, as long as the events arrive on the same node running cqrs-saga.
The problem is: the queue is maintained in memory. So, when I have two instances of cqrs-saga (and purposedly, for this test, the events are sent to different instance), things break.
Only event orderCreated is processed (at node 01). Node 02 (where orderUpdated is queued) is not aware of the fact that orderCreated has arrived (and has been processed). So it stucks there, and so does the instance of the saga.
I think the quick fix would be: move the queue to Redis (have it as a list).
But a more general (more proper) fix would be to modify the way revisionGuard keeps / checks information in revisionGuard store, and act accordingly (e.g.: once node 01 has finished processing orderCreated, it will set something in revisionGuard store to the revision number of orderCreated. ... Node 02, in a loop checking the revisionGuard, will eventually detect that, and starts pulling orderUpdated out of queue, and commence its handling).
Any progress here? We met the same issue and can't find quick solution.
Seems currentHandlingRevisions must be shared across different processes of the same saga.
does it mean that revisionGuard cannot handle multiple instances of the saga at all? Or did we just misunderstood revisionGuard purpose – if so, could you please briefly explain it, please?
Have missed this issue completely... sorry.
cqrs-saga IS able to exist as multiple instances... should not be any problem... (exactly there is the revisionGuard for this...)
About the memory leak... is this a huge issue? normally, when working with microservices, these are not that long-living anyway... but contributions to help are always welcome
Hi @adrai,
cqrs-saga IS able to exist as multiple instances... should not be any problem... (exactly there is the revisionGuard for this...)
what if the same event (with the same revision value) will be received by several instances of the same saga at the same time (from bus)? Seems it will be handled as 2 different events in parallel. In my opinion event should be handled only once by instance which "first managed to grab" event but current revisionGuard implementation doesn't help us to reach such result.
We put concatenatedId key with revInEvt + 1 in store only when first saga step was done (in done callback actually) so before this, another instance of saga can start processing of the same event in parallel too. I found mechanism which should avoid us from such cases but seems it works only for single saga process. Checks which determines should be event handled or not listed below (revisionGuard.js):
function proceed (revInStore) {
if (!revInStore && !self.currentHandlingRevisions[concatenatedId] && (self.options.startRevisionNumber === undefined || self.options.startRevisionNumber === null)) {
self.currentHandlingRevisions[concatenatedId] = revInEvt;
debug('first revision to store [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
return;
}
if (revInStore && revInEvt < revInStore) {
debug('event already handled [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyHandledError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handled!'), function (clb) {
clb(null);
});
return;
}
if (revInStore && revInEvt > revInStore) {
debug('queue event [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (!revInStore && self.options.startRevisionNumber >= 0 && revInEvt > self.options.startRevisionNumber) {
debug('queue event (startRevisionNumber is set) [concatenatedId]=' + concatenatedId + ', [startRevisionNumber]=' + self.options.startRevisionNumber + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (!revInStore && self.currentHandlingRevisions[concatenatedId] >= 0 && revInEvt > self.currentHandlingRevisions[concatenatedId]) {
debug('queue event [concatenatedId]=' + concatenatedId + ', [currentlyHandling]=' + self.currentHandlingRevisions[concatenatedId] + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}
if (self.currentHandlingRevisions[concatenatedId] >= revInEvt) {
debug('event already handling [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyHandlingError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handling!'), function (clb) {
clb(null);
});
return;
}
self.currentHandlingRevisions[concatenatedId] = revInEvt;
debug('event is in correct order => go for it! [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
}
here we check currentHandlingRevisions and if it is already contain concatenatedId of the event we drop it. But this object stored in memory and is visible only inside of current saga instance, therefore if currentHandlingRevisions will be shared across all saga instances (for example by storing in Redis) it will solve the issue. What do you think?
in all my cqrs systems so far the messages were sent “normally” just once, additionally the message distribution was always sharded by context+aggregate+aggregateId so an event-stream was always sent to the same process...
Is your “double handling” case something that happens often in your system?
But in general yes, feel free to contribute.
Btw: in theory https://github.com/adrai/node-cqrs-eventdenormalizer has the same problem
yes, now I see that we just need to solve this on distribution level (bus), as you said channel should be sharded by context+aggregate+aggregateId.
But in general yes, feel free to contribute.
always ready to do that. Especially when we decided to use your libraries in our new big project...
btw, how you created so complex and "academical" event sourcing/cqrs system? I reviewed a lot of similar systems and only your looks like full implementation of theory from the book of Chris Richardson (https://www.amazon.com/Microservices-Patterns-examples-Chris-Richardson/dp/1617294543). Who was inspired you? Your results is respectful.
Thank you very much for these nice words... the awesome guy who pushed me in this world was @jamuhl 😊 and then I read the classics... https://github.com/adrai/cqrs-sample#why-should-i-care