嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎么秀他 (5)

可能有的小伙伴就会打岔了,那如果两个 hash 值,低位和高位都相同,怎么办呢。如果是这样,我只能说,这个 hash 算法也太烂了吧。(这里的 hash 算法也会尽量避免这种情况,当然只是减少几率,并不能杜绝)

我有个大胆的想法,这里的高低位不同的计算方式,是不是后边 1.8 HashMap 让 hash 高低位做异或运算的引子呢?不得而知。。

put 方法比较简单,只要能看懂 HashMap 中的 put 方法,这里也没问题。主要是它调用的子方法比较复杂,下边一个一个讲解。

ensureSegment()方法

回到 Map的 put 方法,判断 j 下标的 Segment为空后,则需要调用此方法,初始化一个 Segment 对象,以确保拿到的对象一定是不为空的,否则无法执行s.put了。

//k为 (hash >>> segmentShift) & segmentMask 算法计算出来的值 private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; //u代表 k 的偏移量,用于通过 UNSAFE 获取主内存最新的实际 K 值 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //从内存中取到最新的下标位置的 Segment 对象,判断是否为空,(1) if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //之前构造函数说了,s0是作为一个原型对象,用于创建新的 Segment 对象 Segment<K,V> proto = ss[0]; // use segment 0 as prototype //容量 int cap = proto.table.length; //加载因子 float lf = proto.loadFactor; //扩容阈值 int threshold = (int)(cap * lf); //把 Segment 对应的 HashEntry 数组先创建出来 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次检查 K 下标位置的 Segment 是否为空, (2) if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //此处把 Segment 对象创建出来,并赋值给 s, Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //循环检查 K 下标位置的 Segment 是否为空, (3) //若不为空,则说明有其它线程抢先创建成功,并且已经成功同步到主内存中了, //则把它取出来,并返回 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //CAS,若当前下标的Segment对象为空,就把它替换为最新创建出来的 s 对象。 //若成功,就跳出循环,否则,就一直自旋直到成功,或者 seg 不为空(其他线程成功导致)。 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }

可以发现,我标注了上边 (1)(2)(3) 个地方,每次都判断最新的Segment是否为空。可能有的小伙伴就会迷惑,为什么做这么多次判断,我直接去自旋不就好了,反正最后都要自旋的。

我的理解是,在多线程环境下,因为不确定是什么时候会有其它线程 CAS 成功,有可能发生在以上的任意时刻。所以,只要发现一旦内存中的对象已经存在了,则说明已经有其它线程把Segment对象创建好,并CAS成功同步到主内存了。此时,就可以直接返回,而不需要往下执行了。这样做,是为了代码执行效率考虑。

scanAndLockForPut()方法

put 方法第一步抢锁失败之后,就会执行此方法,

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //根据hash值定位到它对应的HashEntry数组的下标位置,并找到链表的第一个节点 //注意,这个操作会从主内存中获取到最新的状态,以确保获取到的first是最新值 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; //重试次数,初始化为 -1 int retries = -1; // negative while locating node //若抢锁失败,就一直循环,直到成功获取到锁。有三种情况 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //1.若 retries 小于0, if (retries < 0) { if (e == null) { //若 e 节点和 node 都为空,则创建一个 node 节点。这里只是预测性的创建一个node节点 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } //如当前遍历到的 e 节点不为空,则判断它的key是否等于传进来的key,若是则把 retries 设为0 else if (key.equals(e.key)) retries = 0; //否则,继续向后遍历节点 else e = e.next; } //2.若是重试次数超过了最大尝试次数,则调用lock方法加锁。表明不再重试,我下定决心了一定要获取到锁。 //要么当前线程可以获取到锁,要么获取不到就去排队等待获取锁。获取成功后,再 break。 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //3.若 retries 的值为偶数,并且从内存中再次获取到最新的头节点,判断若不等于first //则说明有其他线程修改了当前下标位置的头结点,于是需要更新头结点信息。 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { //更新头结点信息,并把重试次数重置为 -1,继续下一次循环,从最新的头结点遍历当前链表。 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxgxf.html