iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Flink: new sink base on the unified sink API

Open hililiwei opened this issue 3 years ago • 7 comments

Co-authored-by: Kyle Bendickson [email protected]

What is the purpose of the change

There is a preliminary proposal: https://docs.google.com/document/d/1G4O6JidAoKgbIdy8Ts73OfG_KBEMpsW-LkXIb89I5k8/edit?usp=sharing

Motivation & Structure

In Flink 1.12 the community focused on implementing a unified Data Sink API (FLIP-143). The new abstraction introduces a write/commit protocol and a more modular interface where the individual components are transparently exposed to the framework. A Sink implementor will have to provide the what and how: a SinkWriter that writes data and outputs what needs to be committed (i.e. committables); and a Committer and GlobalCommitter that encapsulate how to handle the committables. The framework is responsible for the when and where: at what time and on which machine or process to commit.

image

GlobalCommitter can complete the action of submitting snapshots in Iceberg. There is a QA related to Iceberg in its design documentation.

Is the sink an operator or a topology? From the discussion in the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink.

The current Writing process of Iceberg Flink Sink is as follows:

image

In the figure above, data is written by multiple write operators, and then snapshot is submitted by an operator with 1 parallelism. Finally, an empty Sink node is added to end the whole process. Based on the FLIP 143, the above flow can be modified to: image

The Sink node writes data, and the GlobalCommiter operator commits the snapshot.

There is an unsolved problem in the above structure. When streaming data is written, Sink will generate a large number of small files, which accumulate over a long period of time and eventually lead to problems such as excessive system pressure and decreased reading performance. This problem not only exists in Iceberg, but also in file storage.

Flink tried to solve this problem with FLIP-191.

Motivation With FLIP-143 we introduced the unified Sink API to make it easier for connector developers to support batch and streaming scenarios. While developing the unified Sink API it was already noted that the unified Sink API might not be flexible enough to support all scenarios from the beginning. With this document, we mainly focus on the small-file-compaction problem which is one of the not yet covered scenarios. The problem manifests when using the FileSink and you want to guarantee a certain file size and not end up with a lot of small files (i.e. one file per parallel subtask). It becomes very important for modern data lake use cases where usually the data is written in some kind of columnar format (Parquet or ORC) and larger files are recommended to amplify the read performance. A few examples are the Iceberg sink [1], Delta lake [2], or Hive. For Hive in particular we already have a sink that supports compaction but it is not generally applicable and only available in Flink’s Table API [3].

The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction problem. To overcome the limitations of Sink V1 Flink offer different hooks to insert custom topologies into the sink.

image

On the basis of FLIP-191, the structure of Iceberg Flink Sink will be similar to the above, except that it only uses one committer operator, that is, all the output of Sink should be written to one committer, and all the submission actions should be completed by it.

image

PostCommit gives us a portal to do things like asynchronous small file merges and index generation, where we can do some optimization without affecting stream data writing.

The Advantage

Provides interfaces for small file merging and index The structure is clearer, and fault tolerance is guaranteed by Flink itself

Proposed Changes

The data writing action is converted from the transform operator to Sink Establish a separate Commiter operator to commit the snapshot

Iceberg currently maintains three versions of Flink 1.13, 1.14 and 1.15, while Flip-191 was introduced in Flink 1.15, so we plan to complete the versions containing these two flips in Flink 1.15 first. The one containing only FLIP-141 will be migrated to 1.13 1.14

immovable

The logic for writing data to a file This proposal only modifies the timing&operator of data writing, it does not change the logic of writing

committer logic Same with above, we don't change the action of commit, just put it into another operator.

Compatibility, Deprecation, and Migration Plan

Java API Deprecate old sink, and keep them at least until the next major version of Iceberg. SQL API Use new sink instead of the old one.

To do

Small file merge Index file generation

close #5119

hililiwei avatar May 30 '22 16:05 hililiwei

cc @openinx @chenjunjiedada @stevenzwu @kbendick , PTAL , thx.

hililiwei avatar May 31 '22 14:05 hililiwei

Thanks for ping me, I will take a look this week.

chenjunjiedada avatar Jun 22 '22 12:06 chenjunjiedada

@hililiwei: New to the Flink part, but read through the changes and left some comments. Feel free to correct me anywhere if I do not yet understand some of the Flink stuff. Thanks, Peter

pvary avatar Jul 19 '22 12:07 pvary

@hililiwei: I am getting second thoughts about this newline stuff. We usually try to add a newline after every block. I see that in several places we do not really adhere to it. Still might worth to check if you think the code would look better.

pvary avatar Jul 20 '22 17:07 pvary

@hililiwei: I am getting second thoughts about this newline stuff. We usually try to add a newline after every block. I see that in several places we do not really adhere to it. Still might worth to check if you think the code would look better.

Thank you for your review. I'll check it as soon as possible.

hililiwei avatar Jul 20 '22 17:07 hililiwei

@hililiwei: I think commit failure handling is a key here, and we should create several test cases to cover partially failing and retried commits so we can later check that the code written here is able to handle the failures correctly.

Thanks, Peter

pvary avatar Jul 29 '22 10:07 pvary

@hililiwei: I think commit failure handling is a key here, and we should create several test cases to cover partially failing and retried commits so we can later check that the code written here is able to handle the failures correctly.

Thanks, Peter

Indeed, let me try to add some unit tests for these case. thank you.

hililiwei avatar Jul 29 '22 10:07 hililiwei