iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

Add runtime module to enable concurrent load of manifest files.

Open liurenjie1024 opened this issue 2 years ago • 17 comments

Currently we implement manifest loading in a sequential approach, e.g. load them one by one. We should add load them concurrently. This requires submitting tasks to rust async runtime, and we should be careful to runtime agnostic.

liurenjie1024 avatar Dec 19 '23 02:12 liurenjie1024

Hi, is this what you refer to? Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning? https://github.com/apache/iceberg-rust/blob/9768b0ef1c7ab3631f69ee52d18ea928e4a4250f/crates/iceberg/src/scan.rs#L140-L145

odysa avatar Feb 02 '24 03:02 odysa

Hi, is this what you refer to?

Yes, exactly.

Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning?

I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.

liurenjie1024 avatar Feb 02 '24 05:02 liurenjie1024

I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.

Do you want users to choose their own runtime like sqlx? They are building an abstraction layer(Runtime) so sqlx can run on many blocking/non-blocking runtime.

# tokio (no TLS)
sqlx = { version = "0.7", features = [ "runtime-tokio" ] }
# async-std (no TLS)
sqlx = { version = "0.7", features = [ "runtime-async-std" ] }

I am interested in this feature, but it will take some time for me to draft a design.

odysa avatar Feb 02 '24 17:02 odysa

Do you want users to choose their own runtime like sqlx? They are building an abstraction layer(Runtime) so sqlx can run on many blocking/non-blocking runtime.

Follow up on this. The sqlx uses a relatively simple solution. For example, the spawn function.

https://github.com/launchbadge/sqlx/blob/84d576004c93a32133688426eacb50434bb5c5f0/sqlx-core/src/rt/mod.rs#L66-L74

odysa avatar Feb 02 '24 20:02 odysa

Do you want users to choose their own runtime like sqlx?

Yes, exactly. I don't think we should bind to some specific runtime.

Follow up on this. The sqlx uses a relatively simple solution.

I agree that we may need to think about it carefully, sqlx's solution can only use current runtime.

I am interested in this feature, but it will take some time for me to draft a design.

Yeah, welcome to contribute, we can work together on this.

liurenjie1024 avatar Feb 03 '24 02:02 liurenjie1024

@odysa Just to follow up on this, any progress regarding some design ideas?

@liurenjie1024 Do we have any reference implementation where we can get "inspired" on how things could be done, or do you have a particular design idea already in mind?

Is this something we should track in #348 perhaps for the v.0.4.0 release?

marvinlanhenke avatar Apr 30 '24 13:04 marvinlanhenke

It's already tracked here: https://github.com/apache/iceberg-rust/issues/123

liurenjie1024 avatar May 01 '24 07:05 liurenjie1024

in order to verify my understanding and possibly kick of a design discussion, we could follow the approach of sqlx:

  • have a runtime.rs
    • to define a Runtime trait
    • based on a feature-flag import / re-export the implementors of that trait
  • have a /runtime module with specific runtime implementations
    • that implement the Runtime trait

For our particular use-case (loading manifest / or DataFiles on multiple threads) we could e.g. wrap tokio::spawn with our runtime and we're good to go? Then the Runtime trait would evolve to support more "use-cases"?

marvinlanhenke avatar May 02 '24 04:05 marvinlanhenke

Maybe currently we don't need a Runtime trait? From what we have learned, we currently need two methods:

  1. spawn
  2. block_on

I think the method here already provided a good example of what we need. Allowing user to specify Runtime for task scheduling could be treated as an advanced feature.

liurenjie1024 avatar May 02 '24 13:05 liurenjie1024

... so as a first step - simply wrap tokio::spawn (for example) like here - and not even use a feature-flag for now; just the most simplest layer of abstraction?

@liurenjie1024 Have you already made up your mind; about how we want to read & filter the manifest files concurrently? An idea would be to:

  • async load ManifestList
  • for each entry spawn a new task
    • apply ManifestEvaluator
    • async load Manifest
    • for each ManifestEntry / DataFile spawn a new task
      • apply ExpressionEvaluator
      • apply InclusiveMetricsEvaluator
      • if data file has not been pruned yet
      • send FileScanTask as a result via channel back to main stream
      • yield FileScanTask

I think this way we can achive maximum parallelism, throughput and performance; however the tradeoffs are the complexity and the spike in ressource consumption, due to spawning multiple tasks and concurrently loading files in memory.

Here is a toy-example to illustrate the idea for further discussion:

async fn create_stream() -> Result<BoxStream<'static, Result<FileScanTask>>> {
    let (tx, mut rx) = mpsc::channel::<FileScanTask>(32);

    // manifest list with entries
    let manifests = Vec::from_iter(0..12);

    for entry in manifests {
        let sender = tx.clone();

        // for each entry spawn a new task
        tokio::spawn(async move {
            // apply `ManifestEvaluator`
            // if not pruned; load manifest
            println!("loading manifest {}", entry);
            time::sleep(Duration::from_millis(1000)).await;

            let data_files: Vec<_> = (0..48).map(|_| DataFile {}).collect();
            // for each DataFile spawn a new task
            for _ in data_files {
                let sender = sender.clone();
                tokio::spawn(async move {
                    // apply ExpressionEvaluator
                    // apply InclusiveMetricsEvaluator
                    process_data_file(sender).await;
                });
            }
        });
    }
    drop(tx);

    let stream = try_stream! {
        while let Some(file_scan_task) = rx.recv().await {
            yield file_scan_task;
        }
    };

    Ok(stream.boxed())
}

marvinlanhenke avatar May 03 '24 04:05 marvinlanhenke

Hi, @marvinlanhenke After #233 got merged, we will have a basic runtime framework.

Have you already made up your mind;

Not yet.

I think you solution generally LGTM. Creating on task for each manifest entry may look too much for me. Though spawing one task is lightweight in rust, it still consumes some memory. How do you feel starting with one task for one manifest file?

liurenjie1024 avatar May 06 '24 06:05 liurenjie1024

With Iceberg, the manifests are written to a target size (8 megabyte) by default. Each manifest is bound to the same schema and partition, so you can re-use the evaluators here. I would not go overboard with the parallelism, and just create one task per manifest, and not spawn a task per manifest-entry.

Fokko avatar May 06 '24 07:05 Fokko

How do you feel starting with one task for one manifest file

you mean:

  • spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
  • and handle DataFiles synchronously on the same task

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

marvinlanhenke avatar May 06 '24 08:05 marvinlanhenke

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

That is correct 👍 I think there might be some confusion around the naming. In the spec we have the Manifest List that contains Manifests. Within the Manifest there are manifest-entries that each point to one Datafile.

Fokko avatar May 06 '24 08:05 Fokko

How do you feel starting with one task for one manifest file

you mean:

  • spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
  • and handle DataFiles synchronously on the same task

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

Yeah, that's exactly what I mean.

liurenjie1024 avatar May 06 '24 08:05 liurenjie1024

Using try_for_each_concurrent here rather than just spawning in a for loop will allow us to tune the concurrncy as it accepts a max concurrent tasks argument. I'd advocate for a data-driven approach to determine a sensible default value for this, alongside the capability to override the default with optional config, probably a with_max_concurrency() method on the builder and some guidance in the docs.

sdd avatar May 13 '24 18:05 sdd

try_for_each_concurrent

Do you meam this method? I think it's ok to me.

liurenjie1024 avatar May 14 '24 14:05 liurenjie1024

Close by #233

liurenjie1024 avatar Jul 06 '24 08:07 liurenjie1024

Closed by #373

liurenjie1024 avatar Aug 07 '24 06:08 liurenjie1024