不识Netty真面目,只缘未读此真经 (5)

io.netty.channel.nio.NioEventLoop

private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } // 省略其他代码... return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }

这里我们看到了provider.openSelector(),到这里,创建出来的Selector就与 EventLoop 关联在一起了。

同时在创建NioEventLoop时,看看super(parent, executor, false, newTaskQueue(queueFactory), ...)在父类SingleThreadEventLoop干了什么。

io.netty.channel.SingleThreadEventLoop

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); }

再往下;

io.netty.util.concurrent.SingleThreadEventExecutor

private final Queue<Runnable> taskQueue; protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }

这里我们看到了对Queue<Runnable> taskQueue的赋值。

创建ServerSocketChannel

AbstractBootstrap中的initAndRegister()方法是ServerSocketChannel的创建入口。

io.netty.bootstrap.AbstractBootstrap

final ChannelFuture initAndRegister() { Channel channel = null; try { // 1.创建ServerSocketChannel channel = channelFactory.newChannel(); // 2.初始化ServerSocketChannel init(channel); } catch (Throwable t) { } // 3.将ServerSocketChannel注册到Selector上 ChannelFuture regFuture = config().group().register(channel); return regFuture; }

Server端的启动最核心的也就是上面加注释的三步。按照顺序先从ServerSocketChannel的创建讲起。

ServerSocketChannel的创建用了工厂模式+反射机制。具体见ReflectiveChannelFactory

io.netty.channel.ReflectiveChannelFactory

/** * A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively. */ public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; public ReflectiveChannelFactory(Class<? extends T> clazz) { this.constructor = clazz.getConstructor(); } @Override public T newChannel() { return constructor.newInstance(); } }

还记得在前面的bootstrap.channel(NioServerSocketChannel.class)这行代码吗?传入的Class就是用于反射生成Channel实例的。这里是Server端,显然需要进NioServerSocketChannel看如何创建的。

io.netty.channel.socket.nio.NioServerSocketChannel

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="http://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }

provider.openServerSocketChannel()这行代码也就创建出来了ServerSocketChannel。再往父类里面追,看做了些什么。super(null, channel, SelectionKey.OP_ACCEPT);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html