Netty源码—一、server启动(1) (2)

Netty源码—一、server启动(1)

NioEventLoopGroup的重要属性为:

// 包含的EventExecutor数组 private final EventExecutor[] children; // 选择哪一个EventExecutor执行task的选择器,不同的选择器有不同的策略 private final EventExecutorChooserFactory.EventExecutorChooser chooser;

重要方法有:

// 选择下一个执行任务的线程 io.netty.util.concurrent.MultithreadEventExecutorGroup#next // 创建EventLoop io.netty.channel.nio.NioEventLoopGroup#newChild // 在线程池中执行注册channel的任务 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) // 创建默认的threadFactory io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory

线程池初始化的代码为

EventLoopGroup workerGroup = new NioEventLoopGroup();

如果使用无参的构造方法的话,最后会执行下面这个构造方法,这里面做要做了以下几件事

如果executor没有初始化,使用默认的executor初始化

初始化线程池中每个EventLoop

如果其中一个初始化过程中抛出异常,关闭所有的NioEventLoop

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 创建EventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 初始化chooser,决定选择下一个线程的策略 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }

使用默认参数构造参数的话,上面这个构造方法的入参的值分别是

nThreads // 默认的线程池大小 private static final int DEFAULT_EVENT_LOOP_THREADS; static { // 如果配置了io.netty.eventLoopThreads参数的话,先取该参数的值 // 如果没有配置上面的参数,则取机器处理器个数的2倍 // 如果上面算出的结果小于1则取1 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } // 默认没有指定线程池大小,取DEFAULT_EVENT_LOOP_THREADS protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } executor

默认没有指定executor,为null

chooserFactory protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // io.netty.util.concurrent.DefaultEventExecutorChooserFactory

使用默认的chooser,该类的主要功能是提供选择下一个线程的策略

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { // 单例 public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { // 如果是2的幂次则使用这个chooser return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { // 判断一个数是否2的幂,方法很巧妙 return (val & -val) == val; } private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 如果是2的幂次个线程,可以使用位运算计算出下一个选出的线程的index return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 使用求余的方法计算出下一个线程的index return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsffps.html