kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-13668: Retry upon missing initProducerId due to authorization error

Open philipnee opened this issue 3 years ago • 2 comments

More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.

Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

philipnee avatar May 11 '22 20:05 philipnee

@philipnee where are we with this?

ijuma avatar Jun 15 '22 17:06 ijuma

@ijuma - I think @hachikuji is reviewing it.

philipnee avatar Jun 15 '22 17:06 philipnee

Following this PR because of https://github.com/confluentinc/kafka-rest/issues/416 and https://issues.apache.org/jira/browse/KAFKA-13668. After this is fixed, does it mean idempotent producer is supported on Kafka-rest? One more follow-up question is, does idempotent producer on Kafka-rest handle the connection failure between user and Kafka-rest well? I guess it's not?

laosiaudi avatar Oct 03 '22 22:10 laosiaudi

Hey @hachikuji - Thanks for the previous review, but I rewrote the PR because of some of the timeout IT tests. In the previous version, the issue was that simply setting the transactionManager state to UNINITIALIZED wouldn't propagate the authorization exceptions to the sender, so the producer would keep re-requesting the producerID, eventually causing some tests to timeout. Here in this PR, I transition to the "ABORTABLE_ERROR" state, abort the pending requests, then send the state to UNINITIALIZED in the sender.

Do you agree with this approach?

Also, do you think we need to make this behavior consistent with the produce request? For example, the tests in SenderTests.java:

  • testClusterAuthorizationExceptionInProduceRequest
  • testClusterAuthorizationExceptionInInitProducerIdRequest

philipnee avatar Oct 05 '22 18:10 philipnee

Hey @jolshan - Thanks for the review. i reverted those documentation/comment changes in the senderTest.java (for the produceResponse authorization error).

philipnee avatar Apr 20 '23 20:04 philipnee

Failing tests don't seem to be related.

Build / JDK 8 and Scala 2.12 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
1m 46s
Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 26s
Build / JDK 8 and Scala 2.12 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
1m 46s
Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
3m 15s
Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 3s
Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 41s
Build / JDK 11 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 23s
Build / JDK 11 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
1m 55s
Build / JDK 11 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 7s
Build / JDK 11 and Scala 2.13 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest
1m 22s
Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – kafka.server.KRaftClusterTest
18s
Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 14s
Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 22s
Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 16s
Build / JDK 17 and Scala 2.13 / testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 16s
Build / JDK 17 and Scala 2.13 / testCreatePartitionsUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 53s
Build / JDK 17 and Scala 2.13 / testSyncTopicConfigUseProvidedForwardingAdmin() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
2m 13s
Build / JDK 17 and Scala 2.13 / testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
1m 14s
Existing failures - 3
Build / JDK 8 and Scala 2.12 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
<1s
Build / JDK 11 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest
<1s
Build / JDK 17 and Scala 2.13 / putTopicStateRetriableFailure – org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest

philipnee avatar Apr 20 '23 20:04 philipnee

Going to rerun the build one more time.

jolshan avatar Apr 26 '23 17:04 jolshan

Left a few more questions -- I think we are in the final stretch here.

jolshan avatar Apr 26 '23 17:04 jolshan

Still an issue -- rebuilding again 😅

jolshan avatar May 02 '23 23:05 jolshan

thank you~

philipnee avatar May 03 '23 00:05 philipnee

There seem to be a bit of server related failures, but they are also irrelevant to this change I think:

I believe JDK17 tests and most of the JDK11 tests passed.

Build / JDK 11 and Scala 2.13 / testOffsetSyncsTopicsOnTarget() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
3m 24s
Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
1m 51s
Build / JDK 8 and Scala 2.12 / testLogCleanerConfig(String).quorum=kraft – kafka.server.DynamicBrokerReconfigurationTest
28s
Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest
2m 7s
Existing failures - 23
Build / JDK 8 and Scala 2.12 / executionError – kafka.server.DynamicBrokerReconfigurationTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.EdgeCaseRequestTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestDownConversionConfigTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FetchRequestWithLegacyMessageFormatTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.FinalizedFeatureChangeListenerTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.GssapiAuthenticationTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricReporterClusterIdTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricReporterExceptionHandlingTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaMetricsReporterTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.KafkaServerTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ListOffsetsRequestTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.LogDirFailureTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.LogRecoveryTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.MetadataRequestTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.MultipleListenersWithAdditionalJaasContextTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.OffsetFetchRequestTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ServerGenerateClusterIdTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.ServerShutdownTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.UpdateFeaturesTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.zk.KafkaZkClientTest
<1s
Build / JDK 8 and Scala 2.12 / initializationError – kafka.zk.ZkMigrationClientTest
<1s

philipnee avatar May 03 '23 16:05 philipnee

Thanks Philip. The initialization ones come when there is a thread leak somewhere. I'm pretty convinced it wasn't your change, but just wanted to be extra safe. I will check the next build. Thanks for your patience.

jolshan avatar May 03 '23 21:05 jolshan