线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

FutureTask思维导图.png

前言

上一篇内容写了Java中线程池的实现原理及源码分析,说好的是实实在在的大满足,想通过一篇文章让大家对线程池有个透彻的了解,但是文章写完总觉得还缺点什么?

上篇文章只提到线程提交的execute()方法,并没有讲解线程提交的submit()方法,submit()有一个返回值,可以获取线程执行的结果Future<T>,这一讲就那深入学习下submit()和FutureTask实现原理

使用场景&示例 使用场景

我能想到的使用场景就是在并行计算的时候,例如一个方法中调用methodA()、methodB(),我们可以通过线程池异步去提交方法A、B,然后在主线程中获取组装方法A、B计算后的结果,能够大大提升方法的吞吐量。

FutureTask使用场景.png

使用示例 /** * @author wangmeng * @date 2020/5/28 15:30 */ public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("====执行FutureTask线程任务===="); Future<String> futureTask = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("FutureTask执行业务逻辑"); Thread.sleep(2000); System.out.println("FutureTask业务逻辑执行完毕!"); return "欢迎关注: 一枝花算不算浪漫!"; } }); System.out.println("====执行主线程任务===="); Thread.sleep(1000); boolean flag = true; while(flag){ if(futureTask.isDone() && !futureTask.isCancelled()){ System.out.println("FutureTask异步任务执行结果:" + futureTask.get()); flag = false; } } threadPool.shutdown(); } }

上面的使用很简单,submit()内部传递的实际上是个Callable接口,我们自己实现其中的call()方法,我们通过futureTask既可以获取到具体的返回值。

submit()实现原理

submit() 是也是提交任务到线程池,只是它可以获取任务返回结果,返回结果是通过FutureTask来实现的,先看下ThreadPoolExecutor中代码实现:

public class ThreadPoolExecutor extends AbstractExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } } public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } }

提交任务还是执行execute()方法,只是task被包装成了FutureTask ,也就是在excute()中启动线程后会执行FutureTask.run()方法。

再来具体看下它执行的完整链路图:

submit()全链路图.png

上图可以看到,执行任务并返回执行结果的核心逻辑实在FutureTask中,我们以FutureTask.run/get 两个方法为突破口,一点点剖析FutureTask的实现原理。

FutureTask源码初探

先看下FutureTask中部分属性:

FutureTask属性.png

public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }

state

当前task状态,共有7中类型。
NEW: 当前任务尚未执行
COMPLETING: 当前任务正在结束,尚未完全结束,一种临界状态
NORMAL:当前任务正常结束
EXCEPTIONAL: 当前任务执行过程中发生了异常。
CANCELLED: 当前任务被取消
INTERRUPTING: 当前任务中断中..
INTERRUPTED: 当前任务已中断

callble

用户提交任务传递的Callable,自定义call方法,实现业务逻辑

outcome

任务结束时,outcome保存执行结果或者异常信息。

runner

当前任务被线程执行期间,保存当前任务的线程对象引用

waiters

因为会有很多线程去get当前任务的结果,所以这里使用了一种stack数据结构来保存

FutureTask.run()实现原理

我们已经知道在线程池runWorker()中最终会调用到FutureTask.run()方法中,我们就来看下它的执行原理吧:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdpf.html