[Epic] storage: sharded reads from persist
Theme and Initiative
Materialize is Scalable; Materialize can increase performance if necessary
Problem
Currently, the persist_source that COMPUTE uses to read from STORAGE pins all work to a single worker (a timely worker, part of the timely cluster that a compute replicate runs). This means that read throughput does not improve when adding more resources to the compute instance/replica, that is (roughly) when adding more workers to a cluster.
Instead, adding more workers should scale up the read performance of persist_source, that is we should provide parallelization potential that can be used by compute when necessary.
A related issue is scaling up writing to a persist shard. In order to keep a compute instance that's listening for updates in parallel properly saturated we have to also make writing to a single persist shard parallelizable. We can either view this as a part of this Epic or create a new one. Just parallelizing the initial snapshot reading should already help, though, without scaling up writers.
Implementation Sketch
The persist_source does two things:
- get a snapshot from persist and emit all updates from that
- switch over to listen for new updates coming in from persist
The first of these can be solved already by using snapshot splits and distributing the read work across multiple workers, the current implementation simply doesn't do this.
Listening can not yet be parallelized this way, but it should be possible given changes in persist that seem feasible.
Success Criteria
Compute can successfully make reading from storage go faster by adding more resources.
Time Horizon
Medium
Blockers
None
There is a very rough prototype of this feature in #13849 There is a design doc for the proposed approach in Notion under the name "Scalable reads from persist"
We ended up using #13849 to merge the code for sharded reads from persist. However, the code is currently unused.
Dan + Aljoscha realized there is an issue with how we handle distributing SeqNos alongside batches; namely after distributing the SeqNo via ReaderEnrichedHollowBatch, the SeqNo could get garbage collected, despite the fact that the data's still needed.
The high-level approach we'll take to resolve this issue is to have the original worker that subscribes to the source updates track all SeqNos it has "outstanding" with workers, and prevent them from getting GC'ed. Each worker will then have a final exchange back to the subscribing worker with its SeqNos, letting it know that it can let the SeqNo get GC'ed.