spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Can't convert Binder to KafkaMessageChannelBinder!!!!!!!

Open Haibarapink opened this issue 6 months ago • 2 comments

Iunconvertible types; cannot cast 'org.springframework.cloud.stream.binder.Binder<capture<?>,org.springframework.cloud.stream.binder.ConsumerProperties,org.springframework.cloud.stream.binder.ProducerProperties>' to 'org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder'

@Bean
public BinderCustomizer kafkaBinderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder) {
            KafkaMessageChannelBinder kafkaBinder = (KafkaMessageChannelBinder) binder;
        }
    };
}

When I debug the following code, the type of binder is KafkaMessageChannelBinder in debugger :)

@Bean
public BinderCustomizer kafkaBinderCustomizer() {
    return (binder, binderName) -> {
        System.out.println("hello world");
    };
}

Haibarapink avatar Jul 22 '25 07:07 Haibarapink

Can you please post the entire stack trace. These little bits don't help.

olegz avatar Jul 22 '25 09:07 olegz

I have to convert binder to Object and then convert Object to KafkaMessageChannelBinder. The following code works correctly. But if I convert binder to KafkaMessageChannelBinder, this code can't pass compiling. And it reports "Inconvertible types; cannot cast 'org.springframework.cloud.stream.binder.Binder<capture<?>,org.springframework.cloud.stream.binder.ConsumerProperties,org.springframework.cloud.stream.binder.ProducerProperties>' to 'org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder'" Thanks a lot!!!

@Bean
    public BinderCustomizer consumerOffsetInitialize() {
        var startingOffsetModeProperties = this.startingOffsetModeProperties;
        return (binder, binderName) -> {
            // please check here!!!, This
            KafkaMessageChannelBinder kafkaBinder = (KafkaMessageChannelBinder) (Object) binder;
            kafkaBinder.setRebalanceListener(new KafkaBindingRebalanceListener() {
                @Override
                public void onPartitionsAssigned(String bindingName,
                                                 org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
                                                 Collection<TopicPartition> partitionsCollection,
                                                 boolean initial) {
                    
                    if (!initial) {
                        return;
                    }

                    Map<String, List<TopicPartition>> partitions = new HashMap<>();
                    for (var p : partitionsCollection) {
                        if (!partitions.containsKey(p.topic())) {
                            partitions.put(p.topic(), new ArrayList<>());
                        }
                        partitions.get(p.topic()).add(p);
                    }

                    for (var topicAndPartitions : partitions.entrySet()) {
                        String topic = topicAndPartitions.getKey();
                        var config = startingOffsetModeProperties.getTopicConfig(topic);
                        if (config == null) {
                            continue;
                        }
                        List<TopicPartition> currentPartitions = topicAndPartitions.getValue();
                        String strategy = config.getStrategy();
                        if (strategy.equals(StartingOffsetModeProperties.EARLIEST)) {
                            consumer.seekToBeginning(currentPartitions);
                        } else if (strategy.equals(StartingOffsetModeProperties.LATEST)) {
                            consumer.seekToEnd(currentPartitions);
                        } else if (strategy.equals(StartingOffsetModeProperties.TIME)) {
                             Map<TopicPartition, Long> topicPartitionsTimestamps = new HashMap<>();
                             for (var p : currentPartitions) {
                                 topicPartitionsTimestamps.put(p, StartingOffsetModeProperties.parseDateTime(config.getSeekTo()));
                             }
                             var topicPartitionsOffset = consumer.offsetsForTimes(topicPartitionsTimestamps);
                             for (var p : currentPartitions) {
                                 var tsAndOffset = topicPartitionsOffset.getOrDefault(p, null);
                                 if (tsAndOffset != null) {
                                    consumer.seek(p, topicPartitionsOffset.get(p).offset());
                                 } else {
                                     consumer.seekToEnd(Collections.singleton(p));
                                 }
                             }
                        } else {
                            throw new RuntimeException("Unreachable code");
                        }
                    }
                }

            });
        };
    }

Haibarapink avatar Jul 24 '25 02:07 Haibarapink