FLIP [Draft]: FifoQueue for engines
we decided to introduce unblocking message queue(s) for the engines, so that they don’t block the networking layer’s queue and thereby each other. Yurii has already made great progress on the consensus nodes implementing such queues. I recently found a blog post, where a similar implementation is discussed. Comparing this proposal to our implementation (e.g. here), I notice two noteworthy differences
- I really like our queue FifoQueue implementation (essentially wrapper around deque.Deque) much better than using slices
- In our implementation, we also have an inbound and outbound channel, with some internal unbounded storage slice in the middle. However, the go routines which shovel the data from the inbound channel -> internal storage -> outbound channel live in the engine (e.g. here). Thereby, we replicate this code in every engine.
I was was wondering, what the thoughts where on moving this go-routine for shovelling the queue elements into the FifoQueue. From my perspective, this would generate a very clean separation of concerns. I think we could take the shovelling logic Jon Bodner's blog post with only minimal changes.
This is mainly a question for @durkmurder, @zhangchiqing, and @arrivets but I thought I open it up for a broader discussion here.
Hi Alex, thanks for research. My personal opinion on this one:
- I am sure that dequeue is more performant comparing to slice so I would leave it as it is.
- Regarding internal goroutine. While I like the idea of embedding goroutine into a queue I don't see a clean way how to do it. In blog post you have sent, it's basically an unbounded channel which means that we will have set of channels for every queue + every queue will run it's own consumer/producer goroutine. I think it will introduce performance penalties compared to where we run multiple queues using one channel and one goroutine. I agree that we are kinda duplicating it in compliance engine but generally speaking current framework can be easily tuned and adjusted for more specific situations.
- Not sure also how we will fetch from those queues, using a select statement over internal channels or back to polling?Not sure also how we will fetch from those queues, using a select statement over internal channels or back to polling?
what the thoughts where on moving this go-routine for shovelling the queue elements into the FifoQueue
Can we just get rid of this goroutine and the channel it reads from? Using the compliance engine as an example, these exist because the Process method that is called by the network layer immediately puts the message onto the pendingEventSink channel so that we don't block the network layer worker goroutine. But why don't we just put these messages directly into the appropriate message queue rather than having a channel and goroutine in between?
what the thoughts where on moving this go-routine for shovelling the queue elements into the FifoQueue
Can we just get rid of this goroutine and the channel it reads from? Using the compliance engine as an example, these exist because the
Processmethod that is called by the network layer immediately puts the message onto thependingEventSinkchannel so that we don't block the network layer worker goroutine. But why don't we just put these messages directly into the appropriate message queue rather than having a channel and goroutine in between?
Because if you put it directly into queue you will need some mechanism how to poll it from queue later, separate channel and goroutine provide a responsive way to consume events from network layer.
thanks @durkmurder for the comments.
we will have set of channels for every queue + every queue will run it's own consumer/producer goroutine. I think it will introduce performance penalties
Lets take a look at the current matching.Engine:
- we have one shared channel, where all the inbound messages pass through, before the engine's "shovelling routine" distributes them into the respective queues: https://github.com/onflow/flow-go/blob/0b9d0e4c2c552bb7506bbe01d73dfa8226798494/engine/consensus/matching/engine.go#L56
- In addition, for each queue, we have a dedicated channel, which holds the queue's head: https://github.com/onflow/flow-go/blob/0b9d0e4c2c552bb7506bbe01d73dfa8226798494/engine/consensus/matching/engine.go#L50-L52
- as Yurii explained, this is important so that the Engine's "consumer routine" can block on
Regarding the number of channels
So, on a high level:
- if an engine has
kinternal queues, it will roughly havek+1channels - For my suggestion, we would have a dedicated a pair of channels for each queue, i.e.
2kin total- Note that
kis generally quite small (1, 2, or 3 ... rarely ever higher than 3) - for
k=3we are talking about a "cost" of increasing the number of channels from4to6, which is seems neglectable
- Note that
While I agree that channels have some performance cost, I don't think the cost is that high enough to warrant significant reduction in code readability or modularization (which I feel is currently the case). If we are really worried about channel cost, then this implementation (based on a single sync.Cond) provides significantly better performance than k channels.
Regarding the number go routines
- With the existing implementation, an engine needs
1"shovelling routine". In comparison, my suggestion requireskroutines. Again,kis typically1,2, or3. - Go's routines are green threads. The language was specifically engineered to support heavy usage of routines with low performance overhead
Just FYI (sorry, forgot to add this earlier to the discussion):
The Trapdoor was part of a proposal for the Engine queues (highly optimized to avoid performance costs of channels and go routines). https://github.com/onflow/flow-go/pull/389
what the thoughts where on moving this go-routine for shovelling the queue elements into the FifoQueue
Can we just get rid of this goroutine and the channel it reads from? Using the compliance engine as an example, these exist because the
Processmethod that is called by the network layer immediately puts the message onto thependingEventSinkchannel so that we don't block the network layer worker goroutine. But why don't we just put these messages directly into the appropriate message queue rather than having a channel and goroutine in between?Because if you put it directly into queue you will need some mechanism how to poll it from queue later, separate channel and goroutine provide a responsive way to consume events from network layer.
I agree with @jordanschalm, regarding the compliance engine, I don’t understand why we need the FIFO queues between the pendingEventsSink and the voteSink/blockSink at all.
The image that comes to mind is that the pipeline has a loop in it.
If I understand correctly, what @jordanschalm is suggesting is to make the voteSink and blockSink buffered channels, so that the processEvents routine could forward events to the appropriate sink directly without blocking. This has the advantage of eliminating the FIFO queues, and the external dependency. In my opinion it also makes the code simpler and easier to reason about.
The only motive I can see for introducing the FIFO queues is that you want unbounded channels. @AlexHentschel, why is it necessary to have unbounded channels?
Here's a diagram to illustrate the above comment:

As discussed, I made a simple example of a generic unbound queue based on @durkmurder 's implementation.
It is able to take incoming events, pipe out to different queues by the event type, and allow the engine to consume them.
With the generic queue, the engine's logic is simplified. The engine now only need to specify event handlers by event type, just like pattern matching.
And it also supports internal events like check sealing.
https://github.com/onflow/flow-go/commit/eb216ec8e49284b6c5616dadbe1127b4e394c0f2
Draft for unbounded queue with dedicated head and tail channels (inspired by Jon Bodner's blog post): https://github.com/onflow/flow-go/pull/460
recap from meeting March 2nd: decided to implement the Trapdoor/Poker approach with synchronized FIFO queues.
Thanks Jordan for creating the corresponding implementation issue: https://github.com/dapperlabs/flow-go/issues/5355