很神奇的一个接口,FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 接口又分别实现了 Runnable 和 Future 接口,所以可以推断出 FutureTask 具有这两种接口的特性:
有 Runnable 特性,所以可以用在 ExecutorService 中配合线程池使用
有 Future 特性,所以可以从中获取到执行结果
FutureTask源码分析如果你完整的看过 AQS 相关分析的文章,你也许会发现,阅读 Java 并发工具类源码,我们无非就是要关注以下这三点:
- 状态 (代码逻辑的主要控制) - 队列 (等待排队队列) - CAS (安全的set 值)脑海中牢记这三点,咱们开始看 FutureTask 源码,看一下它是如何围绕这三点实现相应的逻辑的
文章开头已经提到,实现 Runnable 接口形式创建的线程并不能获取到返回值,而实现 Callable 的才可以,所以 FutureTask 想要获取返回值,必定是和 Callable 有联系的,这个推断一点都没错,从构造方法中就可以看出来:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }即便在 FutureTask 构造方法中传入的是 Runnable 形式的线程,该构造方法也会通过 Executors.callable 工厂方法将其转换为 Callable 类型:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }但是 FutureTask 实现的是 Runnable 接口,也就是只能重写 run() 方法,run() 方法又没有返回值,那问题来了:
FutureTask 是怎样在 run() 方法中获取返回值的?
它将返回值放到哪里了?
get() 方法又是怎样拿到这个返回值的呢?
我们来看一下 run() 方法(关键代码都已标记注释)
public void run() { // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回 // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 获取构造函数传入的 Callable 值 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 正常调用 Callable 的 call 方法就可以获取到返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 保存 call 方法抛出的异常 setException(ex); } if (ran) // 保存 call 方法的执行结果 set(result); } } finally { runner = null; int s = state; // 如果任务被中断,则执行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }run() 方法没有返回值,至于 run() 方法是如何将 call() 方法的返回结果和异常都保存起来的呢?其实非常简单, 就是通过 set(result) 保存正常程序运行结果,或通过 setException(ex) 保存程序异常信息
/** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes // 保存异常结果 protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } // 保存正常结果 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }setException 和 set 方法非常相似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,所以他们要通过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的原因所在。
保存正常结果值(set方法)与保存异常结果值(setException方法)两个方法代码逻辑,唯一的不同就是 CAS 传入的 state 不同。我们上面提到,state 多数用于控制代码逻辑,FutureTask 也是这样,所以要搞清代码逻辑,我们需要先对 state 的状态变化有所了解
/* * * Possible state transitions: * NEW -> COMPLETING -> NORMAL //执行过程顺利完成 * NEW -> COMPLETING -> EXCEPTIONAL //执行过程出现异常 * NEW -> CANCELLED // 执行过程中被取消 * NEW -> INTERRUPTING -> INTERRUPTED //执行过程中,线程被中断 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;7种状态,千万别慌,整个状态流转其实只有四种线路