iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

How to load a table with bucket partition

Open frankliee opened this issue 1 year ago • 5 comments

Question

The table DDL is

create table test1 (
id int,
data int
) 
using iceberg
partitioned by (bucket(4, id));

How to write the row_filter to load one of bucket?

table.scan(row_filter=xxx)

frankliee avatar Mar 25 '24 13:03 frankliee

@frankliee PyIceberg will do the filtering automatically, so when you filter on the id column, it will automatically use the bucketing to filter down to the correct bucket:

table.scan(row_filter='id = 123')

Fokko avatar Mar 25 '24 13:03 Fokko

@frankliee PyIceberg will do the filtering automatically, so when you filter on the id column, it will automatically use the bucketing to filter down to the correct bucket:

table.scan(row_filter='id = 123')

However, 'id=123' cannot control which bucket to load in a fine-grained manner.

For example, if we have four readers, and each reader loads one of bucket independently, it is difficult to write the row_filter expression.

frankliee avatar Mar 25 '24 14:03 frankliee

Ah, I misread what you're looking for. Have you considered the .plan_files() API where you just get a list of tasks to read?

Fokko avatar Mar 25 '24 16:03 Fokko

I have considered this, .plan_files() will get all files, and cannot distinguish which files are in the same bucket.

For Iceberg-Spark, there is a helpful system function expression like this,

SELECT * FROM test1 WHERE system.bucket(4, id) == 2

Iceberg-Python seems to be missing some things like bucket(n, ref).

@Fokko

frankliee avatar Mar 26 '24 01:03 frankliee

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Sep 23 '24 00:09 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Oct 07 '24 00:10 github-actions[bot]

table = catalog.load_table(("default", "mq_log10"))

scan = table.scan()
file_scan_tasks = scan.plan_files()

partition_num = 20
new_file_scan_tasks = []
for task in file_scan_tasks:
    if task.file.partition[0] == partition_num:
        new_file_scan_tasks.append(task)

# reference:
# https://github.com/apache/iceberg-python/blob/pyiceberg-0.7.1/pyiceberg/table/__init__.py#L2014
# @deprecated(
#    deprecated_in="0.8.0",
#    removed_in="0.9.0",
#    help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
# )
from pyiceberg.io.pyarrow import project_table

pa_data_table = project_table(new_file_scan_tasks, scan.table_metadata, scan.io, scan.row_filter, scan.projection(), case_sensitive=scan.case_sensitive, limit=scan.limit)

I use this code to load a table with a bucketed partition, which may be useful for you. You should note that the project_table function will be removed in version 0.9.0, but you can refer to the new implementation and rewrite this code.

goalzz85 avatar Oct 21 '24 12:10 goalzz85

from pyiceberg.transforms import BucketTransform
from pyiceberg.types import IntegerType

id = 50

t = BucketTransform(50)
bucket_int_func = t.transform(IntegerType())
bucket_num = bucket_int_func(id)

print(bucket_num)

I looked into the source code again and found a way to calculate the bucket number from the ID.

@frankliee

goalzz85 avatar Oct 22 '24 08:10 goalzz85