KAFKA-12497: Skip periodic offset commits for failed source tasks
This change serves two purposes:
- Eliminate unnecessary log messages for offset commit of tasks that don't need to perform offset commits (e.g., a task that has failed and for which all data has been flushed and committed)
- Stop blocking the offset commit thread unnecessarily for flushes that will never succeed because the task's producer has failed to send a record in the current batch with a non-retriable error
Existing unit tests for the OffsetStorageWriter are tweaked to verify the small change made to it. Several new unit tests are added for the WorkerSourceTask that cover various cases where offset commits should not be attempted, and some existing tests are modified to cover cases where offset commits either should or should not be attempted.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@ncliang @gharris1727 @kpatelatwork @ddasarathan could one or two of you take a look at this when you have time?
@C0urante overall looks good and very good job on the tests but as I am new to the code, I tried my best to review but I recommend getting a LGTM from one more reviewer also who knows this code better than me to ensure we don't miss something obvious?
@rhauch @tombentley could either of you take a look? It'd be nice to get this merged in time for the upcoming 3.1 release; I know I've seen plenty of people led astray by continued offset commit messages for failed tasks and it'd be great if we could improve their experience.
Thanks @tombentley. Interesting point about the log capture appender but it's a little tricky with the current setup. I've hacked together a prototype that tries to provide realistic coverage by simulating genuine offset commits with calls to SourceTaskOffsetCommitter::commit without constructing a full-on integration test; LMK what you think.
If the test cases themselves look acceptable, I suspect we may want to either develop a Connect-specific log capture appender (similar to how Connect has its own embedded testing framework that was largely copied from Streams) or move the Streams-specific LogCaptureAppender into a more general-use location, possibly in the clients module. The current prototype's dependency on Streams' testing artifacts isn't desirable as it makes partial builds (which I personally do quite frequently while testing Connect-specific changes) less effective.
I suspect we may want to either develop a Connect-specific log capture appender (similar to how Connect has its own embedded testing framework that was largely copied from Streams) or move the Streams-specific
LogCaptureAppenderinto a more general-use location, possibly in the clients module. The current prototype's dependency on Streams' testing artifacts isn't desirable as it makes partial builds (which I personally do quite frequently while testing Connect-specific changes) less effective.
Agreed about he undesirability of the dependency, and if possible a common LogCaptureAppender would be better than have several copies floating about.
I've rebased onto the latest trunk. I think with the latest changes (especially these logging improvements) most of the changes in this PR were made redundant. The only remaining room for improvement IMO is skipping log messages for failed tasks; the other issues (squatting on the source task offset commit thread too long for failed messages to be acknowledged, and misleading users with messages about flushing 0 records) have already been addressed.
I've force-pushed a single commit that brings this PR up to date with the latest trunk; going to push an additional commit later this week that addresses the review comments that have been left on it.
Thanks for the review @tombentley, and apologies for the delay. I've addressed your comments and moved the LogCaptureAppender class to the clients module where it can be used by both Connect and Streams.
I hope this looks alright given the changes in https://github.com/apache/kafka/pull/11323 that drastically alter the logic touched on here.
We're probably past the point for 3.1 so this isn't particularly urgent anymore but I believe it'd still be useful to have at some point if you can spare the time. No rush, though.
CC @vvcephei -- this touches on the LogCaptureAppender currently used by Streams; you may want to take a look if you have the time.
I see 2 committers approved the changes but this was not merged. @C0urante can you rebase to fix the conflicts ? Thanks
This will create other conflicts with https://github.com/apache/kafka/pull/11780. Would it be possible to prioritize that PR and then, once it's merged, fix the conflicts on this one and merge it?
Thanks, that makes sense. I've started reviewing #11780
@mimaison I've finally gotten around to rebasing this one, mind taking another look?
@tombentley @vvcephei Can you take another look?
@mimaison @vvcephei @tombentley I'd like to merge this in order to unblock https://github.com/apache/kafka/pull/12434, which adds a second use case for the LogCaptureAppender in the Connect unit tests. Would it be possible to give this another pass sometime next week? Thanks!
It's also been proposed in https://github.com/apache/kafka/pull/12434 that we eliminate the Java LogCaptureAppender class entirely and replace it with the Scala-based version that's used in core.
Any thoughts?
Test failures are unrelated; going to merge this as-is. @vvcephei @tombentley Please let me know if you'd like to follow up on this at any point.