Can't convert Binder to KafkaMessageChannelBinder!!!!!!!
Iunconvertible types; cannot cast 'org.springframework.cloud.stream.binder.Binder<capture<?>,org.springframework.cloud.stream.binder.ConsumerProperties,org.springframework.cloud.stream.binder.ProducerProperties>' to 'org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder'
@Bean
public BinderCustomizer kafkaBinderCustomizer() {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder) {
KafkaMessageChannelBinder kafkaBinder = (KafkaMessageChannelBinder) binder;
}
};
}
When I debug the following code, the type of binder is KafkaMessageChannelBinder in debugger :)
@Bean
public BinderCustomizer kafkaBinderCustomizer() {
return (binder, binderName) -> {
System.out.println("hello world");
};
}
Can you please post the entire stack trace. These little bits don't help.
I have to convert binder to Object and then convert Object to KafkaMessageChannelBinder. The following code works correctly. But if I convert binder to KafkaMessageChannelBinder, this code can't pass compiling. And it reports "Inconvertible types; cannot cast 'org.springframework.cloud.stream.binder.Binder<capture<?>,org.springframework.cloud.stream.binder.ConsumerProperties,org.springframework.cloud.stream.binder.ProducerProperties>' to 'org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder'" Thanks a lot!!!
@Bean
public BinderCustomizer consumerOffsetInitialize() {
var startingOffsetModeProperties = this.startingOffsetModeProperties;
return (binder, binderName) -> {
// please check here!!!, This
KafkaMessageChannelBinder kafkaBinder = (KafkaMessageChannelBinder) (Object) binder;
kafkaBinder.setRebalanceListener(new KafkaBindingRebalanceListener() {
@Override
public void onPartitionsAssigned(String bindingName,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
Collection<TopicPartition> partitionsCollection,
boolean initial) {
if (!initial) {
return;
}
Map<String, List<TopicPartition>> partitions = new HashMap<>();
for (var p : partitionsCollection) {
if (!partitions.containsKey(p.topic())) {
partitions.put(p.topic(), new ArrayList<>());
}
partitions.get(p.topic()).add(p);
}
for (var topicAndPartitions : partitions.entrySet()) {
String topic = topicAndPartitions.getKey();
var config = startingOffsetModeProperties.getTopicConfig(topic);
if (config == null) {
continue;
}
List<TopicPartition> currentPartitions = topicAndPartitions.getValue();
String strategy = config.getStrategy();
if (strategy.equals(StartingOffsetModeProperties.EARLIEST)) {
consumer.seekToBeginning(currentPartitions);
} else if (strategy.equals(StartingOffsetModeProperties.LATEST)) {
consumer.seekToEnd(currentPartitions);
} else if (strategy.equals(StartingOffsetModeProperties.TIME)) {
Map<TopicPartition, Long> topicPartitionsTimestamps = new HashMap<>();
for (var p : currentPartitions) {
topicPartitionsTimestamps.put(p, StartingOffsetModeProperties.parseDateTime(config.getSeekTo()));
}
var topicPartitionsOffset = consumer.offsetsForTimes(topicPartitionsTimestamps);
for (var p : currentPartitions) {
var tsAndOffset = topicPartitionsOffset.getOrDefault(p, null);
if (tsAndOffset != null) {
consumer.seek(p, topicPartitionsOffset.get(p).offset());
} else {
consumer.seekToEnd(Collections.singleton(p));
}
}
} else {
throw new RuntimeException("Unreachable code");
}
}
}
});
};
}