Add runtime module to enable concurrent load of manifest files.
Currently we implement manifest loading in a sequential approach, e.g. load them one by one. We should add load them concurrently. This requires submitting tasks to rust async runtime, and we should be careful to runtime agnostic.
Hi, is this what you refer to? Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning? https://github.com/apache/iceberg-rust/blob/9768b0ef1c7ab3631f69ee52d18ea928e4a4250f/crates/iceberg/src/scan.rs#L140-L145
Hi, is this what you refer to?
Yes, exactly.
Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning?
I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.
I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.
Do you want users to choose their own runtime like sqlx?
They are building an abstraction layer(Runtime) so sqlx can run on many blocking/non-blocking runtime.
# tokio (no TLS)
sqlx = { version = "0.7", features = [ "runtime-tokio" ] }
# async-std (no TLS)
sqlx = { version = "0.7", features = [ "runtime-async-std" ] }
I am interested in this feature, but it will take some time for me to draft a design.
Do you want users to choose their own runtime like sqlx? They are building an abstraction layer(Runtime) so
sqlxcan run on many blocking/non-blocking runtime.
Follow up on this. The sqlx uses a relatively simple solution. For example, the spawn function.
https://github.com/launchbadge/sqlx/blob/84d576004c93a32133688426eacb50434bb5c5f0/sqlx-core/src/rt/mod.rs#L66-L74
Do you want users to choose their own runtime like sqlx?
Yes, exactly. I don't think we should bind to some specific runtime.
Follow up on this. The sqlx uses a relatively simple solution.
I agree that we may need to think about it carefully, sqlx's solution can only use current runtime.
I am interested in this feature, but it will take some time for me to draft a design.
Yeah, welcome to contribute, we can work together on this.
@odysa Just to follow up on this, any progress regarding some design ideas?
@liurenjie1024 Do we have any reference implementation where we can get "inspired" on how things could be done, or do you have a particular design idea already in mind?
Is this something we should track in #348 perhaps for the v.0.4.0 release?
It's already tracked here: https://github.com/apache/iceberg-rust/issues/123
in order to verify my understanding and possibly kick of a design discussion, we could follow the approach of sqlx:
- have a
runtime.rs- to define a
Runtimetrait - based on a feature-flag import / re-export the implementors of that trait
- to define a
- have a /runtime module with specific runtime implementations
- that implement the
Runtimetrait
- that implement the
For our particular use-case (loading manifest / or DataFiles on multiple threads) we could e.g. wrap tokio::spawn with our runtime and we're good to go? Then the Runtime trait would evolve to support more "use-cases"?
Maybe currently we don't need a Runtime trait? From what we have learned, we currently need two methods:
- spawn
- block_on
I think the method here already provided a good example of what we need. Allowing user to specify Runtime for task scheduling could be treated as an advanced feature.
... so as a first step - simply wrap tokio::spawn (for example) like here - and not even use a feature-flag for now; just the most simplest layer of abstraction?
@liurenjie1024 Have you already made up your mind; about how we want to read & filter the manifest files concurrently? An idea would be to:
- async load
ManifestList - for each entry spawn a new task
- apply
ManifestEvaluator - async load
Manifest - for each
ManifestEntry/DataFilespawn a new task- apply
ExpressionEvaluator - apply
InclusiveMetricsEvaluator - if data file has not been pruned yet
- send
FileScanTaskas a result via channel back to main stream - yield
FileScanTask
- apply
- apply
I think this way we can achive maximum parallelism, throughput and performance; however the tradeoffs are the complexity and the spike in ressource consumption, due to spawning multiple tasks and concurrently loading files in memory.
Here is a toy-example to illustrate the idea for further discussion:
async fn create_stream() -> Result<BoxStream<'static, Result<FileScanTask>>> {
let (tx, mut rx) = mpsc::channel::<FileScanTask>(32);
// manifest list with entries
let manifests = Vec::from_iter(0..12);
for entry in manifests {
let sender = tx.clone();
// for each entry spawn a new task
tokio::spawn(async move {
// apply `ManifestEvaluator`
// if not pruned; load manifest
println!("loading manifest {}", entry);
time::sleep(Duration::from_millis(1000)).await;
let data_files: Vec<_> = (0..48).map(|_| DataFile {}).collect();
// for each DataFile spawn a new task
for _ in data_files {
let sender = sender.clone();
tokio::spawn(async move {
// apply ExpressionEvaluator
// apply InclusiveMetricsEvaluator
process_data_file(sender).await;
});
}
});
}
drop(tx);
let stream = try_stream! {
while let Some(file_scan_task) = rx.recv().await {
yield file_scan_task;
}
};
Ok(stream.boxed())
}
Hi, @marvinlanhenke After #233 got merged, we will have a basic runtime framework.
Have you already made up your mind;
Not yet.
I think you solution generally LGTM. Creating on task for each manifest entry may look too much for me. Though spawing one task is lightweight in rust, it still consumes some memory. How do you feel starting with one task for one manifest file?
With Iceberg, the manifests are written to a target size (8 megabyte) by default. Each manifest is bound to the same schema and partition, so you can re-use the evaluators here. I would not go overboard with the parallelism, and just create one task per manifest, and not spawn a task per manifest-entry.
How do you feel starting with one task for one manifest file
you mean:
- spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
- and handle DataFiles synchronously on the same task
so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?
so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?
That is correct 👍 I think there might be some confusion around the naming. In the spec we have the Manifest List that contains Manifests. Within the Manifest there are manifest-entries that each point to one Datafile.
How do you feel starting with one task for one manifest file
you mean:
- spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
- and handle DataFiles synchronously on the same task
so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?
Yeah, that's exactly what I mean.
Using try_for_each_concurrent here rather than just spawning in a for loop will allow us to tune the concurrncy as it accepts a max concurrent tasks argument. I'd advocate for a data-driven approach to determine a sensible default value for this, alongside the capability to override the default with optional config, probably a with_max_concurrency() method on the builder and some guidance in the docs.
Close by #233
Closed by #373