datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Support reading multiple parquet files via `datafusion-cli`

Open alamb opened this issue 8 months ago • 13 comments

Is your feature request related to a problem or challenge?

This is an idea that @robtandy brought up on the DataFusion sync call the other day and I think it would be pretty useful.

The usecase is "I want to read more than 1 but not an entire directory of parquet files from remote object store" -- I think in this case to look at some particular files

For example, let's say you want to read just these two files:

  • s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet
  • s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_2.parquet

There is currently no way to do so via SQL. You can either do the entire directory

> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/' options (aws.region 'eu-central-1');
0 row(s) fetched.
Elapsed 2.928 seconds.

Or you can read each file separately

> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet' options (aws.region 'eu-central-1');
0 row(s) fetched.
Elapsed 1.017 seconds.

Describe the solution you'd like

I would like to be able to read an arbitrary set of remote parquet files

It would also be awesome to support GLOB files (e.g. *) which has been requested before

  • https://github.com/apache/datafusion/issues/7393

Describe alternatives you've considered

I suggest we implement a TableFunction similar to the DuckDB read_parquet file ONLY in the datafusion-cli source

So to query the files listed above, this would look like

SELECT * FROM read_parquet([
  'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet', 
  'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_2.parquet'
]);

From the duckdb docs: https://duckdb.org/docs/stable/data/parquet/overview.html

-- read file1, file2, file3
SELECT *
FROM read_parquet(['file1.parquet', 'file2.parquet', 'file3.parquet']);
-- Support GLOB access
SELECT *
FROM read_parquet(['folder1/*.parquet', 'folder2/*.parquet']);

We already support the parquet_metadata function in datafusion-cli (docs)

SELECT path_in_schema, row_group_id, row_group_num_rows, stats_min, stats_max, total_compressed_size
FROM parquet_metadata('hits.parquet')
WHERE path_in_schema = '"WatchID"'
LIMIT 3;

+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
| path_in_schema | row_group_id | row_group_num_rows | stats_min           | stats_max           | total_compressed_size |
+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
| "WatchID"      | 0            | 450560             | 4611687214012840539 | 9223369186199968220 | 3883759               |
| "WatchID"      | 1            | 612174             | 4611689135232456464 | 9223371478009085789 | 5176803               |
| "WatchID"      | 2            | 344064             | 4611692774829951781 | 9223363791697310021 | 3031680               |
+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
3 rows in set. Query took 0.053 seconds.

Here is the code implementation: https://github.com/apache/datafusion/blob/85f6621a6b1680b40d483a56b10ff3495861ece3/datafusion-cli/src/functions.rs#L322

We can also look at the ClickBench S3 command that is similar: https://clickhouse.com/docs/integrations/s3

DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames');

Open questions

What to do if the files are on different object stores (e.g. S3 and http):

SELECT * FROM read_parquet([
  'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet', 
  -- note a different object store 
  's3://public-datasets/hits_compatible/athena_partitioned/hits_2.parquet'
]);

At first I suggest we don't try and support this

Additional context

No response

alamb avatar Jun 06 '25 13:06 alamb

That actually was on my backlog couple of months. It is nice to support an array of files or globs

comphead avatar Jun 06 '25 15:06 comphead

I'll try to take in 2 weeks if no one else beats me to it

comphead avatar Jun 06 '25 16:06 comphead

Maybe @robtandy could help

alamb avatar Jun 06 '25 21:06 alamb

@alamb - I'm less familiar with this area in datafusion but might be able to give this a shot. The idea is to add this as a table function right? I can see that ListingTableUrl::parse supports glob strings, so does it make sense to simply implement this as a listing table?

a-agmon avatar Jun 07 '25 18:06 a-agmon

I gave it a shot but it ended up being somewhat messy. Thats mostly due to the fact that on the one hand TableFunctionImpl::call() is synchronous, yet, on the other hand, it also has to get a hold of the schema of the data, which in the case of remote blobs (like s3), requires IO and async to be done right. I was trying to work around this by using the call() method to create a TableProvider that initially reports an empty schema. This satisfies the planner's synchronous API. The actual schema discovery is deferred until the scan() method is called during the asynchronous execution phase. But this creates an issue with projections that require to validate schema, i.e, select X from read_csv(some-glob-pattern) though select * from read_csv(some-glob-pattern) will work

a-agmon avatar Jun 08 '25 10:06 a-agmon

Thnks @a-agmon -- maybe this example would help: https://docs.rs/datafusion/latest/datafusion/catalog/trait.AsyncSchemaProvider.html

I agree the trick will be figuring out how to async calls.

alamb avatar Jun 08 '25 10:06 alamb

I can see that ListingTableUrl::parse supports glob strings, so does it make sense to simply implement this as a listing table?

Yes this is what I would expect -- that the result of calling read_parquet is / uses the LIstingTable implementation

alamb avatar Jun 08 '25 10:06 alamb

I have added a draft for this PR. Would be happy for your comments.

a-agmon avatar Jun 08 '25 17:06 a-agmon

Hi @alamb , @comphead raises a couple of good questions about the PR, so I'm linking it here to hear you thoughts. https://github.com/apache/datafusion/pull/16332#discussion_r2134795185

a-agmon avatar Jun 09 '25 06:06 a-agmon

Hi @comphead and @alamb I thought it might be a good idea to split this issue to several PRs 1 - add the support to use CREATE TABLE syntax with glob patterns and remote URL schemes just as with local ones (The new PR above tried to handle this).
2 - add table functions (read_parquet, read_csv, etc) to support glob reading (Im working on your comments regarding this one).

Hope this makes sense, feel free to comment also if not...

a-agmon avatar Jun 12 '25 13:06 a-agmon

Hi @comphead and @alamb I thought it might be a good idea to split this issue to several PRs 1 - add the support to use CREATE TABLE syntax with glob patterns and remote URL schemes just as with local ones (The new PR above tried to handle this). 2 - add table functions (read_parquet, read_csv, etc) to support glob reading (Im working on your comments regarding this one).

Hope this makes sense, feel free to comment also if not...

I agree a few smaller focused PRs will make sense

Thank you for working on this

alamb avatar Jun 13 '25 10:06 alamb

Thanks for creating this issue @alamb !!

Regarding the location of the code, if it is in datafusion proper rather than the CLI, it would be available in datafusion python, and any other projects that want to offer functionality backed by datafusion. I think it increases the utility of datafusion as a library and will get used.

Is it possible that it is a configuration option about whether to enable it? Like how datafusion.catalog.information_schema enables the info schema in the SessionState? I do understand that it will be more code to maintain, but my intuition is that this is generally useful enough to offer within the core as i think it will provide value. Its possible though i don't fully appreciate the ramifications of this choice though.

Curious what people think about this.

robtandy avatar Jun 13 '25 14:06 robtandy

Regarding the location of the code, if it is in datafusion proper rather than the CLI, it would be available in datafusion python, and any other projects that want to offer functionality backed by datafusion. I think it increases the utility of datafusion as a library and will get used.

That is an interesting idea -- I agree that having CREATE EXTERNAL TABLE support this kind of URL / multiple files would be useful

I am not sure I fully understand the ramifications either -- if we simply update the SQL planner (SqlToRel) to split the URL list on ' ' or ', ' that certainly seems straightforward to me (and would be backwards compatible...)

alamb avatar Jun 16 '25 11:06 alamb