iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Can we enable adaptive clustering?

Open myz540 opened this issue 10 months ago • 9 comments

Question

Does pyiceberg allow us to enable adaptive clustering when creating a table or enable it on an existing table?

The relevant sql would be something like

ALTER TABLE <table_identifier> CLUSTER BY (<column_name> [, ...]);

myz540 avatar Mar 12 '25 19:03 myz540

@myz540 That's not in there today. However, if you pre-cluster the table before writing, it should maintain order.

Fokko avatar Mar 12 '25 19:03 Fokko

@myz540 That's not in there today. However, if you pre-cluster the table before writing, it should maintain order.

thanks for your reply. On another matter, I am trying to write to a table that has TruncateTransforms on two columns as part of the partition schema. I mostly followed the documentation.

def create_partitions_truncate():
    return PartitionSpec(
        PartitionField(
            source_id=1, field_id=1000, transform=TruncateTransform(width=7), name="sid_truncate"
        ),
        PartitionField(
            source_id=2, field_id=2000, transform=TruncateTransform(width=1), name="gene_truncate"
        ),
    )

However, when I try writing to it

table = catalog.load_table((DATABASE, table_name))
smol_table = pa.Table.from_pandas(_df, schema=create_pa_schema())
with table.transaction() as transaction:
    transaction.append(smol_table)

I am hit with the following error

Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: [PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=7), name='sid_truncate'), PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(width=1), name='gene_truncate')].

Is this simply not supported by pyarrow at the moment or am I doing something wrong?

myz540 avatar Mar 12 '25 20:03 myz540

@myz540 can you double check if you're using the latest version?

Fokko avatar Mar 12 '25 21:03 Fokko

@myz540 can you double check if you're using the latest version?

my pyarrow==16.0.0 and pyiceberg==0.8.1

I upgraded them to their latest versions and now have a new weird error

Traceback (most recent call last):
  File "/Users/mikezhong/dev/pbmc/scripts/upload_lems.py", line 137, in <module>
    transaction.append(smol_table)
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 473, in append
    data_files = list(
                 ^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 2583, in _dataframe_to_data_files
    partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 2630, in _determine_partitions
    name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mikezhong/.venv/main/lib/python3.11/site-packages/pyiceberg/transforms.py", line 905, in pyarrow_transform
    from pyiceberg_core import transform as pyiceberg_core_transform
ModuleNotFoundError: No module named 'pyiceberg_core'

Rolling back to 0.8.1 resolves the issue but I am still left with the original write error. I don't think pyarrow implements the partition write for truncate transform. I can look to use aws glue to run spark scripts to do the work but would've loved to use pyiceberg for all our AWS s3 Tables interactions

myz540 avatar Mar 13 '25 00:03 myz540

Truncate transform with pyarrow was added in 0.9.0 https://github.com/apache/iceberg-python/commit/50c33aa0119d9e2478b3865d864ec23a7c45b1d7#diff-59b88b08481bfa59240342994e8dc16b34f4e9b28fb05540beb7dd22af8c036fR864-R865

ModuleNotFoundError: No module named 'pyiceberg_core'

you'd need to install the extra pyiceberg-core https://github.com/apache/iceberg-python/blob/1c0e2b04c383e5d90118ffce2066dcdb95193c4a/pyproject.toml#L307

kevinjqliu avatar Mar 17 '25 18:03 kevinjqliu

Truncate transform with pyarrow was added in 0.9.0 50c33aa#diff-59b88b08481bfa59240342994e8dc16b34f4e9b28fb05540beb7dd22af8c036fR864-R865

ModuleNotFoundError: No module named 'pyiceberg_core'

you'd need to install the extra pyiceberg-core

iceberg-python/pyproject.toml

Line 307 in 1c0e2b0

pyiceberg-core = ["pyiceberg-core"]

thanks Kevin, I wasn't sure if the error msg I was receiving was because of pyarrow not implementing or pyiceberg not implementing the wrapper. I'll give it a go and let you know

myz540 avatar Mar 18 '25 13:03 myz540

@kevinjqliu Would you be able to provide an example for the BucketTransform?

Here is the schema

def create_lems_schema() -> Schema:
    """
    Create and return the PyArrow schema for lems table.
    """
    return Schema(
        NestedField(field_id=1, name="sid", field_type=StringType(), required=True),
        NestedField(field_id=2, name="gene", field_type=StringType(), required=True),
        NestedField(
            field_id=3, name="prediction", field_type=FloatType(), required=True
        ),
    )

I have tried two things and gotten errors for both:

def create_lems_partitions_bucket():
    return PartitionSpec(
        PartitionField(
            source_id=1,
            field_id=1,
            transform=BucketTransform(num_buckets=1000),
            name="sid",
        ),
        PartitionField(
            source_id=2,
            field_id=2,
            transform=BucketTransform(num_buckets=100),
            name="gene",
        ),
    )

yields error: ValueError: Could not find in old schema: 1: sid: bucket[1000](1)

def create_lems_partitions_bucket():
    return PartitionSpec(
        PartitionField(
            source_id=1,
            field_id=1000,
            transform=BucketTransform(num_buckets=1000),
            name="sid_bucket",
        ),
        PartitionField(
            source_id=2,
            field_id=2000,
            transform=BucketTransform(num_buckets=100),
            name="gene_bucket",
        ),
    )

yields error: ValueError: Could not find in old schema: 1000: sid_bucket: bucket[1000](1)

myz540 avatar Mar 18 '25 15:03 myz540

I am able to create the partition spec with TruncateTransform and am able to write, however, when looping over chunks of 10k records and writing, like so:

for i, _df in tqdm(enumerate(chunk_dataframe(df)), desc="Processing chunk"):
    catalog = get_rest_catalog()
    table = catalog.load_table((DATABASE, table_name))
    smol_table = pa.Table.from_pandas(_df, schema=create_lems_pa_schema())
    with table.transaction() as transaction:
        transaction.append(smol_table)
        print(f"✅ Successfully appended data for {i}")
    print(f"✅ Successfully committed data for {i}")

print("✅ Successfully committed all data")

I eventually encounter this error. I've hit this error any time I need to write lots of chunks and it usually happens about an hour and a half in. I am refreshing my catalog connection on each iteration so not sure what the problem is. Any help would be appreciated

OSError: When initiating multiple part upload for key 'data/sid=HCC1008/gene=R/00000-0-b4aef6a1-d6c0-4c09-9c08-8cd2e91957e8.parquet' in bucket 'a5fc81c2-ccf3-4f36-wbi7imrt75cabpxguuo6i8f1a7n9quse1b--table-s3': AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 28, Timeout was reached

myz540 avatar Mar 18 '25 16:03 myz540

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Nov 14 '25 00:11 github-actions[bot]