默认的情况下,每个ConcurrentHashMap 类会创建16个并发的 segment,每个 segment 里面包含多个 Hash表,每个 Hash 链都是由 HashEntry 节点组成的。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { //默认初始容量为 16,即初始默认为 16 个桶 static final int DEFAULT_INITIAL_CAPACITY = 16; static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认并发级别为 16。该值表示当前更新线程的估计数 static final int DEFAULT_CONCURRENCY_LEVEL = 16; static final int MAXIMUM_CAPACITY = 1 << 30; static final int MIN_SEGMENT_TABLE_CAPACITY = 2; static final int MAX_SEGMENTS = 1 << 16; // slightly conservative static final int RETRIES_BEFORE_LOCK = 2; final int segmentMask; //段掩码,主要为了定位Segment final int segmentShift; final Segment<K,V>[] segments; //主干就是这个分段锁数组 //构造器 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5 int sshift = 0; // ssize 为segments数组长度,根据concurrentLevel计算得出 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 创建segments数组并初始化第一个Segment,其余的Segment延迟初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } } put() 方法**定位segment并确保定位的Segment已初始化 **
调用 Segment的 put 方法。
public V put(K key, V value) { Segment<K,V> s; //concurrentHashMap不允许key/value为空 if (value == null) throw new NullPointerException(); //hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀 int hash = hash(key); //返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); } get() 方法get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } JDK1.8 实现ConcurrentHashMap 在 JDK8 中进行了巨大改动,光是代码量就从1000多行增加到6000行!1.8摒弃了Segment(锁段)的概念,采用了 CAS + synchronized 来保证并发的安全性。
可以看到,和HashMap 1.8的数据结构很像。底层数据结构改变为采用数组+链表+红黑树的数据形式。
和HashMap1.8相同的一些地方底层数据结构一致
HashMap初始化是在第一次put元素的时候进行的,而不是init
HashMap的底层数组长度总是为2的整次幂
默认树化的阈值为 8,而链表化的阈值为 6
hash算法也很类似,但多了一步& HASH_BITS,该步是为了消除最高位上的负符号,hash的负在ConcurrentHashMap中有特殊意义表示在扩容或者是树节点
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 一些关键属性 private static final int MAXIMUM_CAPACITY = 1 << 30; //数组最大大小 同HashMap private static final int DEFAULT_CAPACITY = 16;//数组默认大小 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //数组可能最大值,需要与toArray()相关方法关联 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //兼容旧版保留的值,默认线程并发度,类似信号量 private static final float LOAD_FACTOR = 0.75f;//默认map扩容比例,实际用(n << 1) - (n >>> 1)代替了更高效 static final int TREEIFY_THRESHOLD = 8; // 链表转树阀值,大于8时 static final int UNTREEIFY_THRESHOLD = 6; //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))。【仅在扩容tranfer时才可能树转链表】 static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16;//扩容转移时的最小数组分组大小 private static int RESIZE_STAMP_BITS = 16;//本类中没提供修改的方法 用来根据n生成位置一个类似时间戳的功能 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 2^15-1,help resize的最大线程数 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 32-16=16,sizeCtl中记录size大小的偏移量 static final int MOVED = -1; // hash for forwarding nodes(forwarding nodes的hash值)、标示位 static final int TREEBIN = -2; // hash for roots of trees(树根节点的hash值) static final int RESERVED = -3; // ReservationNode的hash值 static final int HASH_BITS = 0x7fffffff; // 用在计算hash时进行安位与计算消除负hash static final int NCPU = Runtime.getRuntime().availableProcessors(); // 可用处理器数量 /* ---------------- Fields -------------- */ transient volatile Node<K,V>[] table; //装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方。 private transient volatile Node<K,V>[] nextTable; //扩容时使用,平时为null,只有在扩容的时候才为非null /** * 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新但它并不用返回当前hashmap的元素个数 */ private transient volatile long baseCount; /** *该属性用来控制table数组的大小,根据是否初始化和是否正在扩容有几种情况: *当值为负数时:如果为-1表示正在初始化,如果为-N则表示当前正有N-1个线程进行扩容操作; *当值为正数时:如果当前数组为null的话表示table在初始化过程中,sizeCtl表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table数组)可用容量也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度n 乘以 加载因子loadFactor;当值为0时,即数组长度为默认初始值。 */ private transient volatile int sizeCtl; put() 方法首先会判断 key、value是否为空,如果为空就抛异常!
spread()方法获取hash,减小hash冲突
判断是否初始化table数组,没有的话调用initTable()方法进行初始化
判断是否能直接将新值插入到table数组中
判断当前是否在扩容,MOVED为-1说明当前ConcurrentHashMap正在进行扩容操作,正在扩容的话就进行协助扩容
当table[i]为链表的头结点,在链表中插入新值,通过synchronized (f)的方式进行加锁以实现线程安全性。
在链表中如果找到了与待插入的键值对的key相同的节点,就直接覆盖
如果没有找到的话,就直接将待插入的键值对追加到链表的末尾
当table[i]为红黑树的根节点,在红黑树中插入新值/覆盖旧值
根据当前节点个数进行调整,否需要转换成红黑树(个数大于等于8,就会调用treeifyBin方法将tabel[i]第i个散列桶拉链转换成红黑树)