worker执行方法:
//Woker类实现了Runnable接口 public void run() { runWorker(this); } //最终woker执行逻辑走到了这里 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //task就是Woker构造函数入参指定的任务,即用户提交的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //一般情况下,task都不会为空(特殊情况上面注释中也说明了),因此会直接进入循环体中 //这里getTask方法是要重点说明的,它的实现跟我们构造参数设置存活时间有关 //我们都知道构造参数设置的时间代表了线程池中的线程,即woker线程的存活时间,如果到期则回收woker线程,这个逻辑的实现就在getTask中。 //来不及执行的任务,线程池会放入一个阻塞队列,getTask方法就是去阻塞队列中取任务,用户设置的存活时间,就是 //从这个阻塞队列中取任务等待的最大时间,如果getTask返回null,意思就是woker等待了指定时间仍然没有 //取到任务,此时就会跳过循环体,进入woker线程的销毁逻辑。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //该方法是个空的实现,如果有需要用户可以自己继承该类进行实现 beforeExecute(wt, task); Throwable thrown = null; try { //真正的任务执行逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //该方法是个空的实现,如果有需要用户可以自己继承该类进行实现 afterExecute(task, thrown); } } finally { //这里设为null,也就是循环体再执行的时候会调用getTask方法 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //当指定任务执行完成,阻塞队列中也取不到可执行任务时,会进入这里,做一些善后工作,比如在corePoolSize跟maximumPoolSize之间的woker会进行回收 processWorkerExit(w, completedAbruptly); } }woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成以后会尝试从阻塞队列中获取可执行的任务,如果指定时间内仍然没有任务可以执行,则进入销毁逻辑。
注:这里只会回收corePoolSize与maximumPoolSize直接的那部分woker
理解了整个线程池的运行原理以后,再来看下JDK默认提供的线程池类型就会一目了然了:
public static ExecutorService newFixedThreadPool(int nThreads) { //corePoolSize跟maximumPoolSize值一样,同时传入一个无界阻塞队列 //根据上面分析的woker回收逻��,该线程池的线程会维持在指定线程数,不会进行回收 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { //线程池中只有一个线程进行任务执行,其他的都放入阻塞队列 //外面包装的FinalizableDelegatedExecutorService类实现了finalize方法,在JVM垃圾回收的时候会关闭线程池 return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { //这个线程池corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,意思也就是说来一个任务就创建一个woker,回收时间是60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }最后再说说初始化线程池时线程数的选择:
如果任务是IO密集型,一般线程数需要设置2倍CPU数以上,以此来尽量利用CPU资源。
如果任务是CPU密集型,一般线程数量只需要设置CPU数加1即可,更多的线程数也只能增加上下文切换,不能增加CPU利用率。
上述只是一个基本思想,如果真的需要精确的控制,还是需要上线以后观察线程池中线程数量跟队列的情况来定。
线程池的分析就到这里,如果有什么意见或建议,欢迎给我留言。