Concurrent table scans
This is a bit of an experiment to see how things could look if we tried to:
- process Manifest Lists and Manifest Files concurrently rather than sequentially
- process the stream of file scan tasks concurrently, streaming record batches from multiple files at the same time.
I'd like to add some unit tests to confirm that this behaves as expected beyond the existing tests that we have for TableScan, and add an integration / performance test that can quantify any performance improvements (or regressions 😅 ) that we get from these changes.
Let me know what you all think.
I've updated this to ditch the concurrency when processing ManifestEntry items within a single Manifest, producing them asynchronously but sequentially instead. I've kept the limited concurrency when processing ManifestFiles within the scan's snapshot's ManifestList.
I've kept the approach of using an mpsc channel with a spawned task, with that task using try_for_each_concurrent to achieve the concurrency. This is because without the channel and spawned task, we'd need to use an async closure, which is unstable rust. With the spawned task we only need to use an async block, which is in stable rust.
@sdd JoinHandle in Tokio and async-std have different return types.
In Tokio
impl<T> Future for JoinHandle<T> {
type Output = super::Result<T>;
In Async-std
impl<T> Future for JoinHandle<T> {
type Output = T;
crate::runtime::spawn is consistent with async-std, so you may remove the ? to resolve this issue.
However, we can still make it to behave like Tokio apis. We may need to discuss it further. cc @liurenjie1024 @Xuanwo
Thanks @odysa - I must be going crazy, I thought I tried that already but you were right, that worked! 👍🏼
PTAL @liurenjie1024 and @Xuanwo: I've addressed your helpful feedback and I think this is now ready. Performance tests of this on my performance testing branch show clear gains when scanning snapshots whose manifest list contains multiple manifests. I'm really happy with this now!
PTAL @liurenjie1024 and @Xuanwo: I've addressed your helpful feedback and I think this is now ready. Performance tests of this on my performance testing branch show clear gains when scanning snapshots whose manifest list contains multiple manifests. I'm really happy with this now!
I'm curious about the performance gain from this. Can you share the related data?
Hey @ZENOTME! Sure. If you check out my perf-suite branch from my other PR, this is branched off main and can be used to get a baseline of plan_files performance without the changes in this PR. You'll need to run cargo install just if you don't have just already installed, and then just perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.
The perf tests themselves consist of four scenarios that each execute plan_files in different scenarios:
- a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
- a file plan for a scan that reads some rows from all data files in the current snapshot (
passenger_count= 1); - a file plan for a scan that reads all rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01') - a file plan for a scan that reads some rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01' ANDpassenger_count= 1);
The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:
Now, if you cherry-pick the commit from this concurrent table scan branch into the perf-suite branch:
git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9
And re-run the performance tests, you'll see something similar to the following:
As you can see the times come down from around 1.5s to around 0.5s, a big improvement!
In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:
git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31
and re-run the performance tests again, you'll see something even more remarkable:
Now that caching of the retrieval and parsing of the Manifest and ManifestLists are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯
Hey @ZENOTME! Sure. If you check out my
perf-suitebranch from my other PR, this is branched off main and can be used to get a baseline ofplan_filesperformance without the changes in this PR. You'll need to runcargo install justif you don't havejustalready installed, and thenjust perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.The perf tests themselves consist of four scenarios that each execute
plan_filesin different scenarios:
- a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
- a file plan for a scan that reads some rows from all data files in the current snapshot (
passenger_count= 1);- a file plan for a scan that reads all rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01')- a file plan for a scan that reads some rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01' ANDpassenger_count= 1);The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:
Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9And re-run the performance tests, you'll see something similar to the following:
As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!
In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:
git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31and re-run the performance tests again, you'll see something even more remarkable:
Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯
Good work!
Now that caching of the retrieval and parsing of the
ManifestandManifestLists are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯
Great!
Hey @ZENOTME! Sure. If you check out my
perf-suitebranch from my other PR, this is branched off main and can be used to get a baseline ofplan_filesperformance without the changes in this PR. You'll need to runcargo install justif you don't havejustalready installed, and thenjust perf-run. This should create the docker setup for the performance testing environment, download some test data fro the NYC Taxi dataset, create an Iceberg table inside the perf testing env, insert the test data into the table, and then run the perf tests.The perf tests themselves consist of four scenarios that each execute
plan_filesin different scenarios:
- a file plan for a scan that reads all rows from all data files in the current snapshot (ie no filter);
- a file plan for a scan that reads some rows from all data files in the current snapshot (
passenger_count= 1);- a file plan for a scan that reads all rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01')- a file plan for a scan that reads some rows from one data file in the current snapshot (filter:
tpep_pickup_datetime= '2024-02-01' ANDpassenger_count= 1);The benchmarks get ran using criterion. Here's an example of the test output when I ran it just now on my M1 Pro Macbook:
Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
git cherry-pick 8eef484094dbb7c55ac3181bbd552090308591c9And re-run the performance tests, you'll see something similar to the following:
As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!
In fact, if you then cherry-pick the commit from my follow-on PR with the object cache:
git cherry-pick c696a3f9a7dacf822e6d5bc76f13d24e0b50ee31and re-run the performance tests again, you'll see something even more remarkable:
Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯
Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd
Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd
I agree that a benchmark would be helpful for validating this, but I would prefer to do this after integrating with a sql engine like datafusion, which makes maintaince easier.
Sorry guys, been off-grid for a few days. I've addressed all of your suggestions @liurenjie1024, thanks.
Thanks @sdd! That's awesome! BTW, can we integrate the performance test within the repository so that it will be helpful for performance improvement in the future? cc @liurenjie1024 @Xuanwo @sdd
I agree that a benchmark would be helpful for validating this, but I would prefer to do this after integrating with a sql engine like datafusion, which makes maintaince easier.
@liurenjie1024 Why don't we just bring in the performance tests as they are / with whatever changes are needed from review, and we can always delete them and add different ones once we have the SQL engine in place? It's not much effort to delete them when they've been superseded but it is very useful to have them in place now whilst a lot of performance-related changes are being done. I'm happy to get the perf tests in my other PR into shape to merge as @ZENOTME suggests, even if they will be replaced at some point in the future.
@liurenjie1024 @Xuanwo are we able to merge this now?
@liurenjie1024 @Xuanwo are we able to merge this now?
Done, thanks @sdd for this pr!
Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!
Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯
Now, if you cherry-pick the commit from this concurrent table scan branch into the `perf-suite` branch:
As you can see the times come down from around 1.5s to around **0.5s**, a big improvement!
Now that caching of the retrieval and parsing of the `Manifest` and `ManifestList`s are taking place, the first run in the performance test will take the same time as above but all of the subsequent runs that are averaged to get an average time for each test are much, much faster, reducing the average time from 500ms to about 1.5ms. 🤯