Netty入门一:服务端应用搭建 & 启动过程源码分析 (2)

对于服务端Channel,Netty框架提供了用于接受连接的Handler,我们可以不用设置;但是对于服务端Channel创建的每个子Channel我们需要为其配置Handler,以处理I/O事件。

首先:解码器是必须的。我们业务逻辑中流转的一般是对象,通过配置解码器将字节转换成Java对象(解码器同时需要处理TCP拆包、粘包)

然后:自定义业务处理器用于处理具体的业务逻辑,如上面的BusinessHandler。

最后:需要对结果进行返回时需要配置编码器,用于将输出对象编码成可用于通道传输的ByteBuf对象

对于这个例子:

LineBasedFrameDecoder和StringDecoder是解码器:将一行数据解码成Java中的String对象

BusinessHandler是业务处理器:处理具体的业务逻辑(获取当前日期或者时间)

StringEncoder是编码器:将String对象编码成ByteBuf对象,用于通道传输。

绑定本地端口并启动服务

配置就绪后直接绑定本地端口启动服务

ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();

到这里通过Netty创建一个服务端应用程序就完成了,下面我们从源码成面看看Netty的启动过程

Netty服务端启动过程源码分析

源码基于4.1分支:做了部分简化,只保留了核心逻辑

从bind方法开始 public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { this.validate(); return this.doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { //1. 初始化NioServerSocketChannel,并且注册到EventLoopGroup final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); //2.1. 注册失败,直接返回 if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //2.2. 注册成功,直接bind本地端口 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //3.如果注册还未知(因为是异步操作),添加listener到regFuture对象上用于注册完成后进行回调处理 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

整个bind方法比较简单, 核心逻辑都在doBind方法里面,doBind里的逻辑主要有三步

initAndRegister:实例化ServerSocketChannel(这里是NioServerSocketChannel)并注册到事件循环(EventLoopGroup)

如果第一步失败,直接返回;如果注册成功,调用doBind方法绑定本地端口启动服务器

如果注册结果还未知(reg是异步操作),添加ChannelFutureListener到regFuture对象上用于注册完成后的回调处理

第二和第三个步都比较简单,我们主要需要看下第一步--initAndRegister

initAndRegister(初始化NioServerSocketChannel并注册到EventLoopGroup)

initAndRegister其实是个模版方法,也可以分成三步来分析

实例化,这里其实是通过基于反射的工厂方法实例化

初始化(由子类实现)

注册到EventLoopGroup

final ChannelFuture initAndRegister() { Channel channel = null; try { //1. 实例化,基于反射的工厂方法 channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { // } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { // } return regFuture; }

第一步和第三步这里我们不做展开,主要看下第二步init做了什么事

init(初始化通道)

下面是ServerBootstrap中的init方法的源码

void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

归纳起来其实是把我们通过ServerBootstarp引导类配置的一些参填充到NioServerSocketChannel实例中去了,没有问题。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywfyf.html