从同步容器到并发容器(2)

首先就是性能,在讲解ArrayList的文章中提到过,ArrayList的扩容由于使用了Arrays.copyOf每次都需要申请更大的空间以及复制现有的元素到新的数组,对性能存在一定影响。CopyOnWrite容器也不例外,每次修改操作都会申请新的数组空间,然后进行替换。所以在高并发频繁修改容器的情况下,会不断申请新的空间,同时会造成频繁的GC,这时使用CopyOnWrite容器并不是一个好的选择。

其次还有一个数据一致性问题,由于在修改中copy了新的数组进行替换,同时旧数组如果还在被使用,那么新的数据就不能被及时读取到,这样就造成了数据不一致,如果需要强数据一致性,CopyOnWrite容器也不太适合。

并发容器ConcurrentHashMap

ConcurrentHashMap容器相较于CopyOnWrite容器在并发加锁粒度上有了更大一步的优化,它通过修改对单个hash桶元素加锁的达到了更细粒度的并发控制。在了解ConcurrentHashMap容器之前,推荐大家先阅读我之前对HashMap源码分析的文章--Java集合(5)一 HashMap与HashSet,因为在底层数据结构上,ConcurrentHashMap和HashMap都使用了数组+链表+红黑树的方式,只是在HashMap的基础上添加了并发相关的一些控制,所以这里只对ConcurrentHashMap中并发相关代码做一些分析。

Java并发-从同步容器到并发容器


还是先从ConcurrentHashMap的写操作开始,这里就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算桶的hash值
int binCount = 0;
//循环插入元素,避免并发插入失败
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果当前桶无元素,则通过cas操作插入新节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//如果当前桶正在扩容,则协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//hash冲突时锁住当前需要添加节点的头元素,可能是链表头节点或者红黑树的根节点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
在put元素的过程中,有几个并发处理的关键点:

如果当前桶对应的节点还没有元素插入,通过典型的无锁cas操作尝试插入新节点,减少加锁的概率,并发情况下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。
如果当前桶正在扩容,则协助扩容((fh = f.hash) == MOVED)。这里是一个重点,ConcurrentHashMap的扩容和HashMap不一样,它在多线程情况下或使用多个线程同时扩容,每个线程扩容指定的一部分hash桶,当前线程扩容完指定桶之后会继续获取下一个扩容任务,直到扩容全部完成。扩容的大小和HashMap一样,都是翻倍,这样可以有效减少移动的元素数量,也就是使用2的幂次方的原因,在HashMap中也一样。
在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,可能是链表头节点或者红黑树的根节点,其他桶节点都不需要加锁,大大减小了锁粒度。
通过ConcurrentHashMap添加元素的过程,知道了ConcurrentHashMap容器是通过CAS + synchronized一起来实现并发控制的。这里有个额外的问题:为什么使用synchronized而不使用ReentrantLock?前面我的文章也对synchronized以及ReentrantLock的实现方式和性能做过分析,在这里我的理解是synchronized在后期优化空间上比ReentrantLock更大。

并发容器ConcurrentSkipListMap

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/89e723e1969a769a22861105089581c6.html