[Epic] A collection of dynamic filtering related items
Is your feature request related to a problem or challenge?
This is a collection of various items related to "dynamic filtering"
Roughly speaking dynamic filters are filters who values are known during runtime but not planning (for example from the values of topk)
Here is some more background:
- Snowflake blog: Reimagining Top-K Aggregation Optimization at Snowflake
- Dynamic Filters in Starburst,
Related items
- [x] https://github.com/apache/datafusion/issues/15037
- [ ] https://github.com/apache/datafusion/issues/7955
- [ ] https://github.com/apache/datafusion/issues/15513
- [x] https://github.com/apache/datafusion/issues/15534
Adding https://github.com/apache/datafusion/issues/15534
I've started breaking out the work:
- https://github.com/apache/datafusion/pull/15561
- https://github.com/apache/datafusion/pull/15566
- https://github.com/apache/datafusion/pull/15568 (may want to wait for https://github.com/apache/datafusion/pull/15566)
- PR to replace the current filter pushdown via ListingTable w/ the new filter pushdown APIs by implementing them for
ParquetSource - PR to implement dynamic filter pushdown for TopK
I briefly looked at the descriptions of these optimizations. For example, the method of dynamically handling the "order by limit" process using statistics is really cool! @alamb
Idea
But I have some new ideas that seem to be more universally applicable to order by limit(q23):
┌──────────────────────────────────────┐
│ [Step 1] Filter RowGroups │
│ - Use Parquet metadata to skip RGs │
│ WHERE RG.min(EventTime) > cutoff │
└───────────────────┬──────────────────┘
↓
┌──────────────────────────────────────┐
│ [Step 2] Read EventTime + Filter │
│ - Scan EventTime column in valid RGs │
│ - Sort values → Track top 10 rows │
└───────────────────┬──────────────────┘
↓
┌──────────────────────────────────────┐
│ [Step 3] Record Row Locations │
│ - Map top 10 EventTime to physical │
│ positions (RG_ID + Row_Offset) │
└───────────────────┬──────────────────┘
↓
┌──────────────────────────────────────┐
│ [Step 4] Fetch 10 Rows │
│ - Directly read rows from Parquet │
│ via recorded positions (non-seq) │
└───────────────────┬──────────────────┘
↓
Final Result
Explore
Currently, q23 takes approximately 6 seconds to execute. I have confirmed that DataFusion does not have the aforementioned optimizations and still scans a very large number of rows and columns. By the way, is there a convenient way in datafusion-cli to view statistics on the number of rows and columns scanned? Currently, I directly print the batch information in the Like expression, which gives the following output (it seems endless, and the amount of data being scanned appears to be very large, all with exactly 105 columns):
Some concerns
Parquet is composed of RowGroups. Is it difficult to read an individual page? In my previous work, I’ve seen optimizations for this scenario (based on DataFusion), but it used a custom columnar storage format, which was easier to implement. At that time, when working with very large datasets (similar to the hits dataset), the query time for "order by limit" was reduced to around 2 seconds.
Summary
The reading process of the entire data can be delayed by using "order by" on columns, which is very effective for the "order by limit" scenario. I'm not sure if DataFusion is currently doing this.
Thank you @acking-you for the idea. Does it similar to parquet filter pushdown? We are already trying to make it default. https://github.com/apache/datafusion/issues/3463
With parquet filter pushdown, we will only scan the filtered pages, but the topk filter pushdown is still in progress.
Thank you @acking-you for the idea. Does it similar to parquet filter pushdown? We are already trying to make it default. #3463
With parquet filter pushdown, we will only scan the filtered pages, but the topk filter pushdown is still in progress.
This optimization targets the select * xxx order by limit scenario, where it can bring significant improvements. If we only push down the filter, it won't achieve the effect of materializing only the full columns within the limit rows. Therefore, for the order by limit scenario, we need to implement lazy materialization of columns.
The approach is quite easy to understand. It involves completing the sorting by only reading the order by columns and then performing a full scan to read the required "limit" rows.
Take select * from data order by timestamp desc limit 10 as an example
the most straightforward execution process for "order by limit" might look like the following:
/// Initial RecordBatch:
/// +----------------+---------------------+
/// | Columns | Rows |
/// +----------------+---------------------+
/// | *All columns* | row1, row2, ..., rowN |
/// +----------------+---------------------+
/// ↓
/// Full Table Scan:
/// (Read all columns and rows into memory)
/// ↓
/// Scanned RecordBatch:
/// +----------------+---------------------+
/// | timestamp | ... (other columns) |
/// +----------------+---------------------+
/// | 2023-09-01 | ... |
/// | 2023-09-02 | ... |
/// | ... | ... |
/// +----------------+---------------------+
/// ↓
/// Sort by `timestamp DESC`:
/// (Re-order rows via stable sort on timestamp)
/// ↓
/// Sorted RecordBatch:
/// +----------------+---------------------+
/// | timestamp | ... (other columns) |
/// +----------------+---------------------+
/// | 2023-09-15 | ... |
/// | 2023-09-14 | ... |
/// | ... | ... |
/// +----------------+---------------------+
/// ↓
/// Apply `LIMIT 10`:
/// (Select top 10 rows from sorted batch)
/// ↓
/// Final RecordBatch:
/// +----------------+---------------------+
/// | timestamp | ... (other columns) |
/// +----------------+---------------------+
/// | 2023-09-15 | ... |
/// | 2023-09-14 | ... |
/// | ... (8 more) | ... |
/// +----------------+---------------------+
The effect after having delayed materialization of non-ordered columns is as follows:
/// Initial Data Source:
/// +----------------+---------------------+
/// | Columns | Rows |
/// +----------------+---------------------+
/// | timestamp | 2023-09-01, ... |
/// | other_col1 | data1, ... |
/// | other_col2 | data2, ... |
/// | ... | ... |
/// +----------------+---------------------+
/// ↓
/// Projection Scan:
/// (Only read `timestamp` + generate row IDs)
/// ↓
/// Scanned RecordBatch:
/// +----------------+---------------------+
/// | row_id | timestamp |
/// +----------------+---------------------+
/// | 0 | 2023-09-01 |
/// | 1 | 2023-09-02 |
/// | ... | ... |
/// | N-1 | 2023-09-15 |
/// +----------------+---------------------+
/// ↓
/// Sort by `timestamp DESC`:
/// (Sort only row_id and timestamp)
/// ↓
/// Sorted Indexes: [15, 14, 8, ...] -- List of original row numbers sorted by timestamp
/// Sorted Timestamps:
/// +----------------+
/// | 2023-09-15 |
/// | 2023-09-14 |
/// | ... |
/// +----------------+
/// ↓
/// Apply `LIMIT 10`:
/// (Select top 10 row_ids)
/// ↓
/// Final Indexes: [15, 14, 8, 3, 7, 2, 5, 11, 9, 6]
/// ↓
/// Fetch Other Columns by row_id:
/// (Random access to original data via indexes)
/// ↓
/// Final RecordBatch:
/// +----------------+---------------------+
/// | timestamp | other_col1 | ... |
/// +----------------+---------------------+
/// | 2023-09-15 | data15 | ... | -- The original row corresponding to row_id=15
/// | 2023-09-14 | data14 | ... | -- The original row corresponding to row_id=14
/// | ... (8 more) | ... | ... |
/// +----------------+---------------------+
My opinion
In my previous attempt link, I found that in order to read 10 rows of data, DataFusion would end up scanning an additional 104 columns, which is a significant overhead. I believe the approach would be very helpful for this scenario.
I also noticed that recently, a PR was merged in ClickHouse that does exactly what was described above: https://github.com/ClickHouse/ClickHouse/pull/55518, with the corresponding issue: https://github.com/ClickHouse/ClickHouse/issues/45868. @zhuqi-lucas
@acking-you have you seen https://github.com/apache/datafusion/pull/15301?
@acking-you have you seen #15301?
Thank you for your hint. I haven't looked into this part of the code yet. I only reviewed the optimization method you described in https://github.com/apache/datafusion/issues/15037. It seems that there is no mention of deferred materialization for "order by limit" (perhaps I missed it since the content of the issue is quite long). So, have we considered optimizing "order by limit" in two phases? I'm planning to review and study the PR you mentioned over the weekend. Thanks again for your reply.
DataFusion already does late materialization: if orders filters by least to most expensive, then scans only the columns that the filter needs. Once it applies all filters it then materializes the projection. It's not the default yet but it will be soon. Please see https://github.com/apache/datafusion/issues/3463 which @zhuqi-lucas linked to above.
DataFusion already does late materialization: if orders filters by least to most expensive, then scans only the columns that the filter needs. Once it applies all filters it then materializes the projection. It's not the default yet but it will be soon. Please see https://github.com/apache/datafusion/issues/3463 which @zhuqi-lucas linked to above.
I'm sorry. I thought the late materialization was for filter column without considering order by column. This is my fault. I should have looked deeper.
Well the point of this recent work is to create filters from the order by clause :)
Currently, q23 takes approximately 6 seconds to execute. I have confirmed that DataFusion does not have the aforementioned optimizations and still scans a very large number of rows and columns. By the way, is there a convenient way in
datafusion-clito view statistics on the number of rows and columns scanned? Currently, I directly print the batch information in theLikeexpression, which gives the following output (it seems endless, and the amount of data being scanned appears to be very large, all with exactly 105 columns):
@acking-you I agree with your analysis and arrived at a similar conclusion in this ticket:
- https://github.com/apache/datafusion/issues/15177
It seems that there is no mention of deferred materialization for "order by limit" (perhaps I missed it since the content of the issue is quite long). So, have we considered optimizing "order by limit" in two phases? I'm planning to review and study the PR you mentioned over the weekend. Thanks again for your reply.
I also agree The deferred materialization is key to improving performance massively. I believe this is the effect of https://github.com/apache/datafusion/issues/3463 though it does not use that term
Please see https://github.com/apache/datafusion/issues/3463 which @zhuqi-lucas linked to above.
So TLDR is by combing the following two items
- https://github.com/apache/datafusion/issues/3463
- https://github.com/apache/datafusion/pull/15301
I think DataFusion will have the equivalent of materialized filter
Exciting news! We've merged TopK filter pushdown. As we've moved along we've found a lot of neat little optimizations - I think a blog post is in order.
I added #16435