Java并发编程-扩展可回调的Future

Java并发编程-扩展可回调的Future

前提

最近在看JUC线程池java.util.concurrent.ThreadPoolExecutor的源码实现,其中了解到java.util.concurrent.Future的实现原理。从目前java.util.concurrent.Future的实现来看,虽然实现了异步提交任务,但是任务结果的获取过程需要主动调用Future#get()或者Future#get(long timeout, TimeUnit unit),而前者是阻塞的,后者在异步任务执行时间不确定的情况下有可能需要进行轮询,这两种情况和异步调用的初衷有点相违背。于是笔者想结合目前了解到的Future实现原理的前提下扩展出支持(监听)回调的Future,思路上参考了Guava增强的ListenableFuture。本文编写的时候使用的JDK是JDK11,其他版本可能不适合。

简单分析Future的实现原理 虚拟例子推演

并发大师Doug Lea在设计JUC线程池的时候,提供了一个顶层执行器接口Executor:

public interface Executor { void execute(Runnable command); }

实际上,这里定义的方法Executor#execute()是整套线程池体系最核心的接口,也就是ThreadPoolExecutor定义的核心线程、额外创建的线程(线程池最大线程容量 - 核心线程数)都是在这个接口提交任务的时候懒创建的,也就是说ExecutorService接口扩展的功能都是基于Executor#execute()的基础进行扩展。Executor#execute()方法只是单纯地把任务实例Runnable对象投放到线程池中分配合适的线程执行,但是由于方法返回值是void类型,我们是无法感知任务什么时候执行完毕。这个时候就需要对Runnable任务实例进行包装(下面是伪代码 + 伪逻辑):

// 下面这个Wrapper和Status类是笔者虚构出来 @RequiredArgsConstructor class Wrapper implements Runnable{ private final Runnable target; private Status status = Status.of("初始化"); @Override public void run(){ try{ target.run(); status = Status.of("执行成功"); }catch(Throwable t){ status = Status.of("执行异常"); } } }

我们只需要把new Wrapper(原始Runnable实例)投放到线程池执行,那么通过定义好的Status状态记录变量就能得知异步任务执行的状态,以及什么时候执行完毕(包括正常的执行完毕和异常的执行完毕)。这里仅仅解决了任务执行的状态获取,但是Executor#execute()方法法返回值是void类型的特点使得我们无法回调Runnable对象执行的结果。这个时候需要定义一个可以回调执行结果的接口,其实已经有现成的接口Callable:

@FunctionalInterface public interface Callable<V> { V call() throws Exception; }

这里遇到了一个问题:由于Executor#execute()只接收Runnable参数,我们需要把Callable接口适配到Runnable接口,这个时候,做一次简单的委托即可:

@RequiredArgsConstructor class Wrapper implements Runnable{ private final Callable callable; private Status status = Status.of("初始化"); @Getter private Object outcome; @Override public void run(){ try{ outcome = callable.call(); status = Status.of("执行成功"); }catch(Throwable t){ status = Status.of("执行异常"); outcome = t; } } }

这里把Callable实例直接委托给Wrapper,而Wrapper实现了Runnable接口,执行结果直接存放在定义好的Object类型的对象outcome中即可。当我们感知到执行状态已经结束,就可以从outcome中提取到执行结果。

Future的实现

上面一个小结仅仅对Future实现做一个相对合理的虚拟推演,实际上,RunnableFuture才是JUC中常用的复合接口,它同时实现了Runnable和Future:

public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }

上一节提到的虚构出来的Wrapper类,在JUC中类似的实现是java.util.concurrent.FutureTask,它就是Callable和Runnable的适配器,FutureTask实现了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; // 省略其他代码 }

注意到核心属性state表示执行状态,outcome承载执行结果。接着看提交Callable类型任务的方法ExecutorService#submit():

public interface ExecutorService extends Executor { // 省略其他接口方法 <T> Future<T> submit(Callable<T> task); }

当我们通过上述ExecutorService#submit()方法提交Callable类型任务的时候,实际上做了如下的步骤:

检查入参task的存在性,如果为null抛出NullPointerException。

把Callable类型的task包装为FutureTask实例。

把新建的FutureTask实例放到线程池中执行,也就是调用Executor#execute(FutureTask实例)。

返回FutureTask实例的接口实例RunnableFuture(实际上是返回子接口Future实例)。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppxsx.html