FutureTask相关

上周因为项目中的线程池参数设置的不合理,引发了一些问题,看了下代码,发现对JUC中的一些概念需要再清晰些。

Runnable @FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }

Runable是一个interface,定义了run()方法,The Runnable interface should be implemented by any class whose instances are intended to be executed by a thread。如果想在其他线程中执行你的task,需要实现这个接口。

Callable @FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }

有了Runnable,为啥还需要Callable呢,可以看到Runnable和Callable的两个不同,第一,Runnable是没有返回值的,第二,Runnable是不会抛出checked exception的,而有时候我们需要知道任务执行之后的返回,同时也希望利用异常机制完成一些逻辑。所以有了Callable。

JUC中的Executors这个Factory类,提供了Runnable转Callable的方法。

Future

future 是一个inteface,提供了一系列方法,来帮助我们获取异步执行的task的执行状况和执行结果。

FutureTask

FutureTask实现了RunnableFuture接口,即既实现了Runnable接口,又实现了Future接口。所以他有两个功能,第一,作为一个task,提交到别的线程中异步执行,第二,通过future提供的一些接口,获取task的异步执行状态。

/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;

看下FutureTask的几个属性,首先state表示当前task的执行状态,其中,开始状态位NEW表示task还没开始执行。NORMAL,CANCELLED,INTERRUPTED为终态,COMPLETING和INTERRUPTING为临时状态,最终会通过上面的几个状态转移路径,转移到终态。

callable,表示具体执行的任务。

outcome, task 执行的返回结果

runner,执行这个task的线程

waiters,通过get方法获取此task执行结果被阻塞的线程。

看下几个核心的方法,我们知道,futuretask提交到别的线程里后,最终会调用task的run方法执行具体逻辑。

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

run方法执行时,首先检查当前的状态是否是NEW,如果不是NEW说明已经被执行过了。开始执行之前,标记执行当前task的线程到runner。

调用callable的run方法,执行。抛异常时,设置setException。正常结束时,set结果。看下这两步里都会调到的finishCompletion方法。

/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }

这里主要是在通知所有阻塞在watch这个task结果的线程,通知他们当前task已经执行结束了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zysfxg.html