从上面代码中,我们可以很清楚的知道Add()方法是如何运行的,其中的关键就是GetThreadList()方法,通过该方法可以获取当前线程的数据存储列表对象,假如不存在数据存储列表,它会自动创建或者通过GetUnownedList()方法来寻找那些被停止但是还存储有数据列表的线程,然后将数据列表返回给当前线程中,防止了内存泄漏。
在数据添加的过程中,实现了细颗粒度的lock同步锁,所以性能会很高。删除和其它操作与新增类似,本文不再赘述。
4. ConcurrentBag 如何实现迭代器模式看完上面的代码后,我很好奇ConcurrentBag<T>是如何实现IEnumerator来实现迭代访问的,因为ConcurrentBag<T>是通过分散在不同线程中的ThreadLocalList来存储数据的,那么在实现迭代器模式时,过程会比较复杂。
后面再查看了源码之后,发现ConcurrentBag<T>为了实现迭代器模式,将分在不同线程中的数据全都存到一个List<T>集合中,然后返回了该副本的迭代器。所以每次访问迭代器,它都会新建一个List<T>的副本,这样虽然浪费了一定的存储空间,但是逻辑上更加简单了。
/// <summary> /// 本地帮助器方法释放所有本地列表锁 /// </summary> private void ReleaseAllLocks() { // 该方法用于在执行线程同步以后 释放掉所有本地锁 // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } /// <summary> /// 从冻结状态解冻包的本地帮助器方法 /// </summary> /// <param>The lock taken result from the Freeze method</param> private void UnfreezeBag(bool lockTaken) { // 首先释放掉 每个线程中 本地变量的锁 // 然后释放全局锁 ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(GlobalListsLock); } } /// <summary> /// 本地帮助器函数等待所有未同步的操作 /// </summary> private void WaitAllOperations() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); ThreadLocalList currentList = m_headList; // 自旋等待 等待其它操作完成 while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举 while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } /// <summary> /// 本地帮助器方法获取所有本地列表锁 /// </summary> private void AcquireAllLocks() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); bool lockTaken = false; ThreadLocalList currentList = m_headList; // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁 while (currentList != null) { // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口 try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } /// <summary> /// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// </summary> /// <param>Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param> private void FreezeBag(ref bool lockTaken) { Contract.Assert(!Monitor.IsEntered(GlobalListsLock)); // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync Monitor.Enter(GlobalListsLock, ref lockTaken); // 这将强制同步任何将来的添加/执行操作 m_needSync = true; // 获取所有列表的锁 AcquireAllLocks(); // 等待所有操作完成 WaitAllOperations(); } /// <summary> /// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。 /// 这不是线程安全, 应该被称为冻结/解冻袋块 /// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 /// </summary> /// <returns>List the contains the bag items</returns> private List<T> ToList() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 创建一个新的List List<T> list = new List<T>(); ThreadLocalList currentList = m_headList; // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中 while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } /// <summary> /// Returns an enumerator that iterates through the <see /// cref="ConcurrentBag{T}"/>. /// </summary> /// <returns>An enumerator for the contents of the <see /// cref="ConcurrentBag{T}"/>.</returns> /// <remarks> /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// </remarks> public IEnumerator<T> GetEnumerator() { // Short path if the bag is empty if (m_headList == null) return new List<T>().GetEnumerator(); // empty list bool lockTaken = false; try { // 首先冻结整个 ConcurrentBag集合 FreezeBag(ref lockTaken); // 然后ToList 再拿到 List的 IEnumerator return ToList().GetEnumerator(); } finally { UnfreezeBag(lockTaken); } }由上面的代码可知道,为了获取迭代器对象,总共进行了三步主要的操作。
使用FreezeBag()方法,冻结整个ConcurrentBag<T>集合。因为需要生成集合的List<T>副本,生成副本期间不能有其它线程更改损坏数据。