[FLINK-34063][runtime] Always flush compression buffers, when retrieving stream position during OperatorState snapshot.
https://issues.apache.org/jira/browse/FLINK-34063
CI report:
- acad43fa1458d4544840a7b28e2e752056e5e504 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azurere-run the last Azure build
fyi @echauchot @dawidwys
CI seems broken by https://github.com/apache/flink/pull/24022#issuecomment-1888811191
testOperatorEventLostWithReaderFailure ... reliably fails when trying to restore from compressed state, will investigate further :/
I've been able to pin down the issue to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L190
the problem is that the sorting breaks on empty state ... we should always restore in the order state was written
fix in: https://github.com/apache/flink/pull/24079/commits/8d02807e3b8b12c010cde64e2518fe49044eda14
cc @fredia @ruibinx since you've worked on https://github.com/apache/flink/pull/23938
And another one ... this just hurts :/
DataSetSavepointReaderITCase.testOperatorStateInputFormat
hmm, so the issue is related to repartitioning in case there are multiple states;
imagine we have two states with 6 elements and we rescale from 1 to 3; this means that every split gets 2 elements;
- we read 2 elements from first state
- now we want to read 2 elements from 2nd state, but we're resuming read from where we left off (before seeking to offset) and surprise, headers are not there 🙈
It seems that we should not really have snappy headers per-state, but we should write them just once at the begining ...
Fixed in https://github.com/apache/flink/pull/24079/commits/6bb32b114b76515757e602612dcf33668d031fef
I think this needs another review pass now. Can you please take another look @isburmistrov @dawidwys ?
Note that the latest fix (https://github.com/apache/flink/pull/24079#issuecomment-1892858467) is not BW compatible, but I don't think there is any harm in that since the snapshots were not restorable anyway.
I'm actually starting to doubt whether the feature is even worth it for partitioned state. The compressed representation might actually end up being bigger than uncompressed one. To make it worth something, we should probably evolve partitioned state to use sparse index (still, it's a separate effort).
In terms of broadcast state, this makes a perfect sense, because there we just treat everything as a single block for compression.
I'm actually starting to doubt whether the feature is even worth it for partitioned state. The compressed representation might actually end up being bigger than uncompressed one. To make it worth something, we should probably evolve partitioned state to use sparse index (still, it's a separate effort).
Yeah exactly. I wonder - is it hard to simply not do compression for partitioned state (but keep doing it for broadcast state)?
@flinkbot run azure
Yeah exactly. I wonder - is it hard to simply not do compression for partitioned state (but keep doing it for broadcast state)?
It requires changing the wire format.
I've been able to pin down the issue to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L190
the problem is that the sorting breaks on empty state ... we should always restore in the order state was written
fix in: 8d02807
cc @fredia @ruibinx since you've worked on #23938
@dmvk Thanks for the investigation and fix, sorry for ignoring the case of length=0 https://github.com/apache/flink/pull/23938#discussion_r1432321260 at that time.
It seems that we should not really have snappy headers per-state, but we should write them just once at the begining ...
I have an inelegant solution:
- Before deserializing each state, record the
startPosfirst. - And then when constructing
CompressibleFSDataInputStreameach time, seek tostartPosfirst.
So that the order of our offset is no longer important, and compatibility does not need to be specially considered.
void restore() {
for (OperatorStateHandle stateHandle : stateHandles) {
...
long startPos = in.getPos();
for (String stateName : toRestore) {
in.seek(startPos);
try (final CompressibleFSDataInputStream compressedIn =
new CompressibleFSDataInputStream(
in,
compressionDecorator)) {
// deserialize
}
}
...
}
}
fyi @echauchot @dawidwys
Sorry for introducing this bug and thanks for fixing @dmvk. I'll look at it soon.
I have an inelegant solution:
@fredia After https://github.com/apache/flink/pull/24079/commits/6bb32b114b76515757e602612dcf33668d031fef the sorting is no longer important, because that allows us to seek wherever we want. It's still a nice optimization to have though, since it will allow us to read things sequentially, but ultimately we could remove it now.
Any thoughts?
So that the order of our offset is no longer important, and compatibility does not need to be specially considered.
I don't think that needs to be considered at all, since the states are corrupted anyway, because the lack of flushes.
but ultimately we could remove it now.
Yes, I think acad43f is a more thorough solution, and as you mentioned, it also solves the the compressed representation might actually end up being bigger than uncompressed one problem.
I don't think that needs to be considered at all, since the states are corrupted anyway, because the lack of flushes.
Agree👍, all previously generated compressed operator states are unrecoverable. Thanks to decaf21, make the problems related to the compression operation state expose.
it also solves the the compressed representation might actually end up being bigger than uncompressed one problem.
Not completely, since we're doing per-value compression for list states, (I'm not 100% sure how snappy works, assuming something along the lines of huffman coding) you still need to write per-frame metadata ("dictionary"), so it could imo still be bigger for really small values. It certainly makes things better though.
There might actually be something more fancy going on, need to read up on it at some point :D https://en.wikipedia.org/wiki/Snappy_(compression)
I finally had time to take a look at this PR (reduced availability nowadays). Thanks for all the diagnostics and chained fixes @dmvk and sorry for missing the fact that pos was not updated unless underlying snappy buffer is flushed. I should have added more test coverage for sure ! Thanks to the reviewers as well !