spring-cloud-aws icon indicating copy to clipboard operation
spring-cloud-aws copied to clipboard

@SQSListener method: @SnsNotificationMessage parameter doesn't work in batching method signatures

Open EstelaGarcia opened this issue 1 year ago • 3 comments

Type: Bug

Component: SQS

Describe the bug Using Spring Cloud 3.1.1 (have also tried 3.2.0-M1)

The following method signatures result in an SQS Listener that works and converts to the appropriate object: SQS message, single object

    @SqsListener(value = "queuename")
    public void onMessage(CustomObjectType notification) throws Exception {...}

SQS message, List of objects

    @SqsListener(value = "queuename")
    public void onMessage(List<CustomObjectType> notifications) throws Exception {...}

SNS to SQS message, single object

    @SqsListener(value = "queuename")
    public void onMessage(@SnsNotificationMessage CustomObjectType notification) throws Exception {...}

However the following does not work as no converter can be found:

    @SqsListener(value = "queuename")
    public void onMessage(@SnsNotificationMessage List<CustomObjectType> notification) throws Exception {...}

The error occurs in the following method in io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter

    protected final Object invokeHandler(Collection<Message<T>> messages) {
        try {
            return this.handlerMethod.invoke(MessageBuilder.withPayload(messages).build(), new Object[0]);
        } catch (Exception var3) {
            throw this.createListenerException((Collection)messages, var3);
        }
    }

CustomObjectType is a simple JSON object, and the conversion occurs correctly when handling a single message at a time.

From the comments under the PR that introduced the @SnsNotificationMessage annotation, we are not the only ones experiencing this problem.

EstelaGarcia avatar Apr 03 '24 14:04 EstelaGarcia

Hi @EstelaGarcia, thanks for opening the issue.

You're right in that batch Sns Messages are not currently supported.

Would you like to contribute a PR with that functionality?

Thanks

tomazfernandes avatar Apr 14 '24 23:04 tomazfernandes

Oh, my mistake, we have a PR already. Please ignore the last message.

Thanks.

tomazfernandes avatar Apr 14 '24 23:04 tomazfernandes

Hi, I'm looking for this feature. Any idea when this will be available?

In the meantime, I copied the NotificationMessageArgumentResolver and the SnsMessageConverter to my code base. and did the below configuration to use the copied resolve and converter.

`@Configuration public class AWSSQSConfiguration {

@Bean
SqsListenerConfigurer configurer() {
    return registrar -> registrar.manageMethodArgumentResolvers(
            handlerMethodArgumentResolvers -> handlerMethodArgumentResolvers.add(2,
                    new NotificationMessageArgumentResolver(messageConverter(), new ObjectMapper())));
}

@Bean
public MessageConverter messageConverter() {
    return new MappingJackson2MessageConverter();
}

`

@SqsListener(value = "${com.example.fto.aws.sqs.queue-url}",
            maxMessagesPerPoll="15",
            pollTimeoutSeconds="20",
            messageVisibilitySeconds="5",
            maxConcurrentMessages="15")
public void listenForMultipleMessages(@SnsNotificationMessage List<FTOOrderEventDto> ftoOrderEventDtoList) {

however the conversion still doesn't convert to appropriate object. I'm able to see multiple message read from the SQS and tries to convert into FTOOrderEventDto, however all the properties/attributes are null

Not sure what wrong I'm doing here. any suggestions?

nandeeshsu avatar May 29 '24 09:05 nandeeshsu

I am facing below issue after adding above conf wrt to SQS

@SqsListener(value = "${LISTENER_SQS:test-sqs}",
    factory = "factory", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS)
public void processRequest(@SnsNotificationMessage final Class obj)
{
}

org.springframework.context.ApplicationContextException: Failed to start bean 'io.awspring.cloud.messaging.internalEndpointRegistryBeanName' at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:186) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:363) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:160) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:128) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:968) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:618) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:738) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:440) at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1317) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) at com.nilesh.central.Main.main(Main.java:33) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) at org.springframework.boot.loader.Launcher.launch(Launcher.java:95) at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65) Caused by: java.util.concurrent.CompletionException: io.awspring.cloud.sqs.QueueAttributesResolvingException: Error resolving attributes for queue QUEUE_NAME with strategy CREATE and queueAttributesNames []

nilesh-chordia avatar Aug 08 '24 11:08 nilesh-chordia

@EstelaGarcia and @nandeeshsu , Have you faced above issue,

nilesh-chordia avatar Aug 08 '24 11:08 nilesh-chordia

@nilesh-chordia , I ended up doing my own parsing of the incoming messages (having the argument in the method be a list of strings) as I didn't have time to figure out a cleaner way to do it (we had a deadline). However, your issue seems to be related to the queue name not being set correctly, although I am unsure as to why (you have a default value in the value attribute of the SqsListener annotation so it should be at least picking that up):

Error resolving attributes for queue QUEUE_NAME with strategy CREATE and queueAttributesNames []

EstelaGarcia avatar Aug 14 '24 08:08 EstelaGarcia

@EstelaGarcia , Thanks for your response, Issue is resolved. Problem is: I am using older version of localstack, after using localstack of version 3.x. Issue resolved.

nilesh-chordia avatar Aug 14 '24 09:08 nilesh-chordia

PR with the enhancement was merged: https://github.com/awspring/spring-cloud-aws/pull/1191

tomazfernandes avatar Aug 17 '24 18:08 tomazfernandes