KAFKA-16985: Ensure consumer sends leave request on close even if interrupted
The consumer should attempt to leave the group cleanly upon close(), regardless of a) the timeout, b) interrupts.
If the user interrupted the current thread, upon close(), note the interruption, clear the flag, proceed with the close logic, and then throw an InterruptException at the end of close().
After the UnsubscribeEvent completes, there are still more steps to be performed to leave the group:
- The application thread processes the
ConsumerRebalanceListenerCallbackNeededEventthat was enqueued by the background thread shortly before completing theUnsubscribeEvent - The
ConsumerRebalanceListener.onPartitionsRevoked()callback is invoked - The application thread enqueues a
ConsumerRebalanceListenerCallbackCompletedEventfor the background thread - The background thread will be notified about the callback's completion, so that it can send out the "leave group" heartbeat
If the user invokes close() with a low timeout, we need to ensure the above steps are performed even if the UnsubscribeEvent itself timed out.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@lianetm @philipnee—can I get a review, please 😄
Latest test failures look unrelated.
One high level thought. With this PR we're ignoring the interrupted status temporarily, to allow the close to do what it needs (and ensure it cleanly leaves the group), makes sense. But in that case, we're allowing the close to run with whatever timeout was provided in the api call to close (or the default api timeout). Wouldn't make more sense for this interrupt+close(timeout) to behave exactly like close(0)? (instead of behaving like close(timeout) like it does now). This would be the only change:
final Timer closeTimer = wasInterrupted ? time.timer(Duration.ZERO) : time.timer(timeout);
Seems like a good compromise between respecting the interrupted status, while also attempting to close cleanly (assuming that close(0) does send the leave, this is why I was suggesting we also add an IT for it). Thoughts?
Hey @kirktrue, thanks for filing the follow-up Jira. Took another pass and left some comments, thanks!
Hi @lianetm 👋
One high level thought. With this PR we're ignoring the interrupted status temporarily, to allow the close to do what it needs (and ensure it cleanly leaves the group), makes sense. But in that case, we're allowing the close to run with whatever timeout was provided in the api call to close (or the default api timeout). Wouldn't make more sense for this interrupt+close(timeout) to behave exactly like close(0)? (instead of behaving like close(timeout) like it does now). This would be the only change:
final Timer closeTimer = wasInterrupted ? time.timer(Duration.ZERO) : time.timer(timeout);Seems like a good compromise between respecting the interrupted status, while also attempting to close cleanly (assuming that close(0) does send the leave, this is why I was suggesting we also add an IT for it). Thoughts?
That's a really interesting idea. I'm scratching my head on this one, though, because I'm really not sure the right thing to do.
While the documentation for Consumer.close() mentions situations in which an InterruptException is thrown, it doesn't mention if the thread's interrupt status affects the timeout or not. When calling Consumer.close(Duration), in my opinion, users would likely believe that the timeout they provided would be honored, regardless of the thread's interrupt status.
I looked briefly at the ClassicKafkaConsumer’s close() implementation, and I didn't see anything about altering the timeout. But there's a lot of code behind that API call, so I could have easily missed it.
I would say we're in a bit of uncharted territory with this decision of skipping the interrupt (with timeout or 0), just because the classic consumer does things differently and simply doesn't have this same situation.
Looking at the current behaviour with the goal of getting close to it, the classic simply makes progress on interrupt+close until it polls for a first time (to send the leave request). It's at that point that it short-circuits and propagates the interrupted (ConsumerNetworkClient.maybeThrowInterruptException). That means that interrupt+close is just given a chance to send the request, but not to sit and wait for responses. To get to that behaviour with the new consumer, close(0) seems more accurate, and seems to tackle my main concern: how do we avoid a situation where an interrupted thread will block on the consumer.close just because we internally ignore the interruption? (note that if we give time to consumer.close, it will wait for responses for previous requests also, not only the leave request).
That being said, I'm still also debating on this one, but just raising the concern.
I agree with @lianetm here. If my application is closing with some timeout and the user tries to interrupt the program, what do we want it to do? We want an orderly close with as little hanging around as possible so the application can end promptly. Leave the group politely, but nothing more. So, I agree that the behaviour of close(0) is roughly what is needed when close is interrupted.
I agree with @lianetm here. If my application is closing with some timeout and the user tries to interrupt the program, what do we want it to do? We want an orderly close with as little hanging around as possible so the application can end promptly. Leave the group politely, but nothing more. So, I agree that the behaviour of
close(0)is roughly what is needed when close is interrupted.
@lianetm @AndrewJSchofield—as I understand it, the proposed behavior would be divergent from the existing consumer’s behavior. My preference is to avoid blocking this change from getting in ASARP so that we can exercise the behavior in stress testing.
Would you both be OK if a new Jira is filed to implement the requested change? We can slot it in for 4.0.0 so it doesn't just go to /dev/null.
PLMK.
Thanks!
Note: the test failures in the latest CI build appear unrelated.
the proposed behavior would be divergent from the existing consumer’s behavior
uhm why? I could be missing something, but actually I see it as the proposed behaviour is to make both consumer similar on close+interrupt: send leave request and do not wait for broker responses. That's what the classic consumer achieves by throwing the interrupt right after polling the network client. With the current shape of the PR, we don't achieve the same with the new consumer, and actually, we introduce an unwanted behaviour imo: interrupted thread + close sits and wait for broker responses...response for the leave request, or any other in-flight request there may be.
Is there an issue/concern with the alternative of calling close(0) if interrupted? How does the IT behave if we try the change?
Hi @lianetm!
the proposed behavior would be divergent from the existing consumer’s behavior
uhm why? I could be missing something, but actually I see it as the proposed behaviour is to make both consumer similar on close+interrupt: send leave request and do not wait for broker responses.
Agreed 😄
That's what the classic consumer achieves by throwing the interrupt right after polling the network client.
I apologize if I've overlooked it, but I haven't seen where the classic consumer checks the current thread’s interrupted state (or catches InterruptException/InterruptedException) and then explicitly (or implicitly) ignores the provided timeout 🤔
For what it's worth, this is the code path I've followed to understand the behavior of the classic consumer leaving the group on close:
ClassicKafkaConsumer.close() →
ConsumerCoordinator.close() →
AbstractCoordinator.close() →
AbstractCoordinator.maybeLeaveGroup()
Please correct me if I'm looking in the wrong place.
With the current shape of the PR, we don't achieve the same with the new consumer, and actually, we introduce an unwanted behaviour imo: interrupted thread + close sits and wait for broker responses...response for the leave request, or any other in-flight request there may be.
I'm not aware of anywhere in AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup() where we wait for the response from the server 🤔
Are you referring to the fact that we now process the background events in releaseAssignmentAndLeaveGroup() after the UnsubscribeEvent? At present there's no direct way for the application thread to tell the background thread to leave the group. Leaving the consumer group is part of the overall process driven by UnsubscribeEvent, isn't it? Even still, the unsubscribe process needs to invoke the user's ConsumerRebalanceListener callbacks, right?
Is there an issue/concern with the alternative of calling close(0) if interrupted?
I hate to be stubborn here, but my issue is that I don't see that the classic consumer behaves this way and/or it's documented as the intention. I'm happy to be proven otherwise, of course.
How does the IT behave if we try the change?
I have yet not tested the scenario of ignoring the timeout on interrupt. If we decide that's the direction we want to go, the tests will be updated in due course.
Thanks!
@lianetm, I looked back at one of your previous comments and noticed your observation about the call to Thread.interrupted() in ConsumerNetworkClient.poll(). So yes, the classic consumer does check (and clear) the flag, and that error is thrown all the way up to ClassicKafkaConsumer.close(). However, ClassicKafkaConsumer suppresses the error (via its swallow() method). But I'm not seeing that the Timer object is updated, cleared, or reset, though.
In addition, right after the coordinator is closed, the classic consumer attempts to close the fetcher. The fetcher closing code specifically deals with the timeout, but neither the comments nor the code itself refers to changing the timeout due to a previous interrupt.
The classic consumer also appears to invoke the appropriate ConsumerRebalanceListener callback in ConsumerCoordinator.onLeavePrepare() before attempting to leave the group, specifically when the coordinator is closing. In an attempt to emulate that behavior, I added the additional call to processBackgroundEvents() in AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup() to try to invoke the ConsumerRebalanceListener callback after the UnsubscribeEvent completes.
Hey @kirktrue, sorry if I didn't communicate my point clearly. I see your concern:
But I'm not seeing that the Timer object is updated, cleared, or reset, though.
Agree, but the thing is I never meant that the Classic plays with the timer. I've been only suggesting that playing with the timer is the best way I see to achieve with the new consumer what the classic one achieves in a different way.
I'm not aware of anywhere in AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup() where we wait for the response from the server
- I expect we would be sitting and waiting for responses to the leave group here, because the
UnsubscribeEventdoes not complete until it receives the response to the HB to leave. - We would also sit and wait for responses to any previous in-flight request here, because the network thread won't complete closing until it gets responses for all unsent/in-flights (or timer expires).
Also, regarding:
the classic consumer does check (and clear) the flag
It clears the flag indeed, but then throws interrupted (so the flag is flipped back on), meaning that even if it swallows the exception temporarily to attempt the next close actions, the thread is still interrupted (so next close action that attempts poll won't wait for responses either). Makes sense?
In order to make progress here, I have 2 questions:
-
Do we agree that what we want to achieve on interrupt + close is to take all close actions but without waiting for broker responses?
-
If we agree on 1 about what we want, then question is how to do it? I suggested the option of ignoring the interrupted status +
close(0)because seems to me that it gets us to the result we want, but if we have an alternative I'm open to it.
Thanks! This is definitely a tricky one :)
Hey @kirktrue, thanks for the updates! only one comment left above, and: should we add some unit tests for this change? (I guess similar to the existing testPollThrowsInterruptExceptionIfInterrupted (the classic consumer does have this covered at the unit test level on the testCloseInterrupt, but seems better to just add a new test instead of trying to tweak that one for the new consumer I would say, it involves much more than the interrupt logic we're interesting in with this PR, but up to you).
Thanks!
Added the parameterized test testCloseWithInterrupt(). It validates two cases for when close() is called after an interrupt:
-
close(1000)does invoke theConsumerRebalanceListenerduring unsubscribe -
close(0)does not invoke theConsumerRebalanceListenerduring unsubscribe
Hey @kirktrue , thanks for the updates. Just one comment left for consideration.
Also, should we update the PR description? It currently includes: "If the user invokes close() with a low timeout, we need to ensure the above steps are performed even if the UnsubscribeEvent itself timed out", but we're not really ensuring that happens with this PR right?
Thanks!
Apologies, but I have another question. If the interrupt is swallowed, how can users forcefully close the consumer? The wakeup method doesn't work during close, and the timeout is ignored. What should users do if they truly need to interrupt the close process?
Hi @chia7712!
Apologies, but I have another question. If the interrupt is swallowed, how can users forcefully close the consumer? The wakeup method doesn't work during close, and the timeout is ignored. What should users do if they truly need to interrupt the close process?
When KAFKA-16985 was originally filed, we'd convinced ourselves that it was vital that a Consumer leave its group cleanly even if it was interrupted prior to—or during—close().
Fundamental in the design of the new AsyncKafkaConsumer is the use of the CompletableApplicationEvent mechanism in which the application thread creates a Future which is completed by the background thread when the process is complete.
When the user interrupts the thread, any subsequent call by the application thread to get the result of the Future will immediately fail. Given the way the code is currently structured, I don't believe we'll be able to ensure that we leave the group cleanly in that case.
I don't see how it's possible to satisfy both of these conditions:
- Allow the user to "forcefully close the consumer"
- Guarantee the consumer leaves the group
Thanks!
cc @frankvicky
cc @FrankYang0529 since you're working on KAFKA-17519 👋
I don't see how it's possible to satisfy both of these conditions:
- Allow the user to "forcefully close the consumer"
- Guarantee the consumer leaves the group
Yes, you're right that it's a tough issue. However, do we really need to "wait" for either the request or the background thread if users want to interrupt the close? Even though the application thread can't wait for the result of Future due to an interrupt, it still succeeds in sending a request to the background thread. Additionally, we can set closeTimeout to zero if it's in an interrupted state to let the background thread finish quickly. This essentially acts as an interrupt if the background thread respects the timeout.
In short, we never give up on either closing resources or sending necessary requests, even if the thread is interrupted or the timeout is too short. Instead, we stop "waiting" if there’s an interrupt or if the timeout expires.
Regarding @chia7712 's comment:
we can set closeTimeout to zero if it's in an interrupted state
I agree, that's exactly the approach I proposed initially in this PR, but pushing for it we discovered that the close with low timeouts suffers the same situation it seems (may not send the leave), and @kirktrue filed KAFKA-17518 for it.
So one option would be to fix that one first, and then come back here to attempt to simply do close(0) when closing interrupted. Thoughts?
So one option would be to fix that one first, and then come back here to attempt to simply do close(0) when closing interrupted. Thoughts?
It's great that we've reached a consensus on the "timeout" 😊
Another concern is whether we should respect the interrupt and avoid waiting for the result of Future. For example, in the cases of releaseAssignmentAndLeaveGroupandawaitPendingAsyncCommitsAndExecuteCommitCallbacks`.
After re-reading all the discussions, I’ll try to summarize the fixes for "close". Please correct me if I've misunderstood anything.
ConsumerRebalanceListener should execute during closing, regardless of timeout or interrupt (this PR KAFKA-16985)
- The application thread must process the
ConsumerRebalanceListenerCallbackNeededEventbefore waiting for theUnsubscribeEvent. -
ConsumerRebalanceListenerCallbackNeededEventshould be interruptible.
Therefore, we may need a specialized processBackgroundEvents method that keeps processing until the ConsumerRebalanceListenerCallbackNeededEvent is handled. Additionally, we should avoid swallowing the interrupt before this specialized processBackgroundEvents, so the interrupt can be properly propagated to the ConsumerRebalanceListener. After processing ConsumerRebalanceListenerCallbackNeededEvent, we should temporarily suppress the interrupt signal while waiting for the UnsubscribeEvent, and then restore the signal once the wait is complete.
The leave request should be attempted on a best-effort basis with honoring the interrupt and timeout (KAFKA-17518)
-
ConsumerNetworkThread#cleanupmust be interruptible - application thread should propagate the interrupt signal to background thread
Therefore, application thread should call ConsumerNetworkThread#wakeup to interrupt background thread if the join is interrupted.
@kirktrue @lianetm WDYT?
Is the premise of this PR accurate? I think all bets are off if close() is interrupted. We should just clean up resources and throw. What I would expect is close() sends a message to the background thread to initiate shutdown (which should trigger an immediate LeaveGroup if we're in the right state). Then it should block for the timeout and await completion. At the end of the timeout (or if interrupted), it should tell the background thread to shutdown now.
I put a sketch of an alternative for how we could structure shutdown here: https://github.com/hachikuji/kafka/commit/5e2eb7bd0e4c795d17dc2fe1d43b7579064cb2dd#diff-2085860eef5afdb5f100ea55bfa10536c1a61db6bdd79911b447cdabb7472200L252. I'm a bit out of the loop, so probably missing a lot of details, but the idea is try to get some of the low-level details out of AsyncKafkaConsumer and into the background thread.
hi @hachikuji long time no see :smile:
Is the premise of this PR accurate? I think all bets are off if close() is interrupted.
The issue isn't just with interrupted, but also with ConsumerRebalanceListener. In the classic consumer, ConsumerRebalanceListener has the following behaviors:
- The methods
onPartitionsRevokedandonPartitionsLostare always executed during shutdown, regardless of timeout or interruption. -
onPartitionsRevokedandonPartitionsLostare executed by the consumer thread. - The interrupt signal should propagate to
onPartitionsRevokedandonPartitionsLost. -
onPartitionsRevokedandonPartitionsLostare executed before sending theLEAVE_GROUPrequest.
What I would expect is close() sends a message to the background thread to initiate shutdown (which should trigger an immediate LeaveGroup if we're in the right state).
This is a good start to simplifying the close method. However, to ensure the guarantees mentioned above, the consumer thread and the background thread need to communicate multiple times. The following example demonstrates how we execute the listener.
1. consumer thread -> shutdown event
2. background thread -> check state and then send accurate listener event (`onPartitionsRevoked` or `onPartitionsLost`)
3. consumer thread -> process callback and then send listener complete event
...
I really like the idea of ShutdownEvent, but I'd like to ask a similar question to #17353: Do we need to align all behaviors between the classic consumer and the async consumer? In this case, it would be simpler if ConsumerRebalanceListener could be executed by a non-user thread, similar to how the producer's callback works.
@chia7712 I haven't followed all the conversation in this PR but I have an opinion about your last statement.
Do we need to align all behaviors between the classic consumer and the async consumer? In this case, it would be simpler if ConsumerRebalanceListener could be executed by a non-user thread, similar to how the producer's callback works.
I think that we should keep the new consumer backward compatible with the old one. It has been the goal since the beginning in order to facilitate the upgrade from the old to the new one. If we start to change the behaviour of some APIs, it will be a mess for our users because they will have to reason about whether they use the new or the old all the time.
Another point that I would like to mention about ConsumerRebalanceListener is that it is very common to actually call the consumer in the listener to manually commit offset for instance. As you know, the Consumer is not supposed to be called from different threads at the moment.
I like Jason's suggestion. I think that we could take this idea further and also have the background thread send a CloseCompleted event to the foreground thread when it is done with its cleanup tasks.
With this, the foreground thread could sent the close event to the background thread. The background thread handles it and triggers all the necessary steps, listener events, leave group, etc. In the meantime, the foreground thread could wait while processing event until it receives the CloseCompleted event.
Another point that I would like to mention about ConsumerRebalanceListener is that it is very common to actually call the consumer in the listener to manually commit offset for instance. As you know, the Consumer is not supposed to be called from different threads at the moment.
Okay, that's a solid reason to maintain the current behavior. My main point was to simplify the coordination between the foreground and background threads. The strict happen-before events don't seem well-suited to the architecture of the async consumer.
@kirktrue Could you please consider adding tests or reviewing the existing tests to ensure they cover the listener guarantees? for example:
@ClusterTest(brokers = 3)
@Timeout(60)
public void testConsumerListener(ClusterInstance clusterInstance) throws InterruptedException {
var threadName = Thread.currentThread().getName() + "-testConsumerListener";
var s = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName));
var onPartitionsAssignedLatch = new CountDownLatch(1);
var onPartitionsRevokedThread = new AtomicReference<String>();
var onPartitionsAssignedThread = new AtomicReference<String>();
var onPartitionsRevokedInterrupted = new AtomicBoolean(false);
CompletableFuture.runAsync(() -> {
var consumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "ikea",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"), new ByteArrayDeserializer(), new ByteArrayDeserializer());
try {
consumer.subscribe(List.of("chia"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
onPartitionsRevokedThread.set(Thread.currentThread().getName());
onPartitionsRevokedInterrupted.set(Thread.currentThread().isInterrupted());
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
onPartitionsAssignedThread.set(Thread.currentThread().getName());
onPartitionsAssignedLatch.countDown();
}
});
IntStream.range(0, 10).forEach(__ -> consumer.poll(Duration.ofSeconds(3)));
} finally {
consumer.close(Duration.ofSeconds(0));
}
}, s);
Assertions.assertTrue(onPartitionsAssignedLatch.await(10, TimeUnit.SECONDS));
s.shutdownNow();
Assertions.assertTrue(s.awaitTermination(10, TimeUnit.SECONDS));
// start to check all guarantees
// 1) onPartitionsRevoked should be executed regardless of timeout or interruption
Assertions.assertEquals(threadName, onPartitionsRevokedThread.get());
// 2) listener should be executed by foreground thread
Assertions.assertEquals(threadName, onPartitionsAssignedThread.get());
// 3) listener should be able to see interrupted signal
Assertions.assertTrue(onPartitionsRevokedInterrupted.get());
}