Flink: new sink base on the unified sink API
Co-authored-by: Kyle Bendickson [email protected]
What is the purpose of the change
There is a preliminary proposal: https://docs.google.com/document/d/1G4O6JidAoKgbIdy8Ts73OfG_KBEMpsW-LkXIb89I5k8/edit?usp=sharing
Motivation & Structure
In Flink 1.12 the community focused on implementing a unified Data Sink API (FLIP-143). The new abstraction introduces a write/commit protocol and a more modular interface where the individual components are transparently exposed to the framework. A Sink implementor will have to provide the what and how: a SinkWriter that writes data and outputs what needs to be committed (i.e. committables); and a Committer and GlobalCommitter that encapsulate how to handle the committables. The framework is responsible for the when and where: at what time and on which machine or process to commit.

GlobalCommitter can complete the action of submitting snapshots in Iceberg. There is a QA related to Iceberg in its design documentation.
Is the sink an operator or a topology? From the discussion in the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink.
The current Writing process of Iceberg Flink Sink is as follows:

In the figure above, data is written by multiple write operators, and then snapshot is submitted by an operator with 1 parallelism. Finally, an empty Sink node is added to end the whole process.
Based on the FLIP 143, the above flow can be modified to:

The Sink node writes data, and the GlobalCommiter operator commits the snapshot.
There is an unsolved problem in the above structure. When streaming data is written, Sink will generate a large number of small files, which accumulate over a long period of time and eventually lead to problems such as excessive system pressure and decreased reading performance. This problem not only exists in Iceberg, but also in file storage.
Flink tried to solve this problem with FLIP-191.
Motivation With FLIP-143 we introduced the unified Sink API to make it easier for connector developers to support batch and streaming scenarios. While developing the unified Sink API it was already noted that the unified Sink API might not be flexible enough to support all scenarios from the beginning. With this document, we mainly focus on the small-file-compaction problem which is one of the not yet covered scenarios. The problem manifests when using the FileSink and you want to guarantee a certain file size and not end up with a lot of small files (i.e. one file per parallel subtask). It becomes very important for modern data lake use cases where usually the data is written in some kind of columnar format (Parquet or ORC) and larger files are recommended to amplify the read performance. A few examples are the Iceberg sink [1], Delta lake [2], or Hive. For Hive in particular we already have a sink that supports compaction but it is not generally applicable and only available in Flink’s Table API [3].
The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction problem. To overcome the limitations of Sink V1 Flink offer different hooks to insert custom topologies into the sink.

On the basis of FLIP-191, the structure of Iceberg Flink Sink will be similar to the above, except that it only uses one committer operator, that is, all the output of Sink should be written to one committer, and all the submission actions should be completed by it.

PostCommit gives us a portal to do things like asynchronous small file merges and index generation, where we can do some optimization without affecting stream data writing.
The Advantage
Provides interfaces for small file merging and index The structure is clearer, and fault tolerance is guaranteed by Flink itself
Proposed Changes
The data writing action is converted from the transform operator to Sink Establish a separate Commiter operator to commit the snapshot
Iceberg currently maintains three versions of Flink 1.13, 1.14 and 1.15, while Flip-191 was introduced in Flink 1.15, so we plan to complete the versions containing these two flips in Flink 1.15 first. The one containing only FLIP-141 will be migrated to 1.13 1.14
immovable
The logic for writing data to a file This proposal only modifies the timing&operator of data writing, it does not change the logic of writing
committer logic Same with above, we don't change the action of commit, just put it into another operator.
Compatibility, Deprecation, and Migration Plan
Java API Deprecate old sink, and keep them at least until the next major version of Iceberg. SQL API Use new sink instead of the old one.
To do
Small file merge Index file generation
close #5119
cc @openinx @chenjunjiedada @stevenzwu @kbendick , PTAL , thx.
Thanks for ping me, I will take a look this week.
@hililiwei: New to the Flink part, but read through the changes and left some comments. Feel free to correct me anywhere if I do not yet understand some of the Flink stuff. Thanks, Peter
@hililiwei: I am getting second thoughts about this newline stuff. We usually try to add a newline after every block. I see that in several places we do not really adhere to it. Still might worth to check if you think the code would look better.
@hililiwei: I am getting second thoughts about this newline stuff. We usually try to add a newline after every block. I see that in several places we do not really adhere to it. Still might worth to check if you think the code would look better.
Thank you for your review. I'll check it as soon as possible.
@hililiwei: I think commit failure handling is a key here, and we should create several test cases to cover partially failing and retried commits so we can later check that the code written here is able to handle the failures correctly.
Thanks, Peter
@hililiwei: I think commit failure handling is a key here, and we should create several test cases to cover partially failing and retried commits so we can later check that the code written here is able to handle the failures correctly.
Thanks, Peter
Indeed, let me try to add some unit tests for these case. thank you.