我们可以调用 this_thread::yield() 将当前调用者线程切换到重新等待调度,但是不能对非调用者线程进行调度切换,也不能让非调用者线程休眠(这是操作系统调度器干的活)。
清单 10.例子 thread_yield.ccvoid threadYield(void){ unsigned int procs = thread::hardware_concurrency(), // 获取物理线程数目 i = 0; thread* ta = new thread( [](){ struct timeval t1, t2; gettimeofday(&t1, NULL); for(int i = 0, m = 13; i < COUNT; i++, m *= 17){ this_thread::yield(); } gettimeofday(&t2, NULL); print_time(t1, t2, " with yield"); } ); thread** tb = new thread*[ procs ]; for( i = 0; i < procs; i++){ tb[i] = new thread( [](){ struct timeval t1, t2; gettimeofday(&t1, NULL); for(int i = 0, m = 13; i < COUNT; i++, m *= 17){ do_nothing(); } gettimeofday(&t2, NULL); print_time(t1, t2, "without yield"); }); } ta->join(); delete ta; for( i = 0; i < procs; i++){ tb[i]->join(); delete tb[i]; }; delete tb; }
ta 线程因为需要经常切换去重新等待调度,它运行的时间要比 tb 要多,比如在作者的机器上运行得到如下结果:
$time ./a.out without yield elapse 0.050199s without yield elapse 0.051042s without yield elapse 0.05139s without yield elapse 0.048782s with yield elapse 1.63366s real 0m1.643s user 0m1.175s sys 0m0.611s
ta 线程即使扣除系统调用运行时间 0.611s 之后,它的运行时间也远大于没有进行切换的线程。
C++11 没有提供调整线程的调度策略或者优先级的能力,如果需要,只能通过调用相关的 pthread 函数来进行,需要的时候,可以通过调用 thread 类实例的 native_handle() 方法或者操作系统 API pthread_self() 来获得 pthread 线程 id,作为 pthread 函数的参数。
线程间的数据交互和数据争用 (Data Racing)同一个进程内的多个线程之间多是免不了要有数据互相来往的,队列和共享数据是实现多个线程之间的数据交互的常用方式,封装好的队列使用起来相对来说不容易出错一些,而共享数据则是最基本的也是较容易出错的,因为它会产生数据争用的情况,即有超过一个线程试图同时抢占某个资源,比如对某块内存进行读写等,如下例所示:
清单 11.例子 thread_data_race.ccstatic void inc(int *p ){ for(int i = 0; i < COUNT; i++){ (*p)++; } } void threadDataRacing(void){ int a = 0; thread ta( inc, &a); thread tb( inc, &a); ta.join(); tb.join(); cout << "a=" << a << endl; }
这是简化了的极端情况,我们可以一眼看出来这是两个线程在同时对&a 这个内存地址进行写操作,但是在实际工作中,在代码的海洋中发现它并不一定容易。从表面看,两个线程执行完之后,最后的 a 值应该是 COUNT * 2,但是实际上并非如此,因为简单如 (*p)++这样的操作并不是一个原子动作,要解决这个问题,对于简单的基本类型数据如字符、整型、指针等,C++提供了原子模版类 atomic,而对于复杂的对象,则提供了最常用的锁机制,比如互斥类 mutex,门锁 lock_guard,唯一锁 unique_lock,条件变量 condition_variable 等。
现在我们使用原子模版类 atomic 改造上述例子得到预期结果:
清单 12.例子 thread_atomic.ccstatic void inc(atomic<int> *p ){ for(int i = 0; i < COUNT; i++){ (*p)++; } } void threadDataRacing(void){ atomic<int> a(0) ; thread ta( inc, &a); thread tb( inc, &a); ta.join(); tb.join(); cout << "a=" << a << endl; }
我们也可以使用 lock_guard,lock_guard 是一个范围锁,本质是 RAII(Resource Acquire Is Initialization),在构建的时候自动加锁,在析构的时候自动解锁,这保证了每一次加锁都会得到解锁。即使是调用函数发生了异常,在清理栈帧的时候也会调用它的析构函数得到解锁,从而保证每次加锁都会解锁,但是我们不能手工调用加锁方法或者解锁方法来进行更加精细的资源占用管理,使用 lock_guard 示例如下:
清单 13.例子 thread_lock_guard.ccstatic mutex g_mutex; static void inc(int *p ){ for(int i = 0; i < COUNT; i++){ lock_guard<mutex> _(g_mutex); (*p)++; } } void threadLockGuard(void){ int a = 0; thread ta( inc, &a); thread tb( inc, &a); ta.join(); tb.join(); cout << "a=" << a << endl; }
如果要支持手工加锁,可以考虑使用 unique_lock 或者直接使用 mutex。unique_lock 也支持 RAII,它也可以一次性将多个锁加锁;如果使用 mutex 则直接调用 mutex 类的 lock, unlock, trylock 等方法进行更加精细的锁管理:
清单 14.例子 thread_mutex.ccstatic mutex g_mutex; static void inc(int *p ){ thread_local int i; // TLS 变量 for(; i < COUNT; i++){ g_mutex.lock(); (*p)++; g_mutex.unlock(); } } void threadMutex(void){ int a = 0; thread ta( inc, &a); thread tb( inc, &a); ta.join(); tb.join(); cout << "a=" << a << endl; }
在上例中,我们还使用了线程本地存储 (TLS) 变量,我们只需要在变量前面声明它是 thread_local 即可。TLS 变量在线程栈内分配,线程栈只有在线程创建之后才生效,在线程退出的时候销毁,需要注意不同系统的线程栈的大小是不同的,如果 TLS 变量占用空间比较大,需要注意这个问题。TLS 变量一般不能跨线程,其初始化在调用线程第一次使用这个变量时进行,默认初始化为 0。
对于线程间的事件通知,C++11 提供了条件变量类 condition_variable,可视为 pthread_cond_t 的封装,使用条件变量可以让一个线程等待其它线程的通知 (wait,wait_for,wait_until),也可以给其它线程发送通知 (notify_one,notify_all),条件变量必须和锁配合使用,在等待时因为有解锁和重新加锁,所以,在等待时必须使用可以手工解锁和加锁的锁,比如 unique_lock,而不能使用 lock_guard,示例如下:
清单 15.例子 thread_cond_var.cc#include <thread> #include <iostream> #include <condition_variable> using namespace std; mutex m; condition_variable cv; void threadCondVar(void){ # define THREAD_COUNT 10 thread** t = new thread*[THREAD_COUNT]; int i; for(i = 0; i < THREAD_COUNT; i++){ t[i] = new thread( [](int index){ unique_lock<mutex> lck(m); cv.wait_for(lck, chrono::hours(1000)); cout << index << endl; }, i ); this_thread::sleep_for( chrono::milliseconds(50)); } for(i = 0; i < THREAD_COUNT; i++){ lock_guard<mutex> _(m); cv.notify_one(); } for(i = 0; i < THREAD_COUNT; i++){ t[i]->join(); delete t[i]; } delete t; }
从上例的运行结果也可以看到,条件变量是不保证次序的,即首先调用 wait 的不一定首先被唤醒。