datasets icon indicating copy to clipboard operation
datasets copied to clipboard

Save Dataset as Sharded Parquet

Open tom-p-reichel opened this issue 1 year ago • 6 comments

Feature request

to_parquet currently saves the dataset as one massive, monolithic parquet file, rather than as several small parquet files. It should shard large datasets automatically.

Motivation

This default behavior makes me very sad because a program I ran for 6 hours saved its results using to_parquet, putting the entire billion+ row dataset into a 171 GB single shard parquet file which pyarrow, apache spark, etc. all cannot work with without completely exhausting the memory of my system. I was previously able to work with larger-than-memory parquet files, but not this one. I assume the reason why this is happening is because it is a single shard. Making sharding the default behavior puts datasets in parity with other frameworks, such as spark, which automatically shard when a large dataset is saved as parquet.

Your contribution

I could change the logic here https://github.com/huggingface/datasets/blob/bf6f41e94d9b2f1c620cf937a2e85e5754a8b960/src/datasets/io/parquet.py#L109-L158 to use pyarrow.dataset.write_dataset, which seems to support sharding, or periodically open new files. We would only shard if the user passed in a path rather than file handle.

tom-p-reichel avatar Jul 12 '24 23:07 tom-p-reichel

To anyone else who finds themselves in this predicament, it's possible to read the parquet file in the same way that datasets writes it, and then manually break it into pieces. Although, you need a couple of magic options (thrift_*) to deal with the huge metadata, otherwise pyarrow immediately crashes.

import pyarrow.parquet as pq
import pyarrow as pa

r = pq.ParquetReader()

r.open("./outrageous-file.parquet",thrift_string_size_limit=2**31-1, thrift_container_size_limit=2**31-1)

from more_itertools import chunked
import tqdm

for i,chunk in tqdm.tqdm(enumerate(chunked(range(r.num_row_groups),10000))):
    w = pq.ParquetWriter(f"./chunks.parquet/chunk{i}.parquet",schema=r.schema_arrow)
    for idx in chunk:
        w.write_table(r.read_row_group(idx))
    w.close()

tom-p-reichel avatar Jul 13 '24 00:07 tom-p-reichel

You can also use .shard() and call to_parquet() on each shard in the meantime:

num_shards = 128
output_path_template = "output_dir/{index:05d}.parquet"
for index in range(num_shards):
    shard = ds.shard(index=index, num_shards=num_shards, contiguous=True)
    shard.to_parquet(output_path_template.format(index=index))

lhoestq avatar Jul 17 '24 12:07 lhoestq

You can also use .shard() and call to_parquet() on each shard in the meantime:

num_shards = 128 output_path_template = "output_dir/{index:05d}.parquet" for index in range(num_shards): shard = ds.shard(index=index, num_shards=num_shards, contiguous=True) shard.to_parquet(output_path_template.format(index=index))

I understand that this approach requires loading the entire dataset into memory before sharding. What is the recommended best practice for handling extremely large datasets (for example, in the context of DCLM) that are too large to fit in memory?

huanranchen avatar Sep 29 '25 12:09 huanranchen

load_dataset() doesn't load the dataset in memory, and sharding neither. Datasets in datasets are memory mapped from disk, which allows to load datasets bigger than RAM.

And if the dataset is even bigger than your disk, you could even use streaming=True which gives an IterableDataset.

Check out the documentation if you want to learn more about Dataset and IterableDatasetn, as well as .push_to_hub() which pushes datasets as sharded Parquet on Hugging Face automatically and supports parallelism with multiprocessing for large datasets.

lhoestq avatar Oct 01 '25 15:10 lhoestq