node-amqp
node-amqp copied to clipboard
How to swallow ECONNRESET error correctly?
In my app, the Sender which wrapped with amqp as following
Sender.prototype.createConnection_ = function () {
var deferred = Q.defer();
this.con_ = amqp.createConnection( this.connectOpt_, this.implementOpt_ );
deferred.resolve( this.con_ );
return deferred.promise;
}
Sender.prototype.connectionReady_ = function() {
var deferred = Q.defer(),
self = this;
self.con_.on('ready', function() {
console.log('connection is ok now');
deferred.resolve(self.con_);
});
return deferred.promise;
}
Sender.prototype.handleConnectionError_ = function() {
var self = this;
self.con_.on('error', function(err) {
console.log('Sender: Connection error' + err);
if (err.code === 'ECONNRESET')
console.log('Get ECONNRESET error');
});
return Q(self);
}
Sender.prototype.createExchange_ = function() {
var deferred = Q.defer(),
self = this;
this.con_.exchange( this.exchangeName_, this.exchangeOpt_, function( ex ) {
console.log('exchange is ok now');
self.ex_ = ex;
deferred.resolve(self.ex_);
});
return deferred.promise;
}
Sender.prototype.exchangeReady_ = function() {
var deferred = Q.defer(),
self = this;
this.ex_.on('open', function() {
console.log('Sender: exchange opened');
deferred.resolve(this.ex_);
});
return deferred.promise;
}
Sender.prototype.handleMessageReturn_ = function() {
var self = this;
self.ex_.on('basic-return', function( msg ) {
console.log('Message return ' + msg);
});
return Q(self);
}
Sender.prototype.handleExchangeError_ = function() {
var self = this;
self.ex_.on('error', function(err) {
console.log('Sender: Exchange Error' + err);
}) ;
return Q(self);
}
Sender.prototype.getExchange_ = function() {
var deferred = Q.defer();
if ( this.ex_ === null ) {
deferred.reject( new Error('Sender: Message Connection Failed Now...') );
} else {
deferred.resolve( this.ex_ );
}
return deferred.promise;
}
Sender.prototype.publish_ = function( ex, key, msg ) {
var self = this,
deferred = Q.defer();
console.log('publish message here...');
ex.publish( key, msg, this.publishOpt_, function(flag, err) {
if (flag) {
deferred.reject(new Error('Sender: publish message ' + err) );
self.cb_( key, err );
} else {
console.log('Sender: delivered message processed');
deferred.resolve();
}
});
return deferred.promise;
}
/**
*
* @public
*/
Sender.prototype.deliverMessage = function( key, message ) {
var self = this;
return self.getExchange_()
.then( self.publish_.call( self, self.ex_, key, message ) )
.catch( function(err) {
console.info(err);
self.cb_( key, err );
});
}
Sender.prototype.connect_ = function() {
var self = this;
return self.createConnection_()
.then( self.connectionReady_.bind(self) )
.then( self.handleConnectionError_.bind(self) )
.then( self.createExchange_.bind(self) )
.then( self.exchangeReady_.bind(self) )
.then( self.handleMessageReturn_.bind(self) )
.catch( function(err) {
console.info(err);
});
}
Now here is my test case. Send message every 3 seconds, and start or stop RabbitMQ service randomly. Now the ECONNRESET error comes up, and the log shows
Sender: Connection errorError: CONNECTION_FORCED - broker forced connection clos ure with reason 'shutdown'
It seems that this error can be caught by con_.on('error', however, there is still exception here. How to do that?