spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Producer (fn) stops after there's an error [Kafka binder]

Open juliuskrah opened this issue 5 years ago • 6 comments

I have setup a stream producer using spring-cloud-function, which works fine until there's an exception then it stops producing.

Steps to reproduce the behavior:

class Producer {
    public static EmitterProcessor<String> produce = EmitterProcessor.create();
}

@Configuration
class BeanConfig {
    @Bean
    Supplier<Flux<String>> producer() {
        return () -> Producer.produce;
    }
}

@Component
class Listener {
    @EventListener
    void onSomeEvent(SomeEvent<String> event) {
        var dto = event.getEvent();
	Producer.produce.onNext(dto);
    }
}
spring:
  cloud:
    function:
      definition: producer
    stream:
      kafka:
        binder:
          brokers: localhost
      bindings:
        producer-out-0:
          destination: some-topic

Version of the framework

  • Spring-Boot: 2.3.0.RELEASE
  • Spring-Cloud: Hoxton.SR5 (spring-cloud-stream-binder-kafka)

Expected behavior

The application should keep producing after an exception is encountered. I added an error handler but it doesn't seem to fix it.

 @Bean
 Supplier<Flux<String>> producer() {
     return () -> Producer.produce.onErrorResume(e -> Mono.empty());
 }

juliuskrah avatar Jul 11 '20 22:07 juliuskrah

For reactive function we do not do anything with regard to error handling. We tried, but every time we do something we would inadvertently affect some other part of functionality. So basically once reactive functions are used it's the responsibility of the user to handle errors. Also see - https://github.com/spring-cloud/spring-cloud-stream/issues/1937 That said, your issue doesn't have much information for us to diagnose what may be the issue. NO stack trace, no logs nothing, so please follow up with something with regard to how to reproduce the issue.

olegz avatar Jul 12 '20 13:07 olegz

your issue doesn't have much information for us to diagnose what may be the issue

I will try to reproduce something for this ticket. Basically, after the first error, there's no more production from the Publisher.

So basically once reactive functions are used it's the responsibility of the user to handle errors

I tried this

@Bean // .onErrorResume(...) I just skip the error, still didn't work
Supplier<Flux<String>> producer() {
    return () -> Producer.produce.onErrorResume(e -> Mono.empty());
}

Will this work instead?

try {
    Producer.produce.onNext(dto);
} catch(Exception e) {
    // ignore
}

A little more context

@Data
class SomeDTO implements Serializable {
    private String id;
    @org.apache.avro.reflect.Nullable
    private org.locationtech.jts.geom.Point presentLocation;
}

@Component
class Listener {
    @EventListener
    void onSomeEvent(SomeEvent<SomeDTO > event) {
        var dto = event.getEvent();
        dto.setPresentLocation(null); // Avro workaround
	Producer.produce.onNext(dto);
    }
}

The errors are sporadic and not from application code (framework code) - NullPointerException from Avro + Spring Cloud Stream Schema

At the moment I have switched from content-type: application/*+avro to content-type: application/json and things are stable.

Should I return to the @EnableBinding approach until this is properly addressed?

juliuskrah avatar Jul 12 '20 14:07 juliuskrah

Hi Julius,

Have you attempted to either setup debugging and/or instrumenting your application or the affected method? If not, here are some helpful references to get you started:

https://spring.io/blog/2019/03/28/reactor-debugging-experience https://github.com/reactor/reactor-tools

The second repo also points to the debug agent's new home in core.

This may help capture a bit more (and more focused) information to isolate exception origins+causes.

Cheers, Mark

mkheck avatar Jul 12 '20 16:07 mkheck

@mkheck I will debug and revert

juliuskrah avatar Jul 12 '20 23:07 juliuskrah

I think this is the same issue I am experiencing.

I have a reactive Function that acts as processor. The function per se works but if the returned object creates an exception later, it's as if the Function is unsubscribed.

In my scenario, I use Azure Event Hubs with Kafka emulation (which means I have a hard limit of 1 Mb per message). When my function returns a message larger than 1 Mb, the send to the output queue would fail (org.apache.kafka.common.errors.RecordTooLargeException) and the following exception is raised: ...onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'funcName'

Future incoming messages aren't routed anymore to the function and yield the following exception:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage

Is there a way to handle exceptions before the FunctionToDestinationBinder detects a failure?

ziodave avatar Jun 22 '21 19:06 ziodave

This looks related as well https://github.com/spring-cloud/spring-cloud-stream/issues/1951

ziodave avatar Jun 22 '21 19:06 ziodave