hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Inconsistent Checkpoint Size in Flink Applications with MoR

Open FranMorilloAWS opened this issue 2 years ago • 14 comments

I have an Apache Flink Application that leverages RockDB and incremental checkpoint, but it seems that every time a compaction task occurs, the checkpoint size of the application during that interval increases dramatically. Is this due to when doing compaction, the streaming job has to load the entire table? if thats the case, Is there any way it can only compact and load the current partition?

FranMorilloAWS avatar Dec 14 '23 11:12 FranMorilloAWS

It is as expected because even if the incremental checkpointing is enabled, Flink triggers a full checkpointing every N delta checkpinting, it is not relevent with compaction actually.

danny0405 avatar Dec 14 '23 11:12 danny0405

Can you explain more on that process, so based on the interval of my delta commits, Flink will run a full checkpoint?

FranMorilloAWS avatar Dec 14 '23 11:12 FranMorilloAWS

But also wondering if my application is meant to be stateless, why is checkpoint size increase tenfold? Is that the index size? how can i manage this

FranMorilloAWS avatar Dec 14 '23 12:12 FranMorilloAWS

That inc/full checkpointing is managed by Flink, in hudi, we do have an option 'index.ttl' to control the liveness of the index items but it is not suggested because that would induce duplicates.

danny0405 avatar Dec 14 '23 13:12 danny0405

But could you share what is kept in state in a Flink Hudi Application? I have just a source generator within flink and writing into hudi using MOR and state size is growing consistently

FranMorilloAWS avatar Dec 14 '23 13:12 FranMorilloAWS

It's the mapping from hoodie record key to location, for a location it is comprised by a partition path and file group id.

danny0405 avatar Dec 18 '23 03:12 danny0405

Is there any state cleanup that can be done? So it doesnt indefinitely ?

FranMorilloAWS avatar Dec 18 '23 08:12 FranMorilloAWS

you can choose bucket index, bucket index does not support updates among multiple partitions and the bucket number can not scale well if it not consistent hashing.

danny0405 avatar Dec 19 '23 03:12 danny0405

Then how would you recommend to work with Hudi Tables that can grow to infinite rows?

FranMorilloAWS avatar Jan 11 '24 12:01 FranMorilloAWS

Are there any improvement or features coming for bucket index, to allow updating multiple partitions or changing the number of buckets?

FranMorilloAWS avatar Jan 11 '24 12:01 FranMorilloAWS

We have support for consistent hashing index which can scales the bucket number automically.

danny0405 avatar Jan 12 '24 09:01 danny0405

Does this allows to be updating multiple partitions or if the partition changes? Is it a global index?

FranMorilloAWS avatar Jan 12 '24 10:01 FranMorilloAWS

cc @danny0405

ad1happy2go avatar Jan 31 '24 12:01 ad1happy2go

How is the bucket number automatically expanded? I saw in the pfr that it is meant to be a subtask in the clustering service, but clustering only works with COW and insert.

Will hudi + flink have any rewrite api in flink to remove /compact small files?

FranMorilloAWS avatar Feb 02 '24 18:02 FranMorilloAWS