spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Auto register functionals beans as spring cloud stream bindings

Open ferblaca opened this issue 4 years ago • 19 comments

When configuring an application with spring cloud stream, if there is a bean that implements a functional interface even if its purpose is neither to publish nor to subscribe to events, spring cloud stream detects it and tries to create the provisioning of the target, and even if the bean is a supplier, it even sends events.

This happens as long as there is no more than one bean in the spring context of type Function, Consumer or Supplier, even if you set the property "spring.cloud.function.scan.enabled" to false.

This can be a problem in applications where you only want to use StreamBridge to produce events and for other reasons there is a bean that implements a functional interface in the spring context.

Is this the desired behaviour?

To Reproduce Attached is an application that reproduces the problem. demoStreamKafka.zip

Version of the framework spring-cloud 2020.0.2 spring-cloud-stream 3.1.2

ferblaca avatar May 10 '21 09:05 ferblaca

The behavior is intentional but will only happen if you have single bean of type Function, Consumer or Supplier. If you have multiple, non will be bound without providing spring.cloud.function.definition property.

You can disable it by using spring.cloud.stream.function.autodetect property - https://docs.spring.io/spring-cloud-stream/docs/3.0.12.RELEASE/reference/html/spring-cloud-stream.html#_overview

For future reference please use our dedicated Stack Overflow channel for questions

olegz avatar May 10 '21 09:05 olegz

Thank you very much for the information @olegz !

Yes, the behaviour is as you describe, but I'm afraid that the property "spring.cloud.stream.function.autodetect" has no effect in the latest version 3.1.2... but it does in 3.0.x.

Will it be available for version 3.1.3?

ferblaca avatar May 10 '21 11:05 ferblaca

I'll investigate. I am surprised that it has no effect. But yes it will be available in 3.1.3

olegz avatar May 10 '21 11:05 olegz

As a workaround you can always use spring.cloud.function.definition=blah where blah points to something that doesn't exist. You'll see a warning message, but that's ok.

olegz avatar May 10 '21 11:05 olegz

Actually this indeed has been addressed in 3.1, but have not been released yet - https://github.com/spring-cloud/spring-cloud-stream/issues/2035

olegz avatar May 10 '21 13:05 olegz

Hello @olegz ! just to inform, now in version 3.1.2 this workaroung is not valid: Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'supplierInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'blah' available at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean (AbstractAutowireCapableBeanFactory.java:1786) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean (AbstractAutowireCapableBeanFactory.java:602) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean (AbstractAutowireCapableBeanFactory.java:524)

To solve this for the moment I will define more than one dummy functional bean by leaving the 'spring.cloud.function.definition' property empty.

ferblaca avatar May 11 '21 10:05 ferblaca

We are running into the same issue as ferblaca.

There exists a bean in the context of our application which implements a functional interface that has nothing to do with spring cloud stream. And we have a definition of spring.cloud.stream.source for the StreamBridge.

We also have spring.cloud.stream.function.autodetect=false.

The issue is within the SimpleFunctionRegistry#normalizeFunctionDefinition which replaces the functionDefinition which comes from spring.cloud.stream.source with the name of the single non-related bean.

Thus, our binding configuration is not applied.

As a workaround we define another non-related bean which implements a functional interface.

@olegz can we maybe reopen this issue?

LeovR avatar Aug 19 '21 12:08 LeovR

Sure, we can reopen if one of you provide us with sample to reproduce it. A simple project with bare minimum to reproduce the issue somewhere in github will do.

olegz avatar Aug 19 '21 12:08 olegz

Alright, I will provide a sample application

LeovR avatar Aug 19 '21 13:08 LeovR

I created a sample project here: https://github.com/LeovR/spring-cloud-stream-issue-2170

LeovR avatar Aug 20 '21 05:08 LeovR

@LeovR why is there spring.cloud.stream.function.autodetect=false?

olegz avatar Aug 23 '21 13:08 olegz

Because the arbitraryFunctionalBean in the demo project is just an example of a bean implementing a functional interface. The only source for publishing messages is the StreamBridge.

In our application there is a bean of org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunction which extends Function<KubernetesClient, FilterWatchListDeletable<Service, ServiceList, Boolean, Watch>>.

We do not want this as a source for spring cloud stream. Thus, we disabled spring.cloud.stream.function.autodetect.

LeovR avatar Aug 23 '21 13:08 LeovR

So you just need StreamBridge, correct?

olegz avatar Aug 23 '21 13:08 olegz

Yes, correct.

LeovR avatar Aug 23 '21 13:08 LeovR

Well, perhaps we need to update the docs, but you really don't need the following

spring.cloud.stream.source=sourceSupplier
spring.cloud.stream.bindings.sourceSupplier-out-0.destination=sourcetopic

Basically inject StreamBridge whenever and use it. For example

streamBridge.send("sourcetopic", "blah, blah");

or if you want your binding name to be different then destination name you can still use spring.cloud.stream.bindings.some_binding_name.destination=sourcetopic For example

spring.cloud.stream.bindings.sourceSupplier-out-0.destination=sourcetopic
. . .
streamBridge.send("sourceSupplier-out-0.", "blah, blah");

Basically the way StreamBridge works is it will send to the existing binding or will auto-provision one on the first send.

Give it a try and let me know. I'll keep it open until you confirm

olegz avatar Aug 23 '21 14:08 olegz

In the docs there is the following chapter:

https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

Spring Cloud Stream provides two mechanisms, so let’s look at them in more details

Basically you are suggesting to use this mode: https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_streambridge_and_dynamic_destinations

I think that would work for us but there is a small difference in those mechanisms. With the explicit definition of spring.cloud.stream.source the binding will be created at context creation.

Thus, we know at the start whether the configuration is correct. Wherease in the dynamic mode the binding will only be created after using the StreamBridge for the first time.

So if the first mode is still a valid option then I think there is still an issue as demonstrated in the demo project.

LeovR avatar Aug 24 '21 04:08 LeovR

@olegz I have a GitHub project as I have suffered from the same issues above - see https://github.com/davidmelia/spring-cloud-function-source-only. This project is a simple spring cloud function and spring cloud stream kafka producer and therefore does not need a 'definition'.

You can run AwsLambdaRequestHandlerSandpit which will create a message in the kafka topic 'test'.

  • my observations are
  1. spring.cloud.stream.source does not bootstrap at context creation but I think it should (similar to spring.cloud.function.definition) - its seems to be compleltey redundant

  2. Because I only have a source (and not a spring.cloud.function.definition) I need spring.cloud.stream.function.autodetect=false. Is there anything clever that can be done to say if we only have a spring.cloud.stream.source then autodetect=false rather than having to specify it?

Thanks

davidmelia avatar Sep 17 '21 14:09 davidmelia

Hi!

I have found a related issue. I have a very simple project to reproduce it: demo.zip

It seems to be an issue when I try to use the coherence-spring-boot-starter and coherence together with spring-cloud-stream. The Application cannot start correctly. The stack trace is: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getExtractor' defined in class path resource [com/oracle/coherence/spring/configuration/ExtractorConfiguration.class]: Unexpected exception during bean creation; nested exception is java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1755) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:604) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:526) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:928) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:926) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:592) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:309) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at com.example.demo.DemoApplication.main(DemoApplication.java:10) ~[classes/:na] Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getExtractor' defined in class path resource [com/oracle/coherence/spring/configuration/ExtractorConfiguration.class]: Unexpected exception during bean creation; nested exception is java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:539) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:344) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1141) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.discoverFunctionInBeanFactory(BeanFactoryAwareFunctionRegistry.java:206) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:129) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:42) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.determineFunctionName(FunctionConfiguration.java:989) ~[spring-cloud-stream-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet(FunctionConfiguration.java:861) ~[spring-cloud-stream-4.0.0-M3.jar:4.0.0-M3] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1802) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1751) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] ... 15 common frames omitted Caused by: java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:858) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:767) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:525) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1324) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1161) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:566) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:526) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] ... 25 common frames omitted As you can see there is IllegalStateException. This happens because the Spring cloud stream finds a bean factory method that would create a bean of a Functional interface. But I think that this particular Functional interface should not be picked up. The reason is that this is a Prototype bean that can only be created when there is a provided injection point. What I doubt is: is it good idea to pick up a bean factory method that is prototype and takes injection point as an input parameter? What causes the Exception is @Bean @Primary @Scope(BeanDefinition.SCOPE_PROTOTYPE) public ValueExtractor<?, ?> getExtractor(InjectionPoint injectionPoint) { return this.extractorService.getExtractor(injectionPoint); } in com.oracle.coherence.spring.configuration.ExtractorConfiguration. But I believe the root cause would be to not try to create Beans where InjectionPoint parameter is needed. Please share your thoughts on this.

Thanks!

Drizzthsz avatar Jun 30 '22 11:06 Drizzthsz