C++11 提供了若干多线程编程的高级概念:promise/future, packaged_task, async,来简化多线程编程,尤其是线程之间的数据交互比较简单的情况下,让我们可以将注意力更多地放在业务处理上。
promise/future 可以用来在线程之间进行简单的数据交互,而不需要考虑锁的问题,线程 A 将数据保存在一个 promise 变量中,另外一个线程 B 可以通过这个 promise 变量的 get_future() 获取其值,当线程 A 尚未在 promise 变量中赋值时,线程 B 也可以等待这个 promise 变量的赋值:
清单 16.例子 thread_promise_future.ccpromise<string> val; static void threadPromiseFuture(){ thread ta([](){ future<string> fu = val.get_future(); cout << "waiting promise->future" << endl; cout << fu.get() << endl; }); thread tb([](){ this_thread::sleep_for( chrono::milliseconds(100) ); val.set_value("promise is set"); }); ta.join(); tb.join(); }
一个 future 变量只能调用一次 get(),如果需要多次调用 get(),可以使用 shared_future,通过 promise/future 还可以在线程之间传递异常。
如果将一个 callable 对象和一个 promise 组合,那就是 packaged_task,它可以进一步简化操作:
清单 17.例子 thread_packaged_task.ccstatic mutex g_mutex; static void threadPackagedTask(){ auto run = [=](int index){ { lock_guard<mutex> _(g_mutex); cout << "tasklet " << index << endl; } this_thread::sleep_for( chrono::seconds(10) ); return index * 1000; }; packaged_task<int(int)> pt1(run); packaged_task<int(int)> pt2(run); thread t1([&](){pt1(2);} ); thread t2([&](){pt2(3);} ); int f1 = pt1.get_future().get(); int f2 = pt2.get_future().get(); cout << "task result=" << f1 << endl; cout << "task result=" << f2 << endl; t1.join(); t2.join(); }
我们还可以试图将一个 packaged_task 和一个线程组合,那就是 async() 函数。使用 async() 函数启动执行代码,返回一个 future 对象来保存代码返回值,不需要我们显式地创建和销毁线程等,而是由 C++11 库的实现决定何时创建和销毁线程,以及创建几个线程等,示例如下:
清单 18.例子 thread_async.ccstatic long do_sum(vector<long> *arr, size_t start, size_t count){ static mutex _m; long sum = 0; for(size_t i = 0; i < count; i++){ sum += (*arr)[start + i]; } { lock_guard<mutex> _(_m); cout << "thread " << this_thread::get_id() << ", count=" << count << ", sum=" << sum << endl; } return sum; } static void threadAsync(){ # define COUNT 1000000 vector<long> data(COUNT); for(size_t i = 0; i < COUNT; i++){ data[i] = random() & 0xff; } // vector< future<long> > result; size_t ptc = thread::hardware_concurrency() * 2; for(size_t batch = 0; batch < ptc; batch++){ size_t batch_each = COUNT / ptc; if (batch == ptc - 1){ batch_each = COUNT - (COUNT / ptc * batch); } result.push_back(async(do_sum, &data, batch * batch_each, batch_each)); } long total = 0; for(size_t batch = 0; batch < ptc; batch++){ total += result[batch].get(); } cout << "total=" << total << endl; }
如果是在多核或者多 CPU 的环境上面运行上述例子,仔细观察输出结果,可能会发现有些线程 ID 是重复的,这说明重复使用了线程,也就是说,通过使用 async() 还可达到一些线程池的功能。