Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled
Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)
Currently even when parquet predicate pushdown is enabled, and the predicate can be fully pushed down, the physical plan still contains a FilterExec when using ListingTable
| physical_plan | ProjectionExec: expr=[service@0 as service, host@1 as host, pod@2 as pod, container@3 as container, image@4 as image, time@5 as time, client_addr@6 as client_addr, request_duration_ns@7 as request_duration_ns, request_user_agent@8 as request_user_agent, request_method@9 as request_method, request_host@10 as request_host, request_bytes@11 as request_bytes, response_bytes@12 as response_bytes, response_status@13 as response_status] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: container@3 = backend_container_0 OR pod@2 = aqcathnxqsphdhgjtgvxsfyiwbmhlmg |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | ParquetExec: limit=None, partitions=[home/raphael/Downloads/data.parquet], predicate=container_min@0 <= backend_container_0 AND backend_container_0 <= container_max@1 OR pod_min@2 <= aqcathnxqsphdhgjtgvxsfyiwbmhlmg AND aqcathnxqsphdhgjtgvxsfyiwbmhlmg <= pod_max@3, projection=[service, host, pod, container, image, time, client_addr, request_duration_ns, request_user_agent, request_method, request_host, request_bytes, response_bytes, response_status] |
| |
Describe the solution you'd like
ListingTable::supports_filter_pushdown should return TableProviderFilterPushDown::Exact when
- Parquet predicate pushdown is enabled
- The FileFormat is parquet
- The predicate is fully pushed down by ParquetExec (not all predicates are supported)
Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.
Additional context Add any other context or screenshots about the feature request here.
This is a good proposal I think -- it would skip unecessary filtering and likely make plans faster. It will become more useful (maybe even necessary) when predicate pushdown is enabled by default -- #3463
I think it is a good idea. I confirm it would help improve performance by 30x in https://github.com/apache/arrow-datafusion/issues/5404#issuecomment-1501221831.
I am thinking to pick this up but seems the work is not as trivial as I thought. Following are my two cents' thoughts.
Related code to improve is following https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L736-L756
- The FileFormat is parquet
This is trivial, we can match self.options.format with ParquetFormat
- Parquet predicate pushdown is enabled
This setting could come from ParquetExec.pushdown_filters or ConfigOptions (more specifically, ConfigOptions.execution.parquet.pushdown_filters)
https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L393
However, when ListingTable.supports_filter_pushdown is called, ParquetExec is not created yet, nor does SessionState being passed as input.
Idea: shall we add state: &SessionState as input for ListingTable.supports_filter_pushdown, the same as ListingTable.scan ❓
https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L668-L670
- The predicate is fully pushed down by ParquetExec (not all predicates are supported)
is it making sure the row_filter is built from the predicate without error? If so, seems it requires converting a logical Expr to PhysicalExpr first.
https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L527-L547
I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down
Tagging @alamb and @crepererum who are more familiar with this part of the codebase
I'll try and look into this in more detail tomorrow
I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down
That's what we want to do for InfluxDB IOx as well: https://github.com/influxdata/influxdb_iox/issues/7408
I am going to find time sometime this week
Background Reading
Here is a background article about parquet predicate pushdown: https://www.influxdata.com/blog/querying-parquet-millisecond-latency/#heading5
Specifically the section on Late materialization describes what is done in the RowFilter code
Code links for Listing Table
Supports filter pushdown: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html#impl-TableProvider-for-ListingTable
Conditions:
Here are the conditions @tustvold lists above, and some links about what they mean
Parquet predicate pushdown is enabled
This refers to the datafusion.execution.parquet.pushdown_filters configuration setting (docs here)
The FileFormat is parquet
This should be relatively straightforward to determine
The predicate is fully pushed down by ParquetExec (not all predicates are supported)
"pushdown" refers to this code in ParquetExec (there is some background information in the parquet RowFilter API)
https://github.com/apache/datafusion/blob/16a3557325eb8f949f4a87ab90c0a0b174dc8d86/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs#L43-L70
I think there are likely two things needed for this PR:
- Some API that ListingTable can use to determine if the predicates can be pushed down
- Tests that show / confirm that using RowFilter really is an exact filtering
FWIW I think @itsjunetime is looking into this ticket
Yes, sorry - I'm still somewhat new to this codebase, so I'm taking a good second to read through the associated docs and comments to figure it out first.
No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂
No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂
Yes this is what I meant. Sorry about not being clear.
I filed https://github.com/apache/datafusion/issues/12115 to track some additional testing that I would like to do for this feature.
There is a nice PR here for this feature: https://github.com/apache/datafusion/pull/12135