在上一篇文章你真的了解字典吗?一文中我介绍了Hash Function和字典的工作的基本原理.
有网友在文章底部评论,说我的Remove和Add方法没有考虑线程安全问题.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查阅相关资料后,发现字典.net中Dictionary本身时不支持线程安全的,如果要想使用支持线程安全的字典,那么我们就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源码后,我觉得在ConcurrentDictionary的线程安全的解决思路很有意思,其对线程安全的处理对对我们项目中的其他高并发场景也有一定的参考价值,在这里再次分享我的一些学习心得和体会,希望对大家有所帮助.
ConcurrentDictionary是Dictionary的线程安全版本,位于System.Collections.Concurrent的命名空间下,该命名空间下除了有ConcurrentDictionary,还有以下Class都是我们常用的那些类库的线程安全版本.
BlockingCollection:为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和限制功能。
ConcurrentBag:表示对象的线程安全的无序集合.
ConcurrentQueue:表示线程安全的先进先出 (FIFO) 集合。
如果读过我上一篇文章你真的了解字典吗?的小伙伴,对这个ConcurrentDictionary的工作原理应该也不难理解,它是简简单单地在读写方法加个lock吗?
工作原理 Dictionary如下图所示,在字典中,数组entries用来存储数据,buckets作为桥梁,每次通过hash function获取了key的哈希值后,对这个哈希值进行取余,即hashResult%bucketsLength=bucketIndex,余数作为buckets的index,而buckets的value就是这个key对应的entry所在entries中的索引,所以最终我们就可以通过这个索引在entries中拿到我们想要的数据,整个过程不需要对所有数据进行遍历,的时间复杂度为1.
ConcurrentDictionaryConcurrentDictionary的数据存储类似,只是buckets有个更多的职责,它除了有dictionary中的buckets的桥梁的作用外,负责了数据存储.
key的哈希值与buckets的length取余后hashResult%bucketsLength=bucketIndex,余数作为buckets的索引就能找到我们要的数据所存储的块,当出现两个key指向同一个块时,即上图中的John Smith和Sandra Dee他同时指向152怎么办呢?存储节点Node具有Next属性执行下个Node,上图中,node 152的Next为154,即我们从152开始找Sandra Dee,发现不是我们想要的,再到154找,即可取到所需数据.
由于官方原版的源码较为复杂,理解起来有所难度,我对官方源码做了一些精简,下文将围绕这个精简版的ConcurrentDictionary展开叙述.
https://github.com/liuzhenyulive/DictionaryMini
ConcurrentDictionary中的每个数据存储在一个Node中,它除了存储value信息,还存储key信息,以及key对应的hashcode
private class Node { internal TKey m_key; //数据的key internal TValue m_value; //数据值 internal volatile Node m_next; //当前Node的下级节点 internal int m_hashcode; //key的hashcode //构造函数 internal Node(TKey key, TValue value, int hashcode, Node next) { m_key = key; m_value = value; m_next = next; m_hashcode = hashcode; } } Table而整个ConcurrentDictionary的数据存储在这样的一个Table中,其中m_buckets的Index负责映射key,m_locks是线程锁,下文中会有详细介绍,m_countPerLock存储每个lock锁负责的node数量.
private class Tables { internal readonly Node[] m_buckets; //上文中提到的buckets internal readonly object[] m_locks; //线程锁 internal volatile int[] m_countPerLock; //索格锁所管理的数据数量 internal readonly IEqualityComparer<TKey> m_comparer; //当前key对应的type的比较器 //构造函数 internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer) { m_buckets = buckets; m_locks = locks; m_countPerLock = countPerlock; m_comparer = comparer; } }ConcurrentDictionary会在构造函数中创建Table,这里我对原有的构造函数进行了简化,通过默认值进行创建,其中DefaultConcurrencyLevel默认并发级别为当前计算机处理器的线程数.
//构造函数 public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, EqualityComparer<TKey>.Default) { } /// <summary> /// /// </summary> /// <param>并发等级,默认为CPU的线程数</param> /// <param>默认容量,31,超过31后会自动扩容</param> /// <param>时否动态扩充锁的数量</param> /// <param>key的比较器</param> internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) { if (concurrencyLevel < 1) { throw new Exception("concurrencyLevel 必须为正数"); } if (capacity < 0) { throw new Exception("capacity 不能为负数."); } if (capacity < concurrencyLevel) { capacity = concurrencyLevel; } object[] locks = new object[concurrencyLevel]; for (int i = 0; i < locks.Length; i++) { locks[i] = new object(); } int[] countPerLock = new int[locks.Length]; Node[] buckets = new Node[capacity]; m_tables = new Tables(buckets, locks, countPerLock, comparer); m_growLockArray = growLockArray; m_budget = buckets.Length / locks.Length; } 方法ConcurrentDictionary中较为基础重点的方法分别位Add,Get,Remove,Grow Table方法,其他方法基本上是建立在这四个方法的基础上进行的扩充.
Add向Table中添加元素有以下亮点值得我们关注.