Azure Function SB Trigger Batch - ClassCastException
I am trying to process messages as a batch with Azure Function SB Topic Trigger; but the flow fails with class cast exception. Unable to figure the root cause on why the casting fails.
[2022-02-28T09:16:02.139Z] java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class java.util.List (org.springframework.messaging.support.GenericMessage is in unnamed module of loader java.net.URLClassLoader @126c314e; java.util.List is in module java.base of loader 'bootstrap')
[2022-02-28T09:16:02.139Z] at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:896) ~[spring-cloud-function-context-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
2022-02-28T09:16:02.139Z] at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$invokeFunction$7(SimpleFunctionRegistry.java:848) ~[spring-cloud-function-context-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT].
[2022-02-28T09:16:02.147Z] at org.springframework.cloud.function.adapter.azure.FunctionInvoker.handleRequest(FunctionInvoker.java:136) ~[spring-cloud-function-adapter-azure-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
FunctionInvoker
public class ReconcilerFunction extends FunctionInvoker<List<String>, String> {
@FunctionName("reconcilerFunction")
public void run(@ServiceBusTopicTrigger(name = "messages", topicName = "myTopic", subscriptionName = "my-conn", connection = "myConnString", cardinality = Cardinality.MANY, dataType = "string") final List<String> messages,
final ExecutionContext context) {
handleRequest(messages,context);
}
}
Function Bean
@Bean("reconcilerFunction")
public Function<List<Message<String>>, String> reconcilerBean(
ReconcilerService reconcilerService) {
return inputMessages -> {
try {
ExecutionContext context = (ExecutionContext) inputMessages.get(0).getHeaders().get(EXECUTION_CONTEXT);
List<String> messages = new LinkedList<>();
for (Message<String> message: inputMessages) {
messages.add(message.getPayload());
}
reconcilerService.processMessagesBatch(messages, context);
return "";
} catch (Exception ex) {
throw new FunctionFailureException("Error in function: ReconcilerFunction", ex);
}
};
}
Hi @Gulslabs
Currently all Iterable inputs are converted to a Flux. This gives flexibility in that a single Function<String, String> can serve both a FunctionInvoker<List<String>, String> or FunctionInvoker<String, String>. The downside is that it results in a single item being passed into the function - not a batch (list) of them.
Additionally, there is something going on in your case when the input is a List<Message> the auto-conversion to a list input is not occurring and hence the CCE.
If you changed your code to use List<String> instead of List<Message<String>> it would work, but w/ the caveat listed above (only a single item would go through your function at once). Also, you would not be able to get at the execution context.
I am in the process of reworking the input handling so that the batch calls will work. I will list that ticket here shortly.
Workaround
As a workaround you can use a POJO that holds the list of items as the input type - something like this:
@FunctionName("reconcilerFunction")
public void run(
@ServiceBusTopicTrigger(name = "messages", topicName = "myTopic", subscriptionName = "my-conn", connection = "myConnString", dataType = "string")
final List<String> messages,
final ExecutionContext context
) {
Message<Batch> batchItemsMessage = MessageBuilder.withPayload(Batch.of(messages)).build();
handleRequest(batchItemsMessage, context);
}
@Bean("reconcilerFunction")
public Function<Message<ReconcilerHandler.Batch>, String> reconcilerBean() {
return batchMessage -> {
try {
ExecutionContext context = (ExecutionContext) batchMessage.getHeaders().get("executionContext");
reconcilerService.processMessagesBatch(batchMessage.getPayload().getItems(), context);
} catch (Exception ex) {
throw new RuntimeException("Error in function: ReconcilerFunction", ex);
}
};
}
ℹ️ The "Batch" class is just a simple POJO I created locally to hold a list of items. This is not a built-in class and you would need to do the same.
Please let me know how it goes.