C# Socket BeginReceive方法中参数byte[] buffer 的理解

TcpClient tcpClient;  

byte[] byteBuffer= new byte[1024*4];

tcpClient.Client.BeginReceive(byteBuffer, 0, byteBuffer.Length, SocketFlags.None, ReceiveCallBack, null);

    // 参数:
        //   buffer:
        //     类型的数组 System.Byte ,它是接收到的数据的存储位置。

如上代码片段所示:当发送端以10ms的间隔频率密集发送数据时,接收方利用Thread.Sleep(10000) 刻意等待10s 后观察测试结果:

发送方发了382次,此时接收方数据先写入socket系统缓冲区累加了,但我们程序读取延时,所以只用了6次就全部读取完了。

如果把上面伪代码的 byteBuffer 设置为发送数据长度(测试发送的数据长度为48:)的一半:即 byte[] byteBuffer=new byte[24];

再次测试发现接收次数为发送次数的两倍,这个好理解:接收容量小了一半,次数就要增加一倍,此时和是否有Sleep无关。Sleep等待只是使得接收数据变慢了。

由上可知:这个buffer参数 太小,则接收太慢,如果参数过大,则可以最大化利用网络I/O性能,但相应速度可能下降,通常要保持一个合理的值,如1024*2 或1024*4 等;

附加一个Socket异步接收模型:参考了这个Blog 的封装,但实际测试有些bug,做了些改进:

比如连接时,不小心触发了多次连接就可能报异常:

System.InvalidOperationException:“同一个套接字上正在进行另一个异步操作时,不能调用 BeginConnect

另外测试发现一个比较严重的bug,如果客户端Socket异步发送频繁:比如:

  for(int i=0;i<(int)this.numericUpDown1.Value;i++)
            {
                PipeReader l100 = new PipeReader { PipeCode ="#"+ (i + 1).ToString("0000"), ProductType= ProductType.S45, IsNormalCode=true };
                client.SendAsync(l100.ToBytes());
            }        

此Socket异步模型会导致数据漏收,比如发了50个,实际只收到了2,3个,问题发生的原因在线程同步锁ManualResetEvent的使用导致,去掉这个锁的逻辑,可以接受完整;或者在客户端发送消息时加上延时,则也可接收完整的回调;

//友情提示:下面的封装异步Tcp客户端,可以拿来做测试用,如果实际项目利用需要谨慎使用
public class AsynSocketClient
    {
        #region private
        /// 
        /// 用于控制异步接收消息
        /// 
        // private ManualResetEvent doReceive = new ManualResetEvent(false);
        private TcpClient tcpClient;

        //标识客户端是否接收数据
        private bool isStopWork = false;
        #endregion

        /// 
        /// 是否释放对象了
        /// 
        public bool IsClosed { get => isStopWork; set => isStopWork = value; }

        #region 事件
        /// 
        /// 客户端连接完成、发送完成、连接异常或者服务端关闭触发的事件
        /// 
        public event Action Completed;
        
        /// 
        /// 客户端接收消息触发的事件
        /// 
        public event EventHandler Received;

        #endregion
        
        public AsynSocketClient()
        {
            tcpClient = new TcpClient();    
          
        }

        #region 连接
        /// 
        /// 异步连接
        /// 
        /// 要连接的服务器的ip地址
        /// 要连接的服务器的端口
        public void ConnectAsync(string ip, int port)
        {
            IPAddress ipAddress = null;
            try
            {
                ipAddress = IPAddress.Parse(ip);
            }
            catch (Exception)
            {
                throw new Exception("ip地址格式不正确,请使用正确的ip地址!");
            }
            try
            {
                if (!tcpClient.Connected)
                {
                    tcpClient.BeginConnect(ipAddress, port, ConnectCallBack, tcpClient);
                }
                else if (isStopWork)
                {
                    isStopWork = false;
                    OnComplete(tcpClient, SocketAction.Connect);
                }
            }
            catch
            {
            }            
        }

        /// 
        /// 异步连接的回调函数
        /// 
        /// 
        private void ConnectCallBack(IAsyncResult ar)
        {
            try
            {
                TcpClient client = ar.AsyncState as TcpClient;
                client.EndConnect(ar);
                OnComplete(client, SocketAction.Connect);
            }
            catch
            { }           
        }
        #endregion

        #region 收发数据
        /// 
        /// 异步接收消息
        /// 
        private void ReceiveAsync()
        {
           // doReceive.Reset();
            StateObject obj = new StateObject();
            obj.TcpClient = tcpClient;

            tcpClient.Client.BeginReceive(obj.ListData, 0, obj.ListData.Length, SocketFlags.None, ReceiveCallBack, obj);
           // doReceive.WaitOne();
        }
        /// 
        /// 异步发送消息
        /// 
        /// 
        public void SendAsync(string msg)
        {
            byte[] listData = Encoding.UTF8.GetBytes(msg);
            SendAsync(listData);
        }

        public void SendAsync(byte[] bytes)
        {
            try
            {
                if (tcpClient.Client == null)
                {
                    throw new Exception("连接已经断开");
                }
                if (isStopWork)
                {
                    return;
                }
                tcpClient.Client.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, SendCallBack, tcpClient);
            }
            catch (SocketException ex)
            {
                if (ex.ErrorCode == (int)SocketError.ConnectionAborted)
                {
                    throw new Exception("连接已经断开");
                }
            }
          
        }


        private void ReceiveCallBack(IAsyncResult ar)
        {
            StateObject state = ar.AsyncState as StateObject;
            int count = -1;
            try
            {
                if (isStopWork)
                {
                    return;
                }
                count = state.TcpClient.Client.EndReceive(ar);
                //doReceive.Set();
            }
            catch (Exception ex)
            {
                //如果发生异常,说明客户端失去连接,触发关闭事件
                Stop();
                OnComplete(state.TcpClient, SocketAction.Close);
            }
            if (count > 0)
            {
                Received?.Invoke(this, new DataEventArgs() { Data = state.ListData.Take(count).ToArray() });
            }
        }

        private void SendCallBack(IAsyncResult ar)
        {
            TcpClient client = ar.AsyncState as TcpClient;
            try
            {
                client.Client.EndSend(ar);
                OnComplete(client, SocketAction.SendMsg);
            }
            catch (Exception)
            {
                //如果发生异常,说明客户端失去连接,触发关闭事件
                Stop();
                OnComplete(client, SocketAction.Close);
            }
        }
        #endregion

        public virtual void OnComplete(TcpClient client, SocketAction action)
        {
            if (Completed != null)
                Completed(client, action);
            if (action == SocketAction.Connect)
            {
                // 接收数据
                ThreadPool.QueueUserWorkItem(x =>
                {
                    while (!isStopWork)
                    {
                        try
                        {
                            ReceiveAsync();
                            Thread.Sleep(20);
                        }
                        catch (Exception)
                        {
                            Stop();
                            OnComplete(client, SocketAction.Close);
                        }
                    }
                });
            }
            else if (action == SocketAction.Close)
            {
                try
                {
                    Thread.Sleep(50);
                    this.Received = null;
                    tcpClient.Close();
                }
                catch
                {
                }
            }
        }

        void Stop()
        {
            isStopWork = true;
        }

        public void Close()
        {
            if (tcpClient != null)
            {
                try
                {
                    Stop();
                    IsClosed = true;
                    Completed = null;
                    Received = null;
                    tcpClient.GetStream().Close();
                 
                }
                catch(Exception ex)
                {

                }
            }
        }

    }

    /// 
    /// 接收socket的行为
    /// 
    public enum SocketAction
    {
        /// 
        /// socket发生连接
        /// 
        Connect = 1,
        /// 
        /// socket发送数据
        /// 
        SendMsg = 2,
        /// 
        /// socket关闭
        /// 
        Close = 4
    }
    
    public class StateObject
    {
        public TcpClient TcpClient { get; set; }
        private byte[] listData = new byte[1024*4];
        /// 
        /// 接收的数据
        /// 
        public byte[] ListData
        {
            get
            {
                return listData;
            }
            set
            {
                listData = value;
            }
        }
    }

    public class DataEventArgs : EventArgs
    {
        public byte[] Data
        {
            get;
            set;
        }

        public int Offset
        {
            get;
            set;
        }

        public int Length
        {
            get;
            set;
        }
    }

 

你可能感兴趣的