另一个关键的数据结构是线程安全的任务队列 Lockwise_Queue<>。
所有 Task_Wrapper 对象保存在 std::queue<> 中(#1)。互斥元和条件变量控制工作线程对任务队列的并发访问(#2)。为了提高并发程度,采用非阻塞自旋锁作为互斥元(#3)。任务的入队和出队操作,分别由支持移动语义的 push 函数(#4) 和 pop 函数(#5)完成。
验证为了验证此线程池满足概要中描述的能力,设计了如下的各类可调用对象。
void shoot() { std::printf("\n\t[Free Function] Let an arrow fly...\n"); } bool shoot(long n) { std::printf("\n\t[Free Function] Let %ld arrows fly...\n", n); return false; } auto shootAnarrow = [] { std::printf("\n\t[Lambda] Let an arrow fly...\n"); }; auto shootNarrows = [](long n) -> bool { std::printf("\n\t[Lambda] Let %ld arrows fly...\n", n); return true; }; class Archer { public: void operator()() { std::printf("\n\t[Functor] Let an arrow fly...\n"); } bool operator()(long n) { std::printf("\n\t[Functor] Let %ld arrows fly...\n", n); return false; } void shoot() { std::printf("\n\t[Member Function] Let an arrow fly...\n"); } bool shoot(long n) { std::printf("\n\t[Member Function] Let %ld arrows fly...\n", n); return true; } };对这些函数做好必要的参数封装,将其提交给线程池,
atomic<bool> go(false); time_point<steady_clock> start = steady_clock::now(); minutes PERIOD(1); Thread_Pool pool; thread t1([&go, &pool, &PERIOD, start] { // test free function of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); void (*task)() = shoot; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(task); //pool.submit(std::bind<void(*)()>(shoot)); std::this_thread::yield(); } }); thread t2([&go, &pool, &PERIOD, start] { // test free function of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); bool (*task)(long) = shoot; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(task, x)); //future<bool> r = pool.submit(std::bind<bool(*)(long)>(shoot, x)); std::this_thread::yield(); } }); thread t3([&go, &pool, &PERIOD, start] { // test lambda of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(shootAnarrow); std::this_thread::yield(); } }); thread t4([&go, &pool, &PERIOD, start] { // test lambda of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(shootNarrows, x)); std::this_thread::yield(); } }); thread t5([&go, &pool, &PERIOD, start] { // test functor of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(hoyt); std::this_thread::yield(); } }); thread t6([&go, &pool, &PERIOD, start] { // test functor of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(hoyt, x)); std::this_thread::yield(); } }); thread t7([&go, &pool, &PERIOD, start] { // test member function of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(std::bind<void(Archer::*)()>(&Archer::shoot, &hoyt)); //pool.submit(std::bind(static_cast<void(Archer::*)()>(&Archer::shoot), &hoyt)); std::this_thread::yield(); } }); thread t8([&go, &pool, &PERIOD, start] { // test member function of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); Archer hoyt; for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind<bool(Archer::*)(long)>(&Archer::shoot, &hoyt, x)); //future<bool> r = pool.submit(std::bind(static_cast<bool(Archer::*)(long)>(&Archer::shoot), &hoyt, x)); std::this_thread::yield(); } }); thread t9([&go, &pool, &PERIOD, start] { // test std::function<> of void() while (!go.load(memory_order_acquire)) std::this_thread::yield(); std::function<void()> task = static_cast<void(*)()>(shoot); for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) { pool.submit(task); std::this_thread::yield(); } }); thread t10([&go, &pool, &PERIOD, start] { // test std::function<> of bool(long) while (!go.load(memory_order_acquire)) std::this_thread::yield(); std::function<bool(long)> task = static_cast<bool(*)(long)>(shoot); for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) { future<bool> r = pool.submit(std::bind(task, x)); std::this_thread::yield(); } });