首先在FutureTask中有如下这些信息。
public class FutureTask<V> implements RunnableFuture<V> { // 任务的状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 具体任务对象 private Callable<V> callable; // 任务返回结果或者异常时返回的异常对象 private Object outcome; // 当前正在运行的线程 private volatile Thread runner; // private volatile WaitNode waiters; private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; } public void run() { // 任务状态的校验 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行callable的call方法获取结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 有异常则设置返回值为ex setException(ex); } // 执行过程没有异常则将结果set if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }在这个方法中的核心逻辑就是执行callable的call()方法,将结果赋值,如果有异常则封装异常。
然后我们看一下get方法如何获取结果的。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 这里会阻塞等待 s = awaitDone(false, 0L); // 返回结果 return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) // 状态异常情况会抛出异常 throw new CancellationException(); throw new ExecutionException((Throwable)x); }在FutureTask中除了get()方法还提供有一些其他方法。
get(timeout,unit):获取结果,但只等待指定的时间;
cancel(boolean mayInterruptIfRunning):取消当前任务;
isDone():判断任务是否已完成。
CompletableFuture在使用FutureTask来完成异步任务,通过get()方法获取结果时,会让获取结果的线程进入阻塞等待,这种方式并不是最理想的状态。
在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture传入回调对象,任务在完成或者异常时,自动回调。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { // 创建CompletableFuture时传入Supplier对象 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier()); //执行成功时 future.thenAccept(new MyConsumer()); // 执行异常时 future.exceptionally(new MyFunction()); // 主任务可以继续处理,不用等任务执行完毕 System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } } class MySupplier implements Supplier<Integer> { @Override public Integer get() { try { // 任务睡眠3s TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return 3 + 2; } } // 任务执行完成时回调Consumer对象 class MyConsumer implements Consumer<Integer> { @Override public void accept(Integer integer) { System.out.println("执行结果" + integer); } } // 任务执行异常时回调Function对象 class MyFunction implements Function<Throwable, Integer> { @Override public Integer apply(Throwable type) { System.out.println("执行异常" + type); return 0; } }以上代码可以通过lambda表达式进行简化。
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { // 任务睡眠3s TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return 3 + 2; }); //执行成功时 future.thenAccept((x) -> { System.out.println("执行结果" + x); }); future.exceptionally((type) -> { System.out.println("执行异常" + type); return 0; }); System.out.println("主线程继续执行"); Thread.sleep(5000); System.out.println("主线程执行结束"); } }通过示例我们发现CompletableFuture的优点:
异步任务结束时,会自动回调某个对象的方法;
异步任务出错时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行。
当然这些优点还不足以体现CompletableFuture的强大,还有更厉害的功能。
串行执行