spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Spring cloud stream can not close MessageChannel

Open liuxuzxx opened this issue 2 years ago • 1 comments

Describe the issue When define dynamic-destination-cache-size ,the channelCache execute removeEldestEntry method,the bindingService can found key

The reason is ,when we create a new MessageChannel object put channelCache,the code is:

    this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
    if (StringUtils.hasText(binderName)) {
        this.channelCache.put(binderName + ":" + destinationName, messageChannel);
    }
    else {
        this.channelCache.put(destinationName, messageChannel);
    }

the bindingService use key: desinationName(Topic,not include binderName) the channelCache use key: binderName + ":" + destinationName (When we use send set the binderName is not null)

but the remove method is,for example when we use StreamBridge.send("dev_create_just_1","nansha","The message body"):

protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
    boolean remove = size() > bindingServiceProperties.getDynamicDestinationCacheSize();
    if (remove) {
        if (logger.isDebugEnabled()) {
	    logger.debug("Removing message channel from cache " + eldest.getKey());
	}
        //the eldest.getKey() return "nansha:dev_create_just_1" but the bindingService put key is: "dev_create_just_1"
	bindingService.unbindProducers(eldest.getKey());
    }
    return remove;
}

so the bindingService.unbindProducers execute is:

public void unbindProducers(String outputName) {
    Binding<?> binding = this.producerBindings.remove(outputName);//outputName is :"nansha:dev_create_just_1"
    //so the binding is null
    if (binding != null) {
        binding.stop();
	//then
	binding.unbind();
    }
    else if (this.log.isWarnEnabled()) {
        this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
    }
}

liuxuzxx avatar Dec 11 '23 06:12 liuxuzxx

So If you want to solve this problem,We must modify the resolveDestination method to :

from

    this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
    if (StringUtils.hasText(binderName)) {
        this.channelCache.put(binderName + ":" + destinationName, messageChannel);
    }
    else {
        this.channelCache.put(destinationName, messageChannel);
}

to


    if (StringUtils.hasText(binderName)) {
        this.bindingService.bindProducer(messageChannel, binderName + ":" + destinationName, true, binder);
	this.channelCache.put(binderName + ":" + destinationName, messageChannel);                               
    }
    else {
        this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
	this.channelCache.put(destinationName, messageChannel);                                
    }

liuxuzxx avatar Dec 11 '23 06:12 liuxuzxx