smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Manually assign partitions in the Kafka consumer

Open diogenes1oliveira opened this issue 4 years ago • 5 comments

Is there a way to make Smallrye call assign() with a custom list of partitions instead of subscribe()? I tried doing it in a rebalance listener but Kafka complains, I figure this method must be called once the subscription process is already done:

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        List<TopicPartition> newPartitions = pickSomePartitions(partitions);
        logPartitions(newPartitions);
    }

I get the error

Caused by: java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

Is there any way to do this? It is very useful to debug topics in production when our development machine has read access to the topic, but can't handle the full load.

diogenes1oliveira avatar Jun 01 '21 20:06 diogenes1oliveira

This is currently not possible, AFAICS. If it was enough for you to specify this in configuration, that should be relatively simple to implement. For example, instead of

mp.messaging.incoming.my-channel.topic=xxx
mp.messaging.incoming.my-channel.pattern=true|false

you'd write something like

mp.messaging.incoming.my-channel.partitions=xxx:1,yyy:2

Kafka topic name is [a-zA-Z0-9._-]+, so : should be OK to use.

WDYT?

Ladicek avatar Jun 02 '21 07:06 Ladicek

Indeed it would be good enough. The subscription seems to be implemented here: https://github.com/smallrye/smallrye-reactive-messaging/blob/a0554ca40c1677d6a8ec524724172c697a168b3a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java#L115-L119

I think we'd just had to add another branch in the if, is that correct? And the new config, of course.

diogenes1oliveira avatar Jun 02 '21 12:06 diogenes1oliveira

That is correct, yes.

Ladicek avatar Jun 02 '21 12:06 Ladicek

Cool! Perhaps I'll be opening a PR about it. By the way, where is KafkaConnectorIncomingConfiguration defined? I grepped the source code but couldn't find the definition.

diogenes1oliveira avatar Jun 02 '21 12:06 diogenes1oliveira

@diogenes1oliveira it's created from the @ConnectorAttribute annotations from the KafkaConnector class.

We have an annotation processor reading the attribute and generating the config class (as well as the asciidoc tables for the doc)

cescoffier avatar Jun 02 '21 12:06 cescoffier