UN2L5——分包、黏包

分包、黏包

分包、黏包指在网络通信中由于各种因素(网络环境、API规则等)造成的消息与消息之间出现的两种状态

  • 分包:一个消息分成了多个消息进行发送
  • 黏包:一个消息和另一个消息黏在了一起

image

注意:分包和黏包可能同时发生

如何解决分包、黏包的问题

​​

处理分包、黏包问题首先要了解什么是分包和黏包,解决该问题的逻辑实现的写法可能有很多种,我们采用最节约性能的方式解决问题就行

现在的处理:
我们收到的消息都是以字节数组的形式在程序中体现,目前我们的处理规则是默认传过来的消息就是正常情况
前4个字节是消息ID,后面的字节数组全部用来反序列化

如果出现分包、黏包会导致我们反序列化报错,因为数据可能不完整

思考:那么通过接收到的字节数组我们应该如何判断收到的字节数组处于以下状态

  1. 正常
  2. 分包
  3. 黏包

突破点:如何判断一个消息没有出现分包或者黏包呢?

答案:就是消息长度
我们可以如同处理 区分消息类型 的逻辑一样,为消息添加头部,头部记录消息的长度
当我们接收到消息时,通过消息长度来判断是否分包、黏包
对消息进行拆分处理、合并处理,我们每次只处理完整的消息

image

实践

  1. 为所有消息添加头部信息,用于存储其消息长度

    重新声明一个BaseMessage​,继承BaseData​,让用于消息传递的类继承该基类,
    规定继承BaseMessage​的类必须要有一个自己的唯一ID消息并可以被外部获取
    同时,继承BaseMessage​的类的Writeing​除了消息体以外,还需要序列化自己的ID和消息体的长度值,
    GetBytesNum​获取的消息长度还需要加上ID长度(int​)和消息体长度值的长度(int​),也就是多加一个8
    这样,所有的用于通信的数据就包括头数据和消息体,头数据固定8字节,存储ID和消息体长度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class BaseMessage : BaseData
    {
    public override int GetBytesNum()
    {
    throw new System.NotImplementedException();
    }

    public override int Reading(byte[] bytes, int BeginIndex = 0)
    {
    throw new System.NotImplementedException();
    }

    public override byte[] Writeing()
    {
    throw new System.NotImplementedException();
    }

    public virtual int GetID()
    {
    return 0;
    }
    }

    继承BaseMessage​的类示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    using System.Text;

    public class PlayerMessage : BaseMessage
    {
    public int playerID;
    public PlayerData playerData;

    public override byte[] Writeing()
    {
    int index = 0;
    int bytesNum = GetBytesNum();
    byte[] bytes = new byte[bytesNum];
    WriteInt(bytes, GetID(), ref index); //先写消息ID
    WriteInt(bytes, bytesNum - 8, ref index);
    WriteInt(bytes, playerID, ref index); //再写消息的成员变量
    WriteData(bytes, playerData, ref index);
    return bytes;
    }

    public override int Reading(byte[] bytes, int beginIndex = 0)
    {
    //反序列化不需要去解析ID 因为在这一步之前 就应该把ID反序列化出来
    //用来判断到底使用哪一个自定义类来反序化
    int index = beginIndex;
    playerID = ReadInt(bytes, ref index);
    playerData = ReadData<PlayerData>(bytes, ref index);
    return index - beginIndex;
    }

    public override int GetBytesNum()
    {
    return 4 + //消息ID的长度
    4 + //消息体长度数值的长度
    4 + //playerID的字节数组长度
    playerData.GetBytesNum(); //playerData
    }

    //自定义的消息ID 主要是用于区分是哪一个消息类
    public override int GetID()
    {
    return 1001;
    }
    }

    //玩家数据类
    public class PlayerData : BaseData
    {
    public string name;
    public int atk;
    public int lev;

    public override int GetBytesNum()
    {
    return 4 + 4 + 4 + Encoding.UTF8.GetBytes(name).Length;
    }

    public override int Reading(byte[] bytes, int beginIndex = 0)
    {
    int index = beginIndex;
    name = ReadString(bytes, ref index);
    atk = ReadInt(bytes, ref index);
    lev = ReadInt(bytes, ref index);
    return index - beginIndex;
    }

    public override byte[] Writeing()
    {
    int index = 0;
    byte[] bytes = new byte[GetBytesNum()];
    WriteString(bytes, name, ref index);
    WriteInt(bytes, atk, ref index);
    WriteInt(bytes, lev, ref index);
    return bytes;
    }
    }
  2. 根据分包、黏包的表现情况,修改接收消息处的逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    //服务器端 ClientSocket类内部
    //是否连接
    public bool Connented => socket.Connected;

    public void Receive()
    {
    //如果没有连接就返回
    if (!Connented)
    return;
    try
    {
    //当需要接收消息时
    if (socket.Available > 0)
    {
    byte[] result = new byte[1024 * 5];
    int receiveNum = socket.Receive(result);
    //处理接收到的消息,需要传入接收到的数据长度
    HandleReceiveMsg(result, receiveNum);
    }
    }
    catch (Exception e)
    {
    Console.WriteLine($"从客户端{clientID}接收消息失败:{e.Message}");
    //如果解析错误,也认为要把这个socket断开
    Program.socket.AddDelSocket(this);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //客户端 NetManager类内部

    private void ReceiveMessage(object obj)
    {
    while (isConnented)
    {
    if (socket.Available > 0)
    {
    byte[] receiveBytes = new byte[1024 * 1024];
    int receiveNum = socket.Receive(receiveBytes);
    //处理接收到的消息,需要传入接收到的数据长度,具体实现在下面写
    HandleReceiveMsg(receiveBytes, receiveNum);
    }
    }
    }

    处理分包、黏包的逻辑较为复杂,分析如下:

    • 首先,需要将接收到的字节数组内的数据全部转存到另一个缓存数组内,再通过读取该缓存数组内的数据处理数据包

    • 假设发来的数据是两个甚至是多个完整的包黏在一起了

      先反序列化缓存数组的前8个字节数据,确认ID和消息体长度,通过ID确定反序列化逻辑,通过消息体长度来确定反序列化多少字节数据,
      序列化一次后,发现缓存数组内还有数据未反序列化,就循环执行前面的步骤,最终所有数据被反序列化,清空缓存数组,并跳出循环

    • 假设发来的数据将一个完整的包分为两个甚至更多个

      若发现缓存数组内数据不足8字节,则无法反序列化出ID和消息体长度,直接跳出处理逻辑循环,
      缓存数组内数据保留,下一次收到消息时,会将数据放在缓存数组中原有数据的后面

      缓存数组内数据大于8字节后,反序列化缓存数组的前8个字节数据,确认ID和消息体长度,
      若发现剩余数据长度小于消息体长度,就跳出处理逻辑循环,
      缓存数组内数据保留,下一次收到消息时,会将数据放在缓存数组中原有数据的后面

      直到缓存数组内的数据大于或等于之前反序列化出来的消息体长度后,就将这部分数据反序列化出来
      如果缓存数组内未反序列化的数据没有了,就清空缓存数组,并跳出循环

    • 假设同时遇到分包黏包,即发来的数据是完整的包黏着被分开的包的一部分,或者被分开的包的后面部分黏在另一个被分开的包的前面部分

      先反序列化缓存数组前8个字节,若 不足8字节 或 不包括前8个字节的数据长度低于反序列化出来的消息体长度
      就等待下一次消息的接收,新接收到的数据放在缓存数组内原有数据的后面

      当 不包括前8个字节的数据长度 大于或等于 反序列化出来的消息体长度,就反序列化出消息体
      如果缓存数组内未反序列化的数据没有了,就清空缓存数组,并跳出循环
      如果缓存数组内还有数据,先将剩余的数据转移到前面,然后按照上面的步骤重新执行

    根据分析,用伪代码阐述代码思路:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    private byte[] 缓存数组 = new byte[1024 * 1024];
    private int 缓存数组内需要反序列化的数据长度 = 0;

    //每次接收到数据就调用
    void HandleReceiveMsg(接收到的数据, 接收到的数据的长度):
    var 消息的ID、消息的长度、当前反序列化了几个字节 = 0;
    将接收到的数据复制到缓存数组内;
    缓存数组内需要反序列化的数据长度 += 接收到的数据的长度;
    while (true) //当反序列化一次后,发现还能再序列化时就会继续循环
    消息的长度 = -1;
    if 缓存数组内需要反序列化的数据长度 - 当前反序列化了几个字节 >= 8: //说明剩余未反序列化数据大于8字节,可以序列化头数据
    消息的ID、消息的长度 = 从缓存数组反序列化8个字节;
    当前反序列化了几个字节 += 8;
    if 缓存数组内剩余数据大于消息的长度 && 消息的长度 != -1: //如果消息的长度等于-1说明连头数据都不能反序列化
    按照 消息的ID 和 消息的长度 从缓存数组内反序列化数据;
    当前反序列化了几个字节 += 消息的长度;
    if 当前反序列化了几个字节 == 缓存数组内需要反序列化的数据长度: //说明缓存数组内数据全部反序列化了
    缓存数组内需要反序列化的数据长度 = 0;
    break; //跳出循环,结束消息处理
    else: //说明无法继续反序列化
    if 消息的长度 == -1: //如果消息的长度不等于-1说明反序列化了头数据
    当前反序列化了几个字节 -= 8; //将反序列化的进度回退回去,头数据下一次继续反序列化
    根据 缓存数组内需要反序列化的数据长度 和 当前反序列化了几个字节 得到剩余未序列化数据长度
    将剩余未序列化数据移到缓存数组最前面
    缓存数组内需要反序列化的数据长度 -= 当前反序列化了几个字节
    break; //剩余数据等到下次接收到数据继续进行数据处理操作

    具体代码实现为:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    //服务器端 ClientSocket类内部或客户端 NetManager类内部,逻辑是一致的
    //用于处理分包时,缓存的字节数组和字节数组长度
    private byte[] cacheBytes = new byte[1024 * 1024];
    private int cacheNum = 0;

    private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
    {
    int msgID = 0;
    int msgLength = 0;
    int nowIndex = 0;

    //收到消息后,将收到的消息直接拼接到缓存字节数组的后面
    receiveBytes.CopyTo(cacheBytes, cacheNum);
    cacheNum += receiveNum;
    while (true)
    {
    //每次将长度设置为-1,是避免上一次解析的数据影响这一次的判断
    msgLength = -1;
    //处理解析一条消息,当该消息大于8时,才可能包含完整头信息
    if (cacheNum - nowIndex >= 8)
    {
    msgID = BitConverter.ToInt32(cacheBytes, nowIndex); //解析ID
    nowIndex += 4;
    msgLength = BitConverter.ToInt32(cacheBytes, nowIndex); //解析长度
    nowIndex += 4;
    }

    //解析消息体
    if (cacheNum - nowIndex >= msgLength && msgLength != -1)
    {
    //消息体是完整的情况
    BaseMessage? baseMsg = null;
    switch (msgID)
    {
    case 1001:
    baseMsg = new PlayerMessage();
    baseMsg.Reading(cacheBytes, nowIndex);
    break;
    case 1003:
    //由于该消息都没有消息体,因此我们无需反序列化它
    baseMsg = new QuitMessage();
    break;
    }
    if (baseMsg != null)
    ThreadPool.QueueUserWorkItem(MsgHandle, baseMsg);
    nowIndex += msgLength;
    //解析完消息后正好缓存字节数组内的数据全部解析完毕了,说明数据解析完了,将缓存字节数组索引改为0
    if (nowIndex == cacheNum)
    {
    cacheNum = 0;
    break;
    }
    }
    else
    {
    //消息体不完整的情况,证明有分包,因此需要把当前收到的内容记录下来
    //需要等待下次接收到消息后再做处理
    //如果进行了id和长度的解析,但是没有成功解析消息体,那么我们需要减去nowIndex移动的位置
    if (msgLength != 1)
    nowIndex -= 8;
    //即将剩余没有解析的字节数组内容移到缓存数组前面,用于缓存下次继续解析
    Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
    cacheNum -= nowIndex;
    break;
    }
    }
    }