Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode
Introduce Delta Writer functionality for both unpartitioned and partitioned tables, enabling CDC and upsert modes. Enhance configuration options for CDC fields, upsert mode, and DV usage.
Inspired by #12070 Resolves #10842
@bryanck
Thanks for the PR.
As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.
We should revisit those discussions and resolve those concerns before we proceed with this.
Thanks for the reply! I followed the Flink Sink implementation and attempted to port it into Kafka Connect. Would it be viable to use the connector plugin for CDC with DV mode enabled? also, should the iceberg table properties be set to read-on-merge for this use case?
I believe this feature is absolutely essential.
Thanks for contributing for iceberg ecosystem. In my view, utilising Kafka sink connect could readily facilitate efforts to establish SCD Type 1. Whilst this functionality may prove difficult to implement due to various internal community considerations, such as potential degradation in write and read performance, I ultimately hope it will be integrated so that diverse users can further benefit from Iceberg's advantages.
For what it's worth, I tested it under a modest CDC load, and it seems to be working fine. I can make another commit that locks the CDC mode to use DV only, but that would make the connector incompatible with v2 tables. Regardless, the exception thrown when the table is a v2 table makes it clear that using DV mode is what causes the failure. At this point, users can decide for themselves how they want to proceed.
We did a test with this PR branch with properties in kafka connect for sink connector on AWS iceberg.tables.use-dv: "true" iceberg.tables.write-props.format-version: "3"
We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ? The upsert mode for insert/update/delete worked as expected for all tables with PK=id.
We did a test with this PR branch with properties in kafka connect for sink connector on AWS iceberg.tables.use-dv: "true" iceberg.tables.write-props.format-version: "3"
We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ? The upsert mode for insert/update/delete worked as expected for all tables with PK=id.
Yeah, it's a limitation. I should have probably made a note about it above, but it is kind of out of scope for this PR.
What I ended up doing is writing some code that interfaces with the iceberg catalog outside of kafka connect and initilizes the table ahead of time with the correct format version