精通并发与 Netty (一)如何使用 (3)

集群之间各个节点的通信,主从节点之间需要进行数据同步,每当主节点的数据发生变化时,通过异步的方式将数据同步到从节点,同步方式可以用日志等等,因此主从节点之间不是实时一致性而是最终一致性。

节点与节点之间如何进行通信那?这种主从模式是需要互相之间有长连接的,这样来确定对方还活着,实现方式是互相之间定时发送心跳数据包。如果发送几次后对方还是没有响应的话,就可以认为对方已经挂掉了。

回到客户端与服务端的模式,有人可能会想,客户端断开连接后服务端的 handlerRemoved 等方法不是能感知吗?还要心跳干什么哪?

真实情况其实非常复杂,比如手机客户端和服务端进行一个长连接,客户端没有退出应用,客户端开了飞行模型,或者强制关机,此时双方是感知不到连接已经断掉了,或者说需要非常长的时间才能感知到,这是我们不想看到的,这时就需要心跳了。

来看一个示例:

其他的代码还是和上面的一样,我们就不列出来了,直接进入主题,看不同的地方:

服务端

// Netty 为了支持心跳的 IdleStateHandler,空闲状态监测处理器。 pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));

来看看 IdleStateHandler 的说明

/* * Triggers an IdleStateEvent when a Channel has not performed read, write, or both * operation for a while * 当一个 channel 一断时间没有进行 read,write 就触发一个 IdleStateEvent */ public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); //三个参数分别为多长时间没进行读,写或者读写操作则触发 event。 }

触发 event 后我们编写这个 event 对应的处理器。

public class MyHandler extends ChannelInboundHandlerAdapter{ //触发某个事件后这个方法就会被调用 //一个 channelhandlerContext 上下文对象,另一个是事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; String eventType = null; switch(event.state()){ case READER_IDLE: eventType = "读空闲"; case WRITER_IDLE: eventType = "写空闲"; case ALL_IDLE: eventType = "读写空闲"; } }else{ //继续将事件向下一个 handler 传递 ctx. } } } WebSocket 实现与原理分析

WebSocket 是一种规范,是 HTML5 规范的一部分,主要是解决 Http 协议本身存在的问题。可以实现浏览器和服务端的长连接,连接头信息只在建立连接时发送一次。是在 Http 协议之上构建的,比如请求连接其实是一个 Http 请求,只不过里面加了一些 WebSocket 信息。也可以用在非浏览器场合,比如 app 上。

Http 是一种无状态的基于请求和响应的协议,意思是一定是客户端想服务端发送一个请求,服务端给客户端一个响应。Http 1.0 在服务端给客户端响应后连接就断了。Http 1.1 增加可 keep-alive,服务端可以和客户端在短时间之内保持一个连接,某个事件之内服务端和客户端可以复用这个链接。在这种情况下,网页聊天就是实现不了的,服务端的数据推送是无法实现的。

以前有一些假的长连接技术,比如轮询,缺点和明显,这里就不细说了。

Http 2.0 实现了长连接,但是这不在我们讨论范围之内。

针对服务端,Tomcat 新版本,Spring,和 Netty 都实现了对 Websocket 的支持。

使用 Netty 对 WebSocket 的支持来实现长连接

其他的部分还是一样的,先来看服务端的 WebSocketChannelInitializer。

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{ //需要支持 websocket,我们在 initChannel 是做一点改动 @Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); //因为 websocket 是基于 http 的,所以要加入 http 相应的编解码器 pipeline.addLast(new HttpServerCodec()); //以块的方式进行写的处理器 pipeline.addLast(new ChunkedWriteHandler()); // 进行 http 聚合的处理器,将 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 // FullHttpResponse //HttpObjectAggregator 在基于 netty 的 http 编程使用的非常多,粘包拆包。 pipeline.addLast(new HttpObjectAggregator(8192)); // 针对 websocket 的类,完成 websocket 构建的所有繁重工作,负责握手,以及心跳(close,ping, // pong)的处理, websocket 通过 frame 帧来传递数据。 // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame, // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。 // /ws 是 context_path,websocket 协议标准,ws://server:port/context_path pipeline.addLast(new WebSocketServerProcotolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler()); } } // websocket 协议需要用帧来传递参数 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{ System.out.println("收到消息:"+ msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回")); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerAdded" + ctx.channel().id.asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerRemoved" + ctx.channel().id.asLongText()); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfpsy.html