《Unity3D網絡遊戲實踐》(第2版)要點摘錄 – 「數據流收發」

書名:《Unity3D網絡遊戲實踐》(第2版)

作者:羅培羽

所讀版本:機械工業出版社

TCP數據流

  • 在之前的章節中說過,在OS內部,每個Socket都有一個發送緩沖區,用來保存那些接收方未確認的數據,而Send方法只是把數據寫入到發送緩沖區裡,具體的發送過程由操作系統負責;當發送緩沖區滿了,Send方法就會阻塞

  • 與Send對應,Socket的Receive也只是把接收緩沖區中的數據提取出來,當系統的接收緩沖區為空,Receive方法就會阻塞

粘包半包現象

  • 當發送端連續發送多條數據,而接收端沒有及時Receive,就會導致「粘包」的出現。.

    • //連續發送兩條消息
      string msg1 = "Msg1";
      byte[] b1 = System.Text.Encoding.UTF8.GetBytes(msg1);

      string msg2 = "Msg2";
      byte[] b2 = System.Text.Encoding.UTF8.GetBytes(msg2);

      socket.Send(b1);
      socket.Send(b2);
    • 預期是,接收端會分別獨立讀取到這兩條數據的內容,並分別輸出Msg1和Msg2

      • 但實際上,連續調用兩次發送,會把數據快速寫入到發送緩沖區,然後快速發送到接收端。數據就會在接收端的接收緩沖區中累積

      • 而此時接收端在調用Receive從接收緩沖區中提取數據,導致最終的輸出是”Msg1Msg2″

  • 解決方案

    • 長度信息法(主要方案)

      • 在每個數據包前加上長度信息,每次接收到數據後,先讀取表示長度的字節,如果緩沖區的數據長度大於要取的字節數,則取出相應的字節,否則等待下一次數據接收

        • public void Send()
          {
             //長度信息法
             string sendStr = reqField.text;
             byte[] bodyByte = System.Text.Encoding.UTF8.GetBytes(sendStr);
             Int16 len = (Int16)bodyByte.Length;
             byte[] lenByte = BitConverter.GetBytes(len);
             byte[] sendByte = lenByte.Concat(bodyByte).ToArray();
             socket.Send(sendByte);
          }
        • sendStr = “HelloWorld”

        • byteBytes =

          • CharacterHelloWorld
        • len = 10(16字節長度)

        • lenByte =

          • Character0A
        • sendByte =

          • Character0AHelloWorld
      • 接收方需要定義一個緩沖區指示緩沖區有效數據長度的變量

        • byte[] readBuff = new byte[1024];
          int buffCount = 0;
        • 接收數據時不再從緩沖區開頭的位置寫入,而是把新數據放在有效數據之後

        • 每次寫入後,根據接收到的長度信息更新當前該變量

        • 該變量指示了Receive讀取數據時的起點

          • socket.BeginReceive(readBuff, buffCount, 1024 - buffCount, 0 ,ReceiveCallback, socket);
        • 收到數據後,buffCount需要更新,使下一次接收數據時,寫入到緩沖區有效數據的末尾

          • public void ReceiveCallback(IAsyncResult ar)
            {
               Socket socket = (Socket) ar.AsyncState;
               int count = socket.EndReceive(ar);
               buffCount += count;
            }
      • 數據處理

        • 緩沖區數據長度 >= 一條完整信息時進行數據時,將數據取出,然後

          • OptionA: 將緩沖區後面的數據向前移動(Array.Copy)

          • OptionB: 使用雙指針,標志緩沖區數據當前的讀取範圍

            • 數據處理完後,將起點索引向後移動[上一個數據長度]位,並根據長度更新終點索引

            • 緩沖區長度不夠時再做一次Array.Copy

            • 有效減少OptionA的複雜度

        • 如果緩沖區數據長度 <= 長度信息數據長度 或 < 一條完整信息都不處理,等待下一次接收

    • 固定長度法

      • 每次都以相同的長度發送數據

    • 結束符號法

      • 規定一個結束符號,作為消息間的分隔符

大端小端問題

  • 上面使用的信息長度計算方法為:BitConverter.ToInt16(buffer, offset)

    • 該方法會根據計算機是「大端編碼」還是「小端編碼」而使用不同的轉換算法

      • 大端

        • return (short)((*pbyte) | (*(pbyte + 1) << 8));
      • 小端

        • return (short)((*pbyte << 8) | (*(pbyte + 1)));
  • 計算機的大端模式和小端模式

  • 大小端兼容

    • public void Send()
      {
         string sendStr = reqField.text;
         byte[] bodyByte = System.Text.Encoding.UTF8.GetBytes(sendStr);        
         Int16 len = (Int16)bodyByte.Length;        
         byte[] lenByte = BitConverter.GetBytes(len);
         if (!BitConverter.IsLittleEndian)
        {
             //大端轉小端
             lenByte.Reverse();
        }
         byte[] sendByte = lenByte.Concat(bodyByte).ToArray();
         socket.Send(sendByte);
      }
    • 手動兼容

      • 16位

        • Int16 bodyLength = (short)((readBuff[1] << 8) | readBuff[0]);
      • 32位

        • Int32 bodyLength = (short)((readBuff[3] << 24) |
                                   readBuff[2] << 16 |
                                   readBuff[1] << 8 |
                                   readBuff[0]);
        • ‘|’ 符相當於「相加」

        • <<24/<<16/<<8 => 字節左移24位/16位/8位

          • readBuff[1]左移8位等同於 「readBuff[1] * 2^8 = readBuff[1] * 256」

數據發送不完整問題

  • 數據在網絡擁擠,且發送緩沖區飽和的情況下,有可能導致發送出去的數據解析失敗

  • 要確保發送數據的完整,需要在發送前將數據保存,如果發送不完整,在Send回調中繼續發送

    • int readIdx = 0; //緩沖區偏移值
      int length = 0; //緩沖區剩餘長度

      public void Send()
      {
         //sendBytes = 要發送的數據
         length = sendBytes.Length;
         readIdx = 0;
         socket.BeginSend(sendBytes, 0, length, 0, SendCallback, socket);
      }

      public void SendCallback(IASyncResult ar)
      {
         Socket socket = (Socket)ar.AsyncState;
         int count = socket.EndSend(ar); //返回已發送數據的長度
         readIdx += count; //將readIdx向後移動至已發送數據長度的末尾
         length -= count;
         if(length > 0) //如果全部數據正常發送,length - count應該 == 0
        {
             //如果>0,代表有數據需要重新發送
        socket.BeginSend(sendBytes, readIdx, length, 0, SendCallback, socket);    
        }
      }

ByteArray和Queue

  • 上述方案雖然可以確保數據在最終能正常被完整發送出去,但是如果在Callback之前再次調用Send,就會把length和readIdx修改,把這個保障機制干碎。

    • 因此我們需要一個封裝好readIdx、length以及具體數據bytes的數據結構,然後維護一個隊列(Queue<ByteArray>);每次調用Send時把這個數據入隊,Callback時只修改它自身的length和readIdx,處理完成後出隊。

  • ByteArray

    • using System;

      public class ByteArray
      {
         //默認大小
         const int DEFAULT_SIZE = 1024;
         //初始大小
         int initSize = 0;

         //緩沖區
         public byte[] bytes;

         //讀寫位置
         public int readIdx = 0;
         public int writeIdx = 0;

         //容量
         private int capicity = 0;
         //剩餘空間
         public int remain { get { return capicity - writeIdx; } }

         //數據長度
         public int length { get { return writeIdx - readIdx; } }

         public ByteArray(int size = DEFAULT_SIZE)
        {
             bytes = new byte[size];
             capicity = size;
             initSize = size;
             readIdx = 0;
             writeIdx = 0;
        }

         public ByteArray(byte[] defaultBytes)
        {
             bytes = defaultBytes;
             capicity = defaultBytes.Length;
             initSize = defaultBytes.Length;
             readIdx = 0;
             writeIdx = defaultBytes.Length;
        }

         //重設尺寸
         public void ReSize(int size)
        {
             if (size < length) { return; }
             if (size < initSize) { return; }
             int n = 1;
             while(n < size) //計算新數組的尺寸, 為避免頻繁重設,一般直接翻倍
            {
                 n *= 2;
            }
             capicity = n;

             //申請新的byte數組並把原數據複製過去
             byte[] newBytes = new byte[capicity];
             Array.Copy(bytes, readIdx, newBytes, 0, writeIdx - readIdx);        
             bytes = newBytes;

             //重設讀寫索引
             writeIdx = length;
             readIdx = 0;
        }

         //檢查並移動數據
         public void CheckAndMoveBytes()
        {
             if(length < 8)
            {
                 MoveBytes();
            }
        }

         //移動數據
         public void MoveBytes()
        {
             if(length > 0)
            {
                 Array.Copy(bytes, readIdx, bytes, 0, length);
            }
             writeIdx = length;
             readIdx = 0;
        }

         //寫入數據
         public int Write(byte[] bs, int offset, int count)
        {
             if(remain < count)
            {
                 //檢查緩沖區剩餘空間
                 ReSize(length + count);
            }
             Array.Copy(bs, offset, bytes, writeIdx, count); //從offset位置寫入
             writeIdx += count;
             return count; //返回新的有效數據長度
        }

         //讀取數據
         public int Read(byte[] bs, int offset, int count)
        {
             count = Math.Min(count, length);
             Array.Copy(bytes, readIdx, bs, offset, count); //把數據從bytes的readIdx讀取,保存在bs的offset到count的空間中
             readIdx += count; //根據已讀取的數據量更新readIdx
             CheckAndMoveBytes(); //如果剩餘的有效數據很小,可以將數據前移
             return count;
        }
      }
  • 具體應用

    • public void Send()
      {
         //byte[] sendByte = 發送的數據字符串
         ByteArray ba = new ByteArray(sendByte);
         int count = 0;
         //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
         lock (writeQueue)
        {
             writeQueue.Enqueue(ba);
             count = writeQueue.Count;
        }

         if(count == 1)
        {
             socket.BeginSend(sendByte, 0, sendByte.Length, 0, SendCallback, socket);
        }
      }

      public void SendCallback(IAsyncResult result)
      {
         Socket socket = (Socket)result.AsyncState;
         int count = socket.EndSend(result);
         ByteArray ba;

         //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
         lock (writeQueue)
        {
             ba = writeQueue.First();
        }

         ba.readIdx += count;
         if(ba.length == 0) //ba.length = ba.writeIdx - ba.readIdx
        {
             //Send Finished
             //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
             lock (writeQueue)
            {
                 writeQueue.Dequeue();
                 ba = writeQueue.First(); //Get Next Send ByteArray
            }                
        }

         if(ba != null)
        {
             //Send Remain ByteArray
             socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
        }
      }