iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

Concurrent table scans

Open sdd opened this issue 1 year ago • 1 comments

This is a bit of an experiment to see how things could look if we tried to:

  • process Manifest Lists and Manifest Files concurrently rather than sequentially
  • process the stream of file scan tasks concurrently, streaming record batches from multiple files at the same time.

I'd like to add some unit tests to confirm that this behaves as expected beyond the existing tests that we have for TableScan, and add an integration / performance test that can quantify any performance improvements (or regressions 😅 ) that we get from these changes.

Let me know what you all think.

sdd avatar May 13 '24 06:05 sdd

I've updated this to ditch the concurrency when processing ManifestEntry items within a single Manifest, producing them asynchronously but sequentially instead. I've kept the limited concurrency when processing ManifestFiles within the scan's snapshot's ManifestList.

I've kept the approach of using an mpsc channel with a spawned task, with that task using try_for_each_concurrent to achieve the concurrency. This is because without the channel and spawned task, we'd need to use an async closure, which is unstable rust. With the spawned task we only need to use an async block, which is in stable rust.

sdd avatar May 14 '24 19:05 sdd

@sdd JoinHandle in Tokio and async-std have different return types. In Tokio

impl<T> Future for JoinHandle<T> {
    type Output = super::Result<T>;

In Async-std

impl<T> Future for JoinHandle<T> {
    type Output = T;

crate::runtime::spawn is consistent with async-std, so you may remove the ? to resolve this issue.

However, we can still make it to behave like Tokio apis. We may need to discuss it further. cc @liurenjie1024 @Xuanwo

odysa avatar Jul 18 '24 14:07 odysa

Thanks @odysa - I must be going crazy, I thought I tried that already but you were right, that worked! 👍🏼

sdd avatar Jul 18 '24 18:07 sdd

PTAL @liurenjie1024 and @Xuanwo: I've addressed your helpful feedback and I think this is now ready. Performance tests of this on my performance testing branch show clear gains when scanning snapshots whose manifest list contains multiple manifests. I'm really happy with this now!

sdd avatar Jul 31 '24 01:07 sdd

PTAL @liurenjie1024 and @Xuanwo: I've addressed your helpful feedback and I think this is now ready. Performance tests of this on my performance testing branch show clear gains when scanning snapshots whose manifest list contains multiple manifests. I'm really happy with this now!

I'm curious about the performance gain from this. Can you share the related data?

ZENOTME avatar Aug 01 '24 10:08 ZENOTME

Hey @ZENOTME! Sure. If you check out my perf-suite branch from my other PR, this is branched off main and can be used to get a baseline of plan_files performance without the changes in this PR. You'll need to run cargo install just if you don't have just already installed, and then just perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.

The perf tests themselves consist of four scenarios that each execute plan_files in different scenarios:

  • a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
  • a file plan for a scan that reads some rows from all data files in the current snapshot (passenger_count = 1);
  • a file plan for a scan that reads all rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01')
  • a file plan for a scan that reads some rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01' AND passenger_count = 1);

The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:

image

Now, if you cherry-pick the commit from this concurrent table scan branch into the perf-suite branch:

git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9

And re-run the performance tests, you'll see something similar to the following:

image

As you can see the times come down from around 1.5s to around 0.5s, a big improvement!

In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:

git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31

and re-run the performance tests again, you'll see something even more remarkable:

image

Now that caching of the retrieval and parsing of the Manifest and ManifestLists are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯

sdd avatar Aug 02 '24 07:08 sdd

Hey @ZENOTME! Sure. If you check out my perf-suite branch from my other PR, this is branched off main and can be used to get a baseline of plan_files performance without the changes in this PR. You'll need to run cargo install just if you don't have just already installed, and then just perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.

The perf tests themselves consist of four scenarios that each execute plan_files in different scenarios:

  • a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
  • a file plan for a scan that reads some rows from all data files in the current snapshot (passenger_count = 1);
  • a file plan for a scan that reads all rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01')
  • a file plan for a scan that reads some rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01' AND passenger_count = 1);

The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:

image Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9

And re-run the performance tests, you'll see something similar to the following:

image As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!

In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:

git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31

and re-run the performance tests again, you'll see something even more remarkable:

image Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯

Good work!

liurenjie1024 avatar Aug 02 '24 09:08 liurenjie1024

Now that caching of the retrieval and parsing of the Manifest and ManifestLists are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯

Great!

Xuanwo avatar Aug 02 '24 13:08 Xuanwo

Hey @ZENOTME! Sure. If you check out my perf-suite branch from my other PR, this is branched off main and can be used to get a baseline of plan_files performance without the changes in this PR. You'll need to run cargo install just if you don't have just already installed, and then just perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.

The perf tests themselves consist of four scenarios that each execute plan_files in different scenarios:

  • a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
  • a file plan for a scan that reads some rows from all data files in the current snapshot (passenger_count = 1);
  • a file plan for a scan that reads all rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01')
  • a file plan for a scan that reads some rows from one data file in the current snapshot (filter: tpep_pickup_datetime = '2024-02-01' AND passenger_count = 1);

The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:

image Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9

And re-run the performance tests, you'll see something similar to the following:

image As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!

In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:

git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31

and re-run the performance tests again, you'll see something even more remarkable:

image Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯

Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd

ZENOTME avatar Aug 04 '24 06:08 ZENOTME

Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd

I agree that a benchmark would be helpful for validating this, but I would prefer to do this after integrating with a sql engine like datafusion, which makes maintaince easier.

liurenjie1024 avatar Aug 04 '24 08:08 liurenjie1024

Sorry guys, been off-grid for a few days. I've addressed all of your suggestions @liurenjie1024, thanks.

Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd

I agree that a benchmark would be helpful for validating this, but I would prefer to do this after integrating with a sql engine like datafusion, which makes maintaince easier.

@liurenjie1024 Why don't we just bring in the performance tests as they are / with whatever changes are needed from review, and we can always delete them and add different ones once we have the SQL engine in place? It's not much effort to delete them when they've been superseded but it is very useful to have them in place now whilst a lot of performance-related changes are being done. I'm happy to get the perf tests in my other PR into shape to merge as @ZENOTME suggests, even if they will be replaced at some point in the future.

sdd avatar Aug 06 '24 11:08 sdd

@liurenjie1024 @Xuanwo are we able to merge this now?

sdd avatar Aug 06 '24 18:08 sdd

@liurenjie1024 @Xuanwo are we able to merge this now?

Done, thanks @sdd for this pr!

liurenjie1024 avatar Aug 07 '24 00:08 liurenjie1024