public class MyExtendThreadPoolExecutor extends ThreadPoolExecutor{
public static Logger logger=LoggerFactory.getLogger(MyExtendThreadPoolExecutor.class);
/**
* 记录运行中任务
*/
private LinkedBlockingQueue<Runnable> workBlockingQueue=new LinkedBlockingQueue<Runnable>();
public MyExtendThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
workBlockingQueue.add((GtdataBreakpointResumeDownloadThread)r);//保存在运行的任务
logger.info("Before the task execution");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
workBlockingQueue.remove((GtdataBreakpointResumeDownloadThread)r);//移除关闭的任务
logger.info("After the task execution");
}
/**
*
* Description: 正在运行的任务
* @return LinkedBlockingQueue<Runnable><br>
* @author lishun
*/
public LinkedBlockingQueue<Runnable> getWorkBlockingQueue() {
return workBlockingQueue;
}
}
MyExtendThreadPoolExecutor pool = new MyExtendThreadPoolExecutor(3, 3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>()); //创建线程池
public void addToThreadPool(DownloadRecord downloadRecord){
BlockingQueue<Runnable> waitThreadQueue = pool.getQueue();//Returns the task queue
LinkedBlockingQueue<Runnable> workThreadQueue =pool.getWorkBlockingQueue();//Returns the running work
GtdataBreakpointResumeDownloadThread downloadThread =
new GtdataBreakpointResumeDownloadThread(downloadRecord);//需要执行的任务线程
if (!waitThreadQueue.contains(downloadThread)&&!workThreadQueue.contains(downloadThread)) {//判断任务是否存在正在运行的线程或存在阻塞队列,不存在的就加入线程池(这里的比较要重写equals())
Timestamp recordtime = new Timestamp(System.currentTimeMillis());
logger.info("a_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" add to workThreadQueue");
downloadThread.setName("th_"+downloadRecord.getName());
pool.execute(downloadThread);//添加到线程池
}else{
logger.info("i_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" in waitThreadQueue or workThreadQueue");
}
}