Spring cloud stream can not close MessageChannel
Describe the issue When define dynamic-destination-cache-size ,the channelCache execute removeEldestEntry method,the bindingService can found key
The reason is ,when we create a new MessageChannel object put channelCache,the code is:
this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
if (StringUtils.hasText(binderName)) {
this.channelCache.put(binderName + ":" + destinationName, messageChannel);
}
else {
this.channelCache.put(destinationName, messageChannel);
}
the bindingService use key: desinationName(Topic,not include binderName) the channelCache use key: binderName + ":" + destinationName (When we use send set the binderName is not null)
but the remove method is,for example when we use StreamBridge.send("dev_create_just_1","nansha","The message body"):
protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
boolean remove = size() > bindingServiceProperties.getDynamicDestinationCacheSize();
if (remove) {
if (logger.isDebugEnabled()) {
logger.debug("Removing message channel from cache " + eldest.getKey());
}
//the eldest.getKey() return "nansha:dev_create_just_1" but the bindingService put key is: "dev_create_just_1"
bindingService.unbindProducers(eldest.getKey());
}
return remove;
}
so the bindingService.unbindProducers execute is:
public void unbindProducers(String outputName) {
Binding<?> binding = this.producerBindings.remove(outputName);//outputName is :"nansha:dev_create_just_1"
//so the binding is null
if (binding != null) {
binding.stop();
//then
binding.unbind();
}
else if (this.log.isWarnEnabled()) {
this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
}
}
So If you want to solve this problem,We must modify the resolveDestination method to :
from
this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
if (StringUtils.hasText(binderName)) {
this.channelCache.put(binderName + ":" + destinationName, messageChannel);
}
else {
this.channelCache.put(destinationName, messageChannel);
}
to
if (StringUtils.hasText(binderName)) {
this.bindingService.bindProducer(messageChannel, binderName + ":" + destinationName, true, binder);
this.channelCache.put(binderName + ":" + destinationName, messageChannel);
}
else {
this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
this.channelCache.put(destinationName, messageChannel);
}