Manually assign partitions in the Kafka consumer
Is there a way to make Smallrye call assign() with a custom list of partitions instead of subscribe()? I tried doing it in a rebalance listener but Kafka complains, I figure this method must be called once the subscription process is already done:
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
List<TopicPartition> newPartitions = pickSomePartitions(partitions);
logPartitions(newPartitions);
}
I get the error
Caused by: java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
Is there any way to do this? It is very useful to debug topics in production when our development machine has read access to the topic, but can't handle the full load.
This is currently not possible, AFAICS. If it was enough for you to specify this in configuration, that should be relatively simple to implement. For example, instead of
mp.messaging.incoming.my-channel.topic=xxx
mp.messaging.incoming.my-channel.pattern=true|false
you'd write something like
mp.messaging.incoming.my-channel.partitions=xxx:1,yyy:2
Kafka topic name is [a-zA-Z0-9._-]+, so : should be OK to use.
WDYT?
Indeed it would be good enough. The subscription seems to be implemented here: https://github.com/smallrye/smallrye-reactive-messaging/blob/a0554ca40c1677d6a8ec524724172c697a168b3a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java#L115-L119
I think we'd just had to add another branch in the if, is that correct? And the new config, of course.
That is correct, yes.
Cool! Perhaps I'll be opening a PR about it. By the way, where is KafkaConnectorIncomingConfiguration defined? I grepped the source code but couldn't find the definition.
@diogenes1oliveira it's created from the @ConnectorAttribute annotations from the KafkaConnector class.
We have an annotation processor reading the attribute and generating the config class (as well as the asciidoc tables for the doc)