assignment() returns empty list
Description
After upgrading from 0.11.4 to 1.0.0 I noticed that assignment() has begun returning empty list. Am I missing something?
How to reproduce
Here is the simple script to reproduce:
from confluent_kafka import Consumer, version, libversion
conf = {'bootstrap.servers': 'XXX:9092',
'group.id': 'test',
'default.topic.config': {'auto.offset.reset': 'smallest'},
}
print('confluent-kafka-python: %s, librdkafka: %s' % version(), libversion())
topic = 'test'
c = Consumer(**conf)
c.subscribe([topic])
m = c.poll(1)
print(c.assignment())
c.close()
Output:
confluent-kafka-python: 0.11.4, librdkafka: 721920 ('0.11.4', 722175)
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
and
confluent-kafka-python: 1.0.0, librdkafka: 1048576 ('1.0.1', 16777727)
[]
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): 0.11.4/0.11.4 and 1.0.0/1.0.1 - [x] Apache Kafka broker version: 2.2.0
- [x] Client configuration:
conf = {'bootstrap.servers': 'XXX:9092', 'group.id': 'test', 'default.topic.config': {'auto.offset.reset': 'smallest'},} - [x] Operating system: Debian GNU/Linux 9
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
Im my opinion,if you want to use api of the assignment,you should use api assign to distribute topic to consumer。 the reason is that assign() need to deliver the param:opject of TopicPartition.LIST。but subscribe() only need to deliver the str
My guess is that the rebalance, and thus partition assignment, takes longer than the 1s poll you have.
You can set 'debug': 'consumer' (or 'cgrp' for more details) config to see what is going on
@edenhill you're right, partition assignment takes about 3s on the latest release
%7|1561091912.128|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561091915.135|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
vs 16ms on 0.11.4:
%7|1561092726.141|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561092726.157|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
The environment is exactly the same. The difference is only in confluent-kafka-python version.
Can you reproduce this with debug=cgrp,protocol and provide the logs so we can figure out what is going on?
Could I ask a question about subscribe() and assign()?
while using confluent_kafka to consume messages from kafka, if there would be any difference between subscribe() and assign()?
if I specify partitions to each consumer process with assign(), would kafka rebalance still work?
subscribe is what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically call assign for you as the set of partitions allocated to it changes. use assign explicitly (without subscribe) if you want to read from a static set of partitions.
subscribeis what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically callassignfor you as the set of partitions allocated to it changes. useassignexplicitly (withoutsubscribe) if you want to read from a static set of partitions.
So in fact I can use subscribe and assign at the same time, and in this occasion, rebalance will still be enabled.
And if I don't use subscribe, use assign only, rebalance will not be enabled, right?
I am facing similar issue using Confluent kafka in dotnetcore I need to check if the Assignments are greater than zero while consuming. But I am ending up in a continous loop without any assignments
Os: Windows 64 Kafka: Kafka 2.8.1 dotnet: 6.0 library: ConfluentKafka I have a event-log-master Topic created with 5 partitions on my kafka broker
Here's sample code in dotnetcore that I using for my testing to reproduce.
ConsumerConfig conf = new ConsumerConfig();
conf.BootstrapServers = "localhost:9092";
conf.EnableAutoCommit = false;
conf.SessionTimeoutMs = 30000;
conf.TopicMetadataRefreshIntervalMs = 10000;
conf.AutoOffsetReset = AutoOffsetReset.Latest;
conf.MaxPollIntervalMs = 300 * 60 * 1000;
conf.MaxPartitionFetchBytes = 10000;
conf.GroupId = "test-group-poc-1";
conf.AllowAutoCreateTopics = true;
conf.PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin;
var pattern = "event-log-master";
long count = 0;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
consumer.Subscribe(pattern);
try
{
while (!cts.Token.IsCancellationRequested)
{
if (consumer.Assignment.Count > 0)
{
var record = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed {record?.Message}");
if (record != null)
{
consumer.Commit(record);
}
}
else
{
ConsumeResult<string, string> msg = null;
// var msg = consumer.Consume(0);
consumer.Subscribe(pattern);
Thread.Sleep(1000);
Console.WriteLine($"ZeroAssignments: {consumer.Name} | msg: {msg?.Message}");
}
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
I found a workaround which is somehow giving the assignments.
I tried to call consumer.Consume(0) with 0 timeout when there are not assignments after the loop runs for 2 to 3 times we are getting the assignments. This behavior seems to be wierd.
What's the right fix. I was trying to re subscribe If I donet find any assignments, I know it doesnt make sense but I was just trying.