到这里,register0()这个task就执行完了。但是还记得这个task执行过程中,又往taskQueue中添加了一个Runnable吗?
new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }此时会poll到新加的task,见如下代码:
io.netty.util.concurrent.SingleThreadEventExecutor
protected boolean runAllTasks(long timeoutNanos) { for (;;) { safeExecute(task); task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }执行完这个新增的Runnable后,此时ChannelPipeline大致如下:
head --> ServerBootstrapAcceptor --> tail此时,taskQueue中的task都执行完了,EventLoop线程执行selector.select(),等待客户端的连接。
到这里,Server端也就成功启动了。
Client端启动过程 创建Selector与Server端完全一致。
创建SocketChannel入口与Server端一样,不一样的地方在于Client端是bootstrap.channel(NioSocketChannel.class),所以需要看NioSocketChannel的实现。这里也不必多说。
初始化SocketChannelClient端的就比较简单了,如下:
io.netty.bootstrap.Bootstrap
@Override void init(Channel channel) { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); } SocketChannel注册到Selector上前面的过程与Server端基本一样,执行完doRegister(),执行pipeline.invokeHandlerAddedIfNeeded()时,没有Server端复杂(因为Server端初始化SocketChannel,加了个添加ServerBootstrapAcceptor到ChannelPipeline的task)。
前面分析过,这个过程会触发initChannel调用,所以这时会执行用户编写的ChannelInitializer,也就是会执行ch.pipeline().addLast(new NettyClientHandler()),将用户编写的NettyClientHandler插入到ChannelPipeline中。
连接Server注册成功后,会执行连接Server的回调。
io.netty.bootstrap.Bootstrap
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }需要看doResolveAndConnect0(), 里面又调用的是doConnect()
io.netty.bootstrap.Bootstrap
private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }最终调用的是:
io.netty.channel.socket.nio.NioSocketChannel#doConnect()
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }再看SocketUtils.connect(javaChannel(), remoteAddress)