Support for REPLACE TABLE operation
Closes #281 API proposal (from PR feedback):
table = catalog.create_or_replace_table(identifier, schema, location, partition_spec, sort_order, properties)
TODO:
- [ ] Update schema
- [ ] Update partition spec
- [ ] Update sort order
- [ ] Update location
- [ ] Update table properties
@anupam-saini Thanks for working on this. I'm not sure if the following API is where people would expect it:
with table.transaction() as transaction:
transaction.replace_table_with(new_table)
Specially because this is an unsafe operation that breaks for downstream consumers.
I would expect this operation on the catalog itself:
catalog = load_catalog('default')
catalog.create_table('schema.table', schema=...)
catalog.create_or_replace_table('schema.table', schema=...)
We want to generalize this operation, so we don't have to implement this for each of the catalogs. Therefore I would expect this on the Catalog(ABC) itself.
Just a heads up, for the replace table it keeps the history in Spark:
And when we look at the metadata, we can see the previous schema/snapshot as well:
{
"format-version" : 2,
"table-uuid" : "9b8b02af-2097-453f-86e2-5b2715e9d37a",
"location" : "s3://warehouse/default/fokko",
"last-sequence-number" : 2,
"last-updated-ms" : 1708081058809,
"last-column-id" : 2,
"current-schema-id" : 1,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "name",
"required" : false,
"type" : "string"
} ]
}, {
"type" : "struct",
"schema-id" : 1,
"fields" : [ {
"id" : 1,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "age"
"required" : false,
"type" : "int"
} ],
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "root",
"created-at" : "2024-02-16T10:57:38.541088095Z",
"write.parquet.compression-codec" : "zstd"
},
"current-snapshot-id" : 398515508184271470,
"refs" : {
"main" : {
"snapshot-id" : 398515508184271470,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 4615041670163082108,
"timestamp-ms" : 1708081058629,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1708080918556",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "416",
"changed-partition-count" : "1",
"total-records" : "1",
"total-files-size" : "416",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://warehouse/default/fokko/metadata/snap-4615041670163082108-1-d3852ba7-ff54-4abd-99a2-0265206cfbfa.avro",
"schema-id" : 0
}, {
"sequence-number" : 2,
"snapshot-id" : 398515508184271470,
"timestamp-ms" : 1708081058809,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1708080918556",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "628",
"changed-partition-count" : "1",
"total-records" : "1",
"total-files-size" : "628",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://warehouse/default/fokko/metadata/snap-398515508184271470-1-4d03a8b5-8912-4235-8c18-75400fef9874.avro",
"schema-id" : 1
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1708081058809,
"snapshot-id" : 398515508184271470
} ],
"metadata-log" : [ {
"timestamp-ms" : 1708081058629,
"metadata-file" : "s3://warehouse/default/fokko/metadata/00000-10d2c8d5-f6a2-4dc4-90cb-c545d8ffd497.metadata.json"
} ]
}
Thank you @Fokko for taking time to explain in such great detail. Now it makes much more sense to have this part of the Catalog API. Made changes as suggested.
Now with Sort Order and Partition Spec updates, this PR has all the necessary pieces for create-replace table operation and is ready for review.
@Fokko @syun64