不识Netty真面目,只缘未读此真经 (9)

到这里,register0()这个task就执行完了。但是还记得这个task执行过程中,又往taskQueue中添加了一个Runnable吗?

new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }

此时会poll到新加的task,见如下代码:

io.netty.util.concurrent.SingleThreadEventExecutor

protected boolean runAllTasks(long timeoutNanos) { for (;;) { safeExecute(task); task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }

执行完这个新增的Runnable后,此时ChannelPipeline大致如下:

head --> ServerBootstrapAcceptor --> tail

此时,taskQueue中的task都执行完了,EventLoop线程执行selector.select(),等待客户端的连接。

到这里,Server端也就成功启动了。

Client端启动过程 创建Selector

与Server端完全一致。

创建SocketChannel

入口与Server端一样,不一样的地方在于Client端是bootstrap.channel(NioSocketChannel.class),所以需要看NioSocketChannel的实现。这里也不必多说。

初始化SocketChannel

Client端的就比较简单了,如下:

io.netty.bootstrap.Bootstrap

@Override void init(Channel channel) { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); } SocketChannel注册到Selector上

前面的过程与Server端基本一样,执行完doRegister(),执行pipeline.invokeHandlerAddedIfNeeded()时,没有Server端复杂(因为Server端初始化SocketChannel,加了个添加ServerBootstrapAcceptor到ChannelPipeline的task)。

前面分析过,这个过程会触发initChannel调用,所以这时会执行用户编写的ChannelInitializer,也就是会执行ch.pipeline().addLast(new NettyClientHandler()),将用户编写的NettyClientHandler插入到ChannelPipeline中。

连接Server

注册成功后,会执行连接Server的回调。

io.netty.bootstrap.Bootstrap

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }

需要看doResolveAndConnect0(), 里面又调用的是doConnect()

io.netty.bootstrap.Bootstrap

private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }

最终调用的是:

io.netty.channel.socket.nio.NioSocketChannel#doConnect()

@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }

再看SocketUtils.connect(javaChannel(), remoteAddress)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html