Fix Kafka with Redistribute
Check Argument to check if redistribute can be enabled was incorrect
Two ways to enable commits,
- explicitly via commitOffsetsInFinalize()
- Via consumer config ENABLE_AUTO_COMMIT_CONFIG=true
If the first is true, redistribute can't be enabled If the second is true, the pipeline can still be passed, but isn't the most optimal, since if the runner wants to enable duplicates, there is no point of introducing the additional overhead of checkpointing messages within Kafka itself., though it is not semantically incorrect. The first option uses internal beam state to track which messages have been processed for preventing duplicates, which doesn't make sense if we also have a runner hint that duplicates can be allowed.
Before this fix, it just meant customers weren't able to set withRedistribute on the transform, even if commits weren't configured.
Updated tests to catch this.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
Reviewers: @scwhittle
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @kennknowles for label java. R: @ahmedabu98 for label io.
Available commands:
-
stop reviewer notifications- opt out of the automated review tooling -
remind me after tests pass- tag the comment author after tests pass -
waiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.
Do we need to disable the committing of offsets? I can see the argument that it might not make sense from an exactly-once perspective but given that there are other reasons and that the customer is configuring it explicitly can we just perhaps log a warning that the offsets may not reflect all processed data but still perform them?
One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.
That is a good point, so is an argument to still allow for it. What I dont understand though, is how does reshuffling/redistributing work with commiting offsets? Here is the current graph wiht offsets enabled (pulled from KafkaIO.java incase the formatting of the graph below isn't clear):
PCollection<KafkaSourceDescriptor> --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord) | --> KafkaCommitOffset
At that point, if redistribute is enabled, does it make more sense to substitute the Reshuffle here for the Redistribute transform? Instead of inserting a reshuffle after the Map? (this would introduce another shuffle based on runner implementation)
If we go with the former approach, commits will still occur, but the commits of the commits can have duplicates (need to investigate what that can cause, or will it just be a no op if we attempt to commit the same offset internally in BEAM twice?)
My PR to remove the Reshuffle for the commit offsets was just merged. So I think the question on if it should be a redistribute if configured might be moot now. But we could disallow commit offsets and redistribute in the expand259 path since it is still an issue there.
Perfect, thats great!
Reminder, please take a look at this pr: @kennknowles @ahmedabu98
R: @scwhittle
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers
Run Java_Kafka_IO_Direct PreCommit