含源码解析,深入Java 线程池原理 (3)

第二步:如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

第三步:如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务

第四步:如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

流程图如下:

含源码解析,深入Java 线程池原理

4、ThreadPoolExecutor解析

ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池,如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能,如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。

(1)Executor框架

在深入源码之前先来看看J.U.C包中的线程池类图:

含源码解析,深入Java 线程池原理

它们的最顶层是一个Executor接口,它只有一个方法:

public interface Executor { void execute(Runnable command); }

它提供了一个运行新任务的简单方法,Java线程池也称之为Executor框架。

ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等,以及扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。

(2)Worker解析

Worker类继承了AQS,并实现了Runnable接口,它有两个重要的成员变量:firstTask和thread。firstTask用于保存第一次新建的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了。

/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); (3)如何在线程池中添加任务

线程池要执行任务,那么必须先添加任务,execute()虽说是执行任务的意思,但里面也包含了添加任务的步骤在里面,下面源码:

public void execute(Runnable command) { // 如果添加订单任务为空,则空指针异常 if (command == null) throw new NullPointerException(); // 获取ctl值 int c = ctl.get(); // 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列执行 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务) else if (!addWorker(command, false)) // 如果创建线程失败,则调用线程拒绝策略 reject(command); }

addWorker添加任务,方法源码有点长,按照逻辑拆分成两部分讲解:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgfyxx.html