Java并发编程-扩展可回调的Future (3)

最后一步就是编写线程池ListenableThreadPoolExecutor,继承自ThreadPoolExecutor并且实现ListenableExecutorService接口:

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService { public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) { if (null == callable) { throw new IllegalArgumentException("callable"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); execute(listenableFutureTask); return listenableFutureTask; } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) { if (null == callable) { throw new IllegalArgumentException("callable"); } if (null == callbacks) { throw new IllegalArgumentException("callbacks"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); for (ListenableFutureCallback<T> callback : callbacks) { listenableFutureTask.addCallback(callback, executor); } execute(listenableFutureTask); return listenableFutureTask; } } 测试

引入junit,编写测试类如下:

public class ListenableFutureTest { private static ListenableExecutorService EXECUTOR; private static Executor E; @BeforeClass public static void before() { EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement())); return thread; } }); E = Executors.newFixedThreadPool(3); } @Test public void testListenableFuture1() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture2() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture3() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture3 end..."); Thread.sleep(2000); } @Test public void testListenableFuture4() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture4 end..."); Thread.sleep(2000); } }

执行结果:

// testListenableFuture1 Value = message,Throwable = null // testListenableFuture2 Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception // testListenableFuture3 testListenableFuture3 end... Value = message,Throwable = null // testListenableFuture4 testListenableFuture4 end... Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

和预期的结果一致,注意一下如果Callable执行抛出异常,异常被包装为ExecutionException,要调用Throwable#getCause()才能得到原始的异常实例。

小结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppxsx.html