Java并发编程-扩展可回调的Future (2)

如果我们需要获取结果,可以Future#get()或者Future#get(long timeout, TimeUnit unit)获取,调用这两个方法的时候参看FutureTask里面的方法实现,得知步骤如下:

如果状态state小于等于COMPLETING(1),说明任务还在执行中,获取结果的请求线程会放入WaitNode类型的队列中进行阻塞。

如果任务执行完毕,不管异常完毕还是正常完毕,除了会更新状态state和把结果赋值到outcome之外,还会唤醒所有阻塞获取结果的线程,然后调用钩子方法FutureTask#done()(具体见源码FutureTask#finishCompletion())。

其实分析了这么多,笔者想指出的结论就是:Callable类型任务提交到线程池中执行完毕(包括正常执行完毕和异常执行完毕)之后,都会回调钩子方法FutureTask#done()。这个就是我们扩展可监听Future的理论依据。

扩展可回调的Future

先做一次编码实现,再简单测试其功能。

编码实现

先定义一个Future接口的子接口ListenableFuture,用于添加可监听的回调:

public interface ListenableFuture<V> extends Future<V> { void addCallback(ListenableFutureCallback<V> callback, Executor executor); }

ListenableFutureCallback是一个函数式回调接口:

@FunctionalInterface public interface ListenableFutureCallback<V> { void callback(V value, Throwable throwable); }

对于ListenableFutureCallback而言,回调的结果value和throwable是互斥的。正常执行完毕的情况下value将会是执行结果值,throwable为null;异常执行完毕的情况下,value将会是null,throwable将会是抛出的异常实例。如果更习惯于分开处理正常执行完毕的结果和异常执行完毕的结果,ListenableFutureCallback可以这样定义:

public interface ListenableFutureCallback<V> { void onSuccess(V value); void onError(Throwable throwable); }

接着定义ListenableExecutorService接口继承ExecutorService接口:

public interface ListenableExecutorService extends ExecutorService { <T> ListenableFuture<T> listenableSubmit(Callable<T> callable); /** * 定义这个方法是因为有些时候由于任务执行时间非常短,有可能通过返回的ListenableFuture实例添加回调之前已经执行完毕,因此可以支持显式传入回调 * * @param callable callable * @param callbacks callbacks * @param executor executor * @return ListenableFuture */ <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor); }

然后添加一个执行单元适配器ListenableFutureCallbackRunnable,承载每次回调触发的调用(实现Runnable接口,从而支持异步执行):

@RequiredArgsConstructor public class ListenableFutureCallbackRunnable<V> implements Runnable { private final ListenableFutureCallback<V> callback; private final V value; private final Throwable throwable; @Override public void run() { callback.callback(value, throwable); } }

接着需要定义一个FutureTask的子类ListenableFutureTask,核心逻辑是覆盖FutureTask#done()方法触发回调:

// ListenableFutureTask public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> { private final List<Execution<V>> executions = new ArrayList<>(); public ListenableFutureTask(Callable<V> callable) { super(callable); } public ListenableFutureTask(Runnable runnable, V result) { super(runnable, result); } public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) { return new ListenableFutureTask<>(callable); } @Override protected void done() { Iterator<Execution<V>> iterator = executions.iterator(); Throwable throwable = null; V value = null; try { value = get(); } catch (Throwable t) { throwable = t; } while (iterator.hasNext()) { Execution<V> execution = iterator.next(); ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(), value, throwable); // 异步回调 if (null != execution.getExecutor()) { execution.getExecutor().execute(callbackRunnable); } else { // 同步回调 callbackRunnable.run(); } } } @Override public void addCallback(ListenableFutureCallback<V> callback, Executor executor) { Execution<V> execution = new Execution<>(); execution.setCallback(callback); execution.setExecutor(executor); executions.add(execution); } } // Execution - 承载每个回调实例和对应的Executor,Executor实例为null则进行同步回调 @Data public class Execution <V>{ private Executor executor; private ListenableFutureCallback<V> callback; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppxsx.html