基于C++11实现线程池的工作原理. 不久前写过一篇线程池,那时候刚用C++写东西不久,很多C++标准库里面的东西没怎么用,今天基于C++11重新实现了一个线程池。 简介
线程池(thread pool):一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的组成 1、线程池管理器创建一定数量的线程,启动线程,调配任务,管理着线程池。
本篇线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask).
start()创建一定数量的线程池,进行线程循环.
stop()停止所有线程循环,回收所有资源.
addTask()添加任务.
线程池中线程,在线程池中等待并执行分配的任务.
本篇选用条件变量实现等待与通知机制.
添加任务的接口,以供工作线程调度任务的执行。
4、任务队列用于存放没有处理的任务。提供一种缓冲机制
同时任务队列具有调度功能,高优先级的任务放在任务队列前面;本篇选用priority_queue 与pair的结合用作任务优先队列的结构.
假设我们的线程池大小为3,任务队列目前不做大小限制.
1、主程序当前没有任务要执行,线程池中的任务队列为空闲状态.此情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空.
2、主程序添加小于等于线程池中线程数量的任务.此情况基于情形1,所有工作线程已处在等待状态,主线程开始添加三个任务,添加后通知(notif())唤醒线程池中的线程开始取(take())任务执行. 此时的任务缓冲队列还是空。
3、主程序添加任务数量大于当前线程池中线程数量的任务.此情况发生情形2后面,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中的线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行.
此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第N个任务,添加后发现池子中的线程用完了,任务缓冲队列也满了,于是进入等待状态、等待任务缓冲队列中的任务腾空通知。
但是要注意这种情形会阻塞主线程,本篇暂不限制任务队列大小,必要时再来优化.
等待通知机制通过条件变量实现,Logger和CurrentThread,用于调试,可以无视.
#ifndef _THREADPOOL_HH #define _THREADPOOL_HH #include <vector> #include <utility> #include <queue> #include <thread> #include <functional> #include <mutex> #include "Condition.hh" > ThreadPool{ public: static const int kInitThreadsSize = 3; enum taskPriorityE { level0, level1, level2, }; typedef std::function<void()> Task; typedef std::pair<taskPriorityE, Task> TaskPair; ThreadPool(); ~ThreadPool(); void start(); void stop(); void addTask(const Task&); void addTask(const TaskPair&); private: ThreadPool(const ThreadPool&);//禁止复制拷贝. const ThreadPool& operator=(const ThreadPool&); struct TaskPriorityCmp { bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) { return p1.first > p2.first; //first的小值优先 } }; void threadLoop(); Task take(); typedef std::vector<std::thread*> Threads; typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; Threads m_threads; Tasks m_tasks; std::mutex m_mutex; Condition m_cond; bool m_isStarted; }; #endif //Cpp #include <assert.h> #include "Logger.hh" // debug #include "CurrentThread.hh" // debug #include "ThreadPool.hh" ThreadPool::ThreadPool() :m_mutex(), m_cond(m_mutex), m_isStarted(false) { } ThreadPool::~ThreadPool() { if(m_isStarted) { stop(); } } void ThreadPool::start() { assert(m_threads.empty()); m_isStarted = true; m_threads.reserve(kInitThreadsSize); for (int i = 0; i < kInitThreadsSize; ++i) { m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this))); } } void ThreadPool::stop() { LOG_TRACE << "ThreadPool::stop() stop."; { std::unique_lock<std::mutex> lock(m_mutex); m_isStarted = false; m_cond.notifyAll(); LOG_TRACE << "ThreadPool::stop() notifyAll()."; } for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it) { (*it)->join(); delete *it; } m_threads.clear(); } void ThreadPool::threadLoop() { LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start."; while(m_isStarted) { Task task = take(); if(task) { task(); } } LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit."; } void ThreadPool::addTask(const Task& task) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ TaskPair taskPair(level2, task); m_tasks.push(taskPair); m_cond.notify(); } void ThreadPool::addTask(const TaskPair& taskPair) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ m_tasks.push(taskPair); m_cond.notify(); } ThreadPool::Task ThreadPool::take() { std::unique_lock<std::mutex> lock(m_mutex); //always use a while-loop, due to spurious wakeup while(m_tasks.empty() && m_isStarted) { LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait."; m_cond.wait(lock); } LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup."; Task task; Tasks::size_type size = m_tasks.size(); if(!m_tasks.empty() && m_isStarted) { task = m_tasks.top().second; m_tasks.pop(); assert(size - 1 == m_tasks.size()); /*if (TaskQueueSize_ > 0) { cond2.notify(); }*/ } return task; } 测试程序 start() 、stop()