Websocket传输可靠性(重新连接期间的Socket.io数据丢失)

用过的

NodeJS,Socket.io

问题

想象一下,有2个用户U1U2 ,通过Socket.io连接到一个应用程序。 该algorithm如下:

  1. U1完全失去互联网连接(例如切断互联网)
  2. U2发送消息给U1
  3. U1还没有收到消息,因为互联网已closures
  4. 服务器通过心跳超时检测到U1断开连接
  5. U1重新连接到socket.io
  6. U1永远不会收到来自U2的消息 – 我猜在步骤4中丢失了。

可能的解释

我想我明白为什么会发生:

  • 在步骤4中服务器也将套接字实例和消息队列杀死到U1
  • 而且,在步骤5中, U1服务器创build新的连接(不重用),所以即使消息仍在排队,以前的连接仍然丢失。

需要帮忙

我怎样才能防止这种数据丢失? 我必须使用听觉,因为我不会永远挂在应用程序。 此外,我还必须给出一个重新连接的可能性,因为当我部署一个新版本的应用程序,我想零宕机。

PS我称之为“消息”的东西不仅仅是一个我可以存储在数据库中的文本消息,而是一个有价值的系统消息,必须保证这个消息是传递的,还是UI的紧密结合。

谢谢!


另外1

我已经有一个用户帐户系统。 而且,我的应用程序已经很复杂了。 添加离线/在线状态将无济于事,因为我已经有了这样的东西。 问题是不同的。

退出步骤2.在这一步,我们在技术上不能说,如果U1下线 ,他只是失去了连接说2秒,可能是因为互联网不好。 所以U2发给他一个消息,但是U1没有收到,因为互联网对他来说还是很糟(步骤3)。 第4步是需要检测离线用户,可以说,超时是60秒。 最终在10秒内,U1的互联网连接已经启动,他重新连接到socket.io。 但是来自U2的消息在空间中丢失,因为服务器U1由于超时而断开连接。

这是问题,我不能100%交货。


  1. 在{}用户收集发射(发射名称和数据),由随机发射ID标识。 发送发射
  2. 确认在客户端发射(发送发射回服务器与emitID)
  3. 如果确认 – 从由emitID标识的{}中删除对象
  4. 如果用户重新连接 – 请检查此用户的{},然后循环执行步骤1中的每个对象{}
  5. 如果需要,断开用户连接或/和连接flush {}

其他人已经在其他答案和评论中暗示了这一点,但根本的问题是,Socket.IO只是一个交付机制,你不能单靠它来获得可靠的交付。 唯一知道消息已成功传送给客户的人是客户本身 。 对于这种系统,我会build议做出以下断言:

  1. 消息不会直接发送给客户; 相反,他们被发送到服务器,并存储在某种数据存储。
  2. 客户端负责在重新连接时询问“我错过了什么”,并将查询存储在数据存储中的消息以更新其状态。
  3. 如果在收件人客户端连接时将消息发送到服务器,则该消息将实时发送到客户端。

当然,根据您的应用程序的需要,您可以调整这些内容 – 例如,您可以使用Redis列表或sorting的消息集,如果您知道客户端已启动至今。


这里有几个例子:

快乐的path

  • U1和U2都连接到系统。
  • U2发送一条消息给U1应该接收的服务器。
  • 服务器将消息存储在某种持久存储中,用某种时间戳或顺序标识将其标记为U1。
  • 服务器通过Socket.IO发送消息给U1。
  • U1的客户端确认(也许通过Socket.IOcallback)它收到了消息。
  • 服务器从数据存储中删除持久消息。

离线path

  • U1失去互联网连接。
  • U2发送一条消息给U1应该接收的服务器。
  • 服务器将消息存储在某种持久存储中,用某种时间戳或顺序标识将其标记为U1。
  • 服务器通过Socket.IO发送消息给U1。
  • U1的客户端确认收据,因为他们是离线的。
  • U2可能会向U1发送更多的消息; 它们都以相同的方式存储在数据存储中。
  • 当U1重新连接时,它询问服务器“我看到的最后一条消息是X /我有状态X,我错过了什么。
  • 服务器根据U1的请求向U1发送从数据存储器中遗漏的所有消息
  • U1的客户端确认收到,服务器从数据存储中删除这些消息。

如果你绝对想要保证交付,那么devise你的系统是非常重要的,这样连接并不重要,而实时交付只是一个奖励 。 这几乎总是涉及某种数据存储。 正如user568109在评论中提到的那样,有消息传递系统可以抽象出所述消息的存储和传递,而且可能值得研究这种预先构build的解决scheme。 (您可能仍然需要自己编写Socket.IO集成。)

如果您不希望将消息存储在数据库中,则可以将其存储在本地数组中; 服务器尝试发送U1消息,并将其存储在“待处理消息”列表中,直到U1的客户端确认收到它。 如果客户端处于脱机状态,那么当它回来时,它可以告诉服务器“嗨,我已经断开连接,请给我任何我错过”,服务器可以遍历这些消息。

幸运的是,Socket.IO提供了一种机制,允许客户端“回应”看起来像本地JScallback的消息。 这是一些伪代码:

// server pendingMessagesForSocket = []; function sendMessage(message) { pendingMessagesForSocket.push(message); socket.emit('message', message, function() { pendingMessagesForSocket.remove(message); } }; socket.on('reconnection', function(lastKnownMessage) { // you may want to make sure you resend them in order, or one at a time, etc. for (message in pendingMessagesForSocket since lastKnownMessage) { socket.emit('message', message, function() { pendingMessagesForSocket.remove(message); } } }); // client socket.on('connection', function() { if (previouslyConnected) { socket.emit('reconnection', lastKnownMessage); } else { // first connection; any further connections means we disconnected previouslyConnected = true; } }); socket.on('message', function(data, callback) { // Do something with `data` lastKnownMessage = data; callback(); // confirm we received the message }); 

这与上一个build议非常相似,只是没有持久的数据存储。


您也可能对事件采购的概念感兴趣。

似乎你已经有用户帐户系统。 您知道哪个帐户在线/离线,您可以处理连接/断开事件:

因此,解决scheme是在每个用户的数据库上添加在线/离线和离线消息:

 chatApp.onLogin(function (user) { user.readOfflineMessage(function (msgs) { user.sendOfflineMessage(msgs, function (err) { if (!err) user.clearOfflineMessage(); }); }) }); chatApp.onMessage(function (fromUser, toUser, msg) { if (user.isOnline()) { toUser.sendMessage(msg, function (err) { // alert CAN NOT SEND, RETRY? }); } else { toUser.addToOfflineQueue(msg); } }) 

看这里: 处理浏览器重新加载socket.io 。

我想你可以使用我提出的解决scheme。 如果你修改它,它应该工作,你想要的。

我想你想要的是为每个用户有一个可重用的套接字,如下所示:

客户:

 socket.on("msg", function(){ socket.send("msg-conf"); }); 

服务器:

 // Add this socket property to all users, with your existing user system user.socket = { messages:[], io:null } user.send = function(msg){ // Call this method to send a message if(this.socket.io){ // this.io will be set to null when dissconnected // Wait For Confirmation that message was sent. var hasconf = false; this.socket.io.on("msg-conf", function(data){ // Expect the client to emit "msg-conf" hasconf = true; }); // send the message this.socket.io.send("msg", msg); // if connected, call socket.io's send method setTimeout(function(){ if(!hasconf){ this.socket = null; // If the client did not respond, mark them as offline. this.socket.messages.push(msg); // Add it to the queue } }, 60 * 1000); // Make sure this is the same as your timeout. } else { this.socket.messages.push(msg); // Otherwise, it's offline. Add it to the message queue } } user.flush = function(){ // Call this when user comes back online for(var msg in this.socket.messages){ // For every message in the queue, send it. this.send(msg); } } // Make Sure this runs whenever the user gets logged in/comes online user.onconnect = function(socket){ this.socket.io = socket; // Set the socket.io socket this.flush(); // Send all messages that are waiting } // Make sure this is called when the user disconnects/logs out user.disconnect = function(){ self.socket.io = null; // Set the socket to null, so any messages are queued not send. } 

然后套接字队列被保留在断开连接之间。

确保它将每个用户socket属性保存到数据库,并使这些方法成为用户原型的一部分。 数据库不重要,只要保存它,但是你一直在保存你的用户。

这将避免在Additon 1中提到的问题,因为在将消息标记为已发送之前,需要客户端的确认。 如果你真的想,你可以给每个消息一个id,并让客户端发送消息id到msg-conf ,然后检查它。

在这个例子中, user是所有用户被复制的模板用户,或者像用户原型一样。

注意:这还没有经过testing。

一直在看这个东西,认为不同的道路可能会更好。

尝试看看Azure服务总线,问题和话题照顾离线状态。 消息等待用户回来,然后他们得到消息。

是一个运行队列的成本,但是对于一个基本的队列来说,每百万次操作的成本是0.05美元,所以开发成本会更高,因为需要编写排队系统。 https://azure.microsoft.com/en-us/pricing/details/service-bus/

天青总线有PHP,C#,Xarmin,Anjular,爪哇脚本等库和例子。

所以服务器发送消息,并不需要担心跟踪他们。 客户端也可以使用消息发回,如果需要的话可以处理负载均衡。