接收端,首先从输入流中读出消息长度,然后堵塞的从输入流中读取数据,直到读取出的数据量达到消息长度,整个消息帧才读取结束。
package chapter_3.frame; import java.io.*; /** * 基于显式长度的方法来将实现消息成帧 * * @author fulv */ public class LengthFramer implements Framer { private static final int MESSAGEMAXLENGTH = 65536; private DataInputStream in; public LengthFramer(DataInputStream in) { this.in = in; } @Override public void frameMsg(byte[] msg, OutputStream out) throws IOException { if (msg.length > MESSAGEMAXLENGTH) { throw new IOException("传入的消息超出最大长度"); } int msgLength = msg.length; //将消息长度按照字节大端序写入输出流中 out.write((msgLength >> 8) & 0xFF); out.write(msgLength & 0xFF); //将消息写入输出流 out.write(msg); out.flush(); } @Override public byte[] nextMsg() throws IOException { int length; byte[] msg = null; try { //从输入流中读取两个字节,作为大端序的整型值解释,表示消息长度 length = in.readUnsignedShort(); } catch (EOFException e) { return null; } //存放从输入流中读取出的消息字节数组 msg = new byte[length]; //readFully多次调用read方法直到读取到指定长度的数组消息或者读取到-1返回 in.readFully(msg); return msg; } }测试
对两种消息分帧方式进行测试,开启两个线程分别表示client与server,测试消息的发送与接收。
package chapter_3.frame; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; public class TestFramer { private static final String[] messages = {"Hello World!", "Hello China, 你好 中国", "世界人民大团结万岁", "在消息中发送分隔符\n和替换符}的情况"}; public static void main(String[] args) throws InterruptedException { Thread clientThread = new Thread(() -> { Socket socket = null; try { socket = new Socket(InetAddress.getLocalHost(), 8888); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); //Framer framer = new DelimitFramer(in); DataInputStream dataInputStream = new DataInputStream(in); Framer framer = new LengthFramer(dataInputStream); for (String msg : messages) { byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); framer.frameMsg(msgBytes, out); System.out.println(Thread.currentThread().getName() + " 发送消息: " + msg); } socket.close(); } catch (Exception e) { e.printStackTrace(); } }); Thread serverThread = new Thread(() -> { Socket socket = null; try (ServerSocket serverSocket = new ServerSocket(8888)) { while (true) { socket = serverSocket.accept(); System.out.println("获取到来自" + socket.getRemoteSocketAddress() + "的tcp连接"); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); //Framer framer = new DelimitFramer(in); DataInputStream dataInputStream = new DataInputStream(in); Framer framer = new LengthFramer(dataInputStream); byte[] recvMsgBytes = null; do { recvMsgBytes = framer.nextMsg(); //System.out.println(Arrays.toString(recvMsgBytes)); if (recvMsgBytes != null) { System.out.println(Thread.currentThread().getName() + " 接收到的消息: " + new String(recvMsgBytes, StandardCharsets.UTF_8)); } } while (recvMsgBytes != null); } } catch (IOException e) { e.printStackTrace(); } }); serverThread.setName("server"); clientThread.setName("client"); serverThread.start(); Thread.sleep(3000); clientThread.start(); } }