beam icon indicating copy to clipboard operation
beam copied to clipboard

Fix Kafka with Redistribute

Open Naireen opened this issue 1 year ago • 3 comments

Check Argument to check if redistribute can be enabled was incorrect

Two ways to enable commits,

  1. explicitly via commitOffsetsInFinalize()
  2. Via consumer config ENABLE_AUTO_COMMIT_CONFIG=true

If the first is true, redistribute can't be enabled If the second is true, the pipeline can still be passed, but isn't the most optimal, since if the runner wants to enable duplicates, there is no point of introducing the additional overhead of checkpointing messages within Kafka itself., though it is not semantically incorrect. The first option uses internal beam state to track which messages have been processed for preventing duplicates, which doesn't make sense if we also have a runner hint that duplicates can be allowed.

Before this fix, it just meant customers weren't able to set withRedistribute on the transform, even if commits weren't configured.

Updated tests to catch this.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Naireen avatar Aug 28 '24 00:08 Naireen

Reviewers: @scwhittle

Naireen avatar Aug 28 '24 01:08 Naireen

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java. R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Aug 28 '24 01:08 github-actions[bot]

One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.

Do we need to disable the committing of offsets? I can see the argument that it might not make sense from an exactly-once perspective but given that there are other reasons and that the customer is configuring it explicitly can we just perhaps log a warning that the offsets may not reflect all processed data but still perform them?

scwhittle avatar Aug 28 '24 07:08 scwhittle

One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.

That is a good point, so is an argument to still allow for it. What I dont understand though, is how does reshuffling/redistributing work with commiting offsets? Here is the current graph wiht offsets enabled (pulled from KafkaIO.java incase the formatting of the graph below isn't clear):

PCollection<KafkaSourceDescriptor> --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord) | --> KafkaCommitOffset

At that point, if redistribute is enabled, does it make more sense to substitute the Reshuffle here for the Redistribute transform? Instead of inserting a reshuffle after the Map? (this would introduce another shuffle based on runner implementation)

If we go with the former approach, commits will still occur, but the commits of the commits can have duplicates (need to investigate what that can cause, or will it just be a no op if we attempt to commit the same offset internally in BEAM twice?)

Naireen avatar Aug 28 '24 18:08 Naireen

My PR to remove the Reshuffle for the commit offsets was just merged. So I think the question on if it should be a redistribute if configured might be moot now. But we could disallow commit offsets and redistribute in the expand259 path since it is still an issue there.

scwhittle avatar Aug 29 '24 14:08 scwhittle

Perfect, thats great!

Naireen avatar Aug 31 '24 05:08 Naireen

Reminder, please take a look at this pr: @kennknowles @ahmedabu98

github-actions[bot] avatar Sep 07 '24 12:09 github-actions[bot]

R: @scwhittle

ahmedabu98 avatar Sep 07 '24 12:09 ahmedabu98

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

github-actions[bot] avatar Sep 07 '24 12:09 github-actions[bot]

Run Java_Kafka_IO_Direct PreCommit

Naireen avatar Sep 11 '24 18:09 Naireen