ConcurrentHashMap源码解析(jdk1.8)

jdk8的ConcurrentHashMap改动非常大。放弃了之前segment锁,改用cas+synchronized来实现同步。

常量含义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* table数组最大容量。
*/
private stactic final int MAXIMUM_CAPACITY = 1 << 30;

/**
* 默认初始化容量,是2的次幕
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 加载因子,用于判断是否需要扩容, 当哈希表中的条目数超出了加载因子与当前容量的乘积时.table长度扩容为原* 来的两倍。
*/
private static final float LOAD_FACTOR = 0.75f;

/**
* 链表转红黑树的阈值,链表长度超过这个值自动转为红黑树
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* 红黑树元素个数少于这个值,转回链表
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* 当桶中的bin被树化时最小的hash表容量。这个MIN_TREEIFY_CAPACITY的值至少是TREEIFY_THRESHOLD的
* 4倍。
*/
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* 扩容线程每次最少要迁移16个hash桶
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* 最多多少线程帮助扩容
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* 以下是节点和hash值
*/
static final int MOVED = -1; // forwarding nodes的hash值
static final int TREEBIN = -2; // 红黑树根节点的hash值
static final int RESERVED = -3; // transient reservations的hash值
static final int HASH_BITS = 0x7fffffff; // 正常节点的最大hash值

重要变量

table
用来存放Node节点数据的数组,默认为null,默认大小为16,每次扩容时大小总是2的幂次方;下文的table都是指这个属性

nextTable
扩容时新表,数组为table的两倍,扩容完毕会赋给table

baseCount
map的大小

sizeCtl
控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

transferIndex
表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程扩容。transferIndex初始化是指向table.length。当开始扩容时,首先要将transferIndex右移(以cas的方式修改 transferIndex=transferIndex-stride(要迁移hash桶的个数)),获取迁移任务。每个扩容线程都会通过for循环+CAS的方式设置transferIndex,因此可以确保多线程扩容的并发安全。

image

内部类

Node
节点,保存key-value的数据结构;value字段和next用volatile修饰,保障可见性。读数据时不需要加锁。可以看到Node不支持setValue。修改值直接node.val=xxx修改。

1
2
3
4
5
6
7
8
9
10
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
}

TreeNode
红黑树节点

1
2
3
4
5
6
7
8
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
...
}

TreeBin
用于封装红黑树,持有红黑树根节点的引用。包含一个读写锁用于写线程等待读线程完成,再tree重新构造之前。

1
2
3
4
5
6
7
8
9
10
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 表示锁状态
static final int WRITER = 1; // 持有写锁
static final int WAITER = 2; // 等待写锁
static final int READER = 4; // 持有读锁
}

ForwardingNode
一个特殊的Node节点,hash值为-1(MOVED常量),其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动

1
2
3
4
5
6
  final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
....

主要方法解析

put方法

put方法调用了putVal方法。

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal方法关键代码解析

  1. 计算hash值
    spread方法通过高16位异或低16位来散列。因为后面计算table坐标时是采用 hash&(length-1)的公式来计算。
    也就是说只会保留低位,这样大大加大了出现hash冲突的概率。这里用高位^低位,增加低位的随机性,减少hash冲突的次数。

    1
    2
    3
    static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
    }
  2. tab==null时,先进行table的初始化。

    1
    2
    if (tab == null || (n = tab.length) == 0)
    tab = initTable();
  3. 通过 (table.length-1)&hash算出脚标i,如果table脚标i的元素为null。说明不存在hash冲突。将key,val封装成Node,通过cas直接设置进table,跳出循环。cas失败说明其他线程修改了该脚标节点,重新开始循环。

    1
    2
    3
    4
    5
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
  4. 节点f是table中脚标为i,如果该节点hash==MOVE,说明该节点是ForwardingNode且其他线程正在对这个map进行扩容。当前线程也协助扩容。

    1
    2
    else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);
  5. 不是前面几种情况的话,就是说明存在hash冲突。将新节点加入以f为根节点的链表或红黑树。这里只需要锁住根节点,相比以前的分段锁粒度更小。红黑树用TreeBin对象封装,hash=-2。hash大于0即是链表。节点加入链表会记录一个链表长度binCount,如果binCount>=TREEIFY_THRESHOLD,链表会向红黑树转化。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    else{
    V oldVal = null;
    synchronized (f) {
    if (tabAt(tab, i) == f) {
    if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    Node<K,V> pred = e;
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
  6. addCount方法分两部分,一、更新baseCount,二、判断是否扩容

    1
    addCount(1L, binCount);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private final void addCount(long x, int check) {}
/*
*CounterCell是多线程的情况下辅助计算baseCount;
*多线程添加时,通过随机在CounterCell数组中选一个来记录添加的map大小,减少多线程的竞争。
*最后通过baseCount加上所有的CounterCell.value得出最终的baseCount。
*/
CounterCell[] as; long b, s;
/*baseCount的更新
* counterCells==null,通过cas更新baseCount。成功,更新完成。失败则进入if块处理。
* counterCells!=null,直接进入if块处理
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//cas失败进入fullAddCount方法循环cas
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//将CounterCell数组记录的值加入baseCount中
s = sumCount();
}
//判断扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//s是上面计算的容量,s>sizeCtl是扩容。sizeCtl一般是0.75*table.length,表示扩容阈值。
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
/*sizeCtl<0
*代表正在进行初始化或扩容操作
*-1代表正在初始化
*-N 表示有N-1个线程正在进行扩容操作
*/
if (sc < 0) {
//扩容任务已经全部分配或者扩容已经完成,则当前线程不需要再扩容。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作,sizeCtl+1,参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//扩容方法
transfer(tab, null);
s = sumCount();
}
}
}

扩容transfer

transfer是可以多线程进行的。通过给每个线程分配迁移(将table中的hash桶迁移到新的nextTab中)的hash桶,每个线程最少迁移16个hash桶,用transferIndex来同步。一个map扩容,要迁移的hash桶是原来的table长度。分配时从后面的分配起。从下图看出,每个线程分配的hash桶脚标范围是transferIndex-stride(线程迁移的hash桶个数)到transferIndex-1。分配完一个线程后,通过cas将transferIndex设置为transferIndex-stride。

image
线程迁移的hash桶个数最少是16,图中为了方便没画这么多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//设置stride,stride是 分配给该线程迁移的hash桶个数,最小值是MIN_TRANSFER_STRIDE
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab为null时,这个第一个扩容的线程,初始化nextTab为原来table的2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//初始transferIndex
transferIndex = n;
}
int nextn = nextTab.length;
//初始ForwardingNode节点,指向扩容的nextTab
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//
if (--i >= bound || finishing)
advance = false;
//如果transferIndex<0,说明要迁移的hash桶都分配给线程执行了。
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//cas修改,transferIndex。分配迁移hash桶任务。该线程负责迁移的hash桶脚标范围 bound-i
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {

//该线程要迁移的hash桶的最小脚标
bound = nextBound;
//该线程要迁移的hash桶的最大脚标
i = nextIndex - 1;
advance = false;
}
}
//
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有线程都完成任务
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果table[i]==mull,设置ForwardingNode节点,用于占位。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//迁移操作,锁住要迁移的hash桶
synchronized (f) {
/**
* hash桶中的节点rehash有两种情况。因为节点的key相同,索引hash值也相同。算脚标是hash&
* (length-1),也就是只取hash值的前几位。扩容后length是之前的2倍。算脚标时,会取多一位。
* 所以根据hash&(length-1)的结果,
* 1.如果最高位是0,则脚本和之前一样,移到原来位置。
* 2.如果最高位是1,新的脚标位置就是 length+原来脚标。
* 下面链表和红黑树都是这样将hash桶分成两份,设置在新表的对应位置。
*/
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//hash桶是链表结构
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//hash桶是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

get方法

get方法不需要加锁。因为Node的val是volatile修饰的,所以其他线程的修改对当前线程是可见的。

  1. 先获取key对应的脚标i,table[i]==null,返回null。table[i]节点的key是否和key是否相等。相等则返回val。
  2. table[i].hash<0,则说明节点是TreeBin或者ForwardingNode节点,通过该节点的find方法找出key对应的节点。
  3. 不是以上情况则是链表,遍历链表找到key对应的节点值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
    (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    return e.val;
    }
    else if (eh < 0)
    return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
    if (e.hash == h &&
    ((ek = e.key) == key || (ek != null && key.equals(ek))))
    return e.val;
    }
    }
    return null;
    }