Add support for mixed (imperative and reactive) functions
hi,
in a streaming pipeline I'm trying to return a flux stream in a processor [1] component. error [0] occurs when serializing the flux stream. is it even possible to transfer a flux stream to kafka in scdf? if so, could someone explain to me what's going wrong?
thanks in advance
regards, juergen
[0]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@49814cb5]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class reactor.core.publisher.FluxMap to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer, failedMessage=GenericMessage [payload=FluxMap, headers={id=4090a541-0106-0f9f-42be-4cd2e26f24ad, timestamp=1660150597971}]
[1] processor
@Bean
public Function<File, Flux<Observation>> transform() {
LOG.info("emit csv values as observations");
return fileHandle -> {
String loggerName = fileHandle.getName().split("_")[0];
Configuration loggerConfig = this.loggerConfigurations.getConfigurationByFilename(loggerName);
CSVReader csvReader;
try {
csvReader = this.createCsvReader(fileHandle, loggerConfig);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
return Flux.using(() -> csvReader, (l) -> Flux.fromIterable(csvReader),
(l) -> this.archiveAndDelete(fileHandle)).flatMap(a -> {
return Flux.fromIterable(this.createObservationsFromLine(a, loggerConfig));
});
};
}
[2] sink
@Bean
public Consumer<Flux<Observation>> importObservation() {
return obs -> obs.subscribe(
a -> FrostServerAdapterCache.getInstance().get(a.getUrl()).create(FrostEntitiyConverter.convert(a)));
}
What version of Spring Cloud Stream for Kafka are you using?
That will not work as you are mixing imperative input with reactive output. Basically this is not supported and at the moment I can't see how it can be supported given the different role imperative vs reactive functions play in the framework. We may consider it as a feature in the future, but I do not see a high demand for it
This is definitely of low priority. If anyone wants to take a crack at it it may actually need to be transferred to spring-cloud-function
@olegz I think you’re not focusing on the important thing here. @borg1310 it is recommended for you to move to usage of the Reactor Kafka binder, and not expecting to successfully mix two unrelated paradigms. The thing that wasn’t mentioned here is that Reactive spring cloud stream is useless without a Reactive binder. Hence Kafka binder is useless here, and Reactor Kafka binder is the relevant one for the job.
thanks a lot for your answers and support. now i have changed my code to use reactive input/(output) only (see [0]). but now following error occurs:
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'messageChannelToInputFluxArgumentAdapter' defined in class path resource [org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.class]: Unsatisfied dependency expressed through method 'messageChannelToInputFluxArgumentAdapter' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.cloud.stream.converter.CompositeMessageConverterFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.fzj.ibg.tsmms.stream.tests.TestEmitter.testUsageDetailSender(TestEmitter.java:52) Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.cloud.stream.converter.CompositeMessageConverterFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.fzj.ibg.tsmms.stream.tests.TestEmitter.testUsageDetailSender(TestEmitter.java:52)
[0]
@Bean
public Supplier<Flux<File>> load() {
LOG.info("reading directory: ", this.directory);
return () -> {
try {
return Flux.fromStream(Files.list(Paths.get(this.directory)).map(Path::toFile));
} catch (IOException e) {
throw new RuntimeException(String.format("exception when listing directory: %s", this.directory), e);
}
};
}
What version of Spring Cloud Stream for Kafka are you using?
spring-cloud-dependencies: 2021.0.3 spring-cloud-stream: 3.2.4 spring-cloud-stream-binder-kafka: 3.2.4
@migroskub You are correct about reactive Kafka binder. My point about mixed case is more general and is entirely outside of spring-cloud-stream, hence my comment about potentially transferring this issue to s-c-function since that is where such support is coming from
@borg1310 the limited stack trace you have included clearly points to something in your package org.fzj.ibg.tsmms.stream.tests.TestEmitter.testUsageDetailSender(TestEmitter.java:52)...... Something that we don't see. You also talking about changing code to use reactive input and output but all your are showing is Supplier which only has output (no input).
Best thing you can do is create a small sample that reproduces the issue and push it to github so we can take alook
the limited stack trace you have included clearly points to something in your package
org.fzj.ibg.tsmms.stream.tests.TestEmitter.testUsageDetailSender(TestEmitter.java:52)...... Something that we don't see
i run only a test for testing the source app (supplier). [0] is the source code of the test. in line 7 (which is line 52 in the file) the exception occurs. in [2] you can find all maven dependencies which are included.
when i include spring-cloud-stream-binder-kafka-reactive:4.0.0-M4 (as migroskub suggested (if that's what he meant)) the exception [3] occurs.
thanks for your help regards, juergen
[0] (see line 7 in the snippet below)
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(FtpFileEmitterApplication.class))
.web(WebApplicationType.NONE)
.run()) { //this line 52 - the exception occur here
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Flux<File> usageDetail = (Flux<File>) converter
.fromMessage(sourceMessage, Flux.class);
usageDetail.subscribe(a->{ System.out.println(a.getName());});
}
}
[1]
@Bean
public Supplier<Flux<File>> load() {
LOG.info("reading directory: ", this.directory);
return () -> {
try {
return Flux.fromStream(Files.list(Paths.get(this.directory)).map(Path::toFile));
} catch (IOException e) {
throw new RuntimeException(String.format("exception when listing directory: %s", this.directory), e);
}
};
}
[2]
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
[3]
[INFO] Running org.fzj.ibg.tsmms.stream.tests.TestEmitter
15:39:49.243 [main] ERROR org.springframework.boot.SpringApplication - Application run failed
java.lang.IllegalArgumentException: Unable to instantiate org.springframework.cloud.stream.binder.kafka.common.KafkaBinderEnvironmentPostProcessor [org.springframework.boot.env.EnvironmentPostProcessor]
at org.springframework.boot.util.Instantiator.lambda$static$0(Instantiator.java:52)
at org.springframework.boot.util.Instantiator.instantiate(Instantiator.java:152)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.springframework.boot.util.Instantiator.instantiate(Instantiator.java:140)
at org.springframework.boot.util.Instantiator.instantiate(Instantiator.java:125)
at org.springframework.boot.env.ReflectionEnvironmentPostProcessorsFactory.getEnvironmentPostProcessors(ReflectionEnvironmentPostProcessorsFactory.java:72)
at org.springframework.boot.env.EnvironmentPostProcessorApplicationListener.getEnvironmentPostProcessors(EnvironmentPostProcessorApplicationListener.java:122)
at org.springframework.boot.env.EnvironmentPostProcessorApplicationListener.onApplicationEnvironmentPreparedEvent(EnvironmentPostProcessorApplicationListener.java:100)
at org.springframework.boot.env.EnvironmentPostProcessorApplicationListener.onApplicationEvent(EnvironmentPostProcessorApplicationListener.java:87)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:131)
at org.springframework.boot.context.event.EventPublishingRunListener.environmentPrepared(EventPublishingRunListener.java:85)
at org.springframework.boot.SpringApplicationRunListeners.lambda$environmentPrepared$2(SpringApplicationRunListeners.java:66)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:120)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:114)
at org.springframework.boot.SpringApplicationRunListeners.environmentPrepared(SpringApplicationRunListeners.java:65)
at org.springframework.boot.SpringApplication.prepareEnvironment(SpringApplication.java:344)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164)
at org.fzj.ibg.tsmms.stream.tests.TestEmitter.testUsageDetailSender(TestEmitter.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Caused by: java.lang.ClassNotFoundException: org.springframework.cloud.stream.binder.kafka.common.KafkaBinderEnvironmentPostProcessor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.springframework.util.ClassUtils.forName(ClassUtils.java:284)
at org.springframework.boot.util.Instantiator$TypeSupplier$1.get(Instantiator.java:232)
at org.springframework.boot.util.Instantiator.instantiate(Instantiator.java:147)
... 93 common frames omitted
@borg1310 Yes. Using the Reactor Kafka binder is definitely ideal. This way you’ll earn back-pressure features and more. To me this looks like a real bug, but this isn’t really the subject of this issue, hence I’d post that & and any other detail you can add, in another new dedicated issue for this Reactor Kafka binder bug.
Closing this issue due to no activity lately. Feel free to create any new reactive issues against the proper reactive Kafka binder in this repository.