SpringBoot整合Netty并使用Protobuf进行数据传输(附工程) (4)

客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:

@Service("nettyClientHandler") @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; /** 循环次数 */ private int fcount = 1; /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("建立连接时:" + new Date()); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("关闭连接时:" + new Date()); final EventLoop eventLoop = ctx.channel().eventLoop(); nettyClient.doConnect(new Bootstrap(), eventLoop); super.channelInactive(ctx); } /** * 心跳请求处理 每4秒发送一次心跳请求; * */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount); if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令 UserMsg.Builder userState = UserMsg.newBuilder().setState(2); ctx.channel().writeAndFlush(userState); fcount++; } } } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 如果不是protobuf类型的数据 if (!(msg instanceof UserMsg)) { System.out.println("未知数据!" + msg); return; } try { // 得到protobuf的数据 UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg; // 进行相应的业务处理。。。 // 这里就从简了,只是打印而已 System.out.println( "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge()); // 这里返回一个已经接受到数据的状态 UserMsg.Builder userState = UserMsg.newBuilder().setState(1); ctx.writeAndFlush(userState); System.out.println("成功发送给服务端!"); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } } }

那么到这里客户端的代码也编写完毕了。

功能测试

首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。

服务端输出结果:

服务端启动成功,端口是:9876 连接的客户端地址:/127.0.0.1:53319 第1次,服务端接受的消息:state: 1 客户端业务处理成功! 第2次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第3次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第4次,服务端接受的消息:state: 2 接受到客户端发送的心跳!

客户端输入结果:

Netty客户端启动成功! 建立连接时:Mon Jul 16 23:31:58 CST 2018 客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18 成功发送给服务端! 循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1 循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2 循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3 循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4

通过打印信息可以看出如上述所说。

接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。

客户端输入结果:

Netty客户端启动成功! 与服务端断开连接!在10s之后准备尝试重连! 客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete) 建立连接时:Mon Jul 16 23:41:33 CST 2018 客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18 成功发送给服务端! 循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1 循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2 循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3

服务端输出结果:

服务端启动成功,端口是:9876 连接的客户端地址:/127.0.0.1:53492 第1次,服务端接受的消息:state: 1 客户端业务处理成功! 第2次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第3次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第4次,服务端接受的消息:state: 2

结果也如上述所说!

其它

关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgjps.html