Callable、Future和FutureTask的实现(2)

RunnableFuture接口继承了Runnable和Future,所以它既是一个可以让线程执行的Runnable任务,又是一个可以获取Callable返回值的Future

FutureTask的属性 /** The run state of this task */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;

state 是任务的运行状态

初始化时是NEW

任务终止的状态有NORMAL(正常结束)、EXCEPTIONAL(异常结束)、CANCELLED(被取消)、INTERRUPTED(执行中被中断),这些状态是通过set()setExceptioncancel()方法触发的

COMPLETING 和 INTERRUPTING是两个中间状态,当正常结束设置outcome属性前是COMPLETING,设置后变成NORMAL;当中断运行中线程前是INTERRUPTING,调用thread.interrupt()后是INTERRUPTED

可能的状态转换:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

callable 是线程执行的有返回值的任务
outcome 是任务执行后的结果或异常
waiters 表示等待获取结果的阻塞线程,链表结构,后等待线程的会排在链表前面

FutureTask的构造方法

FutureTask有两个构造方法:
FutureTask(Callable callable)

public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }

构造方法参数是Callable定义的任务,并将state置为NEW,只有当state为NEW时,callable才能被执行

FutureTask(Runnable runnable, V result)

public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }

参数为Runnable和带泛型的result对象,由于Runnable本身是没有返回值的,故线程的执行结果通过result返回
可以看到通过runnable和result封装了个Callable,实际上是new RunnableAdapter<T>(task, result),这个Adapter适配器将Runnable和result转换成Callable,并返回result

FutureTask.run()的实现

线程运行时真正执行的方法,Callable.call()会在其中执行,并包含设置返回值或异常的逻辑

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/874be636a1c974ddd6f7449a3e17fe1e.html