多线程使用注意 (2)

而这个默认的静态全局的异常捕获方法是直接输出异常堆栈。
当然,我们可以覆盖此默认实现,只需要实现java.lang.Thread.UncaughtExceptionHandler接口即可

public interface UncaughtExceptionHandler { void uncaughtException(Thread t, Throwable e); } submit异常吞并

我们平时都是通过submit来提交一个Callable,那如果提交的是Runnable呢,为方便起见我们核心的代码都放在一起了

public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } //最终FutureTask中的callable指向的是一个RunnableAdapter,而RunnableAdapter的call方法也是调用了我们传进来的task的run方法,返回的是null static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }

那从这里我们就知道,我们通过submit传递进去的Runnale,最后在FutureTask的run方法里面调用的callable.call()实质上还是我们传递进去的runnable的run方法,在源码FutureTask的run方法的时候发现,FutureTask中执行任务如果出现异常,是不会抛出来的,必须通过get方法才可以获取到,当然也可以重写afterExecute()这个回调方法,在这个里面来调用get获取异常信息,
还是要重点强调下,我们在通过submit执行任务的时候,一定要调用get()方法

这里我们重写afterExecute()方法,来获取submit(Runnable task)的执行异常:

protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); //执行的Callable,对应的t一定是Null if (t == null && r instanceof Future) { try { Future future = (Future) r; if (future.isDone()){ // 判断任务是否执行完成 future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } CountDownLatch 丢失事件

我们在处理一批任务的时候,往往会把任务进行partition,然后再交给每个线程去处理,那主线程需要等待所有的线程处理完,来统计本次处理的时间,以及其他统计的数据,差不多和下面这段代码类似:

public void execute3(){ List<Integer> data = new ArrayList<Integer>(100); for (int i = 0; i < 100; i++) { data.add(i + 10); } List<List<Integer>> partition = Lists.partition(data, 20); final CountDownLatch countDownLatch = new CountDownLatch(partition.size()); for (final List<Integer> dataToHandle : partition) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try{ for (Integer i : dataToHandle) { doSomeThing(i); } }catch (Exception e){ logger.error(e.getMessage(), e); }finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } logger.info("任务执行结束..."); }

之前这么写代码没有出现过问题,直到最近出现问题才发现这么写会导致主线程无法结束的问题。我们看下,虽然在每个任务的finally中进行处理

countDownLatch.countDown();但是有一点忽视了,我们在异常那块其实有提到过,如果线程池满了,抛出RejectExecuteException的话,那这次任务的countDownLatch就会被忽视,当然我们这是在主线程里执行,直接会抛出异常导致主线程结束,但是如果和上面提到的在单独的子线程里面去执行这个线程池,那这样的话由于主线程无法捕获到子线程的异常,就会出现主线程无法结束的问题,所以我们在子线程中执行线程池一定要避免这点 即如果在子线程中执行,需要改为下面这样:

public void execute3(){ List<Integer> data = new ArrayList<Integer>(100); for (int i = 0; i < 100; i++) { data.add(i + 10); } final List<List<Integer>> partition = Lists.partition(data, 20); final CountDownLatch countDownLatch = new CountDownLatch(partition.size()); new Thread(new Runnable() { @Override public void run() { for (final List<Integer> dataToHandle : partition) { try { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try{ for (Integer i : dataToHandle) { doSomeThing(i); } }catch (Exception e){ logger.error(e.getMessage(), e); }finally { countDownLatch.countDown(); } } }); } catch (RejectedExecutionException e) { logger.error(e.getMessage(), e); //处理完异常之后需要补充countDownLatch事件 countDownLatch.countDown(); } } } }).start(); try { countDownLatch.await(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } logger.info("任务执行结束..."); }

来源:https://www.cnblogs.com/guozp/p/10344446.html

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxwjx.html