先retries64次,不行的话,才用ReentrantLock重入锁
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //通过Segment和hash值寻找匹配的HashEntry HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; //重试次数 int retries = -1; // negative while locating node //循环尝试获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //步骤① if (retries < 0) { //情况① 没找到,之前表中不存在 if (e == null) { if (node == null) // speculatively create node //新建 HashEntry 备用,retries改成0 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } //情况② 找到,刚好第一个节点就是,retries改成0 else if (key.equals(e.key)) retries = 0; //情况③ 第一个节点不是,移到下一个,retries还是-1,继续找 else e = e.next; } //步骤② //尝试了MAX_SCAN_RETRIES次还没拿到锁,简直B了dog! else if (++retries > MAX_SCAN_RETRIES) { //泉水挂机 lock(); break; } //步骤③ //在MAX_SCAN_RETRIES次过程中,key对应的entry发生了变化,则从头开始 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } 最后的put流程rehash的话 同jdk8版本下的rehash
retries2次 如果还是不同,那么就reentranLock依次等待unlock计算每个tab的size
JDK8下的ConcurrentHashMap put public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // key/value不能为空!!! if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //注释① 表为null则初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //CAS方法判断指定位置是否为null,为空则通过创建新节点,通过CAS方法设置在指定位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //当前节点正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //指定位置不为空 else { V oldVal = null; //注释② 加锁 synchronized (f) { if (tabAt(tab, i) == f) { //节点是链表的情况 if (fh >= 0) { binCount = 1; //遍历整体链 for (Node<K,V> e = f;; ++binCount) { K ek; //如果已存在,替换原值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果是新加节点,则以尾部插入实现添加 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //节点是红黑树的情况 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; //遍历红黑树 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (binCount != 0) { //链表中节点个数超过8转成红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //注释③ 添加节点 addCount(1L, binCount); return null; } 为什么tab[hash(key)]用cas,但put里面的元素都需要用synchronized呢其实hash冲撞的几率蛮低的,所以synchronized调用的次数并不多,更多的是在cas那里...
然后就是cas比synchronized的优点...
每次put完毕,都会调用addCount方法
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }