[SUPPORT] Inconsistent Checkpoint Size in Flink Applications with MoR
I have an Apache Flink Application that leverages RockDB and incremental checkpoint, but it seems that every time a compaction task occurs, the checkpoint size of the application during that interval increases dramatically. Is this due to when doing compaction, the streaming job has to load the entire table? if thats the case, Is there any way it can only compact and load the current partition?
It is as expected because even if the incremental checkpointing is enabled, Flink triggers a full checkpointing every N delta checkpinting, it is not relevent with compaction actually.
Can you explain more on that process, so based on the interval of my delta commits, Flink will run a full checkpoint?
But also wondering if my application is meant to be stateless, why is checkpoint size increase tenfold? Is that the index size? how can i manage this
That inc/full checkpointing is managed by Flink, in hudi, we do have an option 'index.ttl' to control the liveness of the index items but it is not suggested because that would induce duplicates.
But could you share what is kept in state in a Flink Hudi Application? I have just a source generator within flink and writing into hudi using MOR and state size is growing consistently
It's the mapping from hoodie record key to location, for a location it is comprised by a partition path and file group id.
Is there any state cleanup that can be done? So it doesnt indefinitely ?
you can choose bucket index, bucket index does not support updates among multiple partitions and the bucket number can not scale well if it not consistent hashing.
Then how would you recommend to work with Hudi Tables that can grow to infinite rows?
Are there any improvement or features coming for bucket index, to allow updating multiple partitions or changing the number of buckets?
We have support for consistent hashing index which can scales the bucket number automically.
Does this allows to be updating multiple partitions or if the partition changes? Is it a global index?
cc @danny0405
How is the bucket number automatically expanded? I saw in the pfr that it is meant to be a subtask in the clustering service, but clustering only works with COW and insert.
Will hudi + flink have any rewrite api in flink to remove /compact small files?