datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled

Open tustvold opened this issue 3 years ago • 6 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Currently even when parquet predicate pushdown is enabled, and the predicate can be fully pushed down, the physical plan still contains a FilterExec when using ListingTable

| physical_plan | ProjectionExec: expr=[service@0 as service, host@1 as host, pod@2 as pod, container@3 as container, image@4 as image, time@5 as time, client_addr@6 as client_addr, request_duration_ns@7 as request_duration_ns, request_user_agent@8 as request_user_agent, request_method@9 as request_method, request_host@10 as request_host, request_bytes@11 as request_bytes, response_bytes@12 as response_bytes, response_status@13 as response_status]                            |
|               |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     FilterExec: container@3 = backend_container_0 OR pod@2 = aqcathnxqsphdhgjtgvxsfyiwbmhlmg                                                                                                                                                                                                                                                                                                                                                                                 |
|               |       RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         ParquetExec: limit=None, partitions=[home/raphael/Downloads/data.parquet], predicate=container_min@0 <= backend_container_0 AND backend_container_0 <= container_max@1 OR pod_min@2 <= aqcathnxqsphdhgjtgvxsfyiwbmhlmg AND aqcathnxqsphdhgjtgvxsfyiwbmhlmg <= pod_max@3, projection=[service, host, pod, container, image, time, client_addr, request_duration_ns, request_user_agent, request_method, request_host, request_bytes, response_bytes, response_status] |
|               |        

Describe the solution you'd like

ListingTable::supports_filter_pushdown should return TableProviderFilterPushDown::Exact when

  • Parquet predicate pushdown is enabled
  • The FileFormat is parquet
  • The predicate is fully pushed down by ParquetExec (not all predicates are supported)

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

tustvold avatar Oct 30 '22 02:10 tustvold

This is a good proposal I think -- it would skip unecessary filtering and likely make plans faster. It will become more useful (maybe even necessary) when predicate pushdown is enabled by default -- #3463

alamb avatar Nov 28 '22 19:11 alamb

I think it is a good idea. I confirm it would help improve performance by 30x in https://github.com/apache/arrow-datafusion/issues/5404#issuecomment-1501221831.


I am thinking to pick this up but seems the work is not as trivial as I thought. Following are my two cents' thoughts.

Related code to improve is following https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L736-L756

  1. The FileFormat is parquet

This is trivial, we can match self.options.format with ParquetFormat

  1. Parquet predicate pushdown is enabled

This setting could come from ParquetExec.pushdown_filters or ConfigOptions (more specifically, ConfigOptions.execution.parquet.pushdown_filters)

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L393

However, when ListingTable.supports_filter_pushdown is called, ParquetExec is not created yet, nor does SessionState being passed as input.

Idea: shall we add state: &SessionState as input for ListingTable.supports_filter_pushdown, the same as ListingTable.scan

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L668-L670

  1. The predicate is fully pushed down by ParquetExec (not all predicates are supported)

is it making sure the row_filter is built from the predicate without error? If so, seems it requires converting a logical Expr to PhysicalExpr first.

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L527-L547

jychen7 avatar Apr 10 '23 02:04 jychen7

I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down

Tagging @alamb and @crepererum who are more familiar with this part of the codebase

tustvold avatar Apr 10 '23 10:04 tustvold

I'll try and look into this in more detail tomorrow

alamb avatar Apr 10 '23 21:04 alamb

I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down

That's what we want to do for InfluxDB IOx as well: https://github.com/influxdata/influxdb_iox/issues/7408

crepererum avatar Apr 11 '23 08:04 crepererum

I am going to find time sometime this week

alamb avatar Jun 24 '24 16:06 alamb

Background Reading

Here is a background article about parquet predicate pushdown: https://www.influxdata.com/blog/querying-parquet-millisecond-latency/#heading5

Specifically the section on Late materialization describes what is done in the RowFilter code

Code links for Listing Table

Supports filter pushdown: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html#impl-TableProvider-for-ListingTable

Conditions:

Here are the conditions @tustvold lists above, and some links about what they mean

Parquet predicate pushdown is enabled

This refers to the datafusion.execution.parquet.pushdown_filters configuration setting (docs here)

The FileFormat is parquet

This should be relatively straightforward to determine

The predicate is fully pushed down by ParquetExec (not all predicates are supported)

"pushdown" refers to this code in ParquetExec (there is some background information in the parquet RowFilter API)

https://github.com/apache/datafusion/blob/16a3557325eb8f949f4a87ab90c0a0b174dc8d86/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs#L43-L70

alamb avatar Aug 06 '24 18:08 alamb

I think there are likely two things needed for this PR:

  1. Some API that ListingTable can use to determine if the predicates can be pushed down
  2. Tests that show / confirm that using RowFilter really is an exact filtering

alamb avatar Aug 06 '24 18:08 alamb

FWIW I think @itsjunetime is looking into this ticket

alamb avatar Aug 08 '24 19:08 alamb

Yes, sorry - I'm still somewhat new to this codebase, so I'm taking a good second to read through the associated docs and comments to figure it out first.

itsjunetime avatar Aug 08 '24 21:08 itsjunetime

No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂

crepererum avatar Aug 12 '24 08:08 crepererum

No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂

Yes this is what I meant. Sorry about not being clear.

alamb avatar Aug 12 '24 10:08 alamb

I filed https://github.com/apache/datafusion/issues/12115 to track some additional testing that I would like to do for this feature.

alamb avatar Aug 22 '24 16:08 alamb

There is a nice PR here for this feature: https://github.com/apache/datafusion/pull/12135

alamb avatar Aug 24 '24 12:08 alamb