源码:
// todo 在这个构造方法中,完成了一些属性的赋值, 彻底构造完成 事件循环组对象 // todo Object... args 是 selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()的简写 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // todo 下面需要的参数,一开始使用无参的构造方法时, 传递进来的 就是null ,执行这一行代码, 创建默认的线程工厂 /// todo ThreadPerTaskExecutor 意味为当前的事件循环组 创建Executor , 用于 针对每一个任务的Executor 线程的执行器 // todo newDefaultThreadFactory根据它的特性,可以给线程加名字等, // todo 比传统的好处是 把创建线程和 定义线程需要做的任务分开, 我们只关心任务, 两者解耦 // todo 每次执行任务都会创建一个线程实体 // todo NioEventLoop 线程命名规则 nioEventLoop-1-XX 1代表是第几个group XX第几个eventLoop executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; // todo 循环 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // todo 创建EventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // todo chooser 在这里 初始化了 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }这个过程中的细节:
Netty的executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
默认的线程工厂,创建出的Thread,并不是JDK原生的Thread,而是Netty自己封装的
protected Thread newThread(Runnable r, String name) { // todo threadGroup是 System.out.println(threadGroup+" threadGroup"); return new FastThreadLocalThread(threadGroup, r, name); }Netty的ThreadPerTaskExecutor源码如下, 可以看到,它的execute直接关联着Thread.start()方法, 一旦执行它就会开启新的线程, 当然源码看到这里时,它是没有没执行的,因为线程和NioEventLoop关联着,再往下就看NioEventLoop的实现
* todo 这里实际上使用了设计模式 * todo 1. command是用户定义的任务, 命令模式; 直观的 我定义一种任务, 程序不需要知道我执行的命令是什么,但是当我把任务扔给你, 你帮我执行就好了 * todo 2. 代理设计模型, 代理了ThreadFactory , 把本来给ThreadPerTaskExecutor执行的任务给了ThreadFactory */ public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // todo 必须实现 Executor 里面唯一的抽象方法, execute , 执行性 任务 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }其次,上面的newChild(executor, args);方法其实是抽象方法,真正运行时会执行子类NioEventLoopGroup的实现, 如下:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { // todo !! 真正创建事件循环组的逻辑在这里!!! return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } // todo 这里是 它的构造方法 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // todo 进入到父类, 着重看他是如何创建出 TaskQueue的 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } // todo 常用的 属性 provider = selectorProvider; // todo 获取Selector选择器 final SelectorTuple selectorTuple = openSelector(); // todo SelectorTuple是netty维护 jdk 原生的Selector的包装类, 下面看,他有两个Selector, 一个是经过包装的,一个是未经过包装的 selector = selectorTuple.selector; // unwrappedSelector = selectorTuple.unwrappedSelector; // todo Jdk 原生的Selector selectStrategy = strategy; }