嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎么秀他 (4)

找到合适的位置插入元素

//这是Map的put方法 public V put(K key, V value) { Segment<K,V> s; //不支持value为空 if (value == null) throw new NullPointerException(); //通过 Wang/Jenkins 算法的一个变种算法,计算出当前key对应的hash值 int hash = hash(key); //上边我们计算出的 segmentShift为28,因此hash值右移28位,说明此时用的是hash的高4位, //然后把它和掩码15进行与运算,得到的值一定是一个 0000 ~ 1111 范围内的值,即 0~15 。 int j = (hash >>> segmentShift) & segmentMask; //这里是用Unsafe类的原子操作找到Segment数组中j下标的 Segment 对象 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //初始化j下标的Segment s = ensureSegment(j); //在此Segment中添加元素 return s.put(key, hash, value, false); }

上边有一个这样的方法, UNSAFE.getObject (segments, (j << SSHIFT) + SBASE。它是为了通过Unsafe这个类,找到 j 最新的实际值。这个计算 (j << SSHIFT) + SBASE ,在后边非常常见,我们只需要知道它代表的是 j 的一个偏移量,通过偏移量,就可以得到 j 的实际值。可以类比,AQS 中的 CAS 操作。 Unsafe中的操作,都需要一个偏移量,看下图,

嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎么秀他

(j << SSHIFT) + SBASE 就相当于图中的 stateOffset偏移量。只不过图中是 CAS 设置新值,而我们这里是取 j 的最新值。 后边很多这样的计算方式,就不赘述了。接着看 s.put 方法,这才是最终确定元素位置的方法。

//Segment中的 put 方法 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //这里通过tryLock尝试加锁,如果加锁成功,返回null,否则执行 scanAndLockForPut方法 //这里说明一下,tryLock 和 lock 是 ReentrantLock 中的方法, //区别是 tryLock 不会阻塞,抢锁成功就返回true,失败就立马返回false, //而 lock 方法是,抢锁成功则返回,失败则会进入同步队列,阻塞等待获取锁。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //当前Segment的table数组 HashEntry<K,V>[] tab = table; //这里就是通过hash值,与tab数组长度取模,找到其所在HashEntry数组的下标 int index = (tab.length - 1) & hash; //当前下标位置的第一个HashEntry节点 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { //如果第一个节点不为空 if (e != null) { K k; //并且第一个节点,就是要插入的节点,则替换value值,否则继续向后查找 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { //替换旧值 oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } //说明当前index位置不存在任何节点,此时first为null, //或者当前index存在一条链表,并且已经遍历完了还没找到相等的key,此时first就是链表第一个元素 else { //如果node不为空,则直接头插 if (node != null) node.setNext(first); //否则,创建一个新的node,并头插 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //如果当前Segment中的元素大于阈值,并且tab长度没有超过容量最大值,则扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); //否则,就把当前node设置为index下标位置新的头结点 else setEntryAt(tab, index, node); ++modCount; //更新count值 count = c; //这种情况说明旧值肯定为空 oldValue = null; break; } } } finally { //需要注意ReentrantLock必须手动解锁 unlock(); } //返回旧值 return oldValue; }

这里说明一下计算 Segment 数组下标和计算 HashEntry 数组下标的不同点:

//下边的hash值是通过哈希运算后的hash值,不是hashCode //计算 Segment 下标 (hash >>> segmentShift) & segmentMask //计算 HashEntry 数组下标 (tab.length - 1) & hash

思考一下,为什么它们的算法不一样呢? 计算 Segment 数组下标是用的 hash值高几位(这里以高 4 位为例)和掩码做与运算,而计算 HashEntry 数组下标是直接用的 hash 值和数组长度减1做与运算。

我的理解是,这是为了尽量避免当前 hash 值计算出来的 Segment 数组下标和计算出来的 HashEntry 数组下标趋于相同。简单说,就是为了避免分配到同一个 Segment 中的元素扎堆现象,即避免它们都被分配到同一条链表上,导致链表过长。同时,也是为了减少并发。下面做一个运算,帮助理解一下(假设不用高 4 位运算,而是正常情况都用低位做运算)。

//我们以并发级别16,HashEntry数组容量 4 为例,则它们参与运算的掩码分别为 15 和 3 //hash值 0110 1101 0110 1111 0110 1110 0010 0010 //segmentMask = 15 ,标记为 (1) 0000 0000 0000 0000 0000 0000 0000 1111 //tab.length - 1 = 3 ,标记为 (2) 0000 0000 0000 0000 0000 0000 0000 0011 //用 hash 分别和 15 ,3 做与运算,会发现得到的结果是一样,都是十进制 2. //这表明,当前 hash值被分配到下标为 2 的 Segment 中,同时,被分配到下标为 2 的 HashEntry 数组中 //现在若有另外一个 hash 值 h2,和第一个hash值,高位不同,但是低4位相同, 1010 1101 0110 1111 0110 1110 0010 0010 //我们会发现,最后它也会被分配到下标为 2 的 Segment 和 HashEntry 数组,就会和第一个元素形成链表。 //所以,为了避免这种扎堆现象,让元素尽量均匀分配,就让 hash 的高 4 位和 (1)处做与 运算,而用低位和 (2)处做与运算 //这样计算后,它们所在的Segment下标分别为 6(0110), 10(1010),即使它们在HashEntry数组中的下标都为 2(0010),也无所谓 //因为它们并不在一个 Segment 中,也就不会在同一个 HashEntry 数组中,更不会形成链表。 //更重要的是,它们不会有并发,因为在各自不同的 Segment 自己操作自己的加锁解锁,互不影响

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxgxf.html