从上面的代码中我们可以更加验证之前的观点,就是ConcurentBag<T>在一个线程中存储数据时,使用的是双向链表,ThreadLocalList实现了一组对链表增删改查的方法。
3. ConcurrentBag实现新增元素接下来我们看一看ConcurentBag<T>是如何新增元素的。
/// <summary> /// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里 /// 这是避免内存泄漏的方法 /// </summary> /// <returns></returns> private ThreadLocalList GetUnownedList() { //此时必须持有全局锁 Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 从头线程列表开始枚举 找到那些已经被关闭的线程 // 将它所在的列表对象 返回 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } /// <summary> /// 本地帮助方法,通过线程对象检索线程线程本地列表 /// </summary> /// <param>如果列表不存在,那么创建新列表</param> /// <returns>The local list object</returns> private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // 获取用于更新操作的 m_tailList 锁 lock (GlobalListsLock) { // 如果头列表等于空,那么说明集合中还没有元素 // 直接创建一个新的 if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中 // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程 // 然后将已停止线程的数据 移交到当前线程管理 list = GetUnownedList(); // 如果没有 那么就新建一个列表 然后更新尾指针的位置 if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } /// <summary> /// Adds an object to the <see cref="ConcurrentBag{T}"/>. /// </summary> /// <param>The object to be added to the /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types.</param> public void Add(T item) { // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add) ThreadLocalList list = GetThreadList(true); // 实际的数据添加操作 在AddInternal中执行 AddInternal(list, item); } /// <summary> /// </summary> /// <param></param> /// <param></param> private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 // 同步案例: // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁 // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁 if (list.Count < 2 || m_needSync) { // 将其重置为None 以避免与窃取线程的死锁 list.m_currentOp = (int)ListOperation.None; // 锁定当前对象 Monitor.Enter(list, ref lockTaken); } // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中 // 如果已经锁定 那么说明线程安全 可以更新Count 计数 list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } }