简单的线程池 (2)


另一个关键的数据结构是线程安全的任务队列 Lockwise_Queue<>。

template<class T> class Lockwise_Queue { private: struct Spinlock_Mutex { // #3 atomic_flag _af_; Spinlock_Mutex() : _af_(false) {} void lock() { while (_af_.test_and_set(memory_order_acquire)); } void unlock() { _af_.clear(memory_order_release); } } mutable _m_; // #2 condition_variable _cv_; queue<T> _q_; // #1 public: Lockwise_Queue() {} void push(const T& element) { lock_guard<Spinlock_Mutex> lk(_m_); _q_.push(std::move(element)); _cv_.notify_one(); } void push(T&& element) { // #4 lock_guard<Spinlock_Mutex> lk(_m_); _q_.push(std::move(element)); _cv_.notify_one(); } bool pop(T& element) { // #5 lock_guard<Spinlock_Mutex> lk(_m_); if (_q_.empty()) return false; element = std::move(_q_.front()); _q_.pop(); return true; } bool empty() const { lock_guard<Spinlock_Mutex> lk(_m_); return _q_.empty(); } };

所有 Task_Wrapper 对象保存在 std::queue<> 中(#1)。互斥元和条件变量控制工作线程对任务队列的并发访问(#2)。为了提高并发程度,采用非阻塞自旋锁作为互斥元(#3)。任务的入队和出队操作,分别由支持移动语义的 push 函数(#4) 和 pop 函数(#5)完成。

验证

为了验证此线程池满足概要中描述的能力,设计了如下的各类可调用对象。

void shoot() { std::printf("\n\t[Free Function] Let an arrow fly...\n"); } bool shoot(long n) { std::printf("\n\t[Free Function] Let %ld arrows fly...\n", n); return false; } auto shootAnarrow = [] { std::printf("\n\t[Lambda] Let an arrow fly...\n"); }; auto shootNarrows = [](long n) -> bool { std::printf("\n\t[Lambda] Let %ld arrows fly...\n", n); return true; }; class Archer { public: void operator()() { std::printf("\n\t[Functor] Let an arrow fly...\n"); } bool operator()(long n) { std::printf("\n\t[Functor] Let %ld arrows fly...\n", n); return false; } void shoot() { std::printf("\n\t[Member Function] Let an arrow fly...\n"); } bool shoot(long n) { std::printf("\n\t[Member Function] Let %ld arrows fly...\n", n); return true; } };

对这些函数做好必要的参数封装,将其提交给线程池,

atomic<bool> go(false); time_point<steady_clock> start = steady_clock::now(); minutes PERIOD(1); Thread_Pool pool; thread t1([&go, &pool, &PERIOD, start] { // test free function of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); void (*task)() = shoot; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(task); //pool.submit(std::bind<void(*)()>(shoot)); std::this_thread::yield(); } }); thread t2([&go, &pool, &PERIOD, start] { // test free function of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); bool (*task)(long) = shoot; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(task, x)); //future<bool> r = pool.submit(std::bind<bool(*)(long)>(shoot, x)); std::this_thread::yield(); } }); thread t3([&go, &pool, &PERIOD, start] { // test lambda of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(shootAnarrow); std::this_thread::yield(); } }); thread t4([&go, &pool, &PERIOD, start] { // test lambda of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(shootNarrows, x)); std::this_thread::yield(); } }); thread t5([&go, &pool, &PERIOD, start] { // test functor of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(hoyt); std::this_thread::yield(); } }); thread t6([&go, &pool, &PERIOD, start] { // test functor of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(hoyt, x)); std::this_thread::yield(); } }); thread t7([&go, &pool, &PERIOD, start] { // test member function of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(std::bind<void(Archer::*)()>(&Archer::shoot, &hoyt)); //pool.submit(std::bind(static_cast<void(Archer::*)()>(&Archer::shoot), &hoyt)); std::this_thread::yield(); } }); thread t8([&go, &pool, &PERIOD, start] { // test member function of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind<bool(Archer::*)(long)>(&Archer::shoot, &hoyt, x)); //future<bool> r = pool.submit(std::bind(static_cast<bool(Archer::*)(long)>(&Archer::shoot), &hoyt, x)); std::this_thread::yield(); } }); thread t9([&go, &pool, &PERIOD, start] { // test std::function<> of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); std::function<void()> task = static_cast<void(*)()>(shoot); for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(task); std::this_thread::yield(); } }); thread t10([&go, &pool, &PERIOD, start] { // test std::function<> of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); std::function<bool(long)> task = static_cast<bool(*)(long)>(shoot); for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(task, x)); std::this_thread::yield(); } });

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzszpy.html