confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

assignment() returns empty list

Open dbus-prsm opened this issue 6 years ago • 8 comments

Description

After upgrading from 0.11.4 to 1.0.0 I noticed that assignment() has begun returning empty list. Am I missing something?

How to reproduce

Here is the simple script to reproduce:

from confluent_kafka import Consumer, version, libversion

conf = {'bootstrap.servers': 'XXX:9092',
        'group.id': 'test',
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        }

print('confluent-kafka-python: %s, librdkafka: %s' % version(), libversion())
topic = 'test'

c = Consumer(**conf)
c.subscribe([topic])
m = c.poll(1)
print(c.assignment())

c.close()

Output:

confluent-kafka-python: 0.11.4, librdkafka: 721920 ('0.11.4', 722175)
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}]

and

confluent-kafka-python: 1.0.0, librdkafka: 1048576 ('1.0.1', 16777727)
[]

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 0.11.4/0.11.4 and 1.0.0/1.0.1
  • [x] Apache Kafka broker version: 2.2.0
  • [x] Client configuration: conf = {'bootstrap.servers': 'XXX:9092', 'group.id': 'test', 'default.topic.config': {'auto.offset.reset': 'smallest'},}
  • [x] Operating system: Debian GNU/Linux 9
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

dbus-prsm avatar Jun 07 '19 02:06 dbus-prsm

Im my opinion,if you want to use api of the assignment,you should use api assign to distribute topic to consumer。 the reason is that assign() need to deliver the param:opject of TopicPartition.LIST。but subscribe() only need to deliver the str

ybbiubiubiu avatar Jun 20 '19 03:06 ybbiubiubiu

My guess is that the rebalance, and thus partition assignment, takes longer than the 1s poll you have. You can set 'debug': 'consumer' (or 'cgrp' for more details) config to see what is going on

edenhill avatar Jun 20 '19 08:06 edenhill

@edenhill you're right, partition assignment takes about 3s on the latest release

%7|1561091912.128|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561091915.135|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)

vs 16ms on 0.11.4:

%7|1561092726.141|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561092726.157|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)

The environment is exactly the same. The difference is only in confluent-kafka-python version.

dbus-prsm avatar Jun 21 '19 04:06 dbus-prsm

Can you reproduce this with debug=cgrp,protocol and provide the logs so we can figure out what is going on?

edenhill avatar Jul 30 '19 19:07 edenhill

Could I ask a question about subscribe() and assign()?

while using confluent_kafka to consume messages from kafka, if there would be any difference between subscribe() and assign()?

if I specify partitions to each consumer process with assign(), would kafka rebalance still work?

zealot-shin avatar Aug 16 '19 03:08 zealot-shin

subscribe is what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically call assign for you as the set of partitions allocated to it changes. use assign explicitly (without subscribe) if you want to read from a static set of partitions.

mhowlett avatar Aug 16 '19 15:08 mhowlett

subscribe is what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically call assign for you as the set of partitions allocated to it changes. use assign explicitly (without subscribe) if you want to read from a static set of partitions.

So in fact I can use subscribe and assign at the same time, and in this occasion, rebalance will still be enabled. And if I don't use subscribe, use assign only, rebalance will not be enabled, right?

zealot-shin avatar Aug 22 '19 09:08 zealot-shin

I am facing similar issue using Confluent kafka in dotnetcore I need to check if the Assignments are greater than zero while consuming. But I am ending up in a continous loop without any assignments

Os: Windows 64 Kafka: Kafka 2.8.1 dotnet: 6.0 library: ConfluentKafka I have a event-log-master Topic created with 5 partitions on my kafka broker

Here's sample code in dotnetcore that I using for my testing to reproduce.

ConsumerConfig conf = new ConsumerConfig();
conf.BootstrapServers = "localhost:9092";
conf.EnableAutoCommit = false;
conf.SessionTimeoutMs = 30000;
conf.TopicMetadataRefreshIntervalMs = 10000;
conf.AutoOffsetReset = AutoOffsetReset.Latest;
conf.MaxPollIntervalMs = 300 * 60 * 1000;
conf.MaxPartitionFetchBytes = 10000;
conf.GroupId = "test-group-poc-1";
conf.AllowAutoCreateTopics = true;
conf.PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin;

var pattern = "event-log-master";
long count = 0;

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
	e.Cancel = true; // prevent the process from terminating.
	cts.Cancel();
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
	consumer.Subscribe(pattern);
	try
	{
		while (!cts.Token.IsCancellationRequested)
		{
			if (consumer.Assignment.Count > 0)
			{
					var record = consumer.Consume(cts.Token);
					Console.WriteLine($"Consumed {record?.Message}");
                                       if (record != null)
					{
						consumer.Commit(record);
					}
			}
			else
			{
				ConsumeResult<string, string> msg = null;
				// var msg = consumer.Consume(0);
				consumer.Subscribe(pattern);
				Thread.Sleep(1000);
				Console.WriteLine($"ZeroAssignments: {consumer.Name} | msg: {msg?.Message}");
			}
		}
	}
	catch (OperationCanceledException)
	{
		// Ctrl-C was pressed.
	}
	finally
	{
		consumer.Close();
	}

I found a workaround which is somehow giving the assignments. I tried to call consumer.Consume(0) with 0 timeout when there are not assignments after the loop runs for 2 to 3 times we are getting the assignments. This behavior seems to be wierd.

What's the right fix. I was trying to re subscribe If I donet find any assignments, I know it doesnt make sense but I was just trying.

yogendramaarisetty avatar Oct 24 '24 06:10 yogendramaarisetty