diff --git a/lib/rpc/axon.js b/lib/rpc/axon.js index 9ec1cb1..451036f 100644 --- a/lib/rpc/axon.js +++ b/lib/rpc/axon.js @@ -22,20 +22,50 @@ AxonAdapter.send = function(data) { } AxonAdapter.connect = function(provider) { + var self = this var name = provider.name var endpoint = 'tcp://' + provider.address + ':' + provider.rpcPort debug('[%s] connect to endpoint "%s"', name, endpoint) var socket = axon.socket('req') + var closing = false + + function closeSocket() { + if (!closing) { + closing = true + socket.close() + self.onProviderDisconnect(provider) + debug('[%s] provider "%s" closed', name, endpoint) + } + } - var self = this socket.on('close', function() { - debug('[%s] provider %s closed', name, endpoint) - self.onProviderDisconnect(provider) + debug('[%s] socket on close, provider "%s"', name, endpoint) + closeSocket() + }) + + socket.on('socket error', function(err) { + debug('[%s] socket on error [%s], provider "%s"', name, err.code, endpoint) + closeSocket() + }) + + socket.on('connect', function(sock) { + var closeListeners = sock.listeners('close') + sock.removeAllListeners('close') + + closeListeners.unshift(function() { + debug('[%s] sock on close, provider "%s"', name, endpoint) + closeSocket() + }) + + closeListeners.forEach(function(listener) { + sock.on('close', listener) + }) }) socket.connect(endpoint, function() { debug('[%s] connected to endpoint "%s"', name, endpoint) + // heartbeat var hid var lastPong = Date.now() @@ -43,14 +73,16 @@ AxonAdapter.connect = function(provider) { function heartbeat() { var now = Date.now() if (now - lastPong > 3000) { - debug('[%s] heartbeat timeout, close socket', name) // timeout, disconnect socket + debug('[%s] heartbeat timeout, close socket provider "%s"', name, endpoint) clearInterval(hid) - socket.close() + closeSocket() } else { socket.send('ping', function(msg) { - lastPong = Date.now() - // debug('[%s] client got ' + msg, name) + // debug('[%s] client got ' + msg, name) + if (msg === 'pong') { + lastPong = Date.now() + } }) } } @@ -73,12 +105,16 @@ AxonAdapter.startServer = function(port, host) { socket.on('message', function(msg, callback) { if (msg === 'ping') { + // debug('server got ping') callback('pong') - // debug('server got ping') } else { self.dispatch(msg, callback) } }) + + socket.on('connect', function(sock) { + debug('client connected from', sock._peername) + }) }) return promise