C#中的高性能TCP服务器

我是一个经验丰富的C#开发人员,但到目前为止我还没有开发TCP服务器应用程序。 现在我必须开发一个高度可扩展和高性能的服务器,可以处理至less5-10个并发连接:从GPS设备通过GPRS获取字节数据。

一个共同的沟通过程应该是这样的:

  • GPS设备启动到我的服务器的连接
  • 我的服务器应答,如果我想获得数据
  • 设备发送GPS数据
  • 我的服务器向设备发送报告(比如校验和)
  • 从GPS获取新的数据,报告和这一再发生
  • 稍后GPS DEVICEclosures连接

所以,在我的服务器,我需要

  • 跟踪连接/活动客户端
  • 从服务器端closures任何客户端
  • 捕捉事件,当一个设备closures连接
  • 获取字节数据
  • 发送数据给客户

我开始通过互联网阅读这个话题,但对我来说似乎是一场噩梦。 有很多方法,但我找不出哪一个是最好的。

asynchronous套接字方法似乎对我来说是最好的,但编写这种asynchronous样式的代码是可怕的,不容易debugging。

所以我的问题是:你认为在C#中实现高性能TCP服务器的最好方法是哪一个? 你知道有这么好的开源组件吗? (我尝试了几个,但我找不到一个好的。)

它必须是asynchronous的,这是没有办法的。 高性能和可扩展性不能与单线程每插槽混用。 你可以看看自己在做什么StackExchange ,看看asynchronousRedis等待BookSleeve利用下一个C#版本的CTP特性(所以在边缘,可能会有变化,但它很酷)。 为了获得更大的优势,解决scheme围绕着使用SocketAsyncEventArgs类进行演变,通过消除与“经典”C#asynchronous处理相关的asynchronous处理程序的频繁分配,

SocketAsyncEventArgs类是System.Net.Sockets.Socket类的一组增强的一部分,它提供了可供专门的高性能套接字应用程序使用的备用asynchronous模式。 该课程专为需要高性能的networking服务器应用程序而devise。 应用程序可以专门使用增强的asynchronous模式,也可以只使用目标热点区域(例如,在接收大量数据时)。

长话短说:学习asynchronous或死亡尝试…

顺便说一句,如果你问为什么asynchronous,然后阅读这篇文章链接的三篇文章: 高性能Windows程序 。 最终的答案是:底层的操作系统devise需要它。

正如Remus上面所说的,你必须使用asynchronous来保持高性能。 这是.NET中的Begin … / End …方法。

在套接字引擎下,这些方法使用IO完成端口,这似乎是在Windows操作系统上处理多个套接字的最高性能方式。

正如Jim所说,TcpClient类可以在这里帮助,而且非常易于使用。 下面是使用TcpListener监听传入连接和TcpClient处理它们的示例,初始BeginAccept和BeginRead调用是asynchronous的。

这个例子假设一个基于消息的协议被使用在套接字上,除了每个传输的前4个字节是长度以外,它是可以省略的,但是这样你可以使用一个同步的Read来获得剩下的数据那已经被缓冲了。

这里是代码:

class ClientContext { public TcpClient Client; public Stream Stream; public byte[] Buffer = new byte[4]; public MemoryStream Message = new MemoryStream(); } class Program { static void OnMessageReceived(ClientContext context) { // process the message here } static void OnClientRead(IAsyncResult ar) { ClientContext context = ar.AsyncState as ClientContext; if (context == null) return; try { int read = context.Stream.EndRead(ar); context.Message.Write(context.Buffer, 0, read); int length = BitConverter.ToInt32(context.Buffer, 0); byte[] buffer = new byte[1024]; while (length > 0) { read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length)); context.Message.Write(buffer, 0, read); length -= read; } OnMessageReceived(context); } catch (System.Exception) { context.Client.Close(); context.Stream.Dispose(); context.Message.Dispose(); context = null; } finally { if (context != null) context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context); } } static void OnClientAccepted(IAsyncResult ar) { TcpListener listener = ar.AsyncState as TcpListener; if (listener == null) return; try { ClientContext context = new ClientContext(); context.Client = listener.EndAcceptTcpClient(ar); context.Stream = context.Client.GetStream(); context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context); } finally { listener.BeginAcceptTcpClient(OnClientAccepted, listener); } } static void Main(string[] args) { TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000)); listener.Start(); listener.BeginAcceptTcpClient(OnClientAccepted, listener); Console.Write("Press enter to exit..."); Console.ReadLine(); listener.Stop(); } } 

它演示了如何处理asynchronous调用,但是需要添加error handling来确保TcpListener始终接受新的连接,并在客户端意外断开连接时处理更多的错误。 而且,似乎也有一些情况,并不是所有的数据都是一次性的到达,这也需要处理。

你可以用TcpClient类来做到这一点,虽然说实话我不知道你是否可以有一万个开放的套接字。 这是相当多的。 但我经常使用TcpClient来处理几十个并发套接字。 而asynchronous模型实际上是非常好的使用。

你最大的问题是不会让TcpClient工作。 随着10万并发连接,我认为带宽和可扩展性将成为问题。 我甚至不知道一台机器是否可以处理所有的stream量。 我想这取决于数据包的大小以及它们进入的频率。但是,在您承诺在一台计算机上实现这一切之前,您最好做一些后台估计。

我想你也在寻找UDP技术。 对于10k客户端,速度很快,但问题是您必须对收到该消息的每封邮件实施确认。 在UDP中,您不需要为每个客户端打开一个套接字,但需要在x秒后执行心跳/ ping机制来检查哪个客户端连接与否。

你可以使用我的TCP CSharpServer,这是非常简单的实现,只是在你的一个类上实现IClientRequest。

 using System; using System.Collections.Generic; using System.Linq; namespace cSharpServer { public interface IClientRequest { /// <summary> /// this needs to be set, otherwise the server will not beable to handle the request. /// </summary> byte IdType { get; set; } // This is used for Execution. /// <summary> /// handle the process by the client. /// </summary> /// <param name="data"></param> /// <param name="client"></param> /// <returns></returns> byte[] Process(BinaryBuffer data, Client client); } } 

BinaryBuffer允许你读取发送到服务器的数据真的很容易。

 using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; namespace cSharpServer { public class BinaryBuffer { private const string Str0001 = "You are at the End of File!"; private const string Str0002 = "You are Not Reading from the Buffer!"; private const string Str0003 = "You are Currenlty Writing to the Buffer!"; private const string Str0004 = "You are Currenlty Reading from the Buffer!"; private const string Str0005 = "You are Not Writing to the Buffer!"; private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!"; private bool _inRead; private bool _inWrite; private List<byte> _newBytes; private int _pointer; public byte[] ByteBuffer; [MethodImpl(MethodImplOptions.AggressiveInlining)] public override string ToString() { return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public BinaryBuffer(string data) : this(Helper.DefaultEncoding.GetBytes(data)) { } [MethodImpl(MethodImplOptions.AggressiveInlining)] public BinaryBuffer() { } [MethodImpl(MethodImplOptions.AggressiveInlining)] public BinaryBuffer(byte[] data) : this(ref data) { } [MethodImpl(MethodImplOptions.AggressiveInlining)] public BinaryBuffer(ref byte[] data) { ByteBuffer = data; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void IncrementPointer(int add) { if (add < 0) { throw new Exception(Str0006); } _pointer += add; if (EofBuffer()) { throw new Exception(Str0001); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetPointer() { return _pointer; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string GetString(ref byte[] buffer) { return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string GetString(byte[] buffer) { return GetString(ref buffer); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { if (_inRead) { throw new Exception(Str0004); } _inWrite = true; _newBytes = new List<byte>(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(float value) { if (!_inWrite) { throw new Exception(Str0005); } _newBytes.AddRange(BitConverter.GetBytes(value)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(byte value) { if (!_inWrite) { throw new Exception(Str0005); } _newBytes.Add(value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(int value) { if (!_inWrite) { throw new Exception(Str0005); } _newBytes.AddRange(BitConverter.GetBytes(value)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(long value) { if (!_inWrite) { throw new Exception(Str0005); } byte[] byteArray = new byte[8]; unsafe { fixed (byte* bytePointer = byteArray) { *((long*)bytePointer) = value; } } _newBytes.AddRange(byteArray); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public int UncommitedLength() { return _newBytes == null ? 0 : _newBytes.Count; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void WriteField(string value) { Write(value.Length); Write(value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(string value) { if (!_inWrite) { throw new Exception(Str0005); } byte[] byteArray = Helper.DefaultEncoding.GetBytes(value); _newBytes.AddRange(byteArray); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(decimal value) { if (!_inWrite) { throw new Exception(Str0005); } int[] intArray = decimal.GetBits(value); Write(intArray[0]); Write(intArray[1]); Write(intArray[2]); Write(intArray[3]); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetInt(int value, int pos) { byte[] byteInt = BitConverter.GetBytes(value); for (int i = 0; i < byteInt.Length; i++) { _newBytes[pos + i] = byteInt[i]; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetLong(long value, int pos) { byte[] byteInt = BitConverter.GetBytes(value); for (int i = 0; i < byteInt.Length; i++) { _newBytes[pos + i] = byteInt[i]; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(byte[] value) { Write(ref value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Write(ref byte[] value) { if (!_inWrite) { throw new Exception(Str0005); } _newBytes.AddRange(value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { if (ByteBuffer != null) { _newBytes.InsertRange(0, ByteBuffer); } ByteBuffer = _newBytes.ToArray(); _newBytes = null; _inWrite = false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { _inRead = false; _pointer = 0; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { if (_inWrite) { throw new Exception(Str0003); } _inRead = true; _pointer = 0; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte ReadByte() { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer()) { throw new Exception(Str0001); } return ByteBuffer[_pointer++]; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ReadInt() { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer(4)) { throw new Exception(Str0001); } int startPointer = _pointer; _pointer += 4; return BitConverter.ToInt32(ByteBuffer, startPointer); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public float[] ReadFloatArray() { float[] dataFloats = new float[ReadInt()]; for (int i = 0; i < dataFloats.Length; i++) { dataFloats[i] = ReadFloat(); } return dataFloats; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public float ReadFloat() { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer(sizeof(float))) { throw new Exception(Str0001); } int startPointer = _pointer; _pointer += sizeof(float); return BitConverter.ToSingle(ByteBuffer, startPointer); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public decimal ReadDecimal() { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer(16)) { throw new Exception(Str0001); } return new decimal(new[] { ReadInt(), ReadInt(), ReadInt(), ReadInt() }); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public long ReadLong() { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer(8)) { throw new Exception(Str0001); } int startPointer = _pointer; _pointer += 8; return BitConverter.ToInt64(ByteBuffer, startPointer); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public string ReadString(int size) { return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte[] ReadByteArray(int size) { if (!_inRead) { throw new Exception(Str0002); } if (EofBuffer(size)) { throw new Exception(Str0001); } byte[] newBuffer = new byte[size]; Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size); _pointer += size; return newBuffer; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool EofBuffer(int over = 1) { return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length); } } } 

完整的项目在GitHub CSharpServer上

Interesting Posts