flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-34063][runtime] Always flush compression buffers, when retrieving stream position during OperatorState snapshot.

Open dmvk opened this issue 2 years ago • 3 comments

https://issues.apache.org/jira/browse/FLINK-34063

dmvk avatar Jan 12 '24 09:01 dmvk

CI report:

  • acad43fa1458d4544840a7b28e2e752056e5e504 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jan 12 '24 09:01 flinkbot

fyi @echauchot @dawidwys

dmvk avatar Jan 12 '24 10:01 dmvk

CI seems broken by https://github.com/apache/flink/pull/24022#issuecomment-1888811191

dmvk avatar Jan 12 '24 10:01 dmvk

testOperatorEventLostWithReaderFailure ... reliably fails when trying to restore from compressed state, will investigate further :/

dmvk avatar Jan 13 '24 06:01 dmvk

I've been able to pin down the issue to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L190

the problem is that the sorting breaks on empty state ... we should always restore in the order state was written

fix in: https://github.com/apache/flink/pull/24079/commits/8d02807e3b8b12c010cde64e2518fe49044eda14

cc @fredia @ruibinx since you've worked on https://github.com/apache/flink/pull/23938

dmvk avatar Jan 15 '24 17:01 dmvk

And another one ... this just hurts :/

DataSetSavepointReaderITCase.testOperatorStateInputFormat

dmvk avatar Jan 15 '24 21:01 dmvk

hmm, so the issue is related to repartitioning in case there are multiple states;

imagine we have two states with 6 elements and we rescale from 1 to 3; this means that every split gets 2 elements;

  1. we read 2 elements from first state
  2. now we want to read 2 elements from 2nd state, but we're resuming read from where we left off (before seeking to offset) and surprise, headers are not there 🙈

It seems that we should not really have snappy headers per-state, but we should write them just once at the begining ...

dmvk avatar Jan 15 '24 22:01 dmvk

Fixed in https://github.com/apache/flink/pull/24079/commits/6bb32b114b76515757e602612dcf33668d031fef

I think this needs another review pass now. Can you please take another look @isburmistrov @dawidwys ?

dmvk avatar Jan 15 '24 23:01 dmvk

Note that the latest fix (https://github.com/apache/flink/pull/24079#issuecomment-1892858467) is not BW compatible, but I don't think there is any harm in that since the snapshots were not restorable anyway.

dmvk avatar Jan 15 '24 23:01 dmvk

I'm actually starting to doubt whether the feature is even worth it for partitioned state. The compressed representation might actually end up being bigger than uncompressed one. To make it worth something, we should probably evolve partitioned state to use sparse index (still, it's a separate effort).

In terms of broadcast state, this makes a perfect sense, because there we just treat everything as a single block for compression.

dmvk avatar Jan 15 '24 23:01 dmvk

I'm actually starting to doubt whether the feature is even worth it for partitioned state. The compressed representation might actually end up being bigger than uncompressed one. To make it worth something, we should probably evolve partitioned state to use sparse index (still, it's a separate effort).

Yeah exactly. I wonder - is it hard to simply not do compression for partitioned state (but keep doing it for broadcast state)?

isburmistrov avatar Jan 15 '24 23:01 isburmistrov

@flinkbot run azure

dmvk avatar Jan 16 '24 08:01 dmvk

Yeah exactly. I wonder - is it hard to simply not do compression for partitioned state (but keep doing it for broadcast state)?

It requires changing the wire format.

dmvk avatar Jan 16 '24 08:01 dmvk

I've been able to pin down the issue to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L190

the problem is that the sorting breaks on empty state ... we should always restore in the order state was written

fix in: 8d02807

cc @fredia @ruibinx since you've worked on #23938

@dmvk Thanks for the investigation and fix, sorry for ignoring the case of length=0 https://github.com/apache/flink/pull/23938#discussion_r1432321260 at that time.

It seems that we should not really have snappy headers per-state, but we should write them just once at the begining ...

I have an inelegant solution:

  1. Before deserializing each state, record the startPos first.
  2. And then when constructing CompressibleFSDataInputStream each time, seek to startPos first.

So that the order of our offset is no longer important, and compatibility does not need to be specially considered.

  void restore() {
        for (OperatorStateHandle stateHandle : stateHandles) {
            ...
            long startPos = in.getPos();
            for (String stateName : toRestore) {
                in.seek(startPos);
                try (final CompressibleFSDataInputStream compressedIn =
                             new CompressibleFSDataInputStream(
                                     in,
                                     compressionDecorator)) {
                    // deserialize
                }
            }
            ...
        }
    }

fredia avatar Jan 16 '24 11:01 fredia

fyi @echauchot @dawidwys

Sorry for introducing this bug and thanks for fixing @dmvk. I'll look at it soon.

echauchot avatar Jan 16 '24 13:01 echauchot

I have an inelegant solution:

@fredia After https://github.com/apache/flink/pull/24079/commits/6bb32b114b76515757e602612dcf33668d031fef the sorting is no longer important, because that allows us to seek wherever we want. It's still a nice optimization to have though, since it will allow us to read things sequentially, but ultimately we could remove it now.

Any thoughts?

dmvk avatar Jan 16 '24 16:01 dmvk

So that the order of our offset is no longer important, and compatibility does not need to be specially considered.

I don't think that needs to be considered at all, since the states are corrupted anyway, because the lack of flushes.

dmvk avatar Jan 16 '24 16:01 dmvk

but ultimately we could remove it now.

Yes, I think acad43f is a more thorough solution, and as you mentioned, it also solves the the compressed representation might actually end up being bigger than uncompressed one problem.

I don't think that needs to be considered at all, since the states are corrupted anyway, because the lack of flushes.

Agree👍, all previously generated compressed operator states are unrecoverable. Thanks to decaf21, make the problems related to the compression operation state expose.

fredia avatar Jan 17 '24 02:01 fredia

it also solves the the compressed representation might actually end up being bigger than uncompressed one problem.

Not completely, since we're doing per-value compression for list states, (I'm not 100% sure how snappy works, assuming something along the lines of huffman coding) you still need to write per-frame metadata ("dictionary"), so it could imo still be bigger for really small values. It certainly makes things better though.

dmvk avatar Jan 17 '24 09:01 dmvk

There might actually be something more fancy going on, need to read up on it at some point :D https://en.wikipedia.org/wiki/Snappy_(compression)

dmvk avatar Jan 17 '24 09:01 dmvk

I finally had time to take a look at this PR (reduced availability nowadays). Thanks for all the diagnostics and chained fixes @dmvk and sorry for missing the fact that pos was not updated unless underlying snappy buffer is flushed. I should have added more test coverage for sure ! Thanks to the reviewers as well !

echauchot avatar Jan 26 '24 10:01 echauchot