精通并发与 Netty (一)如何使用

精通并发与 Netty

Netty 是一个异步的,事件驱动的网络通信框架,用于高性能的基于协议的客户端和服务端的开发。

异步指的是会立即返回,并不知道到底发送过去没有,成功没有,一般都会使用监听器来监听返回。

事件驱动是指开发者只需要关注事件对应的回调方法即可,比如 channel active,inactive,read 等等。

网络通信框架就不用解释了,很多你非常熟悉的组件都使用了 netty,比如 spark,dubbo 等等。

初步了解 Netty

第一个简单的例子,使用 Netty 实现一个 http 服务器,客户端调用一个没有参数的方法,服务端返回一个 hello world。

Netty 里面大量的代码都是对线程的处理和 IO 的异步的操作。

package com.paul; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Server { public static void main(String[] args) throws InterruptedException { //定义两个线程组,事件循环组,可以类比与 Tomcat 就是死循环,不断接收客户端的连接 // boss 线程组不断从客户端接受连接,但不处理,由 worker 线程组对连接进行真正的处理 // 一个线程组其实也能完成,推荐使用两个 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务端启动器,可以轻松的启动服务端的 channel ServerBootstrap serverBootstrap = new ServerBootstrap(); //group 方法有两个,一个接收一个参数,另一个接收两个参数 // childhandler 是我们自己写的请求处理器 serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class) .childHandler(new ServerInitializer()); //绑定端口 ChannelFuture future = serverBootstrap.bind(8011).sync(); //channel 关闭的监听 future.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } package com.paul; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面可以有很多 handler,一层层过滤的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的组合,编码和解码的 h handler pipeline.addLast("httpServerCodec", new HttpServerCodec()); pipeline.addLast("handler", new ServerHandler()); } } package com.paul; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if(httpObject instanceof HttpRequest) { ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //单纯的调用 write 只会放到缓存区,不会真的发送 channelHandlerContext.writeAndFlush(response); } } }

我们在 SimpleChannelInboundHandler 里分析一下,先看它继承的 ChannelInboundHandlerAdapter 里面的事件回调方法,包括通道注册,解除注册,Active,InActive等等。

public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }

执行顺序为 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfpsy.html