Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions lib/rpc-client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ pro.addProxies = function(records) {
}
};

/**
* Add new remote server to the rpc client.
*
* @param {Object} server new server information
*/
pro.addServer = function(server) {
this._station.addServer(server);
};


/**
* Replace remote servers.
*
Expand All @@ -130,7 +140,7 @@ pro.rpcInvoke = function(serverId, msg, cb) {
if(this.state !== STATE_STARTED) {
tracer.error('client', __filename, 'rpcInvoke', 'fail to do rpc invoke for client is not running');
logger.error('[pomelo-rpc] fail to do rpc invoke for client is not running');
cb(new Error('[pomelo-rpc] fail to do rpc invoke for client is not running'));
utils.invokeCallback(cb,new Error('[pomelo-rpc] fail to do rpc invoke for client is not running'));
return;
}
this._station.dispatch(tracer, serverId, msg, this.opts, cb);
Expand Down Expand Up @@ -237,13 +247,16 @@ var proxyCB = function(client, serviceName, methodName, args, attach, isToSpecif
logger.error('[pomelo-rpc] fail to invoke rpc proxy for client is not running');
return;
}
if(args.length < 2) {
logger.error('[pomelo-rpc] invalid rpc invoke, arguments length less than 2, namespace: %j, serverType, %j, serviceName: %j, methodName: %j',
if(args.length < 1) {
logger.error('[pomelo-rpc] invalid rpc invoke, arguments length less than 1, namespace: %j, serverType, %j, serviceName: %j, methodName: %j',
attach.namespace, attach.serverType, serviceName, methodName);
return;
}
var routeParam = args.shift();
var cb = args.pop();
var cb;
if(typeof args[args.length-1]=='function'){
cb = args.pop();
}
var serverType = attach.serverType;
var msg = {namespace: attach.namespace, serverType: serverType,
service: serviceName, method: methodName, args: args};
Expand Down
12 changes: 9 additions & 3 deletions lib/rpc-client/mailboxes/tcp-mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,15 @@ pro.connect = function(tracer, cb) {
return;
}

var id = this.curId++ & 0xffffffff;
this.requests[id] = cb;
setCbTimeout(this, id, tracer, cb);
var id = 0;
if(cb){
id = this.curId++ & 0xffffffff;
if(!id){
id = this.curId++ & 0xffffffff;
}
this.requests[id] = cb;
setCbTimeout(this, id, tracer, cb);
}
var pkg;

if(tracer.isEnabled) {
Expand Down
13 changes: 10 additions & 3 deletions lib/rpc-client/mailboxes/ws-mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,16 @@ pro.connect = function(tracer, cb) {
}

var self = this;
var id = this.curId++;
this.requests[id] = cb;
setCbTimeout(this, id, tracer, cb);
var id = 0;
if(cb){
id = this.curId++;
if(!id){
id = this.curId++;
}
this.requests[id] = cb;
setCbTimeout(this, id, tracer, cb);
}


var pkg;
if(tracer.isEnabled) {
Expand Down
32 changes: 28 additions & 4 deletions lib/rpc-client/mailstation.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ pro.clearStation = function() {
this.serversMap = {};
};

/**
* Add a new server info into the mail station.
*
* @param {Object} serverInfo server info such as {id, host, port}
*/
pro.addServer = function(serverInfo) {
if(!serverInfo || !serverInfo.id) {
return;
}

var id = serverInfo.id;
var type = serverInfo.serverType;
this.servers[id] = serverInfo;

if(!this.serversMap[type]) {
this.serversMap[type] = [];
}
this.serversMap[type].push(id);
this.emit('addServer', id);
};


/**
* Replace remote servers info.
*
Expand Down Expand Up @@ -183,7 +205,7 @@ pro.dispatch = function(tracer, serverId, msg, opts, cb) {
self.emit('error', constants.RPC_ERROR.FAIL_FIND_MAILBOX, tracer, serverId, msg, opts);
return;
}
mailbox.send(tracer, msg, opts, function() {
var callback = function() {
var tracer_send = arguments[0];
var send_err = arguments[1];
if(!!send_err) {
Expand All @@ -198,7 +220,8 @@ pro.dispatch = function(tracer, serverId, msg, opts, cb) {
}
utils.applyCallback(cb, args);
});
});
};
mailbox.send(tracer, msg, opts,cb?callback:null);
};

doFilter(tracer, null, serverId, msg, opts, this.befores, 0, 'before', send);
Expand Down Expand Up @@ -255,8 +278,8 @@ pro.connect = function(tracer, serverId, cb) {
var mailbox = self.mailboxesMap[serverId];
mailbox.connect(tracer, function(err) {
if(!!err) {
tracer.error('client', __filename, 'lazyConnect', 'fail to connect to remote server: ' + serverId);
logger.error('[pomelo-rpc] mailbox fail to connect to remote server: ' + serverId);
tracer.error('client', __filename, 'lazyConnect', 'fail to connect to remote server: ' + serverId+' host:'+mailbox.host+' port:'+mailbox.port);
logger.error('[pomelo-rpc] mailbox fail to connect to remote server: ' + serverId +' host:'+mailbox.host+' port:'+mailbox.port);
if(!!self.mailboxesMap[serverId]) {
delete self.mailboxesMap[serverId];
}
Expand Down Expand Up @@ -322,6 +345,7 @@ var lazyConnect = function(tracer, station, serverId, factory, cb) {
var server = station.servers[serverId];
if(!server) {
logger.error('[pomelo-rpc] unknown server: %s', serverId);
return false;
}
var mailbox = factory.create(server, station.opts);
station.connecting[serverId] = true;
Expand Down
45 changes: 24 additions & 21 deletions lib/rpc-server/acceptors/tcp-acceptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pro.listen = function(port) {
} catch(err) { //json parse exception
if(err) {
socket.composer.reset();
logger.error(err);
logger.error(err.stack);
}
}
}
Expand Down Expand Up @@ -179,30 +179,33 @@ var cloneError = function(origin) {
};
return res;
};

var respCallback = function(socket, acceptor, pkg,tracer,id) {
var args = Array.prototype.slice.call(arguments, 5);
for(var i=0, l=args.length; i<l; i++) {
if(args[i] instanceof Error) {
args[i] = cloneError(args[i]);
}
}
var resp;
if(tracer.isEnabled) {
resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
else {
resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
if(acceptor.bufferMsg) {
enqueue(socket, acceptor, resp);
} else {
socket.write(socket.composer.compose(RES_TYPE, JSON.stringify(resp), id));
}
};

//need to redefine response
var processMsg = function(socket, acceptor, pkg, id) {
var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
tracer.info('server', __filename, 'processMsg', 'tcp-acceptor receive message and try to process message');
acceptor.cb.call(null, tracer, pkg.msg, function() {
var args = Array.prototype.slice.call(arguments, 0);
for(var i=0, l=args.length; i<l; i++) {
if(args[i] instanceof Error) {
args[i] = cloneError(args[i]);
}
}
var resp;
if(tracer.isEnabled) {
resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
else {
resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
if(acceptor.bufferMsg) {
enqueue(socket, acceptor, resp);
} else {
socket.write(socket.composer.compose(RES_TYPE, JSON.stringify(resp), id));
}
});
acceptor.cb.call(null, tracer, pkg.msg,pkg.id?respCallback.bind(null,socket,acceptor,pkg,tracer,id):null);
};

var processMsgs = function(socket, acceptor, pkgs, id) {
Expand Down
43 changes: 23 additions & 20 deletions lib/rpc-server/acceptors/ws-acceptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,29 +153,32 @@ var cloneError = function(origin) {
return res;
};

var respCallback = function(socket,acceptor,pkg,tracer) {
var args = Array.prototype.slice.call(arguments, 4);
for(var i=0, l=args.length; i<l; i++) {
if(args[i] instanceof Error) {
args[i] = cloneError(args[i]);
}
}
var resp;
if(tracer.isEnabled) {
resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
else {
resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
if(acceptor.bufferMsg) {
enqueue(socket, acceptor, resp);
} else {
socket.send(JSON.stringify(resp));
}
};

var processMsg = function(socket, acceptor, pkg) {
var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
tracer.info('server', __filename, 'processMsg', 'ws-acceptor receive message and try to process message');
acceptor.cb.call(null, tracer, pkg.msg, function() {
var args = Array.prototype.slice.call(arguments, 0);
for(var i=0, l=args.length; i<l; i++) {
if(args[i] instanceof Error) {
args[i] = cloneError(args[i]);
}
}
var resp;
if(tracer.isEnabled) {
resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
else {
resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
}
if(acceptor.bufferMsg) {
enqueue(socket, acceptor, resp);
} else {
socket.send(JSON.stringify(resp));
}
});

acceptor.cb.call(null, tracer, pkg.msg, pkg.id?respCallback.bind(null,socket,acceptor,pkg,tracer):null);
};

/**
Expand Down
4 changes: 3 additions & 1 deletion lib/rpc-server/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pro.route = function(tracer, msg, cb) {
}

var args = msg.args.slice(0);
args.push(cb);
if(cb){
args.push(cb);
}
method.apply(service, args);
};
6 changes: 3 additions & 3 deletions lib/rpc-server/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ var Gateway = function(opts) {
this.started = false;
this.stoped = false;
this.services = opts.services;
if(!!this.opts.reloadRemotes) {
watchServices(this, dispatcher);
}
var self = this;

this.acceptors = {};
Expand All @@ -35,6 +32,9 @@ var Gateway = function(opts) {
this.acceptor = this.acceptorFactory.create(opts, function(tracer, msg, cb) {
dispatcher.route(tracer, msg, cb);
});
if(!!this.opts.reloadRemotes) {
watchServices(this, dispatcher);
}
};

util.inherits(Gateway, EventEmitter);
Expand Down
15 changes: 11 additions & 4 deletions lib/util/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ exp.create = function(opts) {
var genObjectProxy = function(serviceName, origin, attach, proxyCB) {
//generate proxy for function field
var res = {};
for(var field in origin) {
if(typeof origin[field] === 'function') {
res[field] = genFunctionProxy(serviceName, field, origin, attach, proxyCB);
}
var keys = Object.getOwnPropertyNames(Object.getPrototypeOf(origin)).concat(Object.getOwnPropertyNames(origin));
for(var i in keys){
var field = keys[i];
if(typeof origin[field] === 'function') {
res[field] = genFunctionProxy(serviceName, field, origin, attach, proxyCB);
}
}
// for(var field in origin) {
// if(typeof origin[field] === 'function') {
// res[field] = genFunctionProxy(serviceName, field, origin, attach, proxyCB);
// }
// }

return res;
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "pomelo-rpc",
"version": "1.0.2",
"dependencies": {
"ws": "0.8.0",
"ws": "^0.8.0",
"crc": "0.2.0",
"pomelo-loader": "0.0.6",
"pomelo-logger": "0.1.7",
Expand Down
1 change: 1 addition & 0 deletions sample/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ client.start(function(err) {
}
console.log(resp);
});
client.proxies.user.test.service.onlynotify(routeParam, msg[Math.round(Math.random()*(10-1))] + '::' + id++);
}

setInterval(func, period);
Expand Down
2 changes: 1 addition & 1 deletion sample/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"protocol": "ws",
"protocol": "tcp",
"host": "127.0.0.1",
"port": "8080",
"interval": 1000,
Expand Down
3 changes: 3 additions & 0 deletions sample/remote/test/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ module.exports = function(context) {
return {
echo: function(msg, cb) {
cb(null, 'echo: ' + msg);
},
onlynotify:function(msg,cb){
console.log('receive notify..:',cb,msg);
}
};
};
2 changes: 1 addition & 1 deletion sample/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var paths = [

var port = config.port || 8080;

var server = Server.create({paths: paths, port: port, acceptorName: acceptorName});
var server = Server.create({paths: paths, port: port, acceptorName: acceptorName,reloadRemotes:true});
server.start();
console.log('rpc server started.');

Expand Down