spring.cloud.stream.sendto.destination not honoring explicit binder environment
spring.cloud.stream.sendto.destination is not honoring the explicit binder environment(child context), and is trying to fetch default binder assignment from parent context which is resulting in following exception.
2020-02-11 16:10:07.284 ERROR 59803 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'supplier_integrationflow.router#0' for component 'supplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0']; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka2,kafka1, and no default binder has been set., failedMessage=GenericMessage [payload=byte[8], headers={id=8b7de95a-6f07-8ad5-b0f5-780297eed494, spring.cloud.stream.sendto.destination=topic-out, contentType=application/json, timestamp=1581466207264}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:187)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka2,kafka1, and no default binder has been set.
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:183)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:134)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:362)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:257)
at org.springframework.cloud.stream.binding.BinderAwareChannelResolver.resolveDestination(BinderAwareChannelResolver.java:125)
at org.springframework.cloud.stream.function.FunctionConfiguration$1.lambda$afterPropertiesSet$0(FunctionConfiguration.java:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:84)
at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:202)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
... 26 more
Sample Project: https://github.com/ncheema/spring-cloud-stream-bug
Spring cloud stream version: 3.0.2.BUILD-SNAPSHOT
@SpringBootApplication
public class Application {
private EmitterProcessor<Message<String>> processor = EmitterProcessor.create(false);
@Bean
public Supplier<Flux<Message<String>>> supplier() {
return () -> processor;
}
@Bean
public Consumer<Flux<String>> consumer() {
return msg -> msg
.map(m -> MessageBuilder
.withPayload(m)
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.build())
.doOnNext(processor::onNext)
.subscribe();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
spring.cloud.stream:
function.definition: supplier;consumer;
bindings:
supplier-out-0:
destination: topic-out
binder: kafka1
consumer-in-0:
destination: topic-in
binder: kafka2
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
Why do you have the value in quotes? - binder: "kafka2"
@ncheema When you have dynamic destinations, the destination itself becomes an output binding at runtime. Therefore, in the case of multi binders, you have to specify which binder to attach to that binding. I modified your configuration as below and the outbound topic (topic-out) now receives data.
bindings:
supplier-out-0:
destination: topic-out
binder: kafka1
consumer-in-0:
destination: topic-in
binder: kafka2
topic-out:
binder: kafka1
If the destination is truly dynamic, i.e. you don't know the Kafka topic at build time, but rather depends on the provisioner to create the dynamic topic at runtime, then the above approach won't work (since you can't attach the binder name to the binding in advance). In that case, Spring Cloud Stream somehow needs to attach the proper binder based on some config. Anyways, the above config change should work in your situation.
@olegz I removed the quotes but the issue is still present. (also updated the code sample)
@sobychacko I agree binder must be specified per binding. This issue is tailored at spring.cloud.stream.sendto.destination which lets us alter the destination topic via headers.
The reported issue occurs with a multi binder setup and spring.cloud.stream.sendto.destination header
(sample of consumer utilizing spring.cloud.stream.sendto.destination).
@Bean
public Consumer<Flux<String>> consumer() {
return msg -> msg
.map(m -> MessageBuilder
.withPayload(m)
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.build())
.doOnNext(processor::onNext)
.subscribe();
}
The issue seems closely related to (https://github.com/spring-cloud/spring-cloud-stream/issues/1880) where the parent context was used rather than the child context updated exception log
2020-02-11 16:10:07.284 ERROR 59803 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'supplier_integrationflow.router#0' for component 'supplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0']; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka2,kafka1, and no default binder has been set., failedMessage=GenericMessage [payload=byte[8], headers={id=8b7de95a-6f07-8ad5-b0f5-780297eed494, spring.cloud.stream.sendto.destination=topic-out, contentType=application/json, timestamp=1581466207264}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:187)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
Taking another stab at explaining the problem, hopefully this would help clarify.
Given a multibinder config, where the destination topic for a supplier is set dynamically via spring.cloud.stream.sendto.destination header, doesn't function unless spring.cloud.stream.defaultBinder is set to same binder as the destination binding's binder.
Breakdown:
Consumer is adding spring.cloud.stream.sendto.destination header to the message.
@SpringBootApplication
public class Application {
private EmitterProcessor<Message<String>> processor = EmitterProcessor.create(false);
@Bean
public Supplier<Flux<Message<String>>> supplier() {
return () -> processor;
}
@Bean
public Consumer<Flux<String>> consumer() {
return msg -> msg
.map(m -> MessageBuilder
.withPayload(m)
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.build())
.doOnNext(processor::onNext)
.subscribe();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Non functional config
this config throws the following error.
A default binder has been requested, but there is more than one binder available
spring.cloud.stream:
function.definition: supplier;consumer;
bindings:
supplier-out-0:
# destination: topic-out
# destination topic is set via header spring.cloud.stream.defaultBinder
binder: kafka1
consumer-in-0:
destination: topic-in
binder: kafka2
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
Working Config (workaround)
Config works, as we are settings spring.cloud.stream.defaultBinder to same binder as supplier's binder (kafka1)
spring.cloud.stream.defaultBinder: kafka1
spring.cloud.stream:
function.definition: supplier;consumer;
bindings:
supplier-out-0:
# destination: topic-out
# destination topic is set via header spring.cloud.stream.defaultBinder
binder: kafka1
consumer-in-0:
destination: topic-in
binder: kafka2
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
is your topic really dynamic? You seem to hard code the value of sendTo to be topic-out at all times.
Also, your bootstrap can be greatly simplified if you just use a single Function instead of a pair of Consumer and Supplier.
@SpringBootApplication
public class Application {
@Bean
public Function<Flux<String>, Flux<Message>> function() {
return value -> MessageBuilder
.withPayload(value)
// .setHeader("spring.cloud.stream.sendto.destination", "topic-out") not sure if this is needed at all
.build();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
and then refer to input as function-in-0 and output as function-out-0
is your topic really dynamic?
Yes, in our use case topic is dynamic (as explanied in earlier issue https://github.com/spring-cloud/spring-cloud-stream/issues/1812) . I am however hard coding in sample code to keep it simple.
Thank you for the simplification.Our actual use-case requires EmitterProcessor, so kept that in the sample as well.
If you and team would like I can change the sample to be a dynamic topic.
What do you mean ...requires EmitterProcessor...? The use of EmitterProcessor is just means to an end primarily for cases where data arrives from external (non-binder) system such as rest endpoint etc. In your case you have two binders, one for input and one for output. You can simply connect it with function (as described).
No, no need to change, i understand the issue, so let me see what we can dig up
Ok, l confirmed this is a bug. Perfect timing though as we were about to release 3.0.2, so we'll fix it before he release.
Actually changed it to enhancement since it really never worked
@ncheema Actually I just re-read what @sobychacko said earlier and that is correct, you do have miss-configuration, so let me try to explain it a bit differently.
When you specify spring.cloud.stream.bindings.<something>.*, the <something> is the name of the binding. You always send/receive messages to/from bindings. The bindings could further be mapped to destinations which correspond to the actual queues/exchanges/topics etc. in your broker.
This means that in your configuration:
...
bindings:
supplier-out-0:
destination: topic-out
binder: kafka1
...
. . . the binding name is supplier-out-0 and it is further mapped to a broker destination topic-out in the kafka1 binder. So when you send message to ("spring.cloud.stream.sendto.destination", "topic-out") the system simply could not resolve the binder for dynamic binding topic-out.
If you were to change it to:
...
bindings:
topic-out:
binder: kafka1
...
(as Soby suggested earlier) then everything would work as expected.
So to summarise, dynamic binding resolution only works with a single binder or in case of multiple binders for cases when binding names are known ahead of time. If they are not the behavior you describe is actually expected behavior. That said I am not against adding this as a feature, so feel free to change the description of the original issue or let me know if you're ok with it and I'll do it. Basically something like "Add support for dynamic binding resolution in multiple binder scenarios when binding names are not known ahead of time".
We would probably require another header
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.setHeader("spring.cloud.stream.sendto.binder", "kafka1")
@olegz, @sobychacko suggestion indeed solves the problem. I apologize for not thoroughly reading @sobychacko' suggestion earlier.
regards to your comment for
What do you mean ...requires EmitterProcessor...? The use of EmitterProcessor is just means to an end primarily for cases where data arrives from external (non-binder) system such as rest endpoint etc. In your case you have two binders, one for input and one for output. You can simply connect it with function (as described).
our use-case: input is of type List<String> and output is to publish each element of the list as a separate message(Message<String>). Not sure how to accomplish this without the usage of the EmitterProcessor(sample below).
@SpringBootApplication
public class Application {
private EmitterProcessor<Message<String>> processor = EmitterProcessor.create(false);
@Bean
public Supplier<Flux<Message<String>>> supplier() {
return () -> processor;
}
@Bean
public Consumer<List<String>> consumer() {
return msg -> msg.forEach(
m -> processor.onNext( MessageBuilder
.withPayload(m)
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.build()));
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@olegz I am using dynamic topic in the header with multiple binder and getting the error
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for
We would probably require another header as you suggested
.setHeader("spring.cloud.stream.sendto.destination", "topic-out")
.setHeader("spring.cloud.stream.sendto.binder", "kafka1")
Is there any other workaround for this ?
@natraj09 have you read the previous comments? It appears that there is a workaround or even a solution
@olegz I already read through the comments and tried out various options. My destination is dynamic and it cannot be statically mapped.