SpringBoot整合Netty并使用Protobuf进行数据传输(附工程) (3)

代码如下:

@Service("nettyServerHandler") public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** 空闲次数 */ private int idle_count = 1; /** 发送次数 */ private int count = 1; /** * 建立连接时,发送一条消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0) .build(); ctx.writeAndFlush(userMsg); super.channelActive(ctx); } /** * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭; */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令 System.out.println("已经5秒没有接收到客户端的信息了"); if (idle_count > 1) { System.out.println("关闭这个不活跃的channel"); ctx.channel().close(); } idle_count++; } } else { super.userEventTriggered(ctx, obj); } } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg); try { // 如果是protobuf类型的数据 if (msg instanceof UserMsg) { UserInfo.UserMsg userState = (UserInfo.UserMsg) msg; if (userState.getState() == 1) { System.out.println("客户端业务处理成功!"); } else if(userState.getState() == 2){ System.out.println("接受到客户端发送的心跳!"); }else{ System.out.println("未知命令!"); } } else { System.out.println("未知数据!" + msg); return; } } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } count++; } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
代码如下:

@SpringBootApplication public class NettyServerApp { public static void main(String[] args) { // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件 ApplicationContext context = SpringApplication.run(NettyServerApp.class, args); NettyServer nettyServer = context.getBean(NettyServer.class); nettyServer.run(); } }

到这里服务端相应的代码就编写完毕了。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
主要实现的代码逻辑如下:

public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(nettyClientFilter); bootstrap.remoteAddress(host, port); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { System.out.println("与服务端断开连接!在10s之后准备尝试重连!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); } }); if(initFalg){ System.out.println("Netty客户端启动成功!"); initFalg=false; } // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { System.out.println("客户端连接失败!"+e.getMessage()); } }

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
改动的代码如下:

ChannelPipeline ph = ch.pipeline(); /* * 解码和编码,应和服务端一致 * */ //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式 ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgjps.html