目錄
- 0. 第一個屬性 serialPersistentFields
- 1. spread()
- 2. tabAt()、casTabAt()、setTabAt()
- 3. counterCells
- 4. keySet、values、entrySet
- 5. 構造方法
- 6. putAll()
- 7. tryPresize()
- 8. resizeStamp()
- 9.transfer()
- 10.putVal()
- 11.initTable()
- 12.helpTransfer()
- 13.addCount()
- 14.get()
如果你沒有閱讀過 Java 源碼重讀系列之 HashMap 這篇文章的話,建議你先讀一下。以下所有內(nèi)容的前提是你已閱讀過以上的文章。
另外,凡是涉及到多線程、并發(fā)的東西從來就沒有簡單的,所以這次我們很難講清楚 ConcurrentHashMap 中的所有內(nèi)容,只能聚焦到以下幾個內(nèi)容
- ConcurrentHashMap 的 get 操作
- ConcurrentHashMap 的 put 操作
- ConcurrentHashMap 的 resize() 操作
- ConcurrentHashMap 是如何保證線程安全的
如果你想要了解的內(nèi)容不在以上范圍內(nèi),那就不用繼續(xù)閱讀了,以免浪費時間~
0. 第一個屬性 serialPersistentFields
因為 ConcurrentHashMap 的邏輯比較復雜,所以我們直接從 serialPersistentFields
屬性說起,它之前的這些屬性等用到的時候我們再看就好了,你只要知道 這個屬性之前還有一堆固定的屬性就好了。
serialPersistentFields
屬性是一個 ObjectStreamField
的數(shù)組,而且默認添加了三個元素。
private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("segments", Segment[].class), new ObjectStreamField("segmentMask", Integer.TYPE), new ObjectStreamField("segmentShift", Integer.TYPE) };
我們點到 ObjectStreamField
類中去,它的類頭有一段這樣的描述:
?* A description of a Serializable field from a Serializable class. ?An array
?* of ObjectStreamFields is used to declare the Serializable fields of a class.
簡單翻譯一下就是,一個序列化類中可以序列化屬性的描述。ObjectStreamFields 數(shù)組聲明了這個類的可序列化的字段。
好了,這個類我們看到這就可以了,而且也知道了 ConcurrentHashMap 中 serialPersistentFields
屬性的作用。就是聲明了一下 ConcurrentHashMap 里有三個 屬性可以被序列化。這三個屬性分別是segments、segmentMask、segmentShift
。結束~
1. spread()
繼續(xù)往下是 Node 類的定義,沒什么好說的,我們遇到了第一個方法。
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
都是一些位運算。解釋一下,這個方法會將 h 和 h 右移 16 位的數(shù)值進行異或(^)操作,得到的結果與 HASH_BITS 進行與(&)操作。和 HASH_BITS 進行與(&)操作,作用就是保證返回的結果的最高位一定是 0,也就是說,返回的結果一定是正數(shù)。(如果你對位運算沒有什么概念的話,也可以不用糾結這個方法,這個方法的作用就是,給一個數(shù),返回另外一個數(shù)。)
2. tabAt()、casTabAt()、setTabAt()
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
這幾個是其實是 ConcurrentHashMap 的核心操作方法。tabAt()
是獲取,casTabAt()
是更新,并且是基于 CAS 方式實現(xiàn)的更新。setTabAt()
是插入。這些實現(xiàn)都使用了大名鼎鼎的 sun.misc.Unsafe
類。
如果你對這個類不熟悉的話,其實可以簡單理解,這個類里的一些方法都是線程的。因為這個類提供的是一些硬件級別的原子操作。簡單來說,sun.misc.Unsafe
類提供的方法都是線程安全的。理解到這里就可以了,再深入的內(nèi)容,就不再本文范圍內(nèi)了。繼續(xù)往下。
3. counterCells
繼續(xù)往下的話,就看到了 table
和 nextTable
,沒什么說的,這個就是存儲數(shù)據(jù)的數(shù)組了,至于 nextTable
,通過注釋可以看到,這個變量應該是只在擴容時使用到了,等用到的時候再說。
繼續(xù)往下呢就是一些int
類型的值了,通過名字和注釋也看不出來什么,直接跳過。等用到的時候再說。繼續(xù)往下的話我們就看到了一個 CounterCell[]
數(shù)組了,點到這個類的定義,可以看到以下代碼。
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
好像也沒有多復雜,就一個使用了 volatile
標記的 數(shù)值。至于 sun.misc.Contended
注解,主要是解決 CPU 偽緩存 問題的,提高性能和效率使用的,可以先不用關注。
但是,如果你閱讀一下注釋的話,就會發(fā)現(xiàn)這里面大有文章。涉及到兩個非常復雜的東西:LongAdder and Striped64
。關于 LongAdder and Striped64
的內(nèi)容也不在本文范圍內(nèi),有興趣的話可以搜一下相關的文章。不了解也沒有關系,不影響閱讀。我們繼續(xù)往下看。
4. keySet、values、entrySet
再往下就是幾個 view 變量了。
// views private transient KeySetView<K,V> keySet; private transient ValuesView<K,V> values; private transient EntrySetView<K,V> entrySet;
看名字也應該能猜出來,這些變量應該是跟 HashMap 的 keySet()、values()、entrySet()
幾個方法的作用類似。如果你點到它的定義就會看到,這幾個類都繼承了 CollectionView
這個類。
abstract static class CollectionView<K,V,E> implements Collection<E>, java.io.Serializable { private static final long serialVersionUID = 7249069246763182397L; final ConcurrentHashMap<K,V> map; CollectionView(ConcurrentHashMap<K,V> map) { this.map = map; } //... ...
只看前面幾行就可以了,內(nèi)部有一個 ConcurrentHashMap
類型的變量,而且 CollectionView
只有一個帶有 ConcurrentHashMap
參數(shù)的構造方法。盲猜也能猜到,上面的 xxxView 類內(nèi)部操作的也都是 ConcurrentHashMap 存儲的數(shù)據(jù)。了解這些就可以了,我們繼續(xù)往下看。
5. 構造方法
第一個是個空構造方法沒有什么好說的,先看第二個。
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
通過注釋和名稱我們應該能夠知道,這個構造方法可以初始化 Map 的容量。有意思的是,計算 cap
的方法。不知道你還記不記得 HashMap 的初始容量的構造方法是怎么計算容量的。代碼在下面
this.threshold = tableSizeFor(initialCapacity);
而 ConcurrentHashMap 則是將 initialCapacity 加上了 initialCapacity 的一半又加了 1 作為 tableSizeFor
的參數(shù)。其實就是為了解決 HashMap 存在的可能出現(xiàn)兩次擴容的問題。
注意,這里使用的是 >>>
,不是 >>
。>>>
的含義是無符號右移。它會把最高位表示正負的值也會右移,然后補 0。 所以 >>>
之后,一定是正數(shù)。如果 >>>
之前是正數(shù)的話,結果跟 >>
一致。如果是負數(shù)的話,就會出現(xiàn)一個很奇怪的正數(shù)。這是因為最高位表示負數(shù)的 1 也跟著右移了。由于代碼里已經(jīng)判斷了小于 0 ,所以我們目前先按照除 2 理解即可。
還有一個點是,從代碼來看,ConcurrentHashMap 的最大容量 好像 是用 sizeCtl
表示的。但是,如果僅僅是表示最大容量,為什么會定義一個這么奇怪的名字呢? Ctl
的后綴應該是 control
的簡寫。具體是怎么控制的呢?
繼續(xù)往下,我們先跳過帶有 Map 參數(shù)的構造方法,因為這個涉及到 putAll()
方法。
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
這兩個構造方法其實可以算做一個,我們直接看下面那個復雜的。
先判斷了一下參數(shù)的取值,然后更新了一下 initialCapacity
參數(shù),然后根據(jù)參數(shù)計算 size
,考慮到 loadFactor
可能小于 1,導致 int 值越界,所以轉(zhuǎn)成了 long 類型。
關于 concurrencyLevel
,給的注釋是:并發(fā)更新線程的預估數(shù)量。那上面那段判斷更新就不難理解了。假如我預估會有 20 個線程同時更新這個初始容量為 15 的 Map,這個時候的初始容量會自動的改為 20 。
好像沒有什么問題?有意思的是, loadFactor
這個參數(shù)竟然沒有保存!! 加載因子沒有保存,那什么時候觸發(fā)擴容呢?我們繼續(xù)往下看。
6. putAll()
回到帶有 Map 參數(shù)的構造方法。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }
沒有什么復雜的,指定了下默認的初始容量(16)就直接 putAll(m);
了。
public void putAll(Map<? extends K, ? extends V> m) { tryPresize(m.size()); for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) putVal(e.getKey(), e.getValue(), false); }
好像也不難,先執(zhí)行 tryPresize(m.size());
應該是初始擴容, 然后再 for 循環(huán)進行 putVal()
操作。
7. tryPresize()
先看下方法名。嘗試并行重置容量。里面的 P 應該是 parallel(并行) 的縮寫。
private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
首先計算了下變量 c,這個是保存入?yún)?size
個元素時需要的最大容量。
然后是一個 whlie 循環(huán),因為我們是通過構造方法進來的,所以 sizeCtl
的值現(xiàn)在是默認值 16 ,table
現(xiàn)在是 null
。這個時候就進入到了 if
的代碼里了。
在 if
的條件里是判斷了 compareAndSwapInt()
的結果。這里需要說一下,compareAndSwapInt
方法是 CAS 的一種實現(xiàn)。這個方法內(nèi)部做了兩件事情,首先是比較 this 這個對象的 SIZECTL 值是否跟 sc 相等,相等的話,把 SIZECTL 的值 改為 -1。而且 Unsafe 類還保證了線程的安全。如果有多個線程同時執(zhí)行這個方法的話,只會有一個線程成功。不會出現(xiàn)兩個線程都比較通過了,然后在賦值的時候產(chǎn)生覆蓋的問題。
好像也不難理解,其實就是把 sizeCtl
值改成了 -1,而且只有一個線程會成功。這里的 sizeCtl
更像是一把鎖,哪個線程改成了 -1 ,哪個線程就獲取到了鎖,那它就可以執(zhí)行后面的流程了。
繼續(xù),因為上面已經(jīng)對 tab = table
賦值了,所以下面的判斷也能通過。然后,就看到了數(shù)組初始化的過程了。直接 new
了一個長度為 n 的 Node[]
。并賦值給了 table
。如果你往上追一下 n 的賦值,就會知道,現(xiàn)在的 n 正好是 c。就是方法一開始計算的值。
table 數(shù)組都已經(jīng)初始化了,是不是結束了?并沒有。這個時候更新了一下 sc。 >>> 2
相當于除 4,其實就是 sc 現(xiàn)在的值是 n 的 3/4 。而且在 finally
塊中,更新了 sizeCtl
。這個時候 sizeCtl
就不是 -1 了。根據(jù)我們之前的理解,這里更新 sizeCtl
,應該是在釋放鎖。
然后,第一次 while
循環(huán)就結束了。再次進入 while
循環(huán),這次 sc 是 n 的 3/4 了,上一次循環(huán)已經(jīng)更新了 sizeCtl
。
這次 table
就不等于 null
了。而且根據(jù)我們之前的推斷,現(xiàn)在的 sc 應該等于 n 的 3/4 ,而 n 之前又等于 c。所以, c <= sc
這個條件也不成立。
而且 n >= MAXIMUM_CAPACITY
這個條件大概率是在擴容到最大的時候才會成立。所以,就走到了最后一個條件分支里了。因為 sc
現(xiàn)在是大于 0 的,所以直接走到了最后一個分支。
PS: if (sc < 0)
這個分支好像永遠不會執(zhí)行,因為 while
進入的條件要求 sc >= 0
,而在判斷sc < 0
之前又沒有任何地方會把 sc
更新為負數(shù),所以好像一直都不會執(zhí)行。如果我理解錯了,希望有人能解惑一下~
8. resizeStamp()
首先執(zhí)行了一下 resizeStamp()
方法。這個方法也是一個位運算的方法。你可以直接使用 main()
方法跑一下,會返回一個很難理解的正數(shù)。簡單說一下這個數(shù)是怎么得出來的。
static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
首先, numberOfLeadingZeros()
會返回 n 的二進制串中從最左邊算起連續(xù)的“0”的總個數(shù)。然后再跟 1 左移 15 位的值按位或(|)操作。 最終得到的就是一個在二進制中,第 16 位為 1 的正數(shù)。
繼續(xù)回到代碼,因為現(xiàn)在已經(jīng)確定 sc
是 n 的 3/4 了(PS:如果這個 3/4 不理解,那換成 0.75 是不是會好點 ,好像跟 HashMap
的擴容因子一樣,其實 sc 的值就是擴容閾值,這個后面會提到,現(xiàn)在不理解沒關系),所以也是大于 0 的。這里又進行了一次 compareAndSwapInt()
。這個時候賦值的是把 rs 左移了 16 位。 還記得 resizeStamp()
返回的結果么,第 16 位是 1。所以 rs 右移 16 位之后,最高位就是 1 了,在 int 類型里,二進制的最高位表示正負,1 表示負數(shù)。
所以,這個時候,又把 sizeCtl
更新成負值了。根據(jù)我們之前的理解,這里應該還是獲取鎖的操作。獲取到鎖之后,一般就是需要操作資源了。但是 table 我們不是已經(jīng)初始化好了嗎?這次又要初始什么呢?
記住,現(xiàn)在 sizeCtl
是一個負值,并且 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
后面要用到!
9.transfer()
int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } //... ...
到這里還是比較好理解的。先初始了一下 stride
和 n
這兩個變量。然后,因為我們是初始化進來的,所以 nextTab
一定等于 null
。這個時候會初始化 nextTab
。在創(chuàng)建數(shù)組的時候捕獲了一個異常,這個異常出現(xiàn)的唯一情況就是內(nèi)存不夠了,分配不了 2 倍的 n 的數(shù)組。這個時候,將 sizeCtl
的值改為了 Integer.MAX_VALUE
。然后就結束了。如果沒有拋異常,會更新 nextTable
和 transferIndex
的值。
我們需要回頭看下 tryPresize()
方法。如果在拋異常的時候結束,會出現(xiàn)什么情況。根據(jù)代碼,異常結束后,會進入第三次循環(huán),這次循環(huán)會進入第二個分支。因為 c <= sc
一定會成立。這里就會結束循環(huán)。
到這里,我們已經(jīng)把 tryPresize()
循環(huán)里的三個分支都走完了,下面繼續(xù)看 transfer()
這個方法。
nextTab
初始化之后,我們又看到了一個新的 Node 類:
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
點到這個類的定義里我們會發(fā)現(xiàn),這個類里面只有一個屬性 nextTable
和一個 find()
方法。關于 find()
是在獲取元素時才能用到,我們先不用關注。目前來看 ForwardingNode
其實就是對 nextTab
的一個封裝。然后繼續(xù)看 transfer()
。
兩個boolean
類型的值,默認一個 true,一個 false。
下面的代碼是一個 for
循環(huán),但是這個循環(huán)有差不多 100 多行的代碼(如果我在項目里遇到這種代碼估計會罵人的~)。我們一點點看,首先是一個while
循環(huán)。
while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
首先 i = 0, bound = 0
,所以,第一次循環(huán)不會進入第一個分支。然后,根據(jù)之前 transferIndex = n;
的賦值,也不會進入第二個分支。
這樣就來到了第三個分支。compareAndSwapInt
會更新 transferIndex
的值,如果 CPU 的個數(shù)是 1,transferIndex
更新成 0 ,否則更新成 nextIndex - stride
。然后更新 bound、i、advance
的值,循環(huán)就結束了。
繼續(xù)往下,現(xiàn)在 i 的值是 n - 1
,所以不會命中 if (i < 0 || i >= n || i + n >= nextn)
條件。
然后,因我們是初始化時進入的,素以 tab 里的所有元素都是 null
,第二個條件就通過了。
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);
其實就是把 tab 的 i 位置 初始化了一個 fwd 元素。 到這里,第一次 for
循環(huán)就結束了。
第二次循環(huán)其實也很簡單,首先 advance = false
,不會進入 while
循環(huán),然后就會進入下面的判斷
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed
首先,獲取了下 tab[i]
的值,因為上次循環(huán)已經(jīng)賦值過了,現(xiàn)在 f = fwd
。然后,有意思的來了,先看下 MOVED
的定義
static final int MOVED = -1; // hash for forwarding nodes
沒錯,MOVED = -1
,按我們正常的理解,一個對象的 hash 值,怎么也不會等于 -1 吧,我們再回頭看下 ForwardingNode
這個類
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //... ... } static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } //... ... }
順便貼了下父類的代碼,主要看構造函數(shù),看到了么? fwd
這個對象在初始化的時候,指定了 hash 值,就是 MOVED
。OK,回到之前的循環(huán)。
這次循環(huán)就會把 advance
改為 true
。第二次循環(huán)就結束了。
經(jīng)過上面的兩次循環(huán)之后,我們是其實只是執(zhí)行了一行代碼 tab[n-1] = fwd
。現(xiàn)在進入第三次循環(huán),之前 i = n - 1
。現(xiàn)在又執(zhí)行了一次 if (--i >= bound || finishing)
,所以現(xiàn)在 i = n - 2
。但是 bound
可能有兩種情況,一種是 bound = n - stride
,一種是 bound = 0
。我們先假設 bound = n - stride; stride = 16
。所以,第一個條件是成立的,執(zhí)行 advance = false;
,然后 while
循環(huán)結束。
然后,第一個 if (i < 0 || i >= n || i + n >= nextn)
條件不會成立,又執(zhí)行到了賦值操作里。這時 tab[n-2] = fwd
。第三次循環(huán)結束~
然后第四次循環(huán)又會把 advance
改為 true
。
好好回味下~~
其實,這一頓操作下來就是在執(zhí)行 tab[i] = fwd
這一行代碼。搞了這么多東西,其實就是在支持多線程并發(fā)擴容,簡單說下過程。
首先,while
循環(huán)會確定當前線程擴容的區(qū)間 [ bound,nextIndex ) 左開右閉。然后 while
循環(huán)下面的代碼其實就是在給 tab
和 nextTab
賦值。設想下,如果 while
循環(huán)里的 compareAndSwapInt
執(zhí)行失敗,會是什么情況?沒錯,會空轉(zhuǎn)!結束只有兩種情況,一種是 transferIndex = 0
。說明已經(jīng)有其他線程把所有的區(qū)間都認領了,另外一種情況是執(zhí)行 compareAndSwapInt
。認領 [ bound,nextIndex ) 的區(qū)間,進行擴容。
其實,你可以直接驗證下的,打斷點也好,手寫也罷,假設n = 1024; NCPU = 4
。這時 stride = 32
。那么第一個線程會先對 tab[1024-32,1024-1]
進行賦值。如果這時有其他線程進來,在 while
循環(huán)的時候,就會認領 tab[1024-32-32,1024-32-1]
的區(qū)間進行賦值。如果有更多的線程進來的話,就會加快這個過程。這個就是所謂的 并發(fā)擴容 ,也有叫 輔助擴容 的。
然后,我們來看下通過 synchronized
加鎖的這段代碼。能執(zhí)行到這里的話只能是 tab[i].hash != MOVED
。那就說明這里記錄的是一個正常的數(shù)據(jù)。
首先判斷了下 if (tabAt(tab, i) == f)
沒什么說的,肯定成立,不成立就結束了,然后判斷了下 if (fh >= 0)
。有點奇怪,正常數(shù)據(jù)的 hash 還能小于 0 ?那我們先看下不成立的情況
else if (f instanceof TreeBin)
明白了吧,當發(fā)生 hash 沖突時,并且鏈表已經(jīng)轉(zhuǎn)成紅黑樹了,這時 tab[i] = TreeBin
,那我們看下 TreeBin
的定義。
static final class TreeBin<K,V> extends Node<K,V> { TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); //... ... } } static final int TREEBIN = -2; // hash for roots of trees
真相了,TreeBin
的 hash 值是 -2,就小于 0。后面的代碼我們就不說了,其實跟HashMap
是一樣的,如果當前節(jié)點是鏈表,那就采用高低位鏈表的形式對 nextTab
賦值,如果是 TreeBin
那就采用紅黑樹的方式進行賦值。而且,我們對 tab[i]
加了 synchronized
鎖,也不會有線程競爭,老老實實賦值就好了。
最后,transfer
里的代碼基本上都看完了,就剩下面這段了
if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
在 while
循環(huán)里,有這么一行代碼 i = -1;
執(zhí)行了這個之后,就會進入上面的代碼里。其實就是 tab
初始化完成之后,即 nextIndex = 0
的時候,就會執(zhí)行 i = -1;
,然后就會進入上面的代碼了。我們看下上面的代碼。
首先,finishing
現(xiàn)在應該是等于 false
的,直接進入第二個 if
。這個也很簡單,首先 sc = sizeCtl
,賦值,然后通過 CAS 將 sizeCtl
的值改為 sc - 1
。還記得 sizeCtl
的值是多少么?? 我直接粘貼一下 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
。是不是跟 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
這個判斷邏輯一致?如果不相等,說明有其他線程修改了sizeCtl
,這同時說明有其他線程還在執(zhí)行擴容的動作,即還在執(zhí)行 tryPresize()
或者是 transfer()
方法。那么,因為當前線程已經(jīng)執(zhí)行完了,所以直接 return;
結束,讓其他線程繼續(xù)執(zhí)行就好了。
如果相等,執(zhí)行 finishing = advance = true; i = n
。進入下一次 for
循環(huán)。
一頓判斷之后,你會發(fā)現(xiàn),還是會進入到上面的代碼,而這時,finishing == true
了!下面的代碼就不難理解了吧,更新 table
,相當于使用了新的數(shù)組了,而 sizeCtl
也更新了一下。都是位運算,如果你看不明白,可以用 main
方法跑一下。我們假設 n = 1024
,那么 table
現(xiàn)在的大小也就是之前 nextTab
的大小,就是2048,然后,我們用 main
跑一下 sizeCtl
的值,不出意外的話應該等于 1536 。如果還看不明白,那么你再計算下 1536 / 2048
。結果是 0.75 ,這個數(shù)字熟悉吧? HashMap
的默認加載因子!。沒錯,sizeCtl
其實就是下次需要擴容的閾值。
到這里,我們就把 transfer()
方法看完了。然后,我們重點總結下 sizeCtl
這個屬性,不得不承認,這個設計非常的巧妙!
首先,通常情況下 sizeCtl
應該是等于下次的擴容閾值的。但是在擴容期間,有兩個狀態(tài),一個是 -1,一個是非常大的一個負值。等于 -1 很好理解,相當于是一個鎖,哪個線程更新成功,就可以進行數(shù)組的初始化動作。那么,就剩最后一種情況了。直接用下面的 main()
方法跑一下
public static void main(String[] args) throws IllegalAccessException { int rs = Integer.numberOfLeadingZeros(1024) | (1 << (16 - 1)); int sizeCtl = (rs << 16) + 2; System.out.println(sizeCtl); System.out.println((sizeCtl<<16)>>16); }
會得到下面的結果
-2146107390
2
有意思了,(sizeCtl<<16)>>16
,這個操作是先左移 16 位,再右移 16 位,相當于把 sizeCtl
的高 16 位都置為 0 了。得到了一個 2,其實,這個 2 就是說當前有 (2 - 1) 個線程在進行擴容操作。(PS: sizeCtl
注釋里有寫~)。具體是為什么,我們繼續(xù)往下看。
transfer()
執(zhí)行完,就回到了 tryPresize()
。然后繼續(xù)返回就到了 putAll()
。繼續(xù)往下執(zhí)行就是 putVal()
方法了。
10.putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //... ...
首先判斷了下 null
,然后計算了下 hash
哈希值。就進入 for
循環(huán)了。首先是第一個分支。其實就是 tab
還沒有初始化的時候會進入這個分支。
11.initTable()
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
好像也沒有多少復雜的,因為我們之前已經(jīng)對 sizeCtl
做了充分的解釋,這里如果 sc < 0
的話,說明在擴容或者是初始化中,然后當前線程直接執(zhí)行了 Thread.yield();
,就是放棄 CPU 執(zhí)行權等待下次分配 CPU 時間片,如果不小于 0 ,并且 tab = null
,那說明現(xiàn)在還沒有線程執(zhí)行擴容,那當前線程就會更新 sizeCtl
,然后自己開始執(zhí)行初始化動作,初始化好后直接返回 tab
。
繼續(xù)回到 putVal()
方法,如果執(zhí)行第二個分支,說明 tab[i] == null
,這個位置還沒有元素,直接通過 casTabAt()
方法進行賦值。如果這個位置有值,并且 (fh = f.hash) == MOVED
說明在擴容或者是在初始化,這個時候當前線程會進行 輔助擴容。
12.helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
首先在 if
條件里獲取到 nextTab
如果不為 null
,就會進入 while
循環(huán),首先 sc < 0
說還在擴容或者是初始化中,while
循環(huán)里的第一個分支是不需要輔助擴容或者是已經(jīng)達到最大的輔助線程數(shù)量,或者是已經(jīng)剩最后一個線程在擴容了,其他的線程都結束了。所以直接 break;
就可以了。
第二個分支,首先會執(zhí)行 sizeCtl + 1
的操作,執(zhí)行成功就會執(zhí)行 transfer()
方法,這個方法我們之前已經(jīng)看過了,就不看了,需要注意的是,我們之前說過,sizeCtl
的低 16 位代表目前正在擴容的線程數(shù)減一。因為這里新加入一個線程參與擴容,所以對 sizeCtl
進行了加一的操作。如果還有線程進來,那么 sizeCtl
還會加一。這里就解釋清楚 sizeCtl
的另外的一個用法了。擴容結束直接 break;
。繼續(xù)回到 putVal()
。
繼續(xù)下一次 for
,如果 tab[i]
還不等于 null
,那就說明發(fā)生哈希沖突了,并且當前已經(jīng)不在擴容了。就走到了最后一個分支,使用 synchronized
加鎖的這一段代碼里,這段代碼其實并不復雜,發(fā)生沖突之后無非就兩種情況,鏈表或者是紅黑樹。你可以看下 TreeBin
的構造方法。它的哈希值是 -2。
static final int TREEBIN = -2; // hash for roots of trees //... ... TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); //... ... }
所以才有了 if (fh >= 0)
的判斷,如果首節(jié)點的哈希值大于 0 ,那一定是鏈表。
最后還有一段進行樹化的判斷操作。
static final int TREEIFY_THRESHOLD = 8; //... ... if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
鏈表的節(jié)點數(shù)超過 8 就會進行樹化操作。到這里其實 putVal()
的相關操作基本上已經(jīng)結束了,就剩最后一個 addCount()
方法了,看名稱也知道這個是更新計數(shù)器的,盲猜也能猜到應該跟元素數(shù)量有關系。不過,好像有點問題啊,你有沒有發(fā)現(xiàn),在整個 putVal()
方法里面,好像沒有觸發(fā)擴容的邏輯!!
13.addCount()
其實這個方法除了操作我們之前見到的 counterCells
屬性外,還會判斷是否需要進行擴容。因為只有在知道具體的元素數(shù)量后,才能判斷出是否需要擴容。我們先看這個方法的第一段代碼。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //... ...
首先,我們假設目前是第一次執(zhí)行這個方法,那么 counterCells == null
,然后就會使用 CAS 執(zhí)行 baseCount = b + x
。失敗之后,就開始執(zhí)行 fullAddCount()
方法了,因為現(xiàn)在 as == null
是成立的。
fullAddCount()
方法與 Striped64.longAccumulate()
方法基本上是一模一樣的,因為之前已經(jīng)跳過了 Striped64
,所以這里也不打算去細看。直接總結下 counterCells
的作用。其實 counterCells
就是在多個線程同時更新 baseCount
失敗時記錄下新增的元素數(shù)量。
舉個例子就明白了,假設 ConcurrentHashMap
初始化完成之后,有 2 個線程,同時執(zhí)行了 addCount()
,那么 baseCount
會更新成 1,counterCells
會初始化為一個大小為 2 的數(shù)組,且一個元素是 null
,另外一個元素的 counterCells[i].value
值是 1。
如果這個時候又來了一個線程,會有 3 種情況,
- CAS 更新
baseCount
成功,baseCount = 2
。第三個線程結束 - CAS 失敗且
counterCells[ThreadLocalRandom.getProbe() & m] == null
。繼續(xù)初始化counterCells
的另一個為null
的元素,值為 1。 - CAS 失敗且
counterCells[ThreadLocalRandom.getProbe() & m] != null
,那么counterCells[i].value
值更新成 2。
ThreadLocalRandom.getProbe()
方法其實就是取了個隨機值。就是說,如果有多個線程同時更新的話,失敗的線程會隨機的從 counterCells
取一個元素,將新增的數(shù)量保存進去。
其實很簡單,能進入到 fullAddCount()
方法的條件只有一種,counterCells == null
并且 CAS 更新 baseCount
失敗,這種情況就是有多個線程同時執(zhí)行了 addCount()
方法,比如,有兩線程同時執(zhí)行 putVal()
,那么必然有一個線程在 CAS 更新 baseCount
時會失敗,這個時候就進入到 fullAddCount()
方法。這個方法內(nèi)部就是在操作 counterCells
數(shù)組。操作的行為基本上就是下面這幾種
要么是初始化 counterCells
數(shù)組
if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; }
要么就是初始化 counterCells
數(shù)組元素
CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty }
要么就是更新 counterCells
數(shù)組元素的值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break;
還有一種操作就是 擴容 ,對 counterCells
進行擴容。
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table }
在 fullAddCount()
方法里,每次循環(huán)都會重新隨機取元素 h = ThreadLocalRandom.advanceProbe(h);
。如果執(zhí)行循環(huán)了多次,都沒有保存成功,說明 counterCells
的容量不夠用了,就會觸發(fā)擴容。從上面的代碼里也能看到,counterCells
的擴容非常簡單,數(shù)組直接翻倍,元素直接賦值到新數(shù)組里,位置都沒有變。
繼續(xù)回到 addCount()
方法,之后的邏輯就是判斷了下 check
參數(shù)。其實這里的邏輯是,如果有多個線程同時操作,只要沒有發(fā)生哈希沖突,就不進行擴容檢查。你往回翻一下就可以看到 check
參數(shù)其實就是 tab[i]
位置的元素數(shù)量。
如果發(fā)生了哈希沖突,或者說沒有多個線程同時操作(這個時候就進入不了當前的分支,更新 baseCount
不會失敗),就會執(zhí)行 s = sumCount();
這個方法非常簡單,就是對 baseCount
和 counterCells
里的數(shù)值進行了一下求和,然后就開始執(zhí)行下面的代碼。
if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }
首先 while
循環(huán)判斷了下當前的元素數(shù)量是否超過了 sizeCtl
,即便是在擴容期間,sizeCtl
小于 0 的時候,也算成立。然后判斷了下 tab
,基本上也會成立。直接進入循環(huán)內(nèi)部
第一個分支 if (sc < 0)
說明已經(jīng)有線程開始執(zhí)行擴容動作了,這個時候更新 sizeCtl
的值加一,當前線程參與 輔助擴容。
第二個分支是目前還沒有線程進行擴容操作,那么當前線程開始執(zhí)行擴容,(rs << RESIZE_STAMP_SHIFT) + 2)
這個數(shù)值我們之前已經(jīng)看到過了,就不再贅述了。
循環(huán)最后重新計算 s
,擴容結束后 s
就不會小于 sizeCtl
,方法結束。
好了,到這里我們基本上就把 ConcurrentHashMap
的 put 操作的邏輯看完了。其實整體上跟 HashMap
還是比較類似的,基本上就是把所有對 tab
的操作都使用 Unsafe
包裝了一下,解決多線程操作的問題,而發(fā)生哈希沖突時也是使用了 synchronized
進行了加鎖,解決了多線程操作鏈表的覆蓋問題。比較難的反而是元素數(shù)量的問題。因為 ConcurrentHashMap
一定要保證元素保存到 tab
成功后,元素數(shù)量一定也要加成功!不能因為元素數(shù)量的值更新失敗了,再把保存到 tab
里的元素刪除掉吧。所以呢 ConcurrentHashMap
就使用 counterCells
數(shù)組來保存那些更新 baseCount
失敗的數(shù)量。
14.get()
下面我們看下 get()
方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
好像也不復雜,第一個分支是一次就從 tab[i]
位置找到了元素,直接返回。最后一個 while
循環(huán)是 tab[i]
位置發(fā)生了哈希沖突,且當前位置是鏈表,通過 while
循環(huán)遍歷尋找。
重點說一下第二個分支吧 else if (eh < 0)
有兩種情況,一種是 tab[i]
位置發(fā)生了哈希沖突,且當前位置是紅黑樹,這時 e
是 TreeBin
類型的,因為涉及到紅黑樹,我們直接跳過,有興趣的可以自己研究下。另外一種情況是在擴容期間,當前元素已經(jīng)轉(zhuǎn)移到新的 nextTable
上了,這時 e
的類型是 ForwardingNode
類型,我們直接看下 ForwardingNode
類的 find()
代碼
Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } }
也不復雜,就是直接在 nextTable
找元素,如果 nextTable[i]
位置為 null
直接返回,否則就進入了 for (;;)
循環(huán)里,跟之前類似,第一個分支里是直接找到了元素,而 if (eh < 0)
也有兩種情況,一個是擴容時轉(zhuǎn)移到新的 nextTable
,一個就是紅黑樹。最后就是鏈表了。
好了,到這里基本上所有的內(nèi)容都結束了,最后還剩一點有意思的東西,就是 Traverser
類,這個類其實實現(xiàn)了在擴容期間,也能使 ConcurrentHashMap
可以高效的(不使用鎖)遍歷。代碼不多,有興趣的話可以讀一下~
以下是遺留的一些內(nèi)容,有機會再補吧
-
sun.misc.Unsafe
類。 -
LongAdder
andStriped64
-
sun.misc.Contended
注解 TreeBin
以上就是Java源碼重讀之ConcurrentHashMap詳解的詳細內(nèi)容,更多關于Java ConcurrentHashMap的資料請關注其它相關文章!
原文地址:https://juejin.cn/post/7230450989591986236