KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
Depends on https://github.com/apache/kafka/pull/11983
The primary goal of this PR is to address several outstanding issues with incremental rebalancing that lead to stable-but-unbalanced clusters. However, other small bug fixes are also applied, and some liberty is taken with refactoring to improve readability and flexibility in the code base.
~This should also address KAFKA-12495, and includes an adapted test case from https://github.com/apache/kafka/pull/10367, which addresses that issue but with a different approach.~
High-level changes:
- Refine the logic for load-balancing revocations:
-
- Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
-
- Remove the "rough estimation" logic and replace it with a precise calculation of exactly how to allocate all currently-configured connectors and tasks as evenly as possible across a cluster
-
- Account for load-balancing revocations when assigning new and lost connectors and tasks across the cluster
- Improve code quality:
-
- Extract the
ConnectorsAndTasksclass into its own file, enrich it and its builder class with developer-friendly methods, make its contents completely immutable, and useSetinstead of genericCollectioninstances to store connectors and tasks
- Extract the
-
- Where possible, identify logic that is shared for connectors and tasks (
IncrementalCooperativeAssignor::assignConnectorsandIncrementalCooperativeAssignor::assignTasks, for example) and abstract it into a single reusable method
- Where possible, identify logic that is shared for connectors and tasks (
-
- Use the
finalkeyword for base and derived sets inIncrementalCooperativeAssignor::performTaskAssignment(tracking mutations across a 100+ line method is difficult)
- Use the
-
- Reword unnecessary and confusing comments ("... is a derived set from the set difference of ..." is not very informative)
-
- Reorganize the grouping of methods in
IncrementalCooperativeAssignorto place static utility methods together at the bottom of the class
- Reorganize the grouping of methods in
-
- Demote visibility of testing-only methods and fields from
protectedto package-private (protectedimplies that the field/method is intended for use by subclasses, which is not the case for any of these)
- Demote visibility of testing-only methods and fields from
- Testing:
-
- Add several new tests to cover a variety of new cases, many of which result in imbalanced allocation with the current rebalancing logic, but which are all correctly handled with the improvements in this PR
-
- Add a few testing utility methods to help "hand wave" test cases without having to specify fine-grained expectations like how many rounds of rebalance are required to reach stability after some changes have been applied to the cluster
-
- Add coverage to all tests that ensures that no connectors or tasks are both revoked and assigned from the sam worker, and that the leader's view of the complete assignment of connectors and tasks across the cluster appears to be correct after each rebalance
- Miscellaneous:
-
- Demote a ton of noisy
DEBUG-level log messages toTRACE
- Demote a ton of noisy
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Apologies @showuon, this does not actually (fully) address KAFKA-12495. Several of the test cases here relied on rebalances being triggered in circumstances that they would not normally be triggered under, which caused the issue with not performing consecutive revocations to be masked.
I've added a new failing test case that's very similar to the one in #10367 but which fails with all functional and testing framework changes I've made so far in this pull request.
At this point I don't see too many alternatives to permitting consecutive revocations, but here are a few that come to mind when the cluster is imbalanced but a revocation took place during the last round:
- Have the leader send out an assignment with no revocations but also a delay of 1ms to trigger an immediate follow-up rebalance for revocations
- Have every worker automatically rejoin the cluster whenever they receive an assignment that contains revoked connectors/tasks (like now) or newly-assigned connectors/tasks, until a rebalance takes place that doesn't change any worker's assigned connectors/tasks
- Have the leader immediately trigger a new rebalance after completing this one by rejoining the group
These (and the strategy of permitting consecutive revocations) all fall under the umbrella of https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-ChangestoConnect'sRebalancingProcess, which outlines this strategy:
- When a Worker is elected as Leader, it computes a new assignment, describing both assigned and revoked connectors and tasks (previously the Leader computed an assignment from scratch without defining revoked resources).
- When a Worker receives its assignment, if this assignment includes any revoked connectors or tasks, it stops (releases) these resources and then immediately rejoins the group with an assignment that excludes revoked resources (previously, upon receipt of assignment, the Worker started the connectors and tasks and operated in the new generation of the group protocol until the next rebalance some time in the future).
- Normally in the next assignment round, the Leader will assign resources according to its policy and there will be no revoked resources in any of the Workers. If that's not the case, the previous steps will be repeated until the group converges into an assignment without revocations.
(Emphasis mine)
Overall I think permitting consecutive revocations is the safest and most intuitive option here, and with the new testing logic in this PR (especially the new assertNoFurtherAssignments method), we get pretty extensive coverage to help ensure we don't get trapped in an infinite revocation loop.
@C0urante Hello! I am just wondering that how this PR is going? Is threre anything that i can help with?
Hi @YeonCheolGit! There's always more PRs to review than there are reviewers (especially with Connect), so feel free to give this (or probably https://github.com/apache/kafka/pull/11983, which this PR depends on) a review if you'd like to help.
Converting this to a draft since I haven't had time to prioritize it (sorry @YeonCheolGit!) and the changes here are not safe to merge as-are.
Converting this to a draft since I haven't had time to prioritize it (sorry @YeonCheolGit!) and the changes here are not safe to merge as-are.
No worries @C0urante! All good and thanks for letting me know this:)