brooklin icon indicating copy to clipboard operation
brooklin copied to clipboard

Error streaming topic with cleanup policy: compact,delete

Open haimhm opened this issue 5 years ago • 0 comments

Error streaming topic with cleanup policy: compact,delete

I'm trying to stream a topic with the above cleanup policy and I get this weird exception

Your environment

  • Operating System - Ubuntu 18
  • Brooklin version - 1.0.2
  • Kafka version - 2.7.0
  • ZooKeeper version - 3.5.8

Steps to reproduce

  1. create the topic in both sites
  2. create a stream from one cluster to another

Actual behavior

Nothing is streamed and I get this exceptions in the log (full log is attached):

[2021-03-09 14:02:36,965] INFO ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [ita-kafka01:9092, ita-kafka02:9092, ita-kafka03:9092] buffer.memory = 33554432 client.id = datastream-producer compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,362] WARN The configuration 'producer.acks' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,363] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,363] WARN The configuration 'producer.max.block.ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,363] WARN The configuration 'producer.max.in.flight.requests.per.connection' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,363] WARN The configuration 'factoryClassName' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-09 14:02:37,363] INFO Kafka version : 2.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-09 14:02:37,363] INFO Kafka commitId : fa14705e51bd2ce5 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-09 14:02:37,366] INFO Cluster ID: YdnuOPmjQ4KavUKJgY4aZw (org.apache.kafka.clients.Metadata) [2021-03-09 14:02:37,375] WARN Unexpected error code: 87. (org.apache.kafka.common.protocol.Errors) [2021-03-09 14:02:37,375] WARN sent failure, restart producer, exception: (class com.linkedin.datastream.kafka.KafkaTransportProvider:kafkaMirroringConnector:0) org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request [2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Closing the Kafka producer with timeoutMillis = 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer) [2021-03-09 14:02:37,376] WARN [Producer clientId=datastream-producer] Overriding close timeout 2000 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back. (org.apache.kafka.clients.producer.KafkaProducer) [2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Proceeding to force close the producer since pending requests could not be completed within timeout 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer) [2021-03-09 14:02:37,376] ERROR Sending a message with source checkpoint testcompact/0/1 to topic testcompact partition 0 for datastream task second-mirroring-stream_f06ad4ca-1620-4035-9186-006f3abd18d1(kafkaMirroringConnector), partitionsV2=, partitions=[0], dependencies=[] threw an exception. (KafkaTransportProvider) com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251) at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request [2021-03-09 14:02:37,377] WARN Detect exception being thrown from callback for src partition: testcompact-0 while sending, metadata: Checkpoint: testcompact/0/1, Topic: testcompact, Partition: 0 , exception: (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask) com.linkedin.datastream.server.api.transport.SendFailedException: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:293) at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194) at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189) at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155) at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251) ... 16 more Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request [2021-03-09 14:02:37,378] ERROR updateErrorRate with 1. Look for error logs right before this message to see what happened (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask) [2021-03-09 14:02:37,378] INFO Trying to seek to previous checkpoint for partitions: [testcompact-0] (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask) [2021-03-09 14:02:37,378] ERROR Partition rewind failed due to (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1701) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1676) at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$seekToLastCheckpoint$4(AbstractKafkaBasedConnectorTask.java:605) at java.util.Collections$SingletonSet.forEach(Collections.java:4769) at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.seekToLastCheckpoint(AbstractKafkaBasedConnectorTask.java:604) at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.rewindAndPausePartitionOnException(AbstractKafkaBasedConnectorTask.java:251) at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$sendDatastreamProducerRecord$1(AbstractKafkaBasedConnectorTask.java:283) at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:303) at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194) at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189) at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155) at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) [2021-03-09 14:02:41,953] INFO Trying to flush the producer and commit offsets. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)

brooklin.log

haimhm avatar Mar 10 '21 09:03 haimhm