ConsurrentDictionary并发字典知多少? (2)

开始操作前会声明一个tables变量来存储操作开始前的m_tables,在正式开始操作后(进入lock)的时候,会检查tables在准备工作阶段是否别的线程改变,如果改变了,则重新开始准备工作并从新开始.

通过GetBucketAndLockNo方法获取bucket索引以及lock索引,其内部就是取余操作.

private void GetBucketAndLockNo( int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) { //0x7FFFFFFF 是long int的最大值 与它按位与数据小于等于这个最大值 bucketNo = (hashcode & 0x7fffffff) % bucketCount; lockNo = bucketNo % lockCount; }

对数据进行操作前会从m_locks取出第lockNo个对象最为lock,操作完成后释放该lock.多个lock一定程度上减少了阻塞的可能性.

在对数据进行更新时,如果该Value的Type为允许原子性写入的,则直接更新该Value,否则创建一个新的node进行覆盖.

/// <summary> /// Determines whether type TValue can be written atomically /// </summary> private static bool IsValueWriteAtomic() { Type valueType = typeof(TValue); // // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without // the risk of tearing. // // See // if (valueType.IsClass) { return true; } switch (Type.GetTypeCode(valueType)) { case TypeCode.Boolean: case TypeCode.Byte: case TypeCode.Char: case TypeCode.Int16: case TypeCode.Int32: case TypeCode.SByte: case TypeCode.Single: case TypeCode.UInt16: case TypeCode.UInt32: return true; case TypeCode.Int64: case TypeCode.Double: case TypeCode.UInt64: return IntPtr.Size == 8; default: return false; } }

该方法依据CLI规范进行编写,简单来说,32位的计算机,对32字节以下的数据类型写入时可以一次写入的而不需要移动内存指针,64位计算机对64位以下的数据可一次性写入,不需要移动内存指针.保证了写入的安全.
详见12.6.6

private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { while (true) { int bucketNo, lockNo; int hashcode; //https://www.cnblogs.com/blurhkh/p/10357576.html //需要了解一下值传递和引用传递 Tables tables = m_tables; IEqualityComparer<TKey> comparer = tables.m_comparer; hashcode = comparer.GetHashCode(key); GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); bool resizeDesired = false; bool lockTaken = false; try { if (acquireLock) Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); //如果表刚刚调整了大小,我们可能没有持有正确的锁,必须重试。 //当然这种情况很少见 if (tables != m_tables) continue; Node prev = null; for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) { if (comparer.Equals(node.m_key, key)) { //key在字典里找到了。如果允许更新,则更新该key的值。 //我们需要为更新创建一个node,以支持不能以原子方式写入的TValue类型,因为free-lock 读取可能同时发生。 if (updateIfExists) { if (s_isValueWriteAtomic) { node.m_value = value; } else { Node newNode = new Node(node.m_key, value, hashcode, node.m_next); if (prev == null) { tables.m_buckets[bucketNo] = newNode; } else { prev.m_next = newNode; } } resultingValue = value; } else { resultingValue = node.m_value; } return false; } prev = node; } //key没有在bucket中找到,则插入该数据 Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); //当m_countPerLock超过Int Max时会抛出OverflowException checked { tables.m_countPerLock[lockNo]++; } // // 如果m_countPerLock[lockNo] > m_budget,则需要调整buckets的大小。 // GrowTable也可能会增加m_budget,但不会调整bucket table的大小。. // 如果发现bucket table利用率很低,也会发生这种情况。 // if (tables.m_countPerLock[lockNo] > m_budget) { resizeDesired = true; } } finally { if (lockTaken) Monitor.Exit(tables.m_locks[lockNo]); } if (resizeDesired) { GrowTable(tables, tables.m_comparer, false, m_keyRehashCount); } resultingValue = value; return true; } } Get

从Table中获取元素的的流程与前文介绍ConcurrentDictionary工作原理时一致,但有以下亮点值得关注.

读取bucket[i]在Volatile.Read()方法中进行,该方法会自动对读取出来的数据加锁,避免在读取的过程中,数据被其他线程remove了.

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjdjz.html