aop icon indicating copy to clipboard operation
aop copied to clipboard

[BUG] Default echange consume failed, when broker restart.

Open autumnqfeng opened this issue 3 years ago • 0 comments

Describe the bug The queue corresponding to the default Exchange cannot consume messages when the broker restarts or the namespace migrates to another broker.

To Reproduce Steps to reproduce the behavior:

  1. producer code:
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setPort(5682);
  connectionFactory.setVirtualHost("vhost-test");
  Connection connection = null;
  Channel channel = null;
  try {
      connection = connectionFactory.newConnection();
      channel = connection.createChannel();
      channel.queueDeclare("queue-test", true, true, true, null);
      for (int i = 0; i < 100; i++) {
          String message = "HelloWorld "+ i;
          channel.basicPublish("", "queue-test", null, message.getBytes());
          System.out.println("Send to mq: " + message);
      }
  } catch (Exception e) {
      e.printStackTrace();
  } finally {
      try {
          if (channel != null) {
              channel.close();
          }
          if (connection != null) {
              connection.close();
          }
      } catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
  }
  1. consumer code
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setPort(5682);
  connectionFactory.setVirtualHost("vhost-test");
  try {
      Connection connection = connectionFactory.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare("queue-test", true, false, false, null);
      DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              super.handleDelivery(consumerTag, envelope, properties, body);
              String exchange = envelope.getExchange();
              String routingKey = envelope.getRoutingKey();
              long deliveryTag = envelope.getDeliveryTag();
              String message = new String(body, StandardCharsets.UTF_8);
              System.out.println("receive message: " + message);
          }
      };
      channel.basicConsume("queue-test", true, defaultConsumer);
  } catch (Exception e) {
      e.printStackTrace();
  }
  1. start consumer and producer client.
  2. unload vhost-test.
  3. restart producer client.
  4. In the observation consumer has been unable to consume.

Expected behavior Unload can still consume normally.

autumnqfeng avatar Jul 06 '22 03:07 autumnqfeng