Produce request failed on 0.8.1.1 cluster
I have a 3 node kafka cluster running 0.8.1.1, recently updated from 0.8.1 and noticing now that producing from Ruby/Poseidon is having trouble. If I'm reading correctly, it appears that Poseidon is attempting to produce on partition 1 on kafka3, but partition 1 is on kafka1.
Does this look like a client problem (Poseidon)?
client code
producer = Poseidon::Producer.new(["kafka1.internal:9092"], "my_test_producer")
messages = []
topic = "topic1"
messages << Poseidon::MessageToSend.new(topic, "Hello from Ruby producer")
producer.send_messages(messages)
=> true
Error message on kafka3
2014-09-03 06:10:04,553] WARN [KafkaApi-3] Produce request with correlation id 1 from client my_test_producer on partition [topic1,1] failed due to Partition [topic1,1] doesn't exist on 3 (kafka.server.KafkaApis)
Describe
Topic:topic1 PartitionCount:2 ReplicationFactor:1 Configs: Topic: topic1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
kafka1 log
[2014-09-03 06:05:01,359] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,404] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,405] INFO Property host.name is overridden to stage-kafka1.internal (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,405] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,405] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,406] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,406] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,406] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,406] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,407] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,407] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,407] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,407] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,407] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,408] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,408] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,408] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,408] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:01,424] INFO [Kafka Server 1], starting (kafka.server.KafkaServer) [2014-09-03 06:05:01,426] INFO [Kafka Server 1], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer) [2014-09-03 06:05:01,642] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager) [2014-09-03 06:05:01,659] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2014-09-03 06:05:01,664] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-09-03 06:05:01,708] INFO Awaiting socket connections on stage-kafka1.internal:9092. (kafka.network.Acceptor) [2014-09-03 06:05:01,709] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer) [2014-09-03 06:05:01,793] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-09-03 06:05:01,835] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-09-03 06:05:01,928] INFO Registered broker 1 at path /brokers/ids/1 with address stage-kafka1.internal:9092. (kafka.utils.ZkUtils$) [2014-09-03 06:05:01,947] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [2014-09-03 06:05:02,014] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2014-09-03 06:10:04,287] INFO Topic creation {"version":1,"partitions":{"1":[1],"0":[3]}} (kafka.admin.AdminUtils$) [2014-09-03 06:10:04,291] INFO [KafkaApi-1] Auto creation of topic topic1 with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis) [2014-09-03 06:10:04,402] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions topic1,1 [2014-09-03 06:10:04,431] INFO Completed load of log topic1-1 with log end offset 0 (kafka.log.Log) [2014-09-03 06:10:04,433] INFO Created log for partition [topic1,1] in /var/log/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 1000, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 10000, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-09-03 06:10:04,434] WARN Partition [topic1,1] on broker 1: No checkpointed highwatermark is found for partition topic1,1
kafka2 log
[2014-09-03 06:05:23,451] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,496] INFO Property broker.id is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,496] INFO Property host.name is overridden to stage-kafka2.internal (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,497] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,497] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,497] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,497] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,498] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,498] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,498] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,498] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,498] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,499] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,499] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,499] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,499] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,500] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,500] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:23,515] INFO [Kafka Server 2], starting (kafka.server.KafkaServer) [2014-09-03 06:05:23,517] INFO [Kafka Server 2], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer) [2014-09-03 06:05:23,656] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager) [2014-09-03 06:05:23,673] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2014-09-03 06:05:23,678] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-09-03 06:05:23,722] INFO Awaiting socket connections on stage-kafka2.internal:9092. (kafka.network.Acceptor) [2014-09-03 06:05:23,723] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer) [2014-09-03 06:05:23,813] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-09-03 06:05:23,869] INFO conflict in /controller data: {"version":1,"brokerid":2,"timestamp":"1409724323819"} stored data: {"version":1,"brokerid":1,"timestamp":"1409724301799"} (kafka.utils.ZkUtils$) [2014-09-03 06:05:24,004] INFO Registered broker 2 at path /brokers/ids/2 with address stage-kafka2.internal:9092. (kafka.utils.ZkUtils$) [2014-09-03 06:05:24,019] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
kafka3 log
[2014-09-03 06:05:44,300] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,342] INFO Property broker.id is overridden to 3 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,342] INFO Property host.name is overridden to stage-kafka3.internal (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,343] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,343] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,343] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,344] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,344] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,344] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,344] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,344] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,345] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,345] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,345] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,345] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,346] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,346] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,346] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-09-03 06:05:44,361] INFO [Kafka Server 3], starting (kafka.server.KafkaServer) [2014-09-03 06:05:44,362] INFO [Kafka Server 3], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer) [2014-09-03 06:05:44,476] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager) [2014-09-03 06:05:44,491] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2014-09-03 06:05:44,496] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-09-03 06:05:44,537] INFO Awaiting socket connections on stage-kafka3.internal:9092. (kafka.network.Acceptor) [2014-09-03 06:05:44,538] INFO [Socket Server on Broker 3], Started (kafka.network.SocketServer) [2014-09-03 06:05:44,618] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-09-03 06:05:44,662] INFO conflict in /controller data: {"version":1,"brokerid":3,"timestamp":"1409724344623"} stored data: {"version":1,"brokerid":1,"timestamp":"1409724301799"} (kafka.utils.ZkUtils$) [2014-09-03 06:05:44,786] INFO Registered broker 3 at path /brokers/ids/3 with address stage-kafka3.internal:9092. (kafka.utils.ZkUtils$) [2014-09-03 06:05:44,799] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [2014-09-03 06:10:04,437] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions topic1,0 [2014-09-03 06:10:04,471] INFO Completed load of log topic1-0 with log end offset 0 (kafka.log.Log) [2014-09-03 06:10:04,476] INFO Created log for partition [topic1,0] in /var/log/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 1000, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 10000, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-09-03 06:10:04,477] WARN Partition [topic1,0] on broker 3: No checkpointed highwatermark is found for partition topic1,0 [2014-09-03 06:10:04,553] WARN [KafkaApi-3] Produce request with correlation id 1 from client my_test_producer on partition [topic1,1] failed due to Partition [topic1,1] doesn't exist on 3 (kafka.server.KafkaApis) [2014-09-03 06:10:04,558] INFO [KafkaApi-3] Send the close connection response due to error handling produce request [clientId = my_test_producer, correlationId = 1, topicAndPartition = [topic1,1]] with Ack=0 (kafka.server.KafkaApis)
This is a much more detailed dump of the issues I ran into with my own cluster, reported here
https://github.com/bpot/poseidon/issues/63
If you look at the diff for the SyncProducer and how it determines partitions, there has been a change between 0.0.4 and master. There are other changes at work, however, as just copying that and it's dependent code to the 0.0.4 gem does not accomplish the fix.
You need to be using master, but you also need to fix the bug in master reported here https://github.com/bpot/poseidon/pull/61
I had missed #61 unfortunately, but found the same thing you did. I ended up forking and added my own #65
Just got this same error on a 0.8.1 cluster after a rebalance. I see #25, so this is likely a duplicate of that, but wanting to try using the required_acks option to see if we can make producing work more reliably with poseidon 0.0.4 since updating to build the gem from github breaks other things (like poseidon_cluster). @bpot any plans for a new gem release and/or merge the PRs from @dim and @tamird?
I've pushed an 0.0.5 release. Can you check if it fixes your issues?
Probably not anytime soon. We switched to rabbitmq/bunny due to the producing problems with poseidon (this and then #25 were major setbacks). I'm still excited about kafka/poseidon, but will have to come back to it a later time and see if it's more stable.
@bpot We're really interested in this, and should be able to take a look.
I tested locally with version 0.0.5 and it's working for me. Thanks @bpot