kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14131: Adding InterruptException when reading to end of Offseto…

Open vamossagar12 opened this issue 3 years ago • 7 comments

When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by

DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g . Distrubuted-connect-28-1 , which may consume a few minutes.

If another thread tries to shut down this herder , it will block for "task.shutdown.graceful.timeout.ms ' before the DistributedHerder.herderExecutor is interrupted.

And if thread in DistributedHerder.herderExecutor is interupted, KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "

Error polling" as the the read has been interrupted, then "consumer.position" will not advance, readToLogEnd() falls into infinite loop.

This PR looks to handle the errors in poll gracefully.

vamossagar12 avatar Aug 04 '22 18:08 vamossagar12

@C0urante can you also take a look. From the scope of the ticket, this catch/throw exception seemed ok to me since we are anyways looking to stop the herder. Do I need any tests for this? Thanks!

vamossagar12 avatar Aug 05 '22 08:08 vamossagar12

This should definitely come with a test :)

I'm also not sure this is the best approach, since the ExecutorService::shutdownNow Javadocs don't give us any guarantees about threads being interrupted:

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

And on top of that, by the time the thread is interrupted--if it's interrupted at all--we've already exhausted our graceful shutdown timeout.

Can we leverage the existing shutdown logic in KafkaBasedLog::stop somehow, possibly by accounting for a WakeupException being thrown while reading to the end of the log during KafkaBasedLog::start? I'm not certain that it's safe to stop a log while another thread is in the middle of starting the log; we may have to tweak some of the logic there. We may also have to wake up/interrupt/shut down the admin client (if we're using one to read offsets) since that too could potentially block (but perhaps not indefinitely).

C0urante avatar Aug 08 '22 19:08 C0urante

This should definitely come with a test :)

I'm also not sure this is the best approach, since the ExecutorService::shutdownNow Javadocs don't give us any guarantees about threads being interrupted:

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

And on top of that, by the time the thread is interrupted--if it's interrupted at all--we've already exhausted our graceful shutdown timeout.

Can we leverage the existing shutdown logic in KafkaBasedLog::stop somehow, possibly by accounting for a WakeupException being thrown while reading to the end of the log during KafkaBasedLog::start? I'm not certain that it's safe to stop a log while another thread is in the middle of starting the log; we may have to tweak some of the logic there. We may also have to wake up/interrupt/shut down the admin client (if we're using one to read offsets) since that too could potentially block (but perhaps not indefinitely).

Thanks Chris. I agree do need a test. Would figure out how to add one..

I think you brought up a great point. Here's what I understood as per the issue. If a DistributedHerder gets a signal to be shutdown in stop method, it invokes shutdown and waits for graceful shutdown timeout. awaitTermination can throw an InterruptedException and once that's thrown, the KafkaBasedLog remains in an infinite loop since I believe the initial readToEnd is off the same thread(switched to a WorkerThread later on. That's the interrupted exception that I was looking to handle in this PR. Do you think that makes sense?

IIUC, the scenario you have described is if even after the graceful shutdown, if the Log isn't stopped and the herder never got to get interrupted, the potential infinite loop issue still remains. Is that correct?

I think using KafkaBasedLog::stop might be a good idea in this case. BTW, I see there's something called stopServices which is effectively stopping these the backing consumers. That inherently calls the KafkaBasedLog::stop method and it's all wired using halt method which checks upon a stopping flag which is set in stop method (BTW, I know you know all this, just writing down for my own confirmation :D ) . Do you think the situation you described would still arise or the only unhandled case was the interruption of the herder executor. WDYT?

vamossagar12 avatar Aug 09 '22 12:08 vamossagar12

awaitTermination can throw an InterruptedException and once that's thrown, the KafkaBasedLog remains in an infinite loop since I believe the initial readToEnd is off the same thread(switched to a WorkerThread later on. That's the interrupted exception that I was looking to handle in this PR. Do you think that makes sense?

Yes, but my point is that we don't actually have any guarantees that awaitTermination or shutdownNow will throw an InterruptedException, or cause one to be thrown on the threads for any currently-running tasks for the executor. So, even though a JVM that's running Connect may cause an InterruptedException to be encountered in KafkaBasedLog::poll, we should not rely on that behavior since a different fully-compliant JVM might not.

It's also worth pointing out that awaitTermination will almost certainly not cause an InterruptedException to be triggered on the executor's threads, since the Javadocs for that method state that it will block "until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.", and the Javadocs for shutdown state that it "Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.". And this is why I was discussing the behavior during shutdown after the graceful timeout had already expired; the changes here are unlikely to have any effect until that condition is reached.

IIUC, the scenario you have described is if even after the graceful shutdown, if the Log isn't stopped and the herder never got to get interrupted, the potential infinite loop issue still remains. Is that correct?

Sort of? I think there's two points worth focusing on here. First, we cannot and should not rely on thread interruption taking place as a result of invoking shutdown, awaitTermination, or shutdownNow on an executor. Second, with an ideal approach, we wouldn't have to wait for the graceful shutdown timeout to elapse before causing any active reads-to-end of the worker's internal topics to halt and return early (either normally or by throwing an exception).

I think using KafkaBasedLog::stop might be a good idea in this case. BTW, I see there's something called stopServices which is effectively stopping these the backing consumers. That inherently calls the KafkaBasedLog::stop method and it's all wired using halt method which checks upon a stopping flag which is set in stop method (BTW, I know you know all this, just writing down for my own confirmation :D ) . Do you think the situation you described would still arise or the only unhandled case was the interruption of the herder executor. WDYT?

I think this approach has some promise (possibly but not necessarily leveraging stopServices to halt an in-progress startup), but haven't been able to think through all the details yet. Some things to consider:

  • Does standalone mode need to be taken into account as well?
  • Will any logic that we add in DistributedHerder::stop conflict with the logic contained in halt, which will presumably be called once the herder finishes/aborts startup?
  • Will anything we add to DistributedHerder::stop run the risk of blocking indefinitely? If so, what are the potential ramifications?
  • None of this should result in any error messages being logged unless an operation has actually failed (instead of just being terminated prematurely)
  • Can we ensure that the worker is able to gracefully leave the cluster if it's been able to join?
  • Can we ensure that the worker doesn't try to leave the cluster (or at the very least, cause any issues by trying to leave the cluster) if it hasn't been able to join?
  • If we're able to successfully abort one startup operation (such as reading to the end of the offsets topic), can we make sure that we don't even attempt any following startup operations (such as reading to the end of the status topic, which currently takes place after reading to the end of the offsets topic)

C0urante avatar Aug 17 '22 16:08 C0urante

Thanks @C0urante . I think those are great points and I now realise the mistake I made with the PR. Here's what I am thinking and the answers to the rest of your points =>

  1. I agree for the most part that InterruptedException may or may not be thrown and it's pretty much unreliable. Having said that, if it does occur i.e the thread in herderExecutor does get interrupted, the backing offset kafka consumers would definitely get into an infinite loop. IMO, that's something that needs to be handled. That's where the original approach in the PR is wrong as it's catching the InterruptedException in consumer.poll which may never even see it. Instead, I am thinking to add it in the DistributedHerder.stop method. I see it catches InterruptedException but doesn't do anything about it. What I think that can be done is that once the herderExecutor thread is interrupted, we can look to force close the Kafka consumers. This can be done by invoking stopServices. At this point, some of the consumers might be closed and some of them might not be, and here we can leverage the stopRequested flag i.e if this flag is set, then for all practical purposes the log should get closed. So, we can add that check in KafkaConsumer.stop. Let me know if this is making sense.
  • Does standalone mode need to be taken into account as well?

Yeah I think it can have a similar issue so probably that can also be accounted for.

  • Will any logic that we add in DistributedHerder::stop conflict with the logic contained in halt, which will presumably be called once the herder finishes/aborts startup?

That's a great point.. It could happen that stopServices might be invoked from both halt and from InterruptedException. So, what we would need to ensure is, that calling the stop method multiple times should be safe. The check that I described using stopRequested might be helpful. WDYT?

  • Will anything we add to DistributedHerder::stop run the risk of blocking indefinitely? If so, what are the potential ramifications?

The approach I described, maybe not. Would love to hear your thoughts on this though.

  • None of this should result in any error messages being logged unless an operation has actually failed (instead of just being terminated prematurely)

+1

Regarding the last 3 points, I am not sure at this moment. Maybe if this approach sounds ok, we can get these answered.

  • Can we ensure that the worker is able to gracefully leave the cluster if it's been able to join?
  • Can we ensure that the worker doesn't try to leave the cluster (or at the very least, cause any issues by trying to leave the cluster) if it hasn't been able to join?
  • If we're able to successfully abort one startup operation (such as reading to the end of the offsets topic), can we make sure that we don't even attempt any following startup operations (such as reading to the end of the status topic, which currently takes place after reading to the end of the offsets topic)

Also, I am not trying to suggest that we shouldn't do it in the ideal way as you described i.e waiting for graceful shutdown to get over. I think if there's an improvement that needs to be done, we should definitely try that.

vamossagar12 avatar Aug 20 '22 09:08 vamossagar12

@vamossagar12 It's easier to comment on proposed changes when they're implemented as code; if you can put together a draft of your proposal (feeling free to note any open questions or parts that are left out for the sake of a rough draft) I can review that.

C0urante avatar Aug 26 '22 14:08 C0urante

@vamossagar12 It's easier to comment on proposed changes when they're implemented as code; if you can put together a draft of your proposal (feeling free to note any open questions or parts that are left out for the sake of a rough draft) I can review that.

Sure. Let me do that. Thanks for your suggestion!

vamossagar12 avatar Aug 26 '22 15:08 vamossagar12

Closing this and assigning to Sambhav. I won't have time to work on this ATM.

vamossagar12 avatar Jun 07 '23 14:06 vamossagar12