线程池代码如下:
public class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }写一段测试代码试试:
@Test public void test() throws InterruptedException { PausableThreadPoolExecutor threadPool = new PausableThreadPoolExecutor(1,1,0,TimeUnit.HOURS,new LinkedBlockingQueue<>()); IntStream.rangeClosed(1, 5).forEach(i->threadPool.submit(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.info("I'm done : {}", i); })); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.schedule(threadPool::pause, 2, TimeUnit.SECONDS); scheduler.schedule(threadPool::resume, 4, TimeUnit.SECONDS); threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); }在测试代码中,我们的线程池只有一个线程,我们提交五个任务进去,每一个任务需要执行1秒,我们使用另一个定时任务线程池来定时开关这个可暂停的线程池。运行结果如下:
可以看到,这个线程池在2秒后暂停了4秒后恢复了。 定时任务线程池