在使用TCP协议进行消息发送时,对消息分帧

成帧技术(frame)是解决如何在接收端定位消息的首尾位置的问题。在进行数据收发时,必须指定消息接收者如何确定何时消息已经接收完整。

在TCP协议中,消息是按照字节来传输的,而且TCP协议中是没有消息边界的概念的。因为当client和server双方建立TCP连接后,双方可以自由发送字节数据。

为了能够在消息传输中确定消息的边界,需要引入额外的信息来标示消息边界。常用的办法有两种:

基于定界符与基于显式消息长度

基于定界符

我们在消息的末尾添加一个唯一标记作为消息结束符,这个唯一的标记一般是一个字节或者一组字节序列,并且在消息中不能出现这个标记。

基于定界符的方法一般用于以文本方式编码的消息中,定义一个特殊的字符作为分隔符来表示消息结束。但是这个分隔符也有可能作为普通字符可能会出现在消息中,导致消息解析出现错误。为了让消息中不出现分隔符,需要引入填充(stuff)技术,在发送端对消息进行扫描,如果碰到分隔符,就将这个分隔符用一个替换符和其他符号(比如将原始字符二进制中的第三位取反得到一个新的字节作为)替换,同样的,如果扫描中遇到替换符,将替换符也用一个替换付和其他符号替换。在消息的接收端,同样也对接收到的消息进行扫描,当碰到替换符时,说明该字符不是消息中的,要将后面一个字符进行还原得到相应的原始字符,这个才是消息中真正的字符。当遇到分隔符时,说明该消息已经结束

显式消息长度

在消息前面添加一个固定大小的字段(一个字节或者两个字节长度),用于表示消息包含的字节个数(也就是消息的长度)。在消息发送时,计算消息的长度(字节数),作为消息的前缀。如果使用一个字节保存长度,则消息长度最大为\(2^8=256\)个字节,如果是两个字节保存长度,则消息长度最大为\(2^{16}=65536\)个字节

消息成帧与解析的实现

在java中,当client和server之间建立tcp连接后,就可以通过输入输出流(I/O stream)来进行消息传输。发送消息时,将待发送的消息写入OutputStream流中,然后发送到接收端InputStream流;接收端则从InputStream流中读取出消息。如何实现将消息按帧发送与接收,就需要要利用我们上面提到的方法。

我们先定义一个Framer接口,来声明两个方法,消息成帧frameMsg()和消息抽取nextMsg()

package chapter_3.frame; import java.io.IOException; import java.io.OutputStream; /** * @author fulv * Framer接口声明了两个方法,用于消息成帧和解析将待发送消息封装成帧并输出到指定流 */ public interface Framer { /** * 将输入的消息msg封装成帧,然后输出到out流 * * @param msg 输入的消息 * @param out 消息输出流 */ void frameMsg(byte[] msg, OutputStream out); /** * 从指定流中读取下一个消息帧 * * @return byte[] */ byte[] nextMsg() throws IOException; }

然后分别使用基于分隔符和基于显式消息长度两种方法来实现Framer接口

基于分隔符:

在这里,我们使用字符'\n'作为消息分隔符,它对应的字节为0x0A;使用的替换符为0x7D。替换的策略是:当扫描到待发送的消息byte数组中有0x0A时,将其替换为(0x7D,0x2A),如果遇到0x7D,将其替换为(0x7D,0x5D)。这里面第二个字符通过将待替换字符从左向右数第三位取反获得。

在 接收端,从输入流中读取字节流数据,遇到0x7D时,说明后面一个字节对应的是特殊字节,需要转换得到原始字节。如果遇到0x0A说明到达消息帧末尾,完成了一个消息帧的读取。

package chapter_3.frame; import java.io.*; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; /** * 采用界定符的方式来实现消息的封装成帧以及消息帧的解析 * * @author fulv */ public class DelimitFramer implements Framer { /** * 数据输入源,从中解析出消息帧 */ private InputStream in; /** * 消息帧的定界符 */ private static final byte DELIMITER = '\n'; /** * 替换字符,用于将出现在消息内部的'\n'进行替换,避免出现解析错误 */ private static final byte REPLACE_CHAR = (byte) 0x7d; private static final byte MASK = (byte) 0x20; public DelimitFramer(InputStream in) { this.in = in; } @Override public void frameMsg(byte[] msg, OutputStream out) { //向判断传入的消息中是否包含界定符与替换符,如果存在,执行相关字节填充操作 //将对应的界定符和替换符换成两个字符,其中第一个为替换符,第二个为将要替换的字符的从左到右的第二位取反形成的字符 int count = 0; for (byte b : msg) { if (DELIMITER == b || REPLACE_CHAR == b) { count++; } } byte[] extendMsg = new byte[msg.length + count]; for (int i = 0, j = 0; i < msg.length; i++) { if (DELIMITER == msg[i] || REPLACE_CHAR == msg[i]) { extendMsg[j++] = REPLACE_CHAR; extendMsg[j++] = byteStuff(msg[i]); } else { extendMsg[j++] = msg[i]; } } try { out.write(extendMsg); out.write(DELIMITER); out.flush(); } catch (IOException e) { e.printStackTrace(); System.out.println("消息写入流失败"); } } /** * 从消息输入流in中,取出下一个消息帧(以分隔符划分一个消息帧) * * @return */ @Override public byte[] nextMsg() throws IOException { ByteArrayOutputStream msgBuffer = new ByteArrayOutputStream(); int nextByte; while ((nextByte = in.read()) != DELIMITER) { //已经读完了输入流,这里分两种情况 if (-1 == nextByte) { //输入流中的字节已经全部读完 if (msgBuffer.size() == 0) { return null; } else { //读取了部分字节,但却没有遇到分隔符,说明输入的消息帧是不完整或者错误的,返回异常 throw new EOFException("读取到了不正确的消息帧"); } } //当前字符为替换字符,需要读取下一个字符并转换(将第三位取反)得到正确的字符 if (REPLACE_CHAR == nextByte) { nextByte = in.read() & 0xFF; nextByte = byteStuff((byte) nextByte); } msgBuffer.write(nextByte); } return msgBuffer.toByteArray(); } /** * 字节填充函数,将传入字节的从左到右数的第二位取反 * * @param originByte * @return */ private static byte byteStuff(byte originByte) { return (byte) ((originByte | MASK) & ~(originByte & MASK)); } }

基于显式消息长度方法:

使用两个字节无符号整型来表示待发送消息的长度,最长为65536。将消息长度按照字节大端序写入待发送的消息前,表示消息长度。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwyyfg.html