How to load a table with bucket partition
Question
The table DDL is
create table test1 (
id int,
data int
)
using iceberg
partitioned by (bucket(4, id));
How to write the row_filter to load one of bucket?
table.scan(row_filter=xxx)
@frankliee PyIceberg will do the filtering automatically, so when you filter on the id column, it will automatically use the bucketing to filter down to the correct bucket:
table.scan(row_filter='id = 123')
@frankliee PyIceberg will do the filtering automatically, so when you filter on the id column, it will automatically use the bucketing to filter down to the correct bucket:
table.scan(row_filter='id = 123')
However, 'id=123' cannot control which bucket to load in a fine-grained manner.
For example, if we have four readers, and each reader loads one of bucket independently, it is difficult to write the row_filter expression.
Ah, I misread what you're looking for. Have you considered the .plan_files() API where you just get a list of tasks to read?
I have considered this, .plan_files() will get all files, and cannot distinguish which files are in the same bucket.
For Iceberg-Spark, there is a helpful system function expression like this,
SELECT * FROM test1 WHERE system.bucket(4, id) == 2
Iceberg-Python seems to be missing some things like bucket(n, ref).
@Fokko
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
table = catalog.load_table(("default", "mq_log10"))
scan = table.scan()
file_scan_tasks = scan.plan_files()
partition_num = 20
new_file_scan_tasks = []
for task in file_scan_tasks:
if task.file.partition[0] == partition_num:
new_file_scan_tasks.append(task)
# reference:
# https://github.com/apache/iceberg-python/blob/pyiceberg-0.7.1/pyiceberg/table/__init__.py#L2014
# @deprecated(
# deprecated_in="0.8.0",
# removed_in="0.9.0",
# help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
# )
from pyiceberg.io.pyarrow import project_table
pa_data_table = project_table(new_file_scan_tasks, scan.table_metadata, scan.io, scan.row_filter, scan.projection(), case_sensitive=scan.case_sensitive, limit=scan.limit)
I use this code to load a table with a bucketed partition, which may be useful for you. You should note that the project_table function will be removed in version 0.9.0, but you can refer to the new implementation and rewrite this code.
from pyiceberg.transforms import BucketTransform
from pyiceberg.types import IntegerType
id = 50
t = BucketTransform(50)
bucket_int_func = t.transform(IntegerType())
bucket_num = bucket_int_func(id)
print(bucket_num)
I looked into the source code again and found a way to calculate the bucket number from the ID.
@frankliee