ConcurrentHashMap源码剖析 (5)

通过以上源码,可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率

数组初始化,initTable方法

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //cas+自旋,保证线程安全,对数组进行初始化操作 while ((tab = table) == null || tab.length == 0) { //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //sizeCtl为0,取默认长度16,否则去sizeCtl的值 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //基于初始长度,构建数组对象 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //计算扩容阈值,并赋值给sc sc = n - (n >>> 2); } } finally { //将扩容阈值,赋值给sizeCtl sizeCtl = sc; } break; } } return tab; }

put加锁图解

ConcurrentHashMap源码剖析

2.3 jdk1.8扩容安全 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //如果是扩容线程,此时新数组为null if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //两倍扩容创建新数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //记录线程开始迁移的桶位,从后往前迁移 transferIndex = n; } //记录新数组的末尾 int nextn = nextTab.length; //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //i记录当前正在迁移桶位的索引值 //bound记录下一次任务迁移的开始桶位 //--i >= bound 成立表示当前线程分配的迁移任务还没有完成 if (--i >= bound || finishing) advance = false; //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //如果没有更多的需要迁移的桶位,就进入该if if (i < 0 || i >= n || i + n >= nextn) { int sc; //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //扩容任务线程数减1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //判断当前所有扩容任务线程是否都执行完成 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //所有扩容线程都执行完,标识结束 finishing = advance = true; i = n; // recheck before commit } } //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //当前节点已经被迁移 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //当前节点需要迁移,加锁迁移,保证多线程安全 //此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

示意图

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsdsy.html