Node.js,Socket.io,Redis pub / sub高容量,低延迟困难

当连接socket.io/node.js和redis pub / sub以试图创build一个由可以处理多个传输的服务器事件驱动的实时Web广播系统时,似乎有三种方法:

  1. 'createClient'一个redis连接并订阅频道。 在socket.io客户机连接上,将客户机join到socket.io机房。 在redis.on(“message”,…)事件中,调用io.sockets.in(room).emit(“event”,data)分发给相关房间中的所有客户端。 像如何在socket.io中重用redis连接?

  2. 'createClient'是一个redis连接。 在socket.io客户端连接上,将客户端join到socket.io房间并订阅相关的redis通道。 在客户端连接closures和收到消息调用client.emit(“event”,data)中引入redis.on(“message”,…)来引发特定客户端上的事件。 就像在使用socket.io中的RedisStore的例子中的答案一样

  3. 按照socketio-spec协议,使用RedisStore烘焙到socket.io中并从Redis中的单个“dispatch”通道“广播”。

Number 1允许为所有客户端处理Redis子和关联事件一次。 Number 2提供了一个更直接的挂钩到Redis pub / sub。 3号更简单,但对消息传递事件的控制很less。

然而,在我的testing中,所有连接的客户端都超过1个,performance出意外的低性能。 有问题的服务器事件是尽快发布到redis通道的1,000条消息,尽快分发。 性能是通过连接客户端的时间来衡量的(socket.io-client基于日志时间戳进入Redis列表进行分析)。

我猜测,在选项1中,服务器接收到消息,然后将其顺序写入所有连接的客户端。 在选项2中,服务器多次接收每条消息(每个客户端订阅一次)并将其写入相关的客户端。 在任何情况下,服务器都不会到达第二个消息事件,直到它传达给所有连接的客户端。 情况明显加剧,并发性上升。

这似乎与堆栈function的智慧不一致。 我想相信,但我正在挣扎。

这种情况(大量消息的低延迟分布)只是没有这些工具的选项(还?),还是我错过了一个把戏?

我认为这是一个合理的问题,并在短时间内对其进行了研究。 我花了一点时间寻找可能能够从中获取一些有用提示的示例。

例子

我喜欢以直截了当的例子开始:

  • 光im示例代码
  • Node.js + Redis Pub / Sub + socket.io演示

光的样本是一个单一的页面(请注意,你要用 Matt Ranney的node_redis来代替redis-node-client:

/* * Mclarens Bar: Redis based Instant Messaging * Nikhil Marathe - 22/04/2010 * A simple example of an IM client implemented using * Redis PUB/SUB commands so that all the communication * is offloaded to Redis, and the node.js code only * handles command interpretation,presentation and subscribing. * * Requires redis-node-client and a recent version of Redis * http://code.google.com/p/redis * http://github.com/fictorial/redis-node-client * * Start the server then telnet to port 8000 * Register with NICK <nick>, use WHO to see others * Use TALKTO <nick> to initiate a chat. Send a message * using MSG <nick> <msg>. Note its important to do a * TALKTO so that both sides are listening. Use STOP <nick> * to stop talking to someone, and QUIT to exit. * * This code is in the public domain. */ var redis = require('./redis-node-client/lib/redis-client'); var sys = require('sys'); var net = require('net'); var server = net.createServer(function(stream) { var sub; // redis connection var pub; var registered = false; var nick = ""; function channel(a,b) { return [a,b].sort().join(':'); } function shareTable(other) { sys.debug(nick + ": Subscribing to "+channel(nick,other)); sub.subscribeTo(channel(nick,other), function(channel, message) { var str = message.toString(); var sender = str.slice(0, str.indexOf(':')); if( sender != nick ) stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n"); }); } function leaveTable(other) { sub.unsubscribeFrom(channel(nick,other), function(err) { stream.write("Stopped talking to " + other+ "\n"); }); } stream.addListener("connect", function() { sub = redis.createClient(); pub = redis.createClient(); }); stream.addListener("data", function(data) { if( !registered ) { var msg = data.toString().match(/^NICK (\w*)/); if(msg) { stream.write("SERVER: Hi " + msg[1] + "\n"); pub.sadd('mclarens:inside', msg[1], function(err) { if(err) { stream.end(); } registered = true; nick = msg[1]; // server messages sub.subscribeTo( nick + ":info", function(nick, message) { var m = message.toString().split(' '); var cmd = m[0]; var who = m[1]; if( cmd == "start" ) { stream.write( who + " is now talking to you\n"); shareTable(who); } else if( cmd == "stop" ) { stream.write( who + " stopped talking to you\n"); leaveTable(who); } }); }); } else { stream.write("Please register with NICK <nickname>\n"); } return; } var fragments = data.toString().replace('\r\n', '').split(' '); switch(fragments[0]) { case 'TALKTO': pub.publish(fragments[1]+":info", "start " + nick, function(a,b) { }); shareTable(fragments[1]); break; case 'MSG': pub.publish(channel(nick, fragments[1]), nick + ':' +fragments.slice(2).join(' '), function(err, reply) { if(err) { stream.write("ERROR!"); } }); break; case 'WHO': pub.smembers('mclarens:inside', function(err, users) { stream.write("Online:\n" + users.join('\n') + "\n"); }); break; case 'STOP': leaveTable(fragments[1]); pub.publish(fragments[1]+":info", "stop " + nick, function() {}); break; case 'QUIT': stream.end(); break; } }); stream.addListener("end", function() { pub.publish(nick, nick + " is offline"); pub.srem('mclarens:inside', nick, function(err) { if(err) { sys.debug("Could not remove client"); } }); }); }); server.listen(8000, "localhost"); 

文件

这里有大量的文档,而且这种types的堆栈上的apis正在迅速改变,所以你必须权衡每个文档的时间相关性。

  • 节点活动stream
  • 云铸造的例子
  • 如何节点redis pubsub
  • redis延迟
  • redis cookbook使用Pub / Sub进行asynchronous通信
  • linkedin的通用技巧
  • 节点的redis绑定
  • 谷歌组nodejs的问题

相关问题

只是一些相关的问题,这是堆栈上的热门话题:

  • node.js中的聊天服务器的Redis pub / sub
  • 如何devise即时通讯系统的redis pub / sub?

值得注意的技巧(ymmv)

closures或优化套接字池,使用有效的绑定,监视延迟,并确保你没有复制工作(即不需要两次发布给所有听众)。