不识Netty真面目,只缘未读此真经 (6)

io.netty.channel.nio.AbstractNioChannel

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); }

this.readInterestOp = readInterestOp把感兴趣的操作赋值给readInterestOp,上面传过来的是SelectionKey.OP_ACCEPT。

ch.configureBlocking(false)把刚才创建出来的channel设置为非阻塞。继续往父类追。

io.netty.channel.AbstractChannel

protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }

这里看到创建了ChannelPipeline,并关联到Channel上。再往下走一步。

io.netty.channel.DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }

此时ChannelPipeline大致如下:

head --> tail 初始化ServerSocketChannel

回到上面提到的重要的第2步: init(channel); 注意,实现类为ServerBootstrap,因为是Server端嘛。

io.netty.bootstrap.ServerBootstrap

@Override void init(Channel channel) { ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

在ChannelPipeline加了一个ChannelHandler。此时ChannelPipeline大致如下:

head --> ChannelInitializer --> tail

一旦serverSocketChannel注册到EventLoop(或者说Selector)上,便会触发这里initChannel的调用。避免绕晕了,这里暂时不去探究具体的调用逻辑。后面调用到这里的时候,再回过头来仔细探究。

ServerSocketChannel注册到Selector上

回到上面提到的重要的第3步:config().group().register(channel);

通过分析类的继承层次(或者debug也行)可以跟踪调用到SingleThreadEventLoop的register方法。

io.netty.channel.SingleThreadEventLoop

@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }

再往下跟,最终调用的是AbstractChannel的register方法,如下:

io.netty.channel.AbstractChannel

@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); }

往下跟eventLoop.execute()

io.netty.util.concurrent.SingleThreadEventExecutor

private void execute(Runnable task, boolean immediate) { addTask(task); startThread(); }

addTask(task)把上面的Runnable放入到上面提到的Queue<Runnable> taskQueue,过程见如下代码:

io.netty.util.concurrent.SingleThreadEventExecutor

/** * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown * before. */ protected void addTask(Runnable task) { ObjectUtil.checkNotNull(task, "task"); if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }

把task放入taskQueue后,就到startThread()这行代码了,进去瞧瞧。

io.netty.util.concurrent.SingleThreadEventExecutor

private void startThread() { doStartThread(); } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); } }); }

继续追executor.execute,到这里才真正创建新的线程执行SingleThreadEventExecutor.this.run(), thread名称大致为nioEventLoopGroup-2-1,见如下代码:

io.netty.util.concurrent.ThreadPerTaskExecutor

@Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }

SingleThreadEventExecutor.this.run()实际执行的代码如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html