客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:
@Service("nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
/** 循环次数 */
private int fcount = 1;
/**
* 建立连接时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("建立连接时:" + new Date());
ctx.fireChannelActive();
}
/**
* 关闭连接时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭连接时:" + new Date());
final EventLoop eventLoop = ctx.channel().eventLoop();
nettyClient.doConnect(new Bootstrap(), eventLoop);
super.channelInactive(ctx);
}
/**
* 心跳请求处理 每4秒发送一次心跳请求;
*
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
ctx.channel().writeAndFlush(userState);
fcount++;
}
}
}
/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果不是protobuf类型的数据
if (!(msg instanceof UserMsg)) {
System.out.println("未知数据!" + msg);
return;
}
try {
// 得到protobuf的数据
UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
// 进行相应的业务处理。。。
// 这里就从简了,只是打印而已
System.out.println(
"客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());
// 这里返回一个已经接受到数据的状态
UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
ctx.writeAndFlush(userState);
System.out.println("成功发送给服务端!");
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
那么到这里客户端的代码也编写完毕了。
功能测试
首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。
服务端输出结果:
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
客户端输入结果:
Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4
通过打印信息可以看出如上述所说。
接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。
客户端输入结果:
Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3
服务端输出结果:
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
结果也如上述所说!
其它
关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf