线程池续:你必须要知道的线程池submit()实现原理之FutureTask! (2)

run()执行逻辑.png

具体代码如下:

public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }

首先是判断FutureTask中state状态,必须是NEW才可以继续执行。

然后通过CAS修改runner引用为当前线程。

接着执行用户自定义的call()方法,将返回结果设置到result中,result可能为正常返回也可能为异常信息。这里主要是调用set()/setException()

FutureTask.set()实现原理

set()方法的实现很简单,直接看下代码:

public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } } }

将call()返回的数据赋值给全局变量outcome上,然后修改state状态为NORMAL,最后调用finishCompletion()来做挂起线程的唤醒操作,这个方法等到get()后面再来讲解。

FutureTask.get()实现原理

接着看下代码:

public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }

如果FutureTask中state为NORMAL或者COMPLETING,说明当前任务并没有执行完成,调用get()方法会被阻塞,具体的阻塞逻辑在awaitDone()方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

这个方法可以说是FutureTask中最核心的方法了,一步步来分析:

如果timed不为空,这说明指定nanos时间还未返回结果,线程就会退出。

q是一个WaitNode对象,是将当前引用线程封装在一个stack数据结构中,WaitNode对象属性如下:

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }

接着判断当前线程是否中断,如果中断则抛出中断异常。

下面就进入一轮轮的if... else if...判断逻辑,我们还是采用分支的方式去分析。

分支一:if (s > COMPLETING) {

此时get()方法已经有结果了,无论是正常返回的结果,还是异常、中断、取消等,此时直接返回state状态,然后执行report()方法。

分支二:else if (s == COMPLETING)

条件成立,说明当前任务接近完成状态,这里让当前线程再释放cpu,进行下一轮抢占cpu。

分支三:else if (q == null)

第一次自旋执行,WaitNode还没有初始化,初始化q=new WaitNode();

分支四:else if (!queued){

queued代表当前线程是否入栈,如果没有入栈则进行入栈操作,顺便将全局变量waiters指向栈顶元素。

分支五/六:LockSupport.park

如果设置了超时时间,则使用parkNanos来挂起当前线程,否则使用park()

经过这么一轮自旋循环后,如果执行call()还没有返回结果,那么调用get()方法的线程都会被挂起。

被挂起的线程会等待run()返回结果后依次唤醒,具体的执行逻辑在finishCompletion()中。

最终stack结构中数据如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdpf.html