RabbitMQ flooded by new connections
Hi,
i'm using java api client with Automatic Recovery but every time it loses the connection, when it reconnects it creates a new connection on a different port, flooding my RabbitMQ with multiple connections.The environment I am working on loses the connection frequently.
Is it possibile use the same connection when it tries to reconnect?
Thanks in advance
What do you mean by flooding? How often does it loose it how what kind of problems do you see from that?
Remote port will always be 5672 (or 5671 for amqps), but local port will be generated by the OS for each new TCP connection. You should not have to change that behavior.
Hi,
thanks for the answer, with flooding I mean that the lost connection remain visible in the dashboard and it isn't automatically deleted after a timeout. The problem is that the lost connections are hundreds in my environment! At some point the connections are that much that the dashboard is totally not usable anymore, it slows down and it doesn't load.
Best regards
2017-07-03 14:44 GMT+02:00 Carl Hörberg [email protected]:
What do you mean by flooding? How often does it loose it how what kind of problems do you see from that?
Remote port will always be 5672 (or 5671 for amqps), but local port will be generated by the OS for each new TCP connection. You should not have to change that behavior.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/cloudamqp/android-example/issues/4#issuecomment-312635928, or mute the thread https://github.com/notifications/unsubscribe-auth/AVdMg3SYWYVvXtYnDzRvfHUn4WhpeTEbks5sKOIogaJpZM4OMOr9 .
Decrease TCP keep alive timeout or AMQP heartbeats timeout.
Hi,
I used Lyra library but when the client lost connection, it still create a new connections like attached picture.
This is my subiscribe method:
private void subscribe(final Handler handler) { subscribeThread = new Thread(new Runnable() { @Override public void run() { while(true) { try { //connection = connectionFactory.newConnection(); connection = MySingletonConnection.getInstance().getConnection(); //connection = Connections.create(options, config); Channel channel = connection.createChannel(); // Declare a queue and bind it to an exchange. com.rabbitmq.client.AMQP.Queue.DeclareOk q = channel.queueDeclare("manager2box-"+ROUTING_KEY, true, false, true, null); channel.queueBind(q.getQueue(), EXCHANGE, ROUTING_KEY); // Create the QueueingConsumer and have it consume from the queue QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(q.getQueue(), false, consumer);
while (true) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery(); String message = new String(delivery.getBody()); Log.d(TAG, message); Message msg = handler.obtainMessage(); Bundle bundle = new Bundle(); bundle.putString("commandBundle", message); msg.setData(bundle); handler.sendMessage(msg); } } catch (InterruptedException e) { Log.d(TAG, "InterruptedException"); e.printStackTrace(); break; } catch (Exception e1) { Log.d(TAG, "Connection broken: " + e1.getClass().getName()); e1.printStackTrace(); try { Thread.sleep(5000); //sleep and then try again } catch (InterruptedException e) { Log.d(TAG, "InterruptedException"); e.printStackTrace(); break; } } } } }); subscribeThread.start(); }
This is my SingletonConnection:
public class MySingletonConnection{ public static final MySingletonConnection INSTANCE = new MySingletonConnection(); private static Connection connection;
private final String TAG = "MySingletonConnection";
private final String EXCHANGE = "manager2box";
private final String USERNAME = "manager";
private final String PASSWORD = "MGXxXq72HscXzakR";
//private final String HOST = "192.168.1.147";
private final String HOST = "portal.seafy.me";
private final String ROUTING_KEY = Utils.getIPAddress();
private Config config;
private ConnectionOptions options;
private MySingletonConnection(){
//Lyra config
config = new Config()
.withConnectionListeners()
.withRecoveryPolicy(RecoveryPolicies.recoverAlways())
.withRetryPolicy(new RetryPolicy()
.withMaxAttempts(20)
.withInterval(Duration.seconds(5))
.withMaxDuration(Duration.minutes(5)));
//Lyra option config for connection
options = new ConnectionOptions().withUsername(USERNAME)
.withRequestedHeartbeat(Duration.seconds(30))
.withPassword(PASSWORD)
.withHost(HOST)
.withPort(5672);
try {
connection = Connections.create(options, config);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static MySingletonConnection getInstance(){
return INSTANCE;
}
public Connection getConnection( ) {
return connection;
}
}
2017-07-03 15:26 GMT+02:00 Carl Hörberg [email protected]:
Decrease TCP keep alive timeout or AMQP heartbeats timeout.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/cloudamqp/android-example/issues/4#issuecomment-312645215, or mute the thread https://github.com/notifications/unsubscribe-auth/AVdMg67bur2NYY2SMNxYWHXMqCFuqxvzks5sKOwbgaJpZM4OMOr9 .
Those methods are deprecated:
`
// Create the QueueingConsumer and have it consume from the queue
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(q.getQueue(), false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
Log.d(TAG, message);
Message msg = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putString("commandBundle", message);
msg.setData(bundle);
handler.sendMessage(msg);
}
`