Socket粘包问题终极解决方案—Netty版(2W字)! (7)

其中核心代码是 18 行和 20 行,通过 LengthFieldPrepender 实现编码(将消息打包成消息头 + 消息体),通过 LengthFieldBasedFrameDecoder 实现解码(从封装的消息中取出消息的内容)。

LengthFieldBasedFrameDecoder 的参数说明如下:

参数 1:maxFrameLength - 发送的数据包最大长度;

参数 2:lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;

参数 3:lengthFieldLength - 长度域自己的字节数长度;

参数 4:lengthAdjustment – 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长;

参数 5:initialBytesToStrip – 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有 4 个节点的长度域,则它的值为 4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:数据包最大长度为 1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)。

客户端的核心实现代码如下:

/** * 客户端通道初始化类 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客户端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解码:读取消息头和消息体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); } }

完整的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 通过封装 Netty 来解决粘包 */ public class NettyExample { // 定义服务器的端口号 static final int PORT = 8007; /** * 服务器端 */ static class MyNettyServer { public static void main(String[] args) { // 创建一个线程组,用来负责接收客户端连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建另一个线程组,用来负责 I/O 的读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 Server 实例(可理解为 Netty 的入门类) ServerBootstrap b = new ServerBootstrap(); // 将两个线程池设置到 Server 实例 b.group(bossGroup, workerGroup) // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器) .channel(NioServerSocketChannel.class) // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类) .childHandler(new NettyExample.ServerInitializer()); // 绑定端口并且进行同步 ChannelFuture future = b.bind(PORT).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 资源关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服务端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服务器端连接之后的执行器(自定义的类) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); /** * 初始化通道的具体执行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 消息解码:读取消息头和消息体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服务器端连接之后的执行器,接收到消息之后的业务处理 pipeline.addLast(SERVER_HANDLER); } } /** * 服务器端接收到消息之后的业务处理类 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 读取到客户端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客户端的消息:" + request); } } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 异常处理,打印异常并关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } /** * 客户端 */ static class MyNettyClient { public static void main(String[] args) { // 创建事件循环线程组(客户端的线程组只有一个) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客户端启动对象 Bootstrap b = new Bootstrap(); // 设置启动参数 b.group(group) // 设置通道类型 .channel(NioSocketChannel.class) // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类) .handler(new NettyExample.ClientInitializer()); // 连接服务器端并同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 发送消息 ChannelFuture lastWriteFuture = null; // 给服务器端发送 10 条消息 for (int i = 0; i < 10; i++) { // 发送给服务器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在关闭通道之前,同步刷新所有的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放资源 group.shutdownGracefully(); } } } /** * 客户端通道初始化类 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客户端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解码:读取消息头和消息体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客户端连接成功之后的业务处理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 读取到服务器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服务器的消息:" + msg); } /** * 异常处理,打印异常并关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzfpwx.html