flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-32562][connectors/filesystem] Handle concatenable compressed streams correctly in FileCompactor

Open ferenc-csaky opened this issue 1 year ago • 2 comments

What is the purpose of the change

Provides a way to handle output stream based compaction in case the data stream is compressed. The problem is described well in the Jira ticket. This change introduces some modifications in case the data is written via CompressWriterFactory, which will be compressed. In those cases, the stream that actually writes to the compacted file should not be compressed, as the written data which we compact is already written out into compressed part files, so we simply concat them.

Of course, this is only usable for Hadoop compression codecs that are concatenable, namely: DEFATE, GZIP, BZIP2.

Brief change log

  • Added CompressWriterFactory#createWithNoCompression(...) to be able to enforce skipping compression for the compacting phase, even if the data itself is compressed.
  • Updated BulkBucketWriter#openNew(...) to use the newly introduced createWithNoCompression when it is called to open a new compacting file, which should not use any kind of compression, as the data is already compressed.
  • Adapted OutputStreamBasedBucketWriter to the above changes.
  • Changed AbstractCompactTestBase to protected to be able to use in the new CompactServiceTest that is in a diff. package.
  • Moved some test helper methods to AbstractCompactTestBase to avoid/eliminate duplication.
  • Extracted TestBucketWriter to its own class, as it is now used in multiple test files.

Verifying this change

  • Added new unit tests for the newly introduced logic in CompactServiceTest.
  • Added a new integration test CompressedCompactionITCase to validate compaction on all supported compressed output that is concatenable.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? debatable
  • If yes, how is the feature documented? JavaDocs

ferenc-csaky avatar Jun 07 '24 14:06 ferenc-csaky

CI report:

  • ff25e0d169ececa7f6c6eaadf84731fb9a350304 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 07 '24 14:06 flinkbot

Seems like the actual change set is working for particular compression types and not working for others (an example is snappy). Solving the problem partially would generate bad UX for users so I would suggest to re-evaluate the general approach.

gaborgsomogyi avatar Jun 17 '24 09:06 gaborgsomogyi

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Apr 06 '25 06:04 github-actions[bot]

This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.

github-actions[bot] avatar May 06 '25 06:05 github-actions[bot]