Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析 (4)

从Worker类的构造函数可以看出,当实例化一个Worker对象时,Worker对象会把传进来的Runnable参数firstTask赋值给自己的同名属性,并且用线程工厂也就是当前的ThreadFactory来新建一个线程。

同时,因为Worker实现了Runnable接口,所以当Worker类中的线程启动时,调用的其实是run()方法。run方法中调用的是runWorker方法,我们来看下它的具体实现:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; //允许中断 w.unlock(); // allow interrupts //是否因为异常退出循环的标志,processWorkerExit方法会对该参数做判断 boolean completedAbruptly = true; try { //判断task是否为null,是的话通过getTask()从队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /* 这里的判断主要逻辑是这样: * 如果线程池正在停止,那么就确保当前线程是中断状态; * 如果不是的话,就要保证不是中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //用于记录任务执行前需要做哪些事,属于ThreadPoolExecutor类中的方法, //是空的,需要子类具体实现 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

总结一下runWorker方法的运行逻辑:

1、通过while循环不断地通过getTask()方法从队列中获取任务;

2、如果线程池正在停止状态,确保当前的线程是中断状态,否则确保当前线程不中断;

3、调用task的run()方法执行任务,执行完毕后需要置为null;

4、循环调用getTask()取不到任务了,跳出循环,执行processWorkerExit()方法。

过完runWorker()的运行流程,我们来看下getTask()是怎么实现的。

getTask方法

getTask()方法的作用是从队列中获取任务,下面是该方法的源码:

private Runnable getTask() { //记录上次从队列获取任务是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //将workerCount减1 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* timed变量用于判断线程的操作是否需要进行超时判断 * allowCoreThreadTimeOut不管它,默认是false * wc > corePoolSize,当前线程是如果大于核心线程数corePoolSize */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* 根据timed变量判断,如果为true,调用workQueue的poll方法获取任务, * 如果在keepAliveTime时间内没有获取到任务,则返回null; * timed为false的话,就调用workQueue的take方法阻塞队列, * 直到队列中有任务可取。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //r为null,说明time为true,超时了,把timedOut也设置为true timedOut = true; } catch (InterruptedException retry) { //发生异常,把timedOut也设置为false,重新跑循环 timedOut = false; } } }

getTask的代码看上去比较简单,但其实内有乾坤,我们来重点分析一下两个if判断的逻辑:

1、当进入getTask方法后,先判断当前线程池状态,如果线程池状态rs >= SHUTDOWN,再进行以下判断:

1)rs 的状态是否大于STOP;2)队列是否为空;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxdgz.html