1、任务执行状态不是NEW,直接返回;将runner属性从null->当前线程不成功,直接返回
2、调用call()方法,调用成功,使用set()设置返回值
3、调用过程发生异常,使用setException()保存异常
set() 和 setException()
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }set()和setException()的实现基本一样,都是先将任务运行状态从NEW->COMPLETING,分别设置返回值或异常给outcome,再将状态分别置为NORMAL和EXCEPTIONAL,最后调用finishCompletion()依次唤醒等待获取结果的阻塞线程
finishCompletion()实现
/** * Removes and signals all waiting threads, invokes done(), and nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //将成员变量waiters置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //循环唤醒WaitNode中的等待线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //由子类实现的方法 done(); callable = null; // to reduce footprint }1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中
2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值为null,并通过LockSupport.unpark方法依次唤醒等待获取结果的线程
get()方法有两个实现,一个是一直等待获取结果,直到任务执行完;一个是等待指定时间,超时后任务还未完成会上抛TimeoutException
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }内部通过awaitDone()对主线程进行阻塞,具体实现如下:
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止时间 WaitNode q = null; boolean queued = false; for (;;) { //如果主线程已经被中断,removeWaiter(),并上抛InterruptedException //注意:Thread.interrupted()后会导致线程的中断状态为false if (Thread.interrupted()) { removeWaiter(q); //线程被中断的情况下,从waiters链表中删除q throw new InterruptedException(); } int s = state; //如果任务已经完成(可能是正常完成、异常、中断),直接返回,即还没有开始等待,任务已经完成了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL else if (s == COMPLETING) // cannot time out yet Thread.yield(); //s<COMPLETING 且 还没有创建WaitNode else if (q == null) q = new WaitNode(); //s<COMPLETING 且 已经创建WaitNode,但还没有入队 else if (!queued) /** * 1、将当前waiters赋值给q.next,即“q-->当前waiters” * 2、CAS,将waiters属性,从“当前waiters-->q” * 所以后等待的会排在链表的前面,而任务完成时会从链表前面开始依次唤醒等待线程 */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //所有准备工作完成,判断等待是否需要计时 else if (timed) { nanos = deadline - System.nanoTime(); //如果已经等待超时,remove当前WaiterNode if (nanos <= 0L) { removeWaiter(q); //等待超时的情况下,从waiters链表中删除q return state; } LockSupport.parkNanos(this, nanos); //挂起一段时间 } else LockSupport.park(this); //一直挂起,等待唤醒 } }