KAFKA-17439: Make polling for new records an explicit action/event in the new consumer
Updated the FetchRequestManager to only create and enqueue fetch requests when signaled to do so by a FetchEvent.
The application thread and the background thread each contains logic that is performed if there is buffered data from a previous fetch. There's a race condition because the presence of buffered data could change between the two threads’ respective checks. Right now the window for the race condition to occur is wide open; this change aims to make the window ajar.
In the ClassicKafkaConsumer, the application thread will explicitly issue fetch requests (via the Fetcher class) at specific points in the Consumer.poll() cycle. Prior to this change, the AsyncKafkaConsumer would issue fetch requests independently from the user calling Consumer.poll(); the fetches would happen nearly continuously as soon as any assigned partition was fetchable. With this change, the AsyncKafkaConsumer introduces a FetchEvent that signals to the background thread that a fetch request should be issued. The specific points where this is done in the Consumer.poll() cycle of the AsyncKafkaConsumer now match the ClassicKafkaConsumer. In short: this makes AsyncKafkaConsumer act nearly identical to the ClassicKafkaConsumer in this regard.
As mentioned above, this change does not completely solve the problem related to fetch session eviction. Exactly how the window where the race condition can be shut completely is outside the scope of this change.
See KAFKA-17182.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hi @AndrewJSchofield!
Thanks for the review 👍
First, I think it better not to overload the PollEvent because that's already used in the share consumer.
Agreed. I've introduced a FetchEvent so that the two separate mechanisms won't step on each others' toes.
Second, it seems to me that there is still the potential for over-fetching, and this will still cause churn of the fetch session cache.
Agreed. It aims to lessen the churn. Preventing the churn completely is a future task.
In the case where the consumer is only fetching a single partition, I think it works pretty well. The set of fetchable partitions will be empty if there's buffered data, and contain the only partition in the fetch session if there is not. So, you'll only send a Fetch request when there's a need for more data and the fetch session will not churn.
Correct.
In the case where the consumer is fetching more than one partition on a particular node, if a subset of the partitions is fetchable, then the fetch session will be modified by sending a Fetch request and that seems to have the potential for a lot of churn.
Correct again!
Any partition with buffered data at the point where the fetch request is being generated will be marked as "removed" from the broker's fetch session cache. That's the crux of the problem 😞
Something that I tend to lose sight of is the fact that it's not a foregone conclusion that a fetch session will be evicted when it has partitions removed. Of course, it will increase its eligibility for eviction if the broker hosting the fetch session is resource-constrained and invokes the eviction process.
Of course, all of this code is in common between the legacy consumer and the async consumer.
I'm not sure I follow. This code is all specific to the AsyncKafkaConsumer. While the ClassicKafkaConsumer has a similar race condition, it is 2-4 orders of magnitude less likely to happen.
The async consumer is still very keen on fetching so I don't properly grasp why this PR would make the fetch session behaviour better.
Yep—the design of the AsyncKafkaConsumer fetching continuously in the background makes it very keen to cause this problem. With this change, the application thread now signals when to fetch, which results in the background thread creating and issuing the fetch requests much less often.
Thanks!
@AndrewJSchofield, et al.—it can be helpful to compare the flow of ClassicKafkaConsumer.poll() and AsyncKafkaConsumer.poll(), specifically how it invokes fetch. Note that the sendFetches() method name, as well as when it is invoked, comes from the ClassicKafkaConsumer.poll(). So this is really making the new consumer act much more like the old one.
@lianetm—tests are passing and all comments have been addressed. Can you make another review pass? Thanks!