精通并发与 Netty
Netty 是一个异步的,事件驱动的网络通信框架,用于高性能的基于协议的客户端和服务端的开发。
异步指的是会立即返回,并不知道到底发送过去没有,成功没有,一般都会使用监听器来监听返回。
事件驱动是指开发者只需要关注事件对应的回调方法即可,比如 channel active,inactive,read 等等。
网络通信框架就不用解释了,很多你非常熟悉的组件都使用了 netty,比如 spark,dubbo 等等。
初步了解 Netty第一个简单的例子,使用 Netty 实现一个 http 服务器,客户端调用一个没有参数的方法,服务端返回一个 hello world。
Netty 里面大量的代码都是对线程的处理和 IO 的异步的操作。
package com.paul; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Server { public static void main(String[] args) throws InterruptedException { //定义两个线程组,事件循环组,可以类比与 Tomcat 就是死循环,不断接收客户端的连接 // boss 线程组不断从客户端接受连接,但不处理,由 worker 线程组对连接进行真正的处理 // 一个线程组其实也能完成,推荐使用两个 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务端启动器,可以轻松的启动服务端的 channel ServerBootstrap serverBootstrap = new ServerBootstrap(); //group 方法有两个,一个接收一个参数,另一个接收两个参数 // childhandler 是我们自己写的请求处理器 serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class) .childHandler(new ServerInitializer()); //绑定端口 ChannelFuture future = serverBootstrap.bind(8011).sync(); //channel 关闭的监听 future.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } package com.paul; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面可以有很多 handler,一层层过滤的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的组合,编码和解码的 h handler pipeline.addLast("httpServerCodec", new HttpServerCodec()); pipeline.addLast("handler", new ServerHandler()); } } package com.paul; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if(httpObject instanceof HttpRequest) { ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //单纯的调用 write 只会放到缓存区,不会真的发送 channelHandlerContext.writeAndFlush(response); } } }我们在 SimpleChannelInboundHandler 里分析一下,先看它继承的 ChannelInboundHandlerAdapter 里面的事件回调方法,包括通道注册,解除注册,Active,InActive等等。
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }执行顺序为 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。