KAFKA-17066: new consumer updateFetchPositions all in background thread
Fix for the known issue that the logic for updating fetch positions in the new consumer was being performed partly in the app thread, party in the background thread, potentially leading to race conditions on the subscription state.
This PR moves the logic for updateFetchPositions to the background thread as a single event (instead of triggering separate events to validate, fetchOffsets, listOffsets). A new UpdateFetchPositionsEvent is triggered from the app thread and processed in the background, where it performs those same operations and updates the subscription state accordingly, without blocking the background thread.
This PR maintains the existing logic for keeping a pendingOffsetFetchRequest that does not complete within the lifetime of the updateFetchPositions attempt, and may be used on the next call to updateFetchPositions.
Hey! I just pushed all the requested changes. The most important change is addressing @AndrewJSchofield 's comment regarding timeout handling, totally agree.
Couldn't the OffsetsRequestManager simply make the sequence of RPCs to complete the little dance, regardless of a timeout?
Yes, I think we could and I went ahead and did it. Basically the updatePositions does its sequence of async operations (not blocking the background thread), and ends up actually updating the positions when it gets all the info it needs only if the set of initializing partitions (partitions requiring positions is the same). The only consideration is that errors could occur also, so we need to keep them to be thrown on the next call to update positions if the triggering updatePositionsEvent already expired. Note that this is exactly the same logic already in place for reset/validate positions, for both consumers. Now we just need it also at the updatePositions level given that it moves on and attempts to do what it needs in the background.
I'm currently adding more tests, but all existing test (unit and integration) pass, so please take a look and let me know. Thank you both!
Thanks @AndrewJSchofield! Very helpful review.
@chia7712 any chance you would have time for this one? Thanks!
Hey @chia7712 , thanks for the review! All comments addressed
Hello @chia7712, thanks for the comments! All addressed.
Hey @chia7712 , I just added a small fix https://github.com/apache/kafka/pull/16885/commits/2957cc49f1b7fdec96f278325066d04c1152815f after noticing some suspicious failures in the tests after the previous changes. I was indeed missing the fact that we need to allow to reset positions after refreshing committed offsets (even if no offset-fetch request is generated). That was affecting several tests (probably the best one to see the flow would be testOffsetOfPausedPartitions, passes consistently now, locally). I'll keep an eye on the build to check the test results again. Thanks!
Last build completed with 3 unrelated failures. I just rebased and will check the next one too, but I would say this is ready for another look when you have a chance @chia7712. Thanks a lot!
Hello @chia7712 , thanks for the review! All comments addressed.
Thanks @chia7712! Build completed with 3 unrelated failures.
(filed https://issues.apache.org/jira/browse/KAFKA-17554 for the one that is on the ConsumerNetworkClientTest, classic consumer, that I see has been flaky for a while but wasn't reported yet)