KAFKA-17339: LocalLogManager should support RaftClient#schedulePreparedAppend
Refer to KAFKA-17339 for more information.
Enable the LocalLogManager class to really support prepared batches, improving the test implementation to more closely mimic the behavior of a real log manager(KafkaRaftClient).
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hi @jsancio,
I hope you're doing well. I wanted to kindly ask if you could take a moment to review this pull request Whenever you have time, your feedback would be greatly appreciated! Thank you so much for your help.
After merging trunk, the test QuorumControllerMetricsIntegrationTest#testFailingOverIncrementsNewActiveControllerCount (forceFailoverUsingLogLayer=true) failed with following log. The first exception is expected, as logEnv.activeLogManager().get().throwOnNextAppend() has been called. But the subsequent SnapshotRegistry exception seems always happen 4 time, which leads the NEW_ACTIVE_CONTROLLERS_COUNT be 6 instead of 2.
org.apache.kafka.raft.errors.BufferAllocationException: Test asked to fail the next prepareAppend
at org.apache.kafka.metalog.LocalLogManager.prepareAppend(LocalLogManager.java:743) ~[test/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:827) ~[main/:?]
at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:912) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:821) ~[main/:?]
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]
16:27:39.012 [quorum-controller-1-event-handler] ERROR org.apache.kafka.server.fault.MockFaultHandler - Encountered nonFatalFaultHandler fault: createTopics: event failed with RuntimeException (treated as UnknownServerException) at epoch 3 in 47070 microseconds. Renouncing leadership and reverting to the last committed offset 5.
java.lang.RuntimeException: Can't create a new in-memory snapshot at epoch 3 because there is already a snapshot with epoch 5. Snapshot epochs are 5
at org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:225) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.timeline.SnapshotRegistry.idempotentCreateSnapshot(SnapshotRegistry.java:245) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.controller.OffsetControlManager.handleScheduleAppend(OffsetControlManager.java:305) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:844) ~[main/:?]
at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:912) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:821) ~[main/:?]
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]
16:27:48.077 [quorum-controller-2-event-handler] ERROR org.apache.kafka.server.fault.MockFaultHandler - Encountered nonFatalFaultHandler fault: maybeFenceStaleBroker: event failed with RuntimeException (treated as UnknownServerException) at epoch 5 in 26900 microseconds. Renouncing leadership and reverting to the last committed offset 9.
java.lang.RuntimeException: Can't create a new in-memory snapshot at epoch 3 because there is already a snapshot with epoch 9. Snapshot epochs are 9
at org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:225) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.timeline.SnapshotRegistry.idempotentCreateSnapshot(SnapshotRegistry.java:245) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.controller.OffsetControlManager.handleScheduleAppend(OffsetControlManager.java:305) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:844) ~[main/:?]
at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:928) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:821) ~[main/:?]
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]
16:27:57.133 [quorum-controller-0-event-handler] ERROR org.apache.kafka.server.fault.MockFaultHandler - Encountered nonFatalFaultHandler fault: maybeFenceStaleBroker: event failed with RuntimeException (treated as UnknownServerException) at epoch 7 in 29393 microseconds. Renouncing leadership and reverting to the last committed offset 13.
java.lang.RuntimeException: Can't create a new in-memory snapshot at epoch 6 because there is already a snapshot with epoch 13. Snapshot epochs are 13
at org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:225) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.timeline.SnapshotRegistry.idempotentCreateSnapshot(SnapshotRegistry.java:245) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.controller.OffsetControlManager.handleScheduleAppend(OffsetControlManager.java:305) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:844) ~[main/:?]
at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:928) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:821) ~[main/:?]
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]
16:28:06.204 [quorum-controller-2-event-handler] ERROR org.apache.kafka.server.fault.MockFaultHandler - Encountered nonFatalFaultHandler fault: maybeFenceStaleBroker: event failed with RuntimeException (treated as UnknownServerException) at epoch 9 in 32702 microseconds. Renouncing leadership and reverting to the last committed offset 16.
java.lang.RuntimeException: Can't create a new in-memory snapshot at epoch 4 because there is already a snapshot with epoch 16. Snapshot epochs are 16
at org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:225) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.timeline.SnapshotRegistry.idempotentCreateSnapshot(SnapshotRegistry.java:245) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.controller.OffsetControlManager.handleScheduleAppend(OffsetControlManager.java:305) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:844) ~[main/:?]
at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:928) ~[main/:?]
at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:821) ~[main/:?]
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) ~[kafka-server-common-4.1.0-SNAPSHOT.jar:?]
at java.base/java.lang.Thread.run(Thread.java:1575) [?:?]
Expected :2
Actual :6
@ahuang98, Do you have any thoughts on this issue? Any feedback would be greatly appreciated. Thank you!
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
@peterxcli Could you help to resolve this conflict and re-trigger CI?
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.