spring-cloud-function icon indicating copy to clipboard operation
spring-cloud-function copied to clipboard

Azure Function SB Trigger Batch - ClassCastException

Open Gulslabs opened this issue 3 years ago • 1 comments

I am trying to process messages as a batch with Azure Function SB Topic Trigger; but the flow fails with class cast exception. Unable to figure the root cause on why the casting fails.

[2022-02-28T09:16:02.139Z] java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class java.util.List (org.springframework.messaging.support.GenericMessage is in unnamed module of loader java.net.URLClassLoader @126c314e; java.util.List is in module java.base of loader 'bootstrap')
[2022-02-28T09:16:02.139Z] 	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:896) ~[spring-cloud-function-context-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
2022-02-28T09:16:02.139Z] 	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$invokeFunction$7(SimpleFunctionRegistry.java:848) ~[spring-cloud-function-context-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT].
[2022-02-28T09:16:02.147Z] 	at org.springframework.cloud.function.adapter.azure.FunctionInvoker.handleRequest(FunctionInvoker.java:136) ~[spring-cloud-function-adapter-azure-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]

FunctionInvoker

public class ReconcilerFunction extends FunctionInvoker<List<String>, String> {
	@FunctionName("reconcilerFunction")
	public void run(@ServiceBusTopicTrigger(name = "messages", topicName = "myTopic", subscriptionName = "my-conn", connection = "myConnString", cardinality = Cardinality.MANY, dataType = "string") final List<String> messages,
			final ExecutionContext context) {
			handleRequest(messages,context);
	}
 }

Function Bean

@Bean("reconcilerFunction")
public Function<List<Message<String>>, String> reconcilerBean(
		ReconcilerService reconcilerService) {		
	return inputMessages -> {			
		try {
			ExecutionContext context = (ExecutionContext) inputMessages.get(0).getHeaders().get(EXECUTION_CONTEXT);			
			List<String> messages = new LinkedList<>(); 
			for (Message<String> message: inputMessages) {
			messages.add(message.getPayload()); 
			}
			reconcilerService.processMessagesBatch(messages, context);
			return "";
		} catch (Exception ex) {
			throw new FunctionFailureException("Error in function: ReconcilerFunction", ex);
		}
	};
}

Gulslabs avatar Feb 28 '22 14:02 Gulslabs

Hi @Gulslabs

Currently all Iterable inputs are converted to a Flux. This gives flexibility in that a single Function<String, String> can serve both a FunctionInvoker<List<String>, String> or FunctionInvoker<String, String>. The downside is that it results in a single item being passed into the function - not a batch (list) of them.

Additionally, there is something going on in your case when the input is a List<Message> the auto-conversion to a list input is not occurring and hence the CCE.

If you changed your code to use List<String> instead of List<Message<String>> it would work, but w/ the caveat listed above (only a single item would go through your function at once). Also, you would not be able to get at the execution context.

I am in the process of reworking the input handling so that the batch calls will work. I will list that ticket here shortly.

Workaround

As a workaround you can use a POJO that holds the list of items as the input type - something like this:

@FunctionName("reconcilerFunction")
public void run(
  @ServiceBusTopicTrigger(name = "messages", topicName = "myTopic", subscriptionName = "my-conn", connection = "myConnString", dataType = "string")
  final List<String> messages,
  final ExecutionContext context
) {
  Message<Batch> batchItemsMessage = MessageBuilder.withPayload(Batch.of(messages)).build();
  handleRequest(batchItemsMessage, context);
}
@Bean("reconcilerFunction")
public Function<Message<ReconcilerHandler.Batch>, String> reconcilerBean() {
  return batchMessage -> {
    try {
      ExecutionContext context = (ExecutionContext) batchMessage.getHeaders().get("executionContext");
      reconcilerService.processMessagesBatch(batchMessage.getPayload().getItems(), context);
    } catch (Exception ex) {
      throw new RuntimeException("Error in function: ReconcilerFunction", ex);
    }
  };
}

ℹ️ The "Batch" class is just a simple POJO I created locally to hold a list of items. This is not a built-in class and you would need to do the same.

Please let me know how it goes.

onobc avatar Mar 21 '22 13:03 onobc