如何编写一个可扩展的基于TCP / IP的服务器
我正处于编写一个新的Windows服务应用程序的devise阶段,该应用程序接受长时间连接的TCP / IP连接(即,这不像HTTP那里有很多短连接,而是客户端连接并保持连接数小时或数天甚至几周)。
我正在寻找想法来devisenetworking架构的最佳方式。 我将需要启动至less一个线程的服务。 我正在考虑使用asynchronousAPI(BeginRecieve等..),因为我不知道有多less客户端,我将连接在任何时候(可能是数百)。 我绝对不想为每个连接启动一个线程。
数据将主要从我的服务器stream出到客户端,但是偶尔会有一些来自客户端的命令。 这主要是一个监视应用程序,在这个应用程序中,我的服务器定期向客户端发送状态数据。
任何build议最好的方式使这个尽可能的可扩展性? 基本工作stream程 谢谢。
编辑:要清楚,我正在寻找基于.net的解决scheme(C#如果可能,但任何.net语言将工作)
BOUNTY NOTE:为了获得赏金,我期待的不仅仅是一个简单的答案。 我需要一个解决scheme的工作示例,作为我可以下载的一个指针或一个简单的例子。 它必须是基于.net和Windows的(任何.net语言都可以)
编辑:我想感谢每个人给出了很好的答案。 不幸的是,我只能接受一个,而我select接受更为人熟知的Begin / End方法。 埃萨克的解决scheme可能会更好,但是这个问题还是很新的,我不知道如何解决问题。
我已经提出了所有我认为很好的答案,希望我能为你们做更多的事情。 再次感谢。
我过去写过类似的东西。 从我多年前的研究显示,使用asynchronous套接字编写自己的套接字实现是最好的select。 这意味着客户没有真正做任何事情需要相对较less的资源。 任何发生的事情都由.net线程池来处理。
我把它写成一个类来pipe理服务器的所有连接。
我只是使用一个列表来保存所有的客户端连接,但是如果您需要更快的查找更大的列表,则可以根据需要编写它。
private List<xConnection> _sockets;
另外你还需要socket实际上监听连接。
private System.Net.Sockets.Socket _serverSocket;
启动方法实际上启动服务器套接字,并开始监听任何传入的连接。
public bool Start() { System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName()); System.Net.IPEndPoint serverEndPoint; try { serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port); } catch (System.ArgumentOutOfRangeException e) { throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e); } try { _serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); } catch (System.Net.Sockets.SocketException e) { throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e); } try { _serverSocket.Bind(serverEndPoint); _serverSocket.Listen(_backlog); } catch (Exception e) { throw new ApplicationException("Error occured while binding socket, check inner exception", e); } try { //warning, only call this once, this is a bug in .net 2.0 that breaks if // you're running multiple asynch accepts, this bug may be fixed, but // it was a major pain in the ass previously, so make sure there is only one //BeginAccept running _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); } catch (Exception e) { throw new ApplicationException("Error occured starting listeners, check inner exception", e); } return true; }
我只是想说明exception处理代码看起来不好,但其原因是我有exception抑制代码在那里,以便任何exception将被抑制,并返回false
如果configuration选项设置,但我想要删除它为了简洁起见。
上面的_serverSocket.BeginAccept(新的AsyncCallback(acceptCallback)),_serverSocket)基本上设置了我们的服务器套接字,以便在用户连接时调用acceptCallback方法。 这个方法从.Net线程池运行,如果你有很多的阻塞操作,它会自动处理创build额外的工作线程。 这应该最好地处理服务器上的任何负载。
private void acceptCallback(IAsyncResult result) { xConnection conn = new xConnection(); try { //Finish accepting the connection System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState; conn = new xConnection(); conn.socket = s.EndAccept(result); conn.buffer = new byte[_bufferSize]; lock (_sockets) { _sockets.Add(conn); } //Queue recieving of data from the connection conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn); //Queue the accept of the next incomming connection _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); } catch (SocketException e) { if (conn.socket != null) { conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); } catch (Exception e) { if (conn.socket != null) { conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); } }
上面的代码基本上完成了接受进来的连接,队列BeginReceive
是一个callback,当客户端发送数据时运行,然后排队下一个acceptCallback
接受下一个客户端连接。
BeginReceive
方法调用告诉套接字从客户端接收数据时该做什么。 对于BeginReceive
,您需要给它一个字节数组,这是数据在客户端发送数据时将复制数据的位置。 ReceiveCallback
方法将被调用,这是我们如何处理接收数据。
private void ReceiveCallback(IAsyncResult result) { //get our connection from the callback xConnection conn = (xConnection)result.AsyncState; //catch any errors, we'd better not have any try { //Grab our buffer and count the number of bytes receives int bytesRead = conn.socket.EndReceive(result); //make sure we've read something, if we haven't it supposadly means that the client disconnected if (bytesRead > 0) { //put whatever you want to do when you receive data here //Queue the next receive conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn); } else { //Callback run but no data, close the connection //supposadly means a disconnect //and we still have to close the socket, even though we throw the event later conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } } catch (SocketException e) { //Something went terribly wrong //which shouldn't have happened if (conn.socket != null) { conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } } }
编辑:在这种模式中,我忘了提到,在这个代码领域:
//put whatever you want to do when you receive data here //Queue the next receive conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
我通常会做的是在你想要的代码中,将数据包重新组装成消息,然后将它们创build为线程池中的作业。 这样,无论消息处理代码在运行,来自客户端的下一个块的BeginReceive都不会被延迟。
接受callback通过调用结束接收完成读取数据套接字。 这填充开始接收函数中提供的缓冲区。 一旦你做了我想离开评论的地方,我们就调用下一个BeginReceive
方法,如果客户端发送更多的数据,它将再次运行callback。 现在,这是非常棘手的部分,当客户端发送数据时,您的接收callback可能只能被部分消息调用。 重组可能变得非常复杂。 我用我自己的方法,并创build了一种专有的协议来做到这一点。 我遗漏了它,但如果你请求,我可以添加它。这个处理程序实际上是我写过的最复杂的一段代码。
public bool Send(byte[] message, xConnection conn) { if (conn != null && conn.socket.Connected) { lock (conn.socket) { //we use a blocking mode send, no async on the outgoing //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode conn.socket.Send(bytes, bytes.Length, SocketFlags.None); } } else return false; return true; }
上面的send方法实际上使用了一个同步的Send
调用,对我来说这很好,因为我的应用程序的消息大小和multithreading性质。 如果你想发送给每个客户端,你只需要遍历_sockets列表。
上面引用的xConnection类基本上是一个包含字节缓冲区的套接字的简单包装,在我的实现中还有一些额外的function。
public class xConnection : xBase { public byte[] buffer; public System.Net.Sockets.Socket socket; }
另外这里的参考是我using
的,因为我没有包括在内,所以我总是感到恼火。
using System.Net.Sockets;
我希望这是有帮助的,它可能不是最干净的代码,但它的工作原理。 代码中还有一些细微之处,你应该对改变感到厌倦。 首先,在任何时候只能调用一个BeginAccept
。 曾经有一个非常烦人的.net错误,这是几年前,所以我不记得的细节。
而且,在ReceiveCallback
代码中,我们在对下一个接收进行排队之前处理从套接字接收的所有内容。 这意味着对于单个套接字,我们实际上在任何时候都只是一次ReceiveCallback
,而且我们不需要使用线程同步。 但是,如果您重新sorting以便在提取数据后立即调用下一个接收,可能会稍微快一点,那么您需要确保正确地同步这些线程。
另外,我还破解了很多我的代码,但是留下了所发生的事情的本质。 这对你的devise应该是一个好的开始。 如果您对此有任何疑问,请留下评论。
在C#中执行networking操作有很多种方法。 它们都在底层使用不同的机制,从而在高并发性的情况下遭受重大的性能问题。 开始*操作是许多人经常误认为是做networking的更快/最快的方式之一。
为了解决这些问题,他们引入了*asynchronous方法集合:从MSDN http://MSdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs.aspx
SocketAsyncEventArgs类是System.Net.Sockets .. ::。Socket类的一组增强的一部分,它提供了可供专用高性能套接字应用程序使用的备用asynchronous模式。 该课程专为需要高性能的networking服务器应用程序而devise。 应用程序可以专门使用增强的asynchronous模式,也可以只使用目标热点区域(例如,在接收大量数据时)。
这些增强的主要function是避免在大容量asynchronous套接字I / O期间重复分配和同步对象。 当前由System.Net.Sockets .. ::。Socket类实现的Begin / Enddevise模式需要为每个asynchronous套接字操作分配一个System .. ::。IAsyncResult对象。
在封面下,* Async API使用IO完成端口,这是执行networking操作的最快方式,请参阅http://msdn.microsoft.com/en-us/magazine/cc302334.aspx
只是为了帮助您,我包含了使用* Async API编写的telnet服务器的源代码。 我只是包括有关部分。 另外需要注意的是,我并不是直接处理数据,而是select将其推送到在单独的线程上处理的免锁(等待空闲)队列中。 请注意,我不包括相应的Pool类,它只是一个简单的池,如果它是空的,它将创build一个新的对象,而Buffer类只是一个自我扩展的缓冲区,除非您正在接受一个不确定的数据量。 如果您想了解更多信息,请随时给我发一封邮件。
public class Telnet { private readonly Pool<SocketAsyncEventArgs> m_EventArgsPool; private Socket m_ListenSocket; /// <summary> /// This event fires when a connection has been established. /// </summary> public event EventHandler<SocketAsyncEventArgs> Connected; /// <summary> /// This event fires when a connection has been shutdown. /// </summary> public event EventHandler<SocketAsyncEventArgs> Disconnected; /// <summary> /// This event fires when data is received on the socket. /// </summary> public event EventHandler<SocketAsyncEventArgs> DataReceived; /// <summary> /// This event fires when data is finished sending on the socket. /// </summary> public event EventHandler<SocketAsyncEventArgs> DataSent; /// <summary> /// This event fires when a line has been received. /// </summary> public event EventHandler<LineReceivedEventArgs> LineReceived; /// <summary> /// Specifies the port to listen on. /// </summary> [DefaultValue(23)] public int ListenPort { get; set; } /// <summary> /// Constructor for Telnet class. /// </summary> public Telnet() { m_EventArgsPool = new Pool<SocketAsyncEventArgs>(); ListenPort = 23; } /// <summary> /// Starts the telnet server listening and accepting data. /// </summary> public void Start() { IPEndPoint endpoint = new IPEndPoint(0, ListenPort); m_ListenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); m_ListenSocket.Bind(endpoint); m_ListenSocket.Listen(100); // // Post Accept // StartAccept(null); } /// <summary> /// Not Yet Implemented. Should shutdown all connections gracefully. /// </summary> public void Stop() { //throw (new NotImplementedException()); } // // ACCEPT // /// <summary> /// Posts a requests for Accepting a connection. If it is being called from the completion of /// an AcceptAsync call, then the AcceptSocket is cleared since it will create a new one for /// the new user. /// </summary> /// <param name="e">null if posted from startup, otherwise a <b>SocketAsyncEventArgs</b> for reuse.</param> private void StartAccept(SocketAsyncEventArgs e) { if (e == null) { e = m_EventArgsPool.Pop(); e.Completed += Accept_Completed; } else { e.AcceptSocket = null; } if (m_ListenSocket.AcceptAsync(e) == false) { Accept_Completed(this, e); } } /// <summary> /// Completion callback routine for the AcceptAsync post. This will verify that the Accept occured /// and then setup a Receive chain to begin receiving data. /// </summary> /// <param name="sender">object which posted the AcceptAsync</param> /// <param name="e">Information about the Accept call.</param> private void Accept_Completed(object sender, SocketAsyncEventArgs e) { // // Socket Options // e.AcceptSocket.NoDelay = true; // // Create and setup a new connection object for this user // Connection connection = new Connection(this, e.AcceptSocket); // // Tell the client that we will be echo'ing data sent // DisableEcho(connection); // // Post the first receive // SocketAsyncEventArgs args = m_EventArgsPool.Pop(); args.UserToken = connection; // // Connect Event // if (Connected != null) { Connected(this, args); } args.Completed += Receive_Completed; PostReceive(args); // // Post another accept // StartAccept(e); } // // RECEIVE // /// <summary> /// Post an asynchronous receive on the socket. /// </summary> /// <param name="e">Used to store information about the Receive call.</param> private void PostReceive(SocketAsyncEventArgs e) { Connection connection = e.UserToken as Connection; if (connection != null) { connection.ReceiveBuffer.EnsureCapacity(64); e.SetBuffer(connection.ReceiveBuffer.DataBuffer, connection.ReceiveBuffer.Count, connection.ReceiveBuffer.Remaining); if (connection.Socket.ReceiveAsync(e) == false) { Receive_Completed(this, e); } } } /// <summary> /// Receive completion callback. Should verify the connection, and then notify any event listeners /// that data has been received. For now it is always expected that the data will be handled by the /// listeners and thus the buffer is cleared after every call. /// </summary> /// <param name="sender">object which posted the ReceiveAsync</param> /// <param name="e">Information about the Receive call.</param> private void Receive_Completed(object sender, SocketAsyncEventArgs e) { Connection connection = e.UserToken as Connection; if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || connection == null) { Disconnect(e); return; } connection.ReceiveBuffer.UpdateCount(e.BytesTransferred); OnDataReceived(e); HandleCommand(e); Echo(e); OnLineReceived(connection); PostReceive(e); } /// <summary> /// Handles Event of Data being Received. /// </summary> /// <param name="e">Information about the received data.</param> protected void OnDataReceived(SocketAsyncEventArgs e) { if (DataReceived != null) { DataReceived(this, e); } } /// <summary> /// Handles Event of a Line being Received. /// </summary> /// <param name="connection">User connection.</param> protected void OnLineReceived(Connection connection) { if (LineReceived != null) { int index = 0; int start = 0; while ((index = connection.ReceiveBuffer.IndexOf('\n', index)) != -1) { string s = connection.ReceiveBuffer.GetString(start, index - start - 1); s = s.Backspace(); LineReceivedEventArgs args = new LineReceivedEventArgs(connection, s); Delegate[] delegates = LineReceived.GetInvocationList(); foreach (Delegate d in delegates) { d.DynamicInvoke(new object[] { this, args }); if (args.Handled == true) { break; } } if (args.Handled == false) { connection.CommandBuffer.Enqueue(s); } start = index; index++; } if (start > 0) { connection.ReceiveBuffer.Reset(0, start + 1); } } } // // SEND // /// <summary> /// Overloaded. Sends a string over the telnet socket. /// </summary> /// <param name="connection">Connection to send data on.</param> /// <param name="s">Data to send.</param> /// <returns>true if the data was sent successfully.</returns> public bool Send(Connection connection, string s) { if (String.IsNullOrEmpty(s) == false) { return Send(connection, Encoding.Default.GetBytes(s)); } return false; } /// <summary> /// Overloaded. Sends an array of data to the client. /// </summary> /// <param name="connection">Connection to send data on.</param> /// <param name="data">Data to send.</param> /// <returns>true if the data was sent successfully.</returns> public bool Send(Connection connection, byte[] data) { return Send(connection, data, 0, data.Length); } public bool Send(Connection connection, char c) { return Send(connection, new byte[] { (byte)c }, 0, 1); } /// <summary> /// Sends an array of data to the client. /// </summary> /// <param name="connection">Connection to send data on.</param> /// <param name="data">Data to send.</param> /// <param name="offset">Starting offset of date in the buffer.</param> /// <param name="length">Amount of data in bytes to send.</param> /// <returns></returns> public bool Send(Connection connection, byte[] data, int offset, int length) { bool status = true; if (connection.Socket == null || connection.Socket.Connected == false) { return false; } SocketAsyncEventArgs args = m_EventArgsPool.Pop(); args.UserToken = connection; args.Completed += Send_Completed; args.SetBuffer(data, offset, length); try { if (connection.Socket.SendAsync(args) == false) { Send_Completed(this, args); } } catch (ObjectDisposedException) { // // return the SocketAsyncEventArgs back to the pool and return as the // socket has been shutdown and disposed of // m_EventArgsPool.Push(args); status = false; } return status; } /// <summary> /// Sends a command telling the client that the server WILL echo data. /// </summary> /// <param name="connection">Connection to disable echo on.</param> public void DisableEcho(Connection connection) { byte[] b = new byte[] { 255, 251, 1 }; Send(connection, b); } /// <summary> /// Completion callback for SendAsync. /// </summary> /// <param name="sender">object which initiated the SendAsync</param> /// <param name="e">Information about the SendAsync call.</param> private void Send_Completed(object sender, SocketAsyncEventArgs e) { e.Completed -= Send_Completed; m_EventArgsPool.Push(e); } /// <summary> /// Handles a Telnet command. /// </summary> /// <param name="e">Information about the data received.</param> private void HandleCommand(SocketAsyncEventArgs e) { Connection c = e.UserToken as Connection; if (c == null || e.BytesTransferred < 3) { return; } for (int i = 0; i < e.BytesTransferred; i += 3) { if (e.BytesTransferred - i < 3) { break; } if (e.Buffer[i] == (int)TelnetCommand.IAC) { TelnetCommand command = (TelnetCommand)e.Buffer[i + 1]; TelnetOption option = (TelnetOption)e.Buffer[i + 2]; switch (command) { case TelnetCommand.DO: if (option == TelnetOption.Echo) { // ECHO } break; case TelnetCommand.WILL: if (option == TelnetOption.Echo) { // ECHO } break; } c.ReceiveBuffer.Remove(i, 3); } } } /// <summary> /// Echoes data back to the client. /// </summary> /// <param name="e">Information about the received data to be echoed.</param> private void Echo(SocketAsyncEventArgs e) { Connection connection = e.UserToken as Connection; if (connection == null) { return; } // // backspacing would cause the cursor to proceed beyond the beginning of the input line // so prevent this // string bs = connection.ReceiveBuffer.ToString(); if (bs.CountAfterBackspace() < 0) { return; } // // find the starting offset (first non-backspace character) // int i = 0; for (i = 0; i < connection.ReceiveBuffer.Count; i++) { if (connection.ReceiveBuffer[i] != '\b') { break; } } string s = Encoding.Default.GetString(e.Buffer, Math.Max(e.Offset, i), e.BytesTransferred); if (connection.Secure) { s = s.ReplaceNot("\r\n\b".ToCharArray(), '*'); } s = s.Replace("\b", "\b \b"); Send(connection, s); } // // DISCONNECT // /// <summary> /// Disconnects a socket. /// </summary> /// <remarks> /// It is expected that this disconnect is always posted by a failed receive call. Calling the public /// version of this method will cause the next posted receive to fail and this will cleanup properly. /// It is not advised to call this method directly. /// </remarks> /// <param name="e">Information about the socket to be disconnected.</param> private void Disconnect(SocketAsyncEventArgs e) { Connection connection = e.UserToken as Connection; if (connection == null) { throw (new ArgumentNullException("e.UserToken")); } try { connection.Socket.Shutdown(SocketShutdown.Both); } catch { } connection.Socket.Close(); if (Disconnected != null) { Disconnected(this, e); } e.Completed -= Receive_Completed; m_EventArgsPool.Push(e); } /// <summary> /// Marks a specific connection for graceful shutdown. The next receive or send to be posted /// will fail and close the connection. /// </summary> /// <param name="connection"></param> public void Disconnect(Connection connection) { try { connection.Socket.Shutdown(SocketShutdown.Both); } catch (Exception) { } } /// <summary> /// Telnet command codes. /// </summary> internal enum TelnetCommand { SE = 240, NOP = 241, DM = 242, BRK = 243, IP = 244, AO = 245, AYT = 246, EC = 247, EL = 248, GA = 249, SB = 250, WILL = 251, WONT = 252, DO = 253, DONT = 254, IAC = 255 } /// <summary> /// Telnet command options. /// </summary> internal enum TelnetOption { Echo = 1, SuppressGoAhead = 3, Status = 5, TimingMark = 6, TerminalType = 24, WindowSize = 31, TerminalSpeed = 32, RemoteFlowControl = 33, LineMode = 34, EnvironmentVariables = 36 } }
曾经有关于Coversant的Chris Mullins编写的使用.NET的可扩展TCP / IP的一个很好的讨论,不幸的是,他的博客似乎已经从原来的位置消失了,所以我会尝试从内存中拼凑他的build议(一些有用的评论他出现在这个线程中: C ++与C#:开发一个高度可伸缩的IOCP服务器 )
首先,请注意,在Socket
类中使用Begin/End
和Async
方法都使用IO完成端口(IOCP)来提供可伸缩性。 这比你实际select实现解决scheme的两种方法中的哪一种方法有更大的区别(如果使用正确,请参阅下面的内容)。
Chris Mullins的post是基于使用Begin/End
,这是我个人有经验的。 请注意,Chris基于此解决scheme,在具有2GB内存的32位计算机上扩展了10,000个并发客户端连接,并在具有足够内存的64位平台上实现了100,000个并发客户端连接。 从我自己的这种技术的经验来看,我没有理由怀疑这些指示性的数字。
IOCP与线程每连接或“select”基元
你想使用一个使用IOCP的机制的原因是,它使用一个非常低级的Windows线程池,它不会唤醒任何线程,直到你试图读取的IO通道上有实际的数据注意IOCP也可以用于文件IO)。 这样做的好处是,Windows不必切换到线程,只是发现没有数据,所以这减less了您的服务器必须做的最低限度的上下文切换的数量。
上下文切换是一定会杀死“每个连接的线程”机制的,尽pipe如果只处理几十个连接,这是一个可行的解决scheme。 然而,这种机制没有想象中的“可扩展性”。
使用IOCP时的重要注意事项
记忆
首先,理解IOCP很容易导致.NET下的内存问题,这一点非常重要。 每个IOCP BeginReceive
调用都会导致您正在读取的缓冲区“钉住”。 为了解释为什么这是一个问题,请参阅: Yun Jin的博客:OutOfMemoryException和Pinning 。
幸运的是,这个问题是可以避免的,但是这需要一些权衡。 build议的解决scheme是在应用程序启动(或其附近)分配一个大的byte[]
缓冲区,至less90KB或以上(从.NET 2开始,在更高版本中所需的大小可能会更大)。 这样做的原因是大内存分配自动结束在一个非压缩内存段(大对象堆)中,该段非常有效地被自动固定。 通过在启动时分配一个大缓冲区,可以确保这块不可移动的内存处于一个相对“低地址”,它不会妨碍并导致碎片化。
然后,您可以使用偏移量将这个大缓冲区分割为单独的区域,以便每个连接需要读取一些数据。 这是一个折衷起作用的地方。 因为这个缓冲区需要预先分配,所以你必须决定每个连接需要多less缓冲区空间,以及你想要扩展的连接数目要设置多less上限(或者你可以实现一个抽象一旦你需要,可以分配额外的固定缓冲区)。
最简单的解决scheme是在这个缓冲区内的唯一偏移处为每个连接分配一个字节。 然后,您可以进行BeginReceive
调用,以便读取单个字节,并根据所获得的callback执行剩余的读取操作。
处理
当您从Begin
调用中获得callback时,认识到callback中的代码将在低级IOCP线程上执行是非常重要的。 在这个callback中避免冗长的操作是绝对必要的 。 使用这些线程进行复杂的处理将会像使用“每个连接线程”一样有效地消除可伸缩性。
build议的解决scheme是只使用callback排队工作项来处理传入的数据,这将在其他线程上执行。 避免callback内的任何潜在的阻塞操作,以便IOCP线程尽快返回到其池。 在.NET 4.0中,我build议最简单的解决scheme是产生一个Task
,给它一个客户端套接字的引用,以及已经被BeginReceive
调用读取的第一个字节的副本。 然后,这个任务负责从套接字中读取表示正在处理的请求的所有数据,执行它,然后BeginReceive
调用一个新的BeginReceive
调用来为IOCP排队。 在.NET 4.0之前,您可以使用ThreadPool,或创build自己的线程工作队列实现。
概要
基本上,我build议使用凯文的示例代码为这个解决scheme,以下添加警告:
- 确保您传递给
BeginReceive
的缓冲区已经被“固定” - 确保您传递给
BeginReceive
的callbackBeginReceive
是排队处理传入数据的实际处理而已
当你这样做的时候,我毫不怀疑你可以复制克里斯的结果,扩大到潜在的成千上万的同时客户(给定正确的硬件和有效的执行你自己的处理程序代码)
You already got the most part of the answer via the code samples above. Using asynchronous IO operation is absolutely the way to go here. Async IO is the way the Win32 is designed internally to scale. The best possible performance you can get is achieved using Completion Ports, binding your sockets to completion ports and have a thread pool waiting for completion port completion. The common wisdom is to have 2-4 threads per CPU(core) waiting for completion. I highly recommend to go over these three articles by Rick Vicik from the Windows Performance team:
- Designing Applications for Performance – Part 1
- Designing Applications for Performance – Part 2
- Designing Applications for Performance – Part 3
The said articles cover mostly the native Windows API, but they are a must read for anyone trying to get a grasp at scalability and performance. They do have some briefs on the managed side of things too.
Second thing you'll need to do is make sure you go over the Improving .NET Application Performance and Scalability book, that is available online. You will find pertinent and valid advice around the use of threads, asynchronous calls and locks in Chapter 5. But the real gems are in Chapter 17 where you'll find such goodies as practical guidance on tuning your thread pool. My apps had some serious problems until I adjusted the maxIothreads/maxWorkerThreads as per the recommendations in this chapter.
You say that you want to do a pure TCP server, so my next point is spurious. However , if you find yourself cornered and use the WebRequest class and its derivatives, be warned that there is a dragon guarding that door: the ServicePointManager . This is a configuration class that has one purpose in life: to ruin your performance. Make sure you free your server from the artificial imposed ServicePoint.ConnectionLimit or your application will never scale (I let you discover urself what is the default value…). You may also reconsider the default policy of sending an Expect100Continue header in the http requests.
Now about the core socket managed API things are fairly easy on the Send side, but they are significantly more complex on the Receive side. In order to achieve high throughput and scale you must ensure that the socket is not flow controlled because you do not have a buffer posted for receive. Ideally for high performance you should post ahead 3-4 buffers and post new buffers as soon as you get one back ( before you process the one got back) so you ensure that the socket always has somewhere to deposit the data coming from the network. You'll see why you probably won't be able to achieve this shortly.
After you're done playing with the BeginRead/BeginWrite API and start the serious work you'll realize that you need security on your traffic, ie. NTLM/Kerberos authentication and traffic encryption, or at least traffic tampering protection. The way you do this is you use the built in System.Net.Security.NegotiateStream (or SslStream if you need to go cross disparate domains). This means that instead of relying on straight socket asynchronous operations you will rely on the AuthenticatedStream asynchronous operations. As soon as you obtain a socket (either from connect on client or from accept on server) you create a stream on the socket and submit it for authentication, by calling either BeginAuthenticateAsClient or BeginAuthenticateAsServer. After the authentication completes (at least your safe from the native InitiateSecurityContext/AcceptSecurityContext madness…) you will do your authorization by checking the RemoteIdentity property of your Authenticated stream and doing whatever ACL verification your product must support. After that you will send messages using the BeginWrite and you'll be receiving them with BeginRead. This is the problem I was talking before that you won't be able to post multiple receive buffers, because the AuthenticateStream classes don't support this. The BeginRead operation manages internally all the IO until you have received an entire frame, otherwise it could not handle the the message authentication (decrypt frame and validate signature on frame). Though in my experience the job done by the AuthenticatedStream classes is fairly good and shouldn't have any problem with it. IE浏览器。 you should be able to saturate GB network with only 4-5% CPU. The AuthenticatedStream classes will also impose on you the protocol specific frame size limitations (16k for SSL, 12k for Kerberos).
This should get you started on the right track. I'm not going to post code here, there is a perfectly good example on MSDN . I've done many projects like this and I was able to scale to about 1000 users connected without problems. Above that you'll need to modify registry keys to allow the kernel for more socket handles. and make sure you deploy on a server OS, that is W2K3 not XP or Vista (ie. client OS), it makes a big difference.
BTW make sure if you have databases operations on the server or file IO you also use the async flavor for them, or you'll drain the thread pool in no time. For SQL Server connections make sure you add the 'Asyncronous Processing=true' to the connection string.
I've got such a server running in some of my solutions. Here is a very detail explanation of the different ways to do it in .net: Get Closer to the Wire with High-Performance Sockets in .NET
Lately I've been looking for ways to improve our code and will be looking into this: " Socket Performance Enhancements in Version 3.5 " that was included specifically "for use by applications that use asynchronous network I/O to achieve the highest performance".
"The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation."
You can keep reading if you follow the link. I personally will be testing their sample code tomorrow to benchmark it against what i've got.
Edit: Here you can find working code for both client and server using the new 3.5 SocketAsyncEventArgs so you can test it within a couple minutes and go thru the code. It is a simple approach but is the basis for starting a much larger implementation. Also this article from almost two years ago in MSDN Magazine was a interesting read.
Have you considered just using a WCF net TCP binding and a publish/subscribe pattern ? WCF would allow you to focus [mostly] on your domain instead of plumbing..
There are lots of WCF samples & even a publish/subscribe framework available on IDesign's download section which may be useful : http://www.idesign.net
I am wondering about one thing:
I definitely do not want to start a thread for each connection.
这是为什么? Windows could handle hundreds of threads in an application since at least Windows 2000. I've done it, it's really easy to work with if the threads don't need to be synchronized. Especially given that you're doing a lot of I/O (so you're not CPU-bound, and a lot of threads would be blocked on either disk or network communication), I don't understand this restriction.
Have you tested the multi-threaded way and found it lacking in something? Do you intend to also have a database connection for each thread (that would kill the database server, so it's a bad idea, but it's easily solved with a 3-tier design). Are you worried that you'll have thousands of clients instead of hundreds, and then you'll really have problems? (Though I'd try a thousand threads or even ten thousand if I had 32+ GB of RAM – again, given that you're not CPU bound, thread switch time should be absolutely irrelevant.)
Here is the code – to see how this looks running, go to http://mdpopescu.blogspot.com/2009/05/multi-threaded-server.html and click on the picture.
Server class:
public class Server { private static readonly TcpListener listener = new TcpListener(IPAddress.Any, 9999); public Server() { listener.Start(); Console.WriteLine("Started."); while (true) { Console.WriteLine("Waiting for connection..."); var client = listener.AcceptTcpClient(); Console.WriteLine("Connected!"); // each connection has its own thread new Thread(ServeData).Start(client); } } private static void ServeData(object clientSocket) { Console.WriteLine("Started thread " + Thread.CurrentThread.ManagedThreadId); var rnd = new Random(); try { var client = (TcpClient) clientSocket; var stream = client.GetStream(); while (true) { if (rnd.NextDouble() < 0.1) { var msg = Encoding.ASCII.GetBytes("Status update from thread " + Thread.CurrentThread.ManagedThreadId); stream.Write(msg, 0, msg.Length); Console.WriteLine("Status update from thread " + Thread.CurrentThread.ManagedThreadId); } // wait until the next update - I made the wait time so small 'cause I was bored :) Thread.Sleep(new TimeSpan(0, 0, rnd.Next(1, 5))); } } catch (SocketException e) { Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e); } } }
Server main program:
namespace ManyThreadsServer { internal class Program { private static void Main(string[] args) { new Server(); } } }
Client class:
public class Client { public Client() { var client = new TcpClient(); client.Connect(IPAddress.Loopback, 9999); var msg = new byte[1024]; var stream = client.GetStream(); try { while (true) { int i; while ((i = stream.Read(msg, 0, msg.Length)) != 0) { var data = Encoding.ASCII.GetString(msg, 0, i); Console.WriteLine("Received: {0}", data); } } } catch (SocketException e) { Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e); } } }
Client main program:
using System; using System.Threading; namespace ManyThreadsClient { internal class Program { private static void Main(string[] args) { // first argument is the number of threads for (var i = 0; i < Int32.Parse(args[0]); i++) new Thread(RunClient).Start(); } private static void RunClient() { new Client(); } } }
Using .NET's integrated Async IO ( BeginRead
, etc) is a good idea if you can get all the details right. When you properly set up your socket/file handles it will use the OS's underlying IOCP implementation, allowing your operations to complete without using any threads (or, in the worst case, using a thread that I believe comes from the kernel's IO thread pool instead of .NET's thread pool, which helps alleviate threadpool congestion.)
The main gotcha is to make sure that you open your sockets/files in non-blocking mode. Most of the default convenience functions (like File.OpenRead
) don't do this, so you'll need to write your own.
One of the other main concerns is error handling – properly handling errors when writing asynchronous I/O code is much, much harder than doing it in synchronous code. It's also very easy to end up with race conditions and deadlocks even though you may not be using threads directly, so you need to be aware of this.
If possible, you should try and use a convenience library to ease the process of doing scalable asynchronous IO.
Microsoft's Concurrency Coordination Runtime is one example of a .NET library designed to ease the difficulty of doing this kind of programming. It looks great, but as I haven't used it, I can't comment on how well it would scale.
For my personal projects that need to do asynchronous network or disk I/O, I use a set of .NET concurrency/IO tools that I've built over the past year, called Squared.Task . It's inspired by libraries like imvu.task and twisted , and I've included some working examples in the repository that do network I/O. I also have used it in a few applications I've written – the largest publicly released one being NDexer (which uses it for threadless disk I/O). The library was written based on my experience with imvu.task and has a set of fairly comprehensive unit tests, so I strongly encourage you to try it out. If you have any issues with it, I'd be glad to offer you some assistance.
In my opinion, based on my experience using asynchronous/threadless IO instead of threads is a worthwhile endeavor on the .NET platform, as long as you're ready to deal with the learning curve. It allows you to avoid the scalability hassles imposed by the cost of Thread objects, and in many cases, you can completely avoid the use of locks and mutexes by making careful use of concurrency primitives like Futures/Promises.
You can find a nice overview of techniques at the C10k problem page .
I used Kevin's solution but he says that solution lacks code for reassembly of messages. Developers can use this code for reassembly of messages:
private static void ReceiveCallback(IAsyncResult asyncResult ) { ClientInfo cInfo = (ClientInfo)asyncResult.AsyncState; cInfo.BytesReceived += cInfo.Soket.EndReceive(asyncResult); if (cInfo.RcvBuffer == null) { // First 2 byte is lenght if (cInfo.BytesReceived >= 2) { //this calculation depends on format which your client use for lenght info byte[] len = new byte[ 2 ] ; len[0] = cInfo.LengthBuffer[1]; len[1] = cInfo.LengthBuffer[0]; UInt16 length = BitConverter.ToUInt16( len , 0); // buffering and nulling is very important cInfo.RcvBuffer = new byte[length]; cInfo.BytesReceived = 0; } } else { if (cInfo.BytesReceived == cInfo.RcvBuffer.Length) { //Put your code here, use bytes comes from "cInfo.RcvBuffer" //Send Response but don't use async send , otherwise your code will not work ( RcvBuffer will be null prematurely and it will ruin your code) int sendLenghts = cInfo.Soket.Send( sendBack, sendBack.Length, SocketFlags.None); // buffering and nulling is very important //Important , set RcvBuffer to null because code will decide to get data or 2 bte lenght according to RcvBuffer's value(null or initialized) cInfo.RcvBuffer = null; cInfo.BytesReceived = 0; } } ContinueReading(cInfo); } private static void ContinueReading(ClientInfo cInfo) { try { if (cInfo.RcvBuffer != null) { cInfo.Soket.BeginReceive(cInfo.RcvBuffer, cInfo.BytesReceived, cInfo.RcvBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo); } else { cInfo.Soket.BeginReceive(cInfo.LengthBuffer, cInfo.BytesReceived, cInfo.LengthBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo); } } catch (SocketException se) { //Handle exception and Close socket here, use your own code return; } catch (Exception ex) { //Handle exception and Close socket here, use your own code return; } } class ClientInfo { private const int BUFSIZE = 1024 ; // Max size of buffer , depends on solution private const int BUFLENSIZE = 2; // lenght of lenght , depends on solution public int BytesReceived = 0 ; public byte[] RcvBuffer { get; set; } public byte[] LengthBuffer { get; set; } public Socket Soket { get; set; } public ClientInfo(Socket clntSock) { Soket = clntSock; RcvBuffer = null; LengthBuffer = new byte[ BUFLENSIZE ]; } } public static void AcceptCallback(IAsyncResult asyncResult) { Socket servSock = (Socket)asyncResult.AsyncState; Socket clntSock = null; try { clntSock = servSock.EndAccept(asyncResult); ClientInfo cInfo = new ClientInfo(clntSock); Receive( cInfo ); } catch (SocketException se) { clntSock.Close(); } } private static void Receive(ClientInfo cInfo ) { try { if (cInfo.RcvBuffer == null) { cInfo.Soket.BeginReceive(cInfo.LengthBuffer, 0, 2, SocketFlags.None, ReceiveCallback, cInfo); } else { cInfo.Soket.BeginReceive(cInfo.RcvBuffer, 0, cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo); } } catch (SocketException se) { return; } catch (Exception ex) { return; } }
I would use SEDA or a lightweight threading library (erlang or newer linux see NTPL scalability on the server side ). Async coding is very cumbersome if your communication isn't 🙂
Well, .NET sockets seem to provide select() – that's best for handling input. For output I'd have a pool of socket-writer threads listening on a work queue, accepting socket descriptor/object as part of the work item, so you don't need a thread per socket.
I would use the AcceptAsync/ConnectAsync/ReceiveAsync/SendAsync methods that were added in .Net 3.5. I have done a benchmark and they are approximately 35% faster (response time and bitrate) with 100 users constantly sending and receiving data.
to people copy pasting the accepted answer, you can rewrite the acceptCallback method, removing all calls of _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); and put it in a finally{} clause, this way:
private void acceptCallback(IAsyncResult result) { xConnection conn = new xConnection(); try { //Finish accepting the connection System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState; conn = new xConnection(); conn.socket = s.EndAccept(result); conn.buffer = new byte[_bufferSize]; lock (_sockets) { _sockets.Add(conn); } //Queue recieving of data from the connection conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn); } catch (SocketException e) { if (conn.socket != null) { conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } } catch (Exception e) { if (conn.socket != null) { conn.socket.Close(); lock (_sockets) { _sockets.Remove(conn); } } } finally { //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket); } }
you could even remove the first catch since its content is the same but it's a template method and you should use typed exception to better handle the exceptions and understand what caused the error, so just implement those catches with some useful code
You could try using a framework called ACE (Adaptive Communications Environment) which is a generic C++ framework for network servers. It's a very solid, mature product and is designed to support high-reliability, high-volume applications up to telco-grade.
The framework deals with quite a wide range of concurrency models and probably has one suitable for your applciation out of the box. This should make the system easier to debug as most of the nasty concurrency issues have already been sorted out. The trade-off here is that the framework is written in C++ and is not the most warm and fluffy of code bases. On the other hand, you get tested, industrial grade network infrastructure and a highly scalable architecture out of the box.
I would recommend to read these books on ACE
- C++ Network Programming: Mastering Complexity Using ACE and Patterns
- C++ Network Programming: Systematic Reuse with ACE and Frameworks
to get ideas about patterns allowing you to create an efficient server.
Although ACE is implemented in C++ the books cover a lot of useful patterns that can be used in any programming language.
To be clear, i'm looking for .net based solutions (C# if possible, but any .net language will work)
You are not going to get the highest level of scalability if you go purely with .NET. GC pauses can hamper the latency.
I'm going to need to start at least one thread for the service. I am considering using the Asynch API (BeginRecieve, etc..) since I don't know how many clients I will have connected at any given time (possibly hundreds). I definitely do not want to start a thread for each connection.
Overlapped IO is generally considered to be Windows' fastest API for network communication. I don't know if this the same as your Asynch API. Do not use select as each call needs to check every socket that is open instead of having callbacks on active sockets.
You can use Push Framework open source framework for high-performance server development. It is built on IOCP and is suitable for push scenarios and message broadcast.