diff --git a/lib/rpc-client/client.js b/lib/rpc-client/client.js index 50ecf59..498116b 100644 --- a/lib/rpc-client/client.js +++ b/lib/rpc-client/client.js @@ -107,6 +107,16 @@ pro.addProxies = function(records) { } }; +/** + * Add new remote server to the rpc client. + * + * @param {Object} server new server information + */ +pro.addServer = function(server) { + this._station.addServer(server); +}; + + /** * Replace remote servers. * @@ -130,7 +140,7 @@ pro.rpcInvoke = function(serverId, msg, cb) { if(this.state !== STATE_STARTED) { tracer.error('client', __filename, 'rpcInvoke', 'fail to do rpc invoke for client is not running'); logger.error('[pomelo-rpc] fail to do rpc invoke for client is not running'); - cb(new Error('[pomelo-rpc] fail to do rpc invoke for client is not running')); + utils.invokeCallback(cb,new Error('[pomelo-rpc] fail to do rpc invoke for client is not running')); return; } this._station.dispatch(tracer, serverId, msg, this.opts, cb); @@ -237,13 +247,16 @@ var proxyCB = function(client, serviceName, methodName, args, attach, isToSpecif logger.error('[pomelo-rpc] fail to invoke rpc proxy for client is not running'); return; } - if(args.length < 2) { - logger.error('[pomelo-rpc] invalid rpc invoke, arguments length less than 2, namespace: %j, serverType, %j, serviceName: %j, methodName: %j', + if(args.length < 1) { + logger.error('[pomelo-rpc] invalid rpc invoke, arguments length less than 1, namespace: %j, serverType, %j, serviceName: %j, methodName: %j', attach.namespace, attach.serverType, serviceName, methodName); return; } var routeParam = args.shift(); - var cb = args.pop(); + var cb; + if(typeof args[args.length-1]=='function'){ + cb = args.pop(); + } var serverType = attach.serverType; var msg = {namespace: attach.namespace, serverType: serverType, service: serviceName, method: methodName, args: args}; diff --git a/lib/rpc-client/mailboxes/tcp-mailbox.js b/lib/rpc-client/mailboxes/tcp-mailbox.js index 015e233..91b6cc3 100644 --- a/lib/rpc-client/mailboxes/tcp-mailbox.js +++ b/lib/rpc-client/mailboxes/tcp-mailbox.js @@ -148,9 +148,15 @@ pro.connect = function(tracer, cb) { return; } - var id = this.curId++ & 0xffffffff; - this.requests[id] = cb; - setCbTimeout(this, id, tracer, cb); + var id = 0; + if(cb){ + id = this.curId++ & 0xffffffff; + if(!id){ + id = this.curId++ & 0xffffffff; + } + this.requests[id] = cb; + setCbTimeout(this, id, tracer, cb); + } var pkg; if(tracer.isEnabled) { diff --git a/lib/rpc-client/mailboxes/ws-mailbox.js b/lib/rpc-client/mailboxes/ws-mailbox.js index a336532..0d4824c 100644 --- a/lib/rpc-client/mailboxes/ws-mailbox.js +++ b/lib/rpc-client/mailboxes/ws-mailbox.js @@ -147,9 +147,16 @@ pro.connect = function(tracer, cb) { } var self = this; - var id = this.curId++; - this.requests[id] = cb; - setCbTimeout(this, id, tracer, cb); + var id = 0; + if(cb){ + id = this.curId++; + if(!id){ + id = this.curId++; + } + this.requests[id] = cb; + setCbTimeout(this, id, tracer, cb); + } + var pkg; if(tracer.isEnabled) { diff --git a/lib/rpc-client/mailstation.js b/lib/rpc-client/mailstation.js index 38b3b94..cda5d5b 100644 --- a/lib/rpc-client/mailstation.js +++ b/lib/rpc-client/mailstation.js @@ -106,6 +106,28 @@ pro.clearStation = function() { this.serversMap = {}; }; +/** + * Add a new server info into the mail station. + * + * @param {Object} serverInfo server info such as {id, host, port} + */ +pro.addServer = function(serverInfo) { + if(!serverInfo || !serverInfo.id) { + return; + } + + var id = serverInfo.id; + var type = serverInfo.serverType; + this.servers[id] = serverInfo; + + if(!this.serversMap[type]) { + this.serversMap[type] = []; + } + this.serversMap[type].push(id); + this.emit('addServer', id); +}; + + /** * Replace remote servers info. * @@ -183,7 +205,7 @@ pro.dispatch = function(tracer, serverId, msg, opts, cb) { self.emit('error', constants.RPC_ERROR.FAIL_FIND_MAILBOX, tracer, serverId, msg, opts); return; } - mailbox.send(tracer, msg, opts, function() { + var callback = function() { var tracer_send = arguments[0]; var send_err = arguments[1]; if(!!send_err) { @@ -198,7 +220,8 @@ pro.dispatch = function(tracer, serverId, msg, opts, cb) { } utils.applyCallback(cb, args); }); - }); + }; + mailbox.send(tracer, msg, opts,cb?callback:null); }; doFilter(tracer, null, serverId, msg, opts, this.befores, 0, 'before', send); @@ -255,8 +278,8 @@ pro.connect = function(tracer, serverId, cb) { var mailbox = self.mailboxesMap[serverId]; mailbox.connect(tracer, function(err) { if(!!err) { - tracer.error('client', __filename, 'lazyConnect', 'fail to connect to remote server: ' + serverId); - logger.error('[pomelo-rpc] mailbox fail to connect to remote server: ' + serverId); + tracer.error('client', __filename, 'lazyConnect', 'fail to connect to remote server: ' + serverId+' host:'+mailbox.host+' port:'+mailbox.port); + logger.error('[pomelo-rpc] mailbox fail to connect to remote server: ' + serverId +' host:'+mailbox.host+' port:'+mailbox.port); if(!!self.mailboxesMap[serverId]) { delete self.mailboxesMap[serverId]; } @@ -322,6 +345,7 @@ var lazyConnect = function(tracer, station, serverId, factory, cb) { var server = station.servers[serverId]; if(!server) { logger.error('[pomelo-rpc] unknown server: %s', serverId); + return false; } var mailbox = factory.create(server, station.opts); station.connecting[serverId] = true; diff --git a/lib/rpc-server/acceptors/tcp-acceptor.js b/lib/rpc-server/acceptors/tcp-acceptor.js index 820416c..d27412a 100644 --- a/lib/rpc-server/acceptors/tcp-acceptor.js +++ b/lib/rpc-server/acceptors/tcp-acceptor.js @@ -81,7 +81,7 @@ pro.listen = function(port) { } catch(err) { //json parse exception if(err) { socket.composer.reset(); - logger.error(err); + logger.error(err.stack); } } } @@ -179,30 +179,33 @@ var cloneError = function(origin) { }; return res; }; + +var respCallback = function(socket, acceptor, pkg,tracer,id) { + var args = Array.prototype.slice.call(arguments, 5); + for(var i=0, l=args.length; i