具体代码如下:
public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }首先是判断FutureTask中state状态,必须是NEW才可以继续执行。
然后通过CAS修改runner引用为当前线程。
接着执行用户自定义的call()方法,将返回结果设置到result中,result可能为正常返回也可能为异常信息。这里主要是调用set()/setException()
FutureTask.set()实现原理set()方法的实现很简单,直接看下代码:
public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } } }将call()返回的数据赋值给全局变量outcome上,然后修改state状态为NORMAL,最后调用finishCompletion()来做挂起线程的唤醒操作,这个方法等到get()后面再来讲解。
FutureTask.get()实现原理接着看下代码:
public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }如果FutureTask中state为NORMAL或者COMPLETING,说明当前任务并没有执行完成,调用get()方法会被阻塞,具体的阻塞逻辑在awaitDone()方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }这个方法可以说是FutureTask中最核心的方法了,一步步来分析:
如果timed不为空,这说明指定nanos时间还未返回结果,线程就会退出。
q是一个WaitNode对象,是将当前引用线程封装在一个stack数据结构中,WaitNode对象属性如下:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }接着判断当前线程是否中断,如果中断则抛出中断异常。
下面就进入一轮轮的if... else if...判断逻辑,我们还是采用分支的方式去分析。
分支一:if (s > COMPLETING) {
此时get()方法已经有结果了,无论是正常返回的结果,还是异常、中断、取消等,此时直接返回state状态,然后执行report()方法。
分支二:else if (s == COMPLETING)
条件成立,说明当前任务接近完成状态,这里让当前线程再释放cpu,进行下一轮抢占cpu。
分支三:else if (q == null)
第一次自旋执行,WaitNode还没有初始化,初始化q=new WaitNode();
分支四:else if (!queued){
queued代表当前线程是否入栈,如果没有入栈则进行入栈操作,顺便将全局变量waiters指向栈顶元素。
分支五/六:LockSupport.park
如果设置了超时时间,则使用parkNanos来挂起当前线程,否则使用park()
经过这么一轮自旋循环后,如果执行call()还没有返回结果,那么调用get()方法的线程都会被挂起。
被挂起的线程会等待run()返回结果后依次唤醒,具体的执行逻辑在finishCompletion()中。
最终stack结构中数据如下: