《Unity3D網絡遊戲實踐》(第2版)要點摘錄 – 「異步和多路復用」

書名:《Unity3D網絡遊戲實踐》(第2版)

作者:羅培羽

所讀版本:機械工業出版社

異步客戶端

  • 連接

    • IAsyncResult BeginConnect(string host, int port, AsyncCallback
                               requestCallback, object state)
      void EndConnect(IAsyncResult asyncResult)
    • AsyncCallback requestCallback:回調函數,函數返回值必須為空,接受一個IAsyncResult類型的參數

      • object state:該對象會回傳給回調函數

      • 在BeginConnect的回調函數中調用EndConnect可以完成連接

    • public void Connect()
      {
         socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
         socket.BeginConnect("127.0.0.1", 8888, ConnectCallback, socket);
      }

      public void ConnectCallback(IAsyncResult result)
      {
         try
        {
             Socket socket = (Socket)result.AsyncState;
             socket.EndConnect(result);
        }
         catch (SocketException ex)
        {
             Debug.Log("Socket Connect Failed: " + ex.ToString());
        }
      }
  • 接收

    • IAsyncResult BeginReceive(byte[] buffer, int offset, int size,          SocketFlags socketFlags, AsyncCallback callback, object state)
      int EndReceive(IAsyncResult asyncResult)
      • byte[] readBuff:接收緩沖區

      • int offset, int size:從第offset位開始接收數據,最多接收size個字節的數據

    • public void ConnectCallback(IAsyncResult result)
      {
         try
        {
             //...
             //確認連接後開始接收數據
             socket.BeginReceive(readBuff, 0, 1024, 0, ReceiveCallback, socket);
        }
         catch(SocketException ex)
        {
             // ...
        }
      }

      public void ReceiveCallback(IAsyncResult result)
      {
         try
        {
             Socket socket = (Socket)result.AsyncState;
             int count = socket.EndReceive(result);
             recvStr = System.Text.Encoding.UTF8.GetString(readBuff, 0, count);
             Debug.Log("Receive Success");
             //結束接受一次後,再開始準備接收下一串的數據
             socket.BeginReceive(readBuff, 0, 1024, 0, ReceiveCallback, socket);
        }
         catch (SocketException ex)
        {
             Debug.Log("Socket Receive Failed: " + ex.ToString());
        }
      }
  • 發送

    • Send方法是一個阻塞方法

      • 由於TCP是可靠連接,當接收方沒有收到數據時,發送方會重新發送數據,直到確認接收方接收到數據為止

      • 在OS內部,每個Socket都有一個發送緩沖區,用來保存那些接收方未確認的數據

      • 調用Send時,程序將要發送的字節寫入緩沖區,再由OS完成數據的發送和確認

      • 緩沖區長度為8KB,滿了之後Send方法就會阻塞

      • Send只是把數據寫入發送緩沖區,由OS負責重傳,確認

      • Send方法返回只代表成功將數據寫入緩沖區

    • 異步Send:當數據成功寫入緩沖區時會調用回調函數

      • IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state)
        int EndSend(IAsyncResult asyncResult)
      • public void Send()
        {
           //Send Req 異步
           string sendStr = reqField.text;
           byte[] sendBytes = System.Text.Encoding.UTF8.GetBytes(sendStr);
           socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
        }

        public void SendCallback(IAsyncResult result)
        {
           try
          {
               Socket socket = (Socket)result.AsyncState;
               int count = socket.EndSend(result);
               Debug.Log("Socket Send Success: " + count);
          }
           catch(SocketException ex)
          {
               Debug.Log("Socket Send Failed: " + ex.ToString());
          }
        }

異步服務端

  • 接收連接

    • IAsyncResult BeginAccept(AsyncCallback callback, object state)
      Socket EndAccept(IAsyncResult asyncResult)
    • static void Main(string[] args)
      {
         //Create Socket
         listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

         //Bind
         IPAddress ipAdr = IPAddress.Parse("192.168.1.5");
         IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888); //綁定IP和端口
         listenSocket.Bind(ipEp);

         //Listen
         listenSocket.Listen(0);
         Console.WriteLine("Server Activated");

         //Async Accept
         listenSocket.BeginAccept(AcceptCallback, listenSocket);
         Console.ReadLine();
      }

      public static void AcceptCallback(IAsyncResult result)
      {
         try
        {
             Console.WriteLine("Server Accepted");
             Socket listenfd = (Socket)result.AsyncState;

             //為連接分配一個ClientState
             Socket clientfd = listenfd.EndAccept(result); //返回連接到服務端的客戶端Socket
             ClientState state = new ClientState();
             state.socket = clientfd;
             clients.Add(clientfd, state);

             //異步接收客戶端數據
             clientfd.BeginReceive(state.readBuff, 0, 1024, 0, ReceiveCallback, state);

             //重新開始接收連接
             listenfd.BeginAccept(AcceptCallback, listenfd);
        }
         catch(SocketException ex)
        {
             Console.WriteLine("Socket Accept Failed: " + ex.ToString());
        }
      }
  • 接收數據

    • 使用與客戶端相同的BeginReceive/EndReceive

    • public static void AcceptCallback(IAsyncResult result)
      {
         try
        {
             //...
             //異步接收客戶端數據
             clientfd.BeginReceive(state.readBuff, 0, 1024, 0, ReceiveCallback, state);
             //...
        }
         //...
      }

      public static void ReceiveCallback(IAsyncResult result)
      {
         try
        {
             ClientState state = (ClientState)result.AsyncState;
             Socket clientfd = state.socket;
             int count = clientfd.EndReceive(result); //數據接收完畢,返回值<=0代表Socket連接斷開

             if(count == 0) //客戶端關閉
            {
                 clientfd.Close();
                 clients.Remove(clientfd);
                 Console.WriteLine("Socket Closed");
                 return;
            }

             //處理數據
             string recvStr = System.Text.Encoding.UTF8.GetString(state.readBuff, 0, count);
             byte[] sendBytes = System.Text.Encoding.UTF8.GetBytes(recvStr);
             clientfd.Send(sendBytes);

             //重新開始接收數據
             clientfd.BeginReceive(state.readBuff, 0, 1024, 0, ReceiveCallback, state);
        }
         catch(SocketException ex)
        {
             Console.WriteLine("Socket Receive Failed: " + ex.ToString());
        }
      }

狀態檢測(Poll)

  • Poll

    • public bool Pool(int microSeconds, SelectMode mode)
      • microSeconds:等待回應的時間(微秒),如果參數為-1,表示無限期等待響應(阻塞);如果參數為0,表示不阻塞

      • mode:

        • SelectMode.SelectRead:如果Socket可讀(緩沖區有數據可以讀取),返回true

        • SelectMode.SelectWrite:如果Socket可寫(緩沖區有數據可以發送),返回true

        • SelectMode.SelectError:連接失敗返回true

    • 原理:

      • 通過SelectMode檢查Socket的狀態是否可讀/可寫/連接失敗

      • 在指定時段內阻止執行

      • 目的:解決同步程序的阻塞問題,在阻塞方法前做判斷,有數據讀時才Receive,有數據寫時才Send

  • 客戶端Poll

    • private void Update()
      {
         if (socket == null) { return; }

         //有可讀數據
         if(socket.Poll(0, SelectMode.SelectRead))
        {
             byte[] readBuff = new byte[1024];
             int count = socket.Receive(readBuff);
             string recvStr = System.Text.Encoding.UTF8.GetString(readBuff, 0, count);
             responseText.text = recvStr;
        }
      }
  • 服務端Poll

    • static void Main(string[] args)
      {
         //Create Socket ...
         //Bind ...
         //Listen ...

         while (true)
        {
             //檢查有沒有客戶端連接
             if(listenSocket.Poll(0, SelectMode.SelectRead))
            {
                 ReadListenfd(listenSocket);
            }

             foreach (ClientState state in clients.Values)
            {
                 Socket clientfd = state.socket;
                 //檢查客戶端Socket有沒有可讀的信息
                 if(clientfd.Poll(0, SelectMode.SelectRead))
                {
                     if (!ReadClientfd(clientfd))
                    {
                         break;
                    }
                }
            }
             System.Threading.Thread.Sleep(1);
        }
      }

      public static void ReadListenfd(Socket listenSocket)
      {
         Console.WriteLine("Accept");
         Socket clientfd = listenSocket.Accept();
         ClientState cs = new ClientState();
         cs.socket = clientfd;
         clients.Add(clientfd, cs);
      }

      public static bool ReadClientfd(Socket clientSocket)
      {
         ClientState cs = clients[clientSocket];

         //接收數據
         int count = 0;
         try
        {
             count = clientSocket.Receive(cs.readBuff);
        }
         catch(SocketException ex)
        {
             clientSocket.Close();
             clients.Remove(clientSocket);
             Console.WriteLine("Receive Socket Exception: " + ex.ToString());
             return false;
        }

         //客戶端關閉
         if(count == 0)
        {
             clientSocket.Close();
             clients.Remove(clientSocket);
             Console.WriteLine("Socket Closed");
             return false;
        }

         //廣播信息至所有客戶端
         string recvStr = System.Text.Encoding.UTF8.GetString(cs.readBuff, 0, count);
         Console.WriteLine("Receive: " + recvStr);
         string sendStr = clientSocket.RemoteEndPoint.ToString() + ":" + recvStr;
         byte[] sendBytes = System.Text.Encoding.UTF8.GetBytes(sendStr);
         foreach (ClientState state in clients.Values)
        {
             state.socket.Send(sendBytes);
        }

         return true;
      }
  • 弊端

    • 如果沒有收到數據,Poll客戶端/服務端的循環也會一直在做檢測,造成浪費

多路複用(Select)

  • 多路複用:同時處理多路信號(如多個Socket的狀態)

    • 設置要監聽的Socket列表,如果有Socket可讀/可寫/報錯,就返回這些Socket列表,如果沒有就阻塞程序,防止CPU資源的消耗

  • public static void Select(IList checkRead, IList checkWrite, IList, checkError, int microSeconds)
    • 調用Select後,Select會根據是checkRead(檢查可讀性)還是checkWrite(檢查可寫性)或者checkError(檢查錯誤條件),對每個列表進行檢查和修改

    • 當有某個/多個Socket滿足條件時,會修改這些列表

  • Select服務端

    • //Select模式
      List<Socket> checkRead = new List<Socket>();
      while (true)
      {
         checkRead.Clear();
         checkRead.Add(listenSocket);
         foreach (ClientState clientState in clients.Values)
        {
             checkRead.Add(clientState.socket);
        }
         Socket.Select(checkRead, null, null, 1000);
         foreach (Socket socket in checkRead)
        {
             if(socket == listenSocket)
            {
                 ReadListenfd(socket);
            }
             else
            {
                 ReadClientfd(socket);
            }
        }
      }
  • Select客戶端

    • private void Update()
      {
         if (socket == null) { return; }
         //Select
         checkRead.Clear();
         checkRead.Add(socket);
         Socket.Select(checkRead, null, null, 0);
         foreach (Socket s in checkRead)
        {
             byte[] readBuff = new byte[1024];
             int count = socket.Receive(readBuff);
             string recvStr =
                 System.Text.Encoding.UTF8.GetString(readBuff, 0, count);
             responseText.text = recvStr;
        }
      }