简单的线程池

此线程池拥有一个被所有工作线程共享的任务队列。线程池用户提交的任务,被线程池保存在任务队列中,工作线程从任务队列中获取任务并执行。

gist

任务是可拥有返回值的、无参数的可调用(callable)对象,或者是经 std::bind 绑定了可调用对象及其参数后的调用包装器。具体而言可以是

自由函数(也称为全局函数)

lambda

函数对象(也称为函数符)

类成员函数

包装了上述类型的 std::function

bind 调用包装器

该线程池异步地执行任务。当任务被提交进线程池后,用户不必等待任务执行和返回结果。

实现

以下代码给出了此线程池的实现。

class Thread_Pool { private: struct Task_Wrapper { ... }; atomic<bool> _done_; // #2 Lockwise_Queue<Task_Wrapper> _queue_; // #3 unsigned _workersize_; thread* _workers_; // #4 void work() { while (!_done_.load(memory_order_acquire)) { Task_Wrapper task; if (_queue_.pop(task)) task(); else std::this_thread::yield(); } } public: Thread_Pool() : _done_(false) { // #1 try { _workersize_ = thread::hardware_concurrency(); // #5 _workers_ = new thread[_workersize_]; for (unsigned i = 0; i < _workersize_; ++i) { _workers_[i] = thread(&Thread_Pool::work, this); // #6 } } catch (...) { // #7 _done_.store(true, memory_order_release); for (unsigned i = 0; i < _workersize_; ++i) { if (_workers_[i].joinable()) _workers_[i].join(); } delete[] _workers_; throw; } } ~Thread_Pool() { _done_.store(true, memory_order_release); for (unsigned i = 0; i < _workersize_; ++i) { if (_workers_[i].joinable()) _workers_[i].join(); } delete[] _workers_; } template<class Callable> future<typename std::result_of<Callable()>::type> submit(Callable c) { // #8 typedef typename std::result_of<Callable()>::type R; packaged_task<R()> task(c); future<R> r = task.get_future(); _queue_.push(std::move(task)); // #9 return r; // #10 } };

我们从构造 Thread_Pool 对象(#1)开始了解这个线程池。atomic<bool> 数据成员用于标志线程池是否结束,并强制同步内存顺序(#2);Task_Wrapper 具体化了线程安全的任务队列 Lockwise_Queue<>(#3);thread* 用于引用所有的工作线程对象(#4)。Task_Wrapper 和 Lockwise_Queue<> 稍后再做说明。

线程池通过 thread::hardware_concurrency() 获取当前硬件支持的并发线程数量(#5),并依据此数量创建出工作线程。Thread_Pool 对象的成员函数 work() 作为所有工作线程的初始函数(#6),这使得线程池中的任务队列能被所有工作线程共享。创建 thread 对象和 new 操作可能失败并引发异常,因此用 try-catch 捕获潜在的异常。处理异常过程中,需要标志线程池结束,保证任何创建的线程都能正常的停止,并回收内存资源(#7)。线程池对象析构时的工作与此一致。

Thread_Pool 对象构建完成后,任务通过 Thread_Pool::submit<>() 被提交进入线程池(#8)。为了支持任务的异步执行,任务先被封装在 std::packaged_task<> 中,再被放入线程安全的任务队列(#9)。任务执行结果被封装在返回的 std::future<> 对象中(#10),允许用户在未来需要结果时,等待任务结束并获取结果。


因为每一个任务都是一个特定类型的 std::packaged_task<> 对象,为了实现任务队列的泛型化,需要设计一个通用的数据结构 Task_Wrapper,用于封装特定类型的 std::packaged_task<> 对象。

struct Task_Wrapper { struct Task_Base { virtual ~Task_Base() {} virtual void call() = 0; }; template<class T> struct Task : Task_Base { // #5 T _t_; Task(T&& t) : _t_(std::move(t)) {} // #6 void call() { _t_(); } // #9 }; Task_Base* _ptr_; // #7 Task_Wrapper() : _ptr_(nullptr) {}; template<class T> Task_Wrapper(T&& t) : _ptr_(new Task<T>(std::move(t))) {} // #1 // support move Task_Wrapper(Task_Wrapper&& other) { // #2 _ptr_ = other._ptr_; other._ptr_ = nullptr; } Task_Wrapper& operator=(Task_Wrapper&& other) { // #3 _ptr_ = other._ptr_; other._ptr_ = nullptr; return *this; } // no copy Task_Wrapper(Task_Wrapper&) = delete; Task_Wrapper& operator=(Task_Wrapper&) = delete; ~Task_Wrapper() { if (_ptr_) delete _ptr_; } void operator()() const { // #4 _ptr_->call(); // #8 } };

std::packaged_task<> 的实例只是可移动的,而不可复制。Task_Wrapper 必须能移动封装 std::packaged_task<R()> 对象(#1)。为了保持一致性,Task_Wrapper 也实现了移动构造(#2)和移动赋值(#3),同时实现了 operator()(#4)。ABC 的继承结构(#5)用于支持泛型化地封装和调用 std::packaged_task<> 对象。std::packaged_task<> 封装在派生类 Task<> 中(#6),由指向非泛型的抽象基类 Task_Base 的指针引用派生类对象(#7)。对 Task_Wrapper 对象的调用由虚调用(#8)委托给派生类并执行实际的任务(#9)。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzszpy.html