Java并发之ThreadPoolExecutor源码解析(三)

先前,笔者讲解到ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core),在这个方法中工作线程可能创建成功,也可能创建失败,具体视线程池的边界条件,以及当前内存情况而定。

那么,如果线程池当前的状态,是允许创建Worker对象的,那么创建Worker的内部流程又是怎样呢?线程池为何要使用Worker包装Thread来创建一个线程,为何不直接使用原生的Thread来创建线程?如果创建Worker的firstTask不为空,那么Worker理所当然应该优先执行firstTask任务,如果firstTask为空,那Worker又要如何获取任务来执行呢?我们还有一堆亟待解决的问题。

首先我们来解决前两个问题,Worker的创建流程,以及为什么不使用原生Thread代替Worker?首先,Doug Lea用Worker包装Thread,意味着Worker比Thread拥有更多的功能。例如:Worker会统计它所对应的线程执行了多少任务、通过Worker可以知道线程是否已启动、线程是否正在执行任务?而这些信息都是原生Thread所没有的,所以需要一个Worker类来扩展Thread。

在创建Worker时,会先设置其state的值为-1,代表Worker所对应的线程尚未启动,即还没有调用Worker.thread.start(),之后会进行firstTask的赋值,向线程工厂申请创建线程,创建完毕后,等待外部调用Worker.thread.start()启动一个线程执行Worker.run()方法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /* * 当初始化一个Worker时,会向线程工厂申请创建一个Thread对象 * 用来执行任务,为null代表线程工厂创建失败。 */ final Thread thread; //首要执行任务,该字段可能为null。 Runnable firstTask; //thread已完成任务数。 volatile long completedTasks; Worker(Runnable firstTask) { /* * state初始为-1,代表还未调用Worker.thread.start(), * Worker对应的线程尚未被创建,还不能中断。线程启动后, * 如果线程正在执行任务,state为1,如果线程启动后没在 * 执行任务的状态则state为0。 */ setState(-1);//<1> this.firstTask = firstTask; /* * 创建Thread对象的时候,会把Worker对象本身传入,而Worker * 本身实现了Runnable接口,当调用thead.start()启动一个线程 * 执行thread.run()时,会进而调用Worker.run()方法。 */ this.thread = getThreadFactory().newThread(this); } //Worker对象将任务的执行委托给ThreadPoolExecutor.runWorker(Worker w). public void run() { runWorker(this); } //判断Worker是否处于被某个线程持有状态。 protected boolean isHeldExclusively() { return getState() != 0;//<2> } /* * 线程尝试持有worker对象,如果worker没有被某个线程 * 持有,则state为0,则用CAS的方式将worker的state * 改为1,并设置exclusiveOwnerThread为当前线程。 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//<3> setExclusiveOwnerThread(Thread.currentThread());//<4> return true; } return false; } /* * 线程释放worker对象,将state改为0,exclusiveOwnerThread * 改为null。 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } /* * 调用父类acquire(int arg)时,会进而调用到Worker本身实现的 * tryAcquire(int unused)。 */ public void lock() { acquire(1);//<5> } public boolean tryLock() { return tryAcquire(1); } /* * 调用父类的release(int arg)时,会进而调用到Worker本身实现的 * tryRelease(int unused)。 */ public void unlock() { release(1);//<6> } public boolean isLocked() { return isHeldExclusively(); } /* * 尝试中断worker的对应线程,如果线程已经启动。创建 * worker时,state为-1,直到调用worker.thread.start() * 后,worker的state为0,如果worker的state>=0,则尝试 * 中断线程。 */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdpws.html