spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

spring function don't work fine with spring integration

Open Xtreme-Light opened this issue 3 years ago • 2 comments

spring function don't work fine with spring integration In my case.I am use spring cloud stream , then I am try to use spring integration with java dsl.First time,I use a consumer bean to do all things that I need to do. when I got Spring Integration, I do a refactor by use the old consumer bean to send message by MessageGateway. Then I realized,why I needed this consumer bean? so I had a try to just use IntegrationFlow to accept message from a messageChannel that also defined with spring cloud stream. yes,It worked succes in my demo.but I found it don't work fine wit spring-cloud-bus. with the more test。I founded that,It just work fine without spring function beans。 I had try to find some thing about this in Stack Overflow,But not found.so,i think,it is a issue.

my core config

spring:
  cloud:
    stream:
      input-bindings: amqpMessage
      bindings:
#        amqpMessage-in-0:
        AMQP_MESSAGE_INPUT:
          destination: AMQP_DESTINATION
          group: AMQP_INPUT
        AMQP_MESSAGE_OUTPUT:
          destination: AMQP_DESTINATION
      function:
        bindings:
          amqpMessage-in-0: AMQP_MESSAGE_INPUT
          amqpMessage-out-0: AMQP_MESSAGE_OUTPUT
      output-bindings: amqpMessage
      rabbit:
        bindings:
          AMQP_MESSAGE_OUTPUT:
            producer:
              routing-key-expression: ''''''

my integrationflow:

  @Bean
  public IntegrationFlow directFlow(
      SimpleEchoServiceActivator simpleEchoServiceActivator
  ) {
    return IntegrationFlows
        .from("AMQP_MESSAGE_INPUT")
        .handle(simpleEchoServiceActivator)
        .log().get();
  }
@MessageEndpoint
@Slf4j
public class SimpleEchoServiceActivator {

  @ServiceActivator
  public void handle(Cargo messages) {
    log.info("accept {}", messages);
  }
}

when I declare a consumer like this,even don't do any config in yaml.my integration flow defined before,just don't work. so,same way,do not work fine with spring cloud bus,because of springCloudBusInput consumer.

  @Bean
  public Consumer<String> temp() {
    return v -> log.info("============{}", v);
  }

also ,I checked queue in rabbit mq. when i add this bean,the queue's message won't be consumed.

Version of the framework <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> 2.7.0 <relativePath/> <spring-cloud.version>2021.0.3</spring-cloud.version>

Xtreme-Light avatar Jun 18 '22 19:06 Xtreme-Light

external code. I send mssage by use gateway

@MessagingGateway
public interface ICargoSingleGateway {

  @Gateway(requestChannel = "AMQP_MESSAGE_OUTPUT")
  void supplier(Cargo messages);

}
    iCargoSingleGateway.supplier(
        new Cargo(1, "AAAAAAAAAAAAAAAA", "Address1", 0.5, ShippingType.DOMESTIC, 1,
            1, ""));

    iCargoSingleGateway.supplier(
        new Cargo(2, "BBBBBBBBBBBBBBBB", "Address1", 0.5, ShippingType.DOMESTIC, 1,
            1, ""));

    iCargoSingleGateway.supplier(
        new Cargo(3, "CCCCCCCCCCCCCCC", "Address1", 0.5, ShippingType.DOMESTIC, 1,
            1, ""));
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Cargo {

  public enum ShippingType {
    DOMESTIC,
    INTERNATIONAL
  }

  private long trackingId;
  private String receiverName;
  private String deliveryAddress;
  private double weight;
  private ShippingType shippingType;
  private int deliveryDayCommitment;
  private int region;
  private String description;

}

Xtreme-Light avatar Jun 19 '22 06:06 Xtreme-Light

Can you please create a small application that reproduces the issue and push it to GitHub somewhere so we can take a look. It is hard to understand based on your explanation what are you trying to accomplish.

olegz avatar Aug 03 '22 17:08 olegz

Closing it due to lack of followup from the reporter

olegz avatar Sep 28 '22 12:09 olegz