Producer (fn) stops after there's an error [Kafka binder]
I have setup a stream producer using spring-cloud-function, which works fine until there's an exception then it stops producing.
Steps to reproduce the behavior:
class Producer {
public static EmitterProcessor<String> produce = EmitterProcessor.create();
}
@Configuration
class BeanConfig {
@Bean
Supplier<Flux<String>> producer() {
return () -> Producer.produce;
}
}
@Component
class Listener {
@EventListener
void onSomeEvent(SomeEvent<String> event) {
var dto = event.getEvent();
Producer.produce.onNext(dto);
}
}
spring:
cloud:
function:
definition: producer
stream:
kafka:
binder:
brokers: localhost
bindings:
producer-out-0:
destination: some-topic
Version of the framework
- Spring-Boot: 2.3.0.RELEASE
- Spring-Cloud: Hoxton.SR5 (
spring-cloud-stream-binder-kafka)
Expected behavior
The application should keep producing after an exception is encountered. I added an error handler but it doesn't seem to fix it.
@Bean
Supplier<Flux<String>> producer() {
return () -> Producer.produce.onErrorResume(e -> Mono.empty());
}
For reactive function we do not do anything with regard to error handling. We tried, but every time we do something we would inadvertently affect some other part of functionality. So basically once reactive functions are used it's the responsibility of the user to handle errors. Also see - https://github.com/spring-cloud/spring-cloud-stream/issues/1937 That said, your issue doesn't have much information for us to diagnose what may be the issue. NO stack trace, no logs nothing, so please follow up with something with regard to how to reproduce the issue.
your issue doesn't have much information for us to diagnose what may be the issue
I will try to reproduce something for this ticket. Basically, after the first error, there's no more production from the Publisher.
So basically once reactive functions are used it's the responsibility of the user to handle errors
I tried this
@Bean // .onErrorResume(...) I just skip the error, still didn't work
Supplier<Flux<String>> producer() {
return () -> Producer.produce.onErrorResume(e -> Mono.empty());
}
Will this work instead?
try {
Producer.produce.onNext(dto);
} catch(Exception e) {
// ignore
}
A little more context
@Data
class SomeDTO implements Serializable {
private String id;
@org.apache.avro.reflect.Nullable
private org.locationtech.jts.geom.Point presentLocation;
}
@Component
class Listener {
@EventListener
void onSomeEvent(SomeEvent<SomeDTO > event) {
var dto = event.getEvent();
dto.setPresentLocation(null); // Avro workaround
Producer.produce.onNext(dto);
}
}
The errors are sporadic and not from application code (framework code) - NullPointerException from Avro + Spring Cloud Stream Schema
At the moment I have switched from content-type: application/*+avro to content-type: application/json and things are stable.
Should I return to the @EnableBinding approach until this is properly addressed?
Hi Julius,
Have you attempted to either setup debugging and/or instrumenting your application or the affected method? If not, here are some helpful references to get you started:
https://spring.io/blog/2019/03/28/reactor-debugging-experience https://github.com/reactor/reactor-tools
The second repo also points to the debug agent's new home in core.
This may help capture a bit more (and more focused) information to isolate exception origins+causes.
Cheers, Mark
@mkheck I will debug and revert
I think this is the same issue I am experiencing.
I have a reactive Function that acts as processor. The function per se works but if the returned object creates an exception later, it's as if the Function is unsubscribed.
In my scenario, I use Azure Event Hubs with Kafka emulation (which means I have a hard limit of 1 Mb per message). When my function returns a message larger than 1 Mb, the send to the output queue would fail (org.apache.kafka.common.errors.RecordTooLargeException) and the following exception is raised: ...onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'funcName'
Future incoming messages aren't routed anymore to the function and yield the following exception:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage
Is there a way to handle exceptions before the FunctionToDestinationBinder detects a failure?
This looks related as well https://github.com/spring-cloud/spring-cloud-stream/issues/1951