spring function don't work fine with spring integration
spring function don't work fine with spring integration In my case.I am use spring cloud stream , then I am try to use spring integration with java dsl.First time,I use a consumer bean to do all things that I need to do. when I got Spring Integration, I do a refactor by use the old consumer bean to send message by MessageGateway. Then I realized,why I needed this consumer bean? so I had a try to just use IntegrationFlow to accept message from a messageChannel that also defined with spring cloud stream. yes,It worked succes in my demo.but I found it don't work fine wit spring-cloud-bus. with the more test。I founded that,It just work fine without spring function beans。 I had try to find some thing about this in Stack Overflow,But not found.so,i think,it is a issue.
my core config
spring:
cloud:
stream:
input-bindings: amqpMessage
bindings:
# amqpMessage-in-0:
AMQP_MESSAGE_INPUT:
destination: AMQP_DESTINATION
group: AMQP_INPUT
AMQP_MESSAGE_OUTPUT:
destination: AMQP_DESTINATION
function:
bindings:
amqpMessage-in-0: AMQP_MESSAGE_INPUT
amqpMessage-out-0: AMQP_MESSAGE_OUTPUT
output-bindings: amqpMessage
rabbit:
bindings:
AMQP_MESSAGE_OUTPUT:
producer:
routing-key-expression: ''''''
my integrationflow:
@Bean
public IntegrationFlow directFlow(
SimpleEchoServiceActivator simpleEchoServiceActivator
) {
return IntegrationFlows
.from("AMQP_MESSAGE_INPUT")
.handle(simpleEchoServiceActivator)
.log().get();
}
@MessageEndpoint
@Slf4j
public class SimpleEchoServiceActivator {
@ServiceActivator
public void handle(Cargo messages) {
log.info("accept {}", messages);
}
}
when I declare a consumer like this,even don't do any config in yaml.my integration flow defined before,just don't work. so,same way,do not work fine with spring cloud bus,because of springCloudBusInput consumer.
@Bean
public Consumer<String> temp() {
return v -> log.info("============{}", v);
}
also ,I checked queue in rabbit mq. when i add this bean,the queue's message won't be consumed.
Version of the framework
external code. I send mssage by use gateway
@MessagingGateway
public interface ICargoSingleGateway {
@Gateway(requestChannel = "AMQP_MESSAGE_OUTPUT")
void supplier(Cargo messages);
}
iCargoSingleGateway.supplier(
new Cargo(1, "AAAAAAAAAAAAAAAA", "Address1", 0.5, ShippingType.DOMESTIC, 1,
1, ""));
iCargoSingleGateway.supplier(
new Cargo(2, "BBBBBBBBBBBBBBBB", "Address1", 0.5, ShippingType.DOMESTIC, 1,
1, ""));
iCargoSingleGateway.supplier(
new Cargo(3, "CCCCCCCCCCCCCCC", "Address1", 0.5, ShippingType.DOMESTIC, 1,
1, ""));
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Cargo {
public enum ShippingType {
DOMESTIC,
INTERNATIONAL
}
private long trackingId;
private String receiverName;
private String deliveryAddress;
private double weight;
private ShippingType shippingType;
private int deliveryDayCommitment;
private int region;
private String description;
}
Can you please create a small application that reproduces the issue and push it to GitHub somewhere so we can take a look. It is hard to understand based on your explanation what are you trying to accomplish.
Closing it due to lack of followup from the reporter