一、容器初始化
1、源碼分析
在jdk8的ConcurrentHashMap中一共有5個(gè)構(gòu)造方法,這四個(gè)構(gòu)造方法中都沒有對內(nèi)部的數(shù)組做初始化, 只是對一些變量的初始值做了處理
jdk8的ConcurrentHashMap的數(shù)組初始化是在第一次添加元素時(shí)完成
1
2
3
|
// 沒有維護(hù)任何變量的操作,如果調(diào)用該方法,數(shù)組長度默認(rèn)是16 public ConcurrentHashMap() { } |
1
2
3
4
5
6
7
8
9
|
// 傳遞進(jìn)來一個(gè)初始容量,ConcurrentHashMap會基于這個(gè)值計(jì)算一個(gè)比這個(gè)值大的2的冪次方數(shù)作為初始容量 public ConcurrentHashMap( int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); // 此處的初始容量計(jì)算結(jié)果為傳入的容量 + 傳入的容量的一半 + 1 this .sizeCtl = cap; } |
注意,調(diào)用這個(gè)方法,得到的初始容量和HashMap以及jdk7的ConcurrentHashMap不同,即使你傳遞的是一個(gè)2的冪次方數(shù),該方法計(jì)算出來的初始容量依然是比這個(gè)值大的2的冪次方數(shù)
1
2
3
4
|
// 調(diào)用四個(gè)參數(shù)的構(gòu)造 public ConcurrentHashMap( int initialCapacity, float loadFactor) { this (initialCapacity, loadFactor, 1 ); } |
1
2
3
4
5
6
7
8
9
10
11
12
|
// 計(jì)算一個(gè)大于或者等于給定的容量值,該值是2的冪次方數(shù)作為初始容量 public ConcurrentHashMap( int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 .0f) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = ( long )( 1.0 + ( long )initialCapacity / loadFactor); int cap = (size >= ( long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor(( int )size); this .sizeCtl = cap; } |
1
2
3
4
5
6
|
// 基于一個(gè)Map集合,構(gòu)建一個(gè)ConcurrentHashMap // 初始容量為16 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this .sizeCtl = DEFAULT_CAPACITY; putAll(m); } |
2、sizeCtl
含義解釋
注意:以上這些構(gòu)造方法中,都涉及到一個(gè)變量
sizeCtl
,這個(gè)變量是一個(gè)非常重要的變量,而且具有非常豐富的含義,它的值不同,對應(yīng)的含義也不一樣,這里我們先對這個(gè)變量不同的值的含義做一下說明,后續(xù)源碼分析過程中,進(jìn)一步解釋
sizeCtl
為0,代表數(shù)組未初始化, 且數(shù)組的初始容量為16
sizeCtl
為正數(shù),如果數(shù)組未初始化,那么其記錄的是數(shù)組的初始容量,如果數(shù)組已經(jīng)初始化,那么其記錄的是數(shù)組的擴(kuò)容閾值
sizeCtl
為-1,表示數(shù)組正在進(jìn)行初始化
sizeCtl
小于0,并且不是-1,表示數(shù)組正在擴(kuò)容, -(1+n),表示此時(shí)有n個(gè)線程正在共同完成數(shù)組的擴(kuò)容操作
3、其他屬性含義
代表整個(gè)哈希表
1
|
transient volatile Node<K,V>[] table; |
用于哈希表擴(kuò)容,擴(kuò)容完成后會被重置為null。
1
|
private transient volatile Node<K,V>[] nextTable; |
baseCount和counterCells一起保存著整個(gè)哈希表中存儲的所有的結(jié)點(diǎn)的個(gè)數(shù)總和。
1
2
|
private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; |
二、添加安全
1、源碼分析
1.1、添加元素put/putVal方法
1
2
3
|
public V put(K key, V value) { return putVal(key, value, false ); } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果有空值或者空鍵,直接拋異常 if (key == null || value == null ) throw new NullPointerException(); // 基于key計(jì)算hash值,并進(jìn)行一定的擾動,這里計(jì)算的hash一定是正數(shù),因?yàn)榕c7FFFFFFF進(jìn)行了位與運(yùn)算,負(fù)數(shù)的hash值另有他用 int hash = spread(key.hashCode()); // 記錄某個(gè)桶上元素的個(gè)數(shù),如果超過8個(gè)(并且table長度>=64),會轉(zhuǎn)成紅黑樹 int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果數(shù)組還未初始化,先對數(shù)組進(jìn)行初始化 if (tab == null || (n = tab.length) == 0 ) tab = initTable(); // 如果hash計(jì)算得到的桶位置沒有元素,利用cas將元素添加 else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { // cas+自旋(和外側(cè)的for構(gòu)成自旋循環(huán)),保證元素添加安全 if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; // no lock when adding to empty bin } // 如果hash計(jì)算得到的桶位置元素的hash值為MOVED(-1),證明正在擴(kuò)容,那么協(xié)助擴(kuò)容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { // hash計(jì)算的桶位置元素不為空,且當(dāng)前沒有處于擴(kuò)容操作,進(jìn)行元素添加 V oldVal = null ; // 對當(dāng)前數(shù)組的第一個(gè)結(jié)點(diǎn)進(jìn)行加鎖,執(zhí)行添加操作,這里不僅保證了線程安全而且使得鎖的粒度相對較小 synchronized (f) { if (tabAt(tab, i) == f) { // 普通鏈表節(jié)點(diǎn) if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; // 鏈表的遍歷找到最后一個(gè)結(jié)點(diǎn)進(jìn)行尾插法(如果找到相同的key則會覆蓋) if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; // 找到了最后一個(gè)結(jié)點(diǎn),尾插法插入新結(jié)點(diǎn)在最后 if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } // 樹節(jié)點(diǎn),將元素添加到紅黑樹中 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { // 鏈表長度>=8,將鏈表轉(zhuǎn)成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) // (在該方法中會對table也就是數(shù)組長度進(jìn)行判斷,>=64時(shí)才會進(jìn)行轉(zhuǎn)樹,否則為數(shù)組擴(kuò)容) treeifyBin(tab, i); // 如果是重復(fù)鍵,直接將舊值返回 if (oldVal != null ) return oldVal; break ; } } } // 添加的是新元素,維護(hù)集合長度,并判斷是否要進(jìn)行擴(kuò)容操作 addCount(1L, binCount); return null ; } |
通過以上源碼,我們可以看到,當(dāng)需要添加元素時(shí),會針對當(dāng)前元素所對應(yīng)的桶位進(jìn)行加鎖操作,這樣一方面保證元素添加時(shí),多線程的安全,同時(shí)對某個(gè)桶位加鎖不會影響其他桶位的操作,進(jìn)一步提升多線程的并發(fā)效率
1.2、數(shù)組初始化,initTable方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // cas+自旋,保證線程安全,對數(shù)組進(jìn)行初始化操作 while ((tab = table) == null || tab.length == 0 ) { // 如果sizeCtl的值(-1)小于0,說明此時(shí)正在初始化, 讓出cpu if ((sc = sizeCtl) < 0 ) Thread.yield(); // lost initialization race; just spin // cas修改sizeCtl的值為-1,修改成功,進(jìn)行數(shù)組初始化,失敗,繼續(xù)自旋 else if (U.compareAndSwapInt( this , SIZECTL, sc, - 1 )) { try { // double checking,防止重復(fù)初始化 if ((tab = table) == null || tab.length == 0 ) { // sizeCtl為0,取默認(rèn)長度16,否則去sizeCtl的值 int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings ( "unchecked" ) // 基于初始長度,構(gòu)建數(shù)組對象 Node<K,V>[] nt = (Node<K,V>[]) new Node<?,?>[n]; table = tab = nt; // 計(jì)算擴(kuò)容閾值,并賦值給sc // n就是當(dāng)前數(shù)組的長度,當(dāng)初始化完成后,sc記錄的是下次需要擴(kuò)容的閾值 // n >>> 2 就相當(dāng)于 n / 4 // 所以 n - (n >>> 2) 就相當(dāng)于 n - n / 4 = n * 0.75,而0.75就是默認(rèn)的加載因子 sc = n - (n >>> 2 ); } } finally { //將擴(kuò)容閾值,賦值給sizeCtl sizeCtl = sc; } break ; } } return tab; } |
2、圖解
2.1、put加鎖圖解
三、擴(kuò)容安全
1、源碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 如果是多cpu,那么每個(gè)線程劃分任務(wù),最小任務(wù)量是16個(gè)桶位的遷移 // 如果是單cpu,則沒必要?jiǎng)澐?/code> if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果是擴(kuò)容線程,此時(shí)新數(shù)組為null if (nextTab == null ) { // initiating try { @SuppressWarnings ( "unchecked" ) // 兩倍擴(kuò)容創(chuàng)建新數(shù)組 Node<K,V>[] nt = (Node<K,V>[]) new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; // 記錄線程開始遷移的桶位,從后往前遷移 transferIndex = n; } // 記錄新數(shù)組的末尾 int nextn = nextTab.length; // 已經(jīng)遷移的桶位,會用這個(gè)節(jié)點(diǎn)占位(這個(gè)節(jié)點(diǎn)的hash值為-1——MOVED) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; // to ensure sweep before committing nextTab for ( int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; // i記錄當(dāng)前正在遷移桶位的索引值 // bound記錄下一次任務(wù)遷移的開始桶位 // --i >= bound 成立表示當(dāng)前線程分配的遷移任務(wù)還沒有完成 if (--i >= bound || finishing) advance = false ; // 沒有元素需要遷移 -- 后續(xù)會去將擴(kuò)容線程數(shù)減1,并判斷擴(kuò)容是否完成 else if ((nextIndex = transferIndex) <= 0 ) { i = - 1 ; advance = false ; } // 計(jì)算下一次任務(wù)遷移的開始桶位,并將這個(gè)值賦值給transferIndex else if (U.compareAndSwapInt ( this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } // 如果沒有更多的需要遷移的桶位,就進(jìn)入該if if (i < 0 || i >= n || i + n >= nextn) { int sc; //擴(kuò)容結(jié)束后,保存新數(shù)組,并重新計(jì)算擴(kuò)容閾值,賦值給sizeCtl if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } // 擴(kuò)容任務(wù)線程數(shù)減1 if (U.compareAndSwapInt( this , SIZECTL, sc = sizeCtl, sc - 1 )) { // 判斷當(dāng)前所有擴(kuò)容任務(wù)線程是否都執(zhí)行完成 if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; // 所有擴(kuò)容線程都執(zhí)行完,標(biāo)識結(jié)束 finishing = advance = true ; i = n; // recheck before commit } } // 當(dāng)前遷移的桶位沒有元素,直接在該位置添加一個(gè)fwd節(jié)點(diǎn) else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); // 當(dāng)前節(jié)點(diǎn)已經(jīng)被遷移 else if ((fh = f.hash) == MOVED) advance = true ; // already processed else { // 當(dāng)前節(jié)點(diǎn)需要遷移,加鎖遷移,保證多線程安全 // 此處的遷移與hashmap類似 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } } |
2、圖解
四、多線程擴(kuò)容效率改進(jìn)(協(xié)助擴(kuò)容)
多線程協(xié)助擴(kuò)容的操作會在兩個(gè)地方被觸發(fā):
① 當(dāng)添加元素時(shí),發(fā)現(xiàn)添加的元素對用的桶位為fwd節(jié)點(diǎn),就會先去協(xié)助擴(kuò)容,然后再添加元素
② 當(dāng)添加完元素后,判斷當(dāng)前元素個(gè)數(shù)達(dá)到了擴(kuò)容閾值,此時(shí)發(fā)現(xiàn)sizeCtl的值小于0,并且新數(shù)組不為空,這個(gè)時(shí)候,會去協(xié)助擴(kuò)容
每當(dāng)有一個(gè)線程幫助擴(kuò)容時(shí),sc就會+1,有一個(gè)線程擴(kuò)容結(jié)束時(shí),sc就會-1,當(dāng)sc重新回到(rs << RESIZE_STAMP_SHIFT) + 2這個(gè)值時(shí),代表當(dāng)前線程是最后一個(gè)擴(kuò)容的線程,則擴(kuò)容結(jié)束。
1、源碼分析
1.1、元素未添加,先協(xié)助擴(kuò)容,擴(kuò)容完后再添加元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; // no lock when adding to empty bin } // 發(fā)現(xiàn)此處為fwd節(jié)點(diǎn),協(xié)助擴(kuò)容,擴(kuò)容結(jié)束后,再循環(huán)回來添加元素 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 省略代碼 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) { // 擴(kuò)容,傳遞一個(gè)不是null的nextTab transfer(tab, nextTab); break ; } } return nextTab; } return table; } |
1.2、先添加元素,再協(xié)助擴(kuò)容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
private final void addCount( long x, int check) { // 省略代碼 if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; // 元素個(gè)數(shù)達(dá)到擴(kuò)容閾值 while (s >= ( long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // sizeCtl小于0,說明正在執(zhí)行擴(kuò)容,那么協(xié)助擴(kuò)容 if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt( this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } } |
注意:擴(kuò)容的代碼都在transfer方法中,這里不再贅述
2、圖解
五、集合長度的累計(jì)方式
1、源碼分析
1.1、addCount方法
① CounterCell數(shù)組不為空,優(yōu)先利用數(shù)組中的CounterCell記錄數(shù)量
② 如果數(shù)組為空,嘗試對baseCount進(jìn)行累加,失敗后,會執(zhí)行fullAddCount邏輯
③ 如果是添加元素操作,會繼續(xù)判斷是否需要擴(kuò)容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
private final void addCount( long x, int check) { CounterCell[] as; long b, s; // 當(dāng)CounterCell數(shù)組不為空,則優(yōu)先利用數(shù)組中的CounterCell記錄數(shù)量 // 或者當(dāng)baseCount的累加操作失敗,會利用數(shù)組中的CounterCell記錄數(shù)量 if ((as = counterCells) != null || !U.compareAndSwapLong( this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 標(biāo)識是否有多線程競爭 boolean uncontended = true ; // 當(dāng)as數(shù)組為空 // 或者當(dāng)as長度為0 // 或者當(dāng)前線程對應(yīng)的as數(shù)組桶位的元素為空 // 或者當(dāng)前線程對應(yīng)的as數(shù)組桶位不為空,但是累加失敗 if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 以上任何一種情況成立,都會進(jìn)入該方法,傳入的uncontended是false fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; // 計(jì)算元素個(gè)數(shù) s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; // 當(dāng)元素個(gè)數(shù)達(dá)到擴(kuò)容閾值 // 并且數(shù)組不為空 // 并且數(shù)組長度小于限定的最大值 // 滿足以上所有條件,執(zhí)行擴(kuò)容 while (s >= ( long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 這個(gè)是一個(gè)很大的正數(shù) int rs = resizeStamp(n); // sc小于0,說明有線程正在擴(kuò)容,那么會協(xié)助擴(kuò)容 if (sc < 0 ) { // 擴(kuò)容結(jié)束或者擴(kuò)容線程數(shù)達(dá)到最大值或者擴(kuò)容后的數(shù)組為null或者沒有更多的桶位需要轉(zhuǎn)移,結(jié)束操作 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; // 擴(kuò)容線程加1,成功后,進(jìn)行協(xié)助擴(kuò)容操作 if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) // 協(xié)助擴(kuò)容,newTable不為null transfer(tab, nt); } // 沒有其他線程在進(jìn)行擴(kuò)容,達(dá)到擴(kuò)容閾值后,給sizeCtl賦了一個(gè)很大的負(fù)數(shù) // 1+1=2 --》 代表此時(shí)有一個(gè)線程在擴(kuò)容 // rs << RESIZE_STAMP_SHIFT)是一個(gè)很大的負(fù)數(shù) else if (U.compareAndSwapInt( this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) // 擴(kuò)容,newTable為null transfer(tab, null ); s = sumCount(); } } } |
1.2、fullAddCount方法
① 當(dāng)CounterCell數(shù)組不為空,優(yōu)先對CounterCell數(shù)組中的CounterCell的value累加
② 當(dāng)CounterCell數(shù)組為空,會去創(chuàng)建CounterCell數(shù)組,默認(rèn)長度為2,并對數(shù)組中的CounterCell的value累加
③ 當(dāng)數(shù)組為空,并且此時(shí)有別的線程正在創(chuàng)建數(shù)組,那么嘗試對baseCount做累加,成功即返回,否則自旋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
private final void fullAddCount( long x, boolean wasUncontended) { int h; // 獲取當(dāng)前線程的hash值 if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } // 標(biāo)識是否有沖突,如果最后一個(gè)桶不是null,那么為true boolean collide = false ; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 數(shù)組不為空,優(yōu)先對數(shù)組中CouterCell的value累加 if ((as = counterCells) != null && (n = as.length) > 0 ) { // 線程對應(yīng)的桶位為null if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { // Try to attach new Cell // 創(chuàng)建CounterCell對象 CounterCell r = new CounterCell(x); // Optimistic create // 利用CAS修改cellBusy狀態(tài)為1,成功則將剛才創(chuàng)建的CounterCell對象放入數(shù)組中 if (cellsBusy == 0 && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { // Recheck under lock CounterCell[] rs; int m, j; // 桶位為空, 將CounterCell對象放入數(shù)組 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; // 表示放入成功 created = true ; } } finally { cellsBusy = 0 ; } if (created) //成功退出循環(huán) break ; // 桶位已經(jīng)被別的線程放置了已給CounterCell對象,繼續(xù)循環(huán) continue ; // Slot is now non-empty } } collide = false ; } // 桶位不為空,重新計(jì)算線程hash值,然后繼續(xù)循環(huán) else if (!wasUncontended) // CAS already known to fail wasUncontended = true ; // Continue after rehash // 重新計(jì)算了hash值后,對應(yīng)的桶位依然不為空,對value累加 // 成功則結(jié)束循環(huán) // 失敗則繼續(xù)下面判斷 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; // 數(shù)組被別的線程改變了,或者數(shù)組長度超過了可用cpu大小,重新計(jì)算線程hash值,否則繼續(xù)下一個(gè)判斷 else if (counterCells != as || n >= NCPU) collide = false ; // At max size or stale // 當(dāng)沒有沖突,修改為有沖突,并重新計(jì)算線程hash,繼續(xù)循環(huán) else if (!collide) collide = true ; // 如果CounterCell的數(shù)組長度沒有超過cpu核數(shù),對數(shù)組進(jìn)行兩倍擴(kuò)容 // 并繼續(xù)循環(huán) else if (cellsBusy == 0 && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { // Expand table unless stale CounterCell[] rs = new CounterCell[n << 1 ]; for ( int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } // CounterCell數(shù)組為空,并且沒有線程在創(chuàng)建數(shù)組,修改標(biāo)記,并創(chuàng)建數(shù)組 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[ 2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } // 數(shù)組為空,并且有別的線程在創(chuàng)建數(shù)組,那么嘗試對baseCount做累加,成功就退出循環(huán),失敗就繼續(xù)循環(huán) else if (U.compareAndSwapLong( this , BASECOUNT, v = baseCount, v + x)) break ; // Fall back on using base } } |
2、圖解
fullAddCount方法中,當(dāng)as數(shù)組不為空的邏輯圖解
六、集合長度獲取
1、源碼分析
1.1、size方法
1
2
3
4
5
6
|
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > ( long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : ( int )n); } |
1.2、sumCount方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 獲取baseCount的值 long sum = baseCount; if (as != null ) { // 遍歷CounterCell數(shù)組,累加每一個(gè)CounterCell的value值 for ( int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; } |
注意:這個(gè)方法并不是線程安全的
七、get方法
這個(gè)就很簡單了,獲得hash值,然后判斷存在與否,遍歷鏈表即可,注意get沒有任何鎖操作!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 計(jì)算key的hash值 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { // 表不為空并且表的長度大于0并且key所在的桶不為空 if ((eh = e.hash) h) { // 表中的元素的hash值與key的hash值相等 if ((ek = e.key) key || (ek != null && key.equals(ek))) // 鍵相等 // 返回值 return e.val; } else if (eh < 0 ) // 是個(gè)TreeBin hash = -2 // 在紅黑樹中查找,因?yàn)榧t黑樹中也保存這一個(gè)鏈表順序 return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { // 對于結(jié)點(diǎn)hash值大于0的情況鏈表 if (e.hash h && ((ek = e.key) key || (ek != null && key.equals(ek)))) return e.val; } } return null ; } |
到此這篇關(guān)于JDK1.8中的ConcurrentHashMap源碼分析的文章就介紹到這了,更多相關(guān)JDK1.8 ConcurrentHashMap源碼內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://blog.csdn.net/weixin_42856430/article/details/115420011