[wip] Core: Add mininum sequence number for delete file
This adds a new field min-sequence-number to the manifest entry to denote the minimum sequence number of the referred data files of the delete file. The min-sequence-number can filter out deletes in planning where we apply the min/max filter. Furthermore, it can help reduce commit conflict when performing rewrite action on streaming upsert job.
In order for simplicity, this PR doesn't include the changes for the writer side to produce the min-sequence-number. For the flink upsert writer, it should write lazy value since the writer doesn't know the sequence number when committing. For the spark writer, we could compute the value from the referenced data files.
@rdblue @aokolnychyi @flyrain Could you please help to take a look? This just includes the first part metadata change, will continue the writer changes if this is the right direction to go. This also doesn't include changes for multiple engine versions.
Thanks @chenjunjiedada for bringing it up. It is good to have the statistic(min seq. number) for delete files, which will make rewrite a bit easier. I don't have much context though. One use case in my mind is that, we cannot easily remove the delete files while compaction since we don't know if it still refers to any data files within a partition, the min seq number will make it a bit easier.
Think aloud and come up another way to have the statistics. We can persist the seq# to the delete files, the new schema for position deletes will be (file_path, pos, seq). We can get the min/max of the column seq. It requires more storage for each delete files, but less table spec changes. One of good things is that we can have the max for the seq, which also helps for deleting a delete files. For example, we can delete the delete-files whose max seq less than the seq# of any data files.
Here is an example, assume we have this sequence happens in multiple snapshots
dataFile1 -> deleteFile1 -> dataFile2 -> deleteFile2 -> dataFile3
With the max seq#, we can remove deleteFile1 once dataFile1 has gone, and remove deleteFile2 once dataFile2 and dataFile1 have gone.
I don't favor any solution though. Let me know more details of your use case if possible. cc @szehon-ho
@flyrain Some delete files can be removed when committing, the related logic in MergingSnapshotProducer#apply compute the minimum sequence number of the data manifest and use it to filter out delete files that have smaller sequence numbers. This part of logic should cover the case you mentioned.
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
.filter(
seq ->
seq
!= ManifestWriter
.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
.reduce(base.lastSequenceNumber(), Math::min);
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
But some data files may not match the deletes, those data files will not be rewritten and will hold the sequence number unchanged. In that case, the delete files will not be removed.
Adding min-seq-num is mainly for performance consideration. When planning the task, the min-seq-num can help to filter out more unrelated deletes upon the min/max filter. Consider a flink upsert case with four snapshots:
(f1) -> (f2) -> (f3) -> (f4, d1)
The delete file d1 is intended to apply to f4 data file only but it may apply to all files since the min/max filter cannot always filter out unrelated data files upon the file name pattern "$partitionId-$taskId-$UUID-$fileCount". Though we can optimize the pattern by moving $UUID to the end or adding a $timestamp after $partitionid, it only helps the position deletes.
Another benefit is reducing conflict when compacting, the min-seq-num could help to narrow down the conflict scope.
Also cc @jackye1995 @stevenzwu. Just found an opened issue as well #3789
@flyrain Just reread your idea, one question is how do you handle the #seq if the writer is an upsert writer? The #seq is unknown when committing.
@aokolnychyi Could you help to take a look?
Let me go through an example to make sure I understand the purpose of this PR.
seq 0: Add DataFile1
seq 1: Add DataFile2
seq 2: Add DataFile3
seq 3: Add DataFile4
seq 4: Add PositionDeletes3 (references only DataFile2) (min referenced sequence number is seq 2).
Without this PR, most likely DataFile1, DataFile2, DataFile3, DataFile4 will get assigned PositionDeletes3 even though our position deletes apply only to DataFile3. After this PR, only DataFile3, DataFile4 will be assigned with PositionDeletes3.
Did I get it correctly? It only applies to position deletes?
My primary worry is that this would require a spec change and quite a bit of code to populate the new value. For instance, we currently only track file names when writing position deletes. After this, we would have to project and keep track of the sequence number per each referenced data file. Even after all of that, we can still get false positives.
I am currently working on an alternative planning for position deletes in Spark, where I want to open files in a distributed manner and squash them into a bitmap per data file. This would give us a reliable way to check if delete files apply and would also avoid the need to open the same delete file multiple times for different data files.
Did I get it correctly? It only applies to position deletes?
Correct.
My primary worry is that this would require a spec change and quite a bit of code to populate the new value. For instance, we currently only track file names when writing position deletes. After this, we would have to project and keep track of the sequence number per each referenced data file. Even after all of that, we can still get false positives.
Yes, it does need a field as added in this PR. It may need to track the sequence number of reference data files in spark MoR mode since we could populate the null value at first and populate the correct value in a later rewrite action. But anyway, it does get a false positive as you mentioned. While in the Flink upsert case, it always has a lazy value and thus no false positive problem.
I am currently working on an alternative planning for position deletes in Spark, where I want to open files in a distributed manner and squash them into a bitmap per data file. This would give us a reliable way to check if delete files apply and would also avoid the need to open the same delete file multiple times for different data files.
Sounds cool and promising, look forward to it.
While in the Flink upsert case, it always has a lazy value and thus no false positive problem.
Are you referring to position deletes written to dedup changes within one batch? Btw, can you elaborate a bit on the issues you see w.r.t to conflicting compaction?
Are you referring to position deletes written to dedup changes within one batch?
Right.
Btw, can you elaborate a bit on the issues you see w.r.t to conflicting compaction?
The issue is like these: https://github.com/apache/iceberg/issues/4996, https://github.com/apache/iceberg/issues/5397, https://github.com/apache/iceberg/issues/6330. The conflict happens when the upsert task commits position deletes that reference to data files is compacted in the progress.
Let me think. The issue is unfortunate.
@chenjunjiedada, one more clarification to make sure I understand. Flink upsert produces position files to dedup records within the same batch and those position deletes prevent from compaction, right? The issue is not related to concurrency, every rewrite data files would fail afterwards?
Can we solve this by adapting validateNoNewDeletesForDataFiles in RewriteFiles to not conflict when we detect a conflicting position delete file sequence number (different from data sequence number) is the same as the rewritten data file sequence number? This would leverage file sequence numbers added recently, not data sequence numbers.
Flink upsert produces position files to dedup records within the same batch and those position deletes prevent from compaction, right?
Right.
The issue is not related to concurrency, every rewrite data files would fail afterwards?
Correct, not related to concurrency.
Can we solve this by adapting validateNoNewDeletesForDataFiles in RewriteFiles to not conflict when we detect a conflicting position delete file sequence number (different from data sequence number) is the same as the rewritten data file sequence number?
It should work. What about position deletes that have been rewritten?
What about position deletes that have been rewritten?
Rewrite operations configure a starting snapshot and check for updates only after that. If we solve position deletes added in one snapshot, will it only fail if we rewrite position deletes and data files concurrently?
I've added this topic to the list of discussion points for the community sync tomorrow. I know it may be late for you to join. I'll share a summary here.
@stevenzwu, I wonder whether you hit this problem in your Flink jobs? Any relevant input regarding compaction?