激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java源碼重讀之ConcurrentHashMap詳解

Java源碼重讀之ConcurrentHashMap詳解

2023-05-09 01:04未知服務器之家 Java教程

目錄 0. 第一個屬性 serialPersistentFields 1. spread() 2. tabAt()、casTabAt()、setTabAt() 3. counterCells 4. keySet、values、entrySet 5. 構造方法 6. putAll() 7. tryPresize() 8. resizeStamp() 9.transfer() 10.putVal() 11.initTable() 12.helpTransfer() 13.addCount() 14.get()

目錄
  • 0. 第一個屬性 serialPersistentFields
  • 1. spread()
  • 2. tabAt()、casTabAt()、setTabAt()
  • 3. counterCells
  • 4. keySet、values、entrySet
  • 5. 構造方法
  • 6. putAll()
  • 7. tryPresize()
  • 8. resizeStamp()
  • 9.transfer()
  • 10.putVal()
  • 11.initTable()
  • 12.helpTransfer()
  • 13.addCount()
  • 14.get()

如果你沒有閱讀過 Java 源碼重讀系列之 HashMap 這篇文章的話,建議你先讀一下。以下所有內(nèi)容的前提是你已閱讀過以上的文章。

另外,凡是涉及到多線程、并發(fā)的東西從來就沒有簡單的,所以這次我們很難講清楚 ConcurrentHashMap 中的所有內(nèi)容,只能聚焦到以下幾個內(nèi)容

  • ConcurrentHashMap 的 get 操作
  • ConcurrentHashMap 的 put 操作
  • ConcurrentHashMap 的 resize() 操作
  • ConcurrentHashMap 是如何保證線程安全的

如果你想要了解的內(nèi)容不在以上范圍內(nèi),那就不用繼續(xù)閱讀了,以免浪費時間~

0. 第一個屬性 serialPersistentFields

因為 ConcurrentHashMap 的邏輯比較復雜,所以我們直接從 serialPersistentFields 屬性說起,它之前的這些屬性等用到的時候我們再看就好了,你只要知道 這個屬性之前還有一堆固定的屬性就好了。

serialPersistentFields 屬性是一個 ObjectStreamField 的數(shù)組,而且默認添加了三個元素。

    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

我們點到 ObjectStreamField 類中去,它的類頭有一段這樣的描述:

?* A description of a Serializable field from a Serializable class. ?An array
?* of ObjectStreamFields is used to declare the Serializable fields of a class.

簡單翻譯一下就是,一個序列化類中可以序列化屬性的描述。ObjectStreamFields 數(shù)組聲明了這個類的可序列化的字段。

好了,這個類我們看到這就可以了,而且也知道了 ConcurrentHashMap 中 serialPersistentFields 屬性的作用。就是聲明了一下 ConcurrentHashMap 里有三個 屬性可以被序列化。這三個屬性分別是segments、segmentMask、segmentShift 。結束~

1. spread()

繼續(xù)往下是 Node 類的定義,沒什么好說的,我們遇到了第一個方法。

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

都是一些位運算。解釋一下,這個方法會將 h 和 h 右移 16 位的數(shù)值進行異或(^)操作,得到的結果與 HASH_BITS 進行與(&)操作。和 HASH_BITS 進行與(&)操作,作用就是保證返回的結果的最高位一定是 0,也就是說,返回的結果一定是正數(shù)。(如果你對位運算沒有什么概念的話,也可以不用糾結這個方法,這個方法的作用就是,給一個數(shù),返回另外一個數(shù)。)

2. tabAt()、casTabAt()、setTabAt()

    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

這幾個是其實是 ConcurrentHashMap 的核心操作方法。tabAt() 是獲取,casTabAt() 是更新,并且是基于 CAS 方式實現(xiàn)的更新。setTabAt() 是插入。這些實現(xiàn)都使用了大名鼎鼎的 sun.misc.Unsafe 類。

如果你對這個類不熟悉的話,其實可以簡單理解,這個類里的一些方法都是線程的。因為這個類提供的是一些硬件級別的原子操作。簡單來說,sun.misc.Unsafe 類提供的方法都是線程安全的。理解到這里就可以了,再深入的內(nèi)容,就不再本文范圍內(nèi)了。繼續(xù)往下。

3. counterCells

繼續(xù)往下的話,就看到了 tablenextTable,沒什么說的,這個就是存儲數(shù)據(jù)的數(shù)組了,至于 nextTable,通過注釋可以看到,這個變量應該是只在擴容時使用到了,等用到的時候再說。

繼續(xù)往下呢就是一些int 類型的值了,通過名字和注釋也看不出來什么,直接跳過。等用到的時候再說。繼續(xù)往下的話我們就看到了一個 CounterCell[] 數(shù)組了,點到這個類的定義,可以看到以下代碼。

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

好像也沒有多復雜,就一個使用了 volatile 標記的 數(shù)值。至于 sun.misc.Contended 注解,主要是解決 CPU 偽緩存 問題的,提高性能和效率使用的,可以先不用關注。

但是,如果你閱讀一下注釋的話,就會發(fā)現(xiàn)這里面大有文章。涉及到兩個非常復雜的東西:LongAdder and Striped64。關于 LongAdder and Striped64 的內(nèi)容也不在本文范圍內(nèi),有興趣的話可以搜一下相關的文章。不了解也沒有關系,不影響閱讀。我們繼續(xù)往下看。

4. keySet、values、entrySet

再往下就是幾個 view 變量了。

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

看名字也應該能猜出來,這些變量應該是跟 HashMap 的 keySet()、values()、entrySet() 幾個方法的作用類似。如果你點到它的定義就會看到,這幾個類都繼承了 CollectionView 這個類。

abstract static class CollectionView<K,V,E>
        implements Collection<E>, java.io.Serializable {
        private static final long serialVersionUID = 7249069246763182397L;
        final ConcurrentHashMap<K,V> map;
        CollectionView(ConcurrentHashMap<K,V> map)  { this.map = map; }
        //... ...

只看前面幾行就可以了,內(nèi)部有一個 ConcurrentHashMap 類型的變量,而且 CollectionView 只有一個帶有 ConcurrentHashMap 參數(shù)的構造方法。盲猜也能猜到,上面的 xxxView 類內(nèi)部操作的也都是 ConcurrentHashMap 存儲的數(shù)據(jù)。了解這些就可以了,我們繼續(xù)往下看。

5. 構造方法

第一個是個空構造方法沒有什么好說的,先看第二個。

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

通過注釋和名稱我們應該能夠知道,這個構造方法可以初始化 Map 的容量。有意思的是,計算 cap 的方法。不知道你還記不記得 HashMap 的初始容量的構造方法是怎么計算容量的。代碼在下面

this.threshold = tableSizeFor(initialCapacity);

而 ConcurrentHashMap 則是將 initialCapacity 加上了 initialCapacity 的一半又加了 1 作為 tableSizeFor 的參數(shù)。其實就是為了解決 HashMap 存在的可能出現(xiàn)兩次擴容的問題。

注意,這里使用的是 >>>,不是 >>>>> 的含義是無符號右移。它會把最高位表示正負的值也會右移,然后補 0。 所以 >>> 之后,一定是正數(shù)。如果 >>> 之前是正數(shù)的話,結果跟 >> 一致。如果是負數(shù)的話,就會出現(xiàn)一個很奇怪的正數(shù)。這是因為最高位表示負數(shù)的 1 也跟著右移了。由于代碼里已經(jīng)判斷了小于 0 ,所以我們目前先按照除 2 理解即可。

還有一個點是,從代碼來看,ConcurrentHashMap 的最大容量 好像 是用 sizeCtl 表示的。但是,如果僅僅是表示最大容量,為什么會定義一個這么奇怪的名字呢? Ctl 的后綴應該是 control 的簡寫。具體是怎么控制的呢?

繼續(xù)往下,我們先跳過帶有 Map 參數(shù)的構造方法,因為這個涉及到 putAll() 方法。

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

這兩個構造方法其實可以算做一個,我們直接看下面那個復雜的。

先判斷了一下參數(shù)的取值,然后更新了一下 initialCapacity 參數(shù),然后根據(jù)參數(shù)計算 size,考慮到 loadFactor 可能小于 1,導致 int 值越界,所以轉(zhuǎn)成了 long 類型。

關于 concurrencyLevel,給的注釋是:并發(fā)更新線程的預估數(shù)量。那上面那段判斷更新就不難理解了。假如我預估會有 20 個線程同時更新這個初始容量為 15 的 Map,這個時候的初始容量會自動的改為 20 。

好像沒有什么問題?有意思的是, loadFactor 這個參數(shù)竟然沒有保存!! 加載因子沒有保存,那什么時候觸發(fā)擴容呢?我們繼續(xù)往下看。

6. putAll()

回到帶有 Map 參數(shù)的構造方法。

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

沒有什么復雜的,指定了下默認的初始容量(16)就直接 putAll(m); 了。

    public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

好像也不難,先執(zhí)行 tryPresize(m.size()); 應該是初始擴容, 然后再 for 循環(huán)進行 putVal() 操作。

7. tryPresize()

先看下方法名。嘗試并行重置容量。里面的 P 應該是 parallel(并行) 的縮寫。

    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc ==   + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

首先計算了下變量 c,這個是保存入?yún)?size 個元素時需要的最大容量。

然后是一個 whlie 循環(huán),因為我們是通過構造方法進來的,所以 sizeCtl 的值現(xiàn)在是默認值 16 ,table 現(xiàn)在是 null。這個時候就進入到了 if 的代碼里了。

if 的條件里是判斷了 compareAndSwapInt() 的結果。這里需要說一下,compareAndSwapInt 方法是 CAS 的一種實現(xiàn)。這個方法內(nèi)部做了兩件事情,首先是比較 this 這個對象的 SIZECTL 值是否跟 sc 相等,相等的話,把 SIZECTL 的值 改為 -1。而且 Unsafe 類還保證了線程的安全。如果有多個線程同時執(zhí)行這個方法的話,只會有一個線程成功。不會出現(xiàn)兩個線程都比較通過了,然后在賦值的時候產(chǎn)生覆蓋的問題。

好像也不難理解,其實就是把 sizeCtl 值改成了 -1,而且只有一個線程會成功。這里的 sizeCtl 更像是一把鎖,哪個線程改成了 -1 ,哪個線程就獲取到了鎖,那它就可以執(zhí)行后面的流程了。

繼續(xù),因為上面已經(jīng)對 tab = table 賦值了,所以下面的判斷也能通過。然后,就看到了數(shù)組初始化的過程了。直接 new 了一個長度為 n 的 Node[]。并賦值給了 table。如果你往上追一下 n 的賦值,就會知道,現(xiàn)在的 n 正好是 c。就是方法一開始計算的值。

table 數(shù)組都已經(jīng)初始化了,是不是結束了?并沒有。這個時候更新了一下 sc。 >>> 2 相當于除 4,其實就是 sc 現(xiàn)在的值是 n 的 3/4 。而且在 finally 塊中,更新了 sizeCtl。這個時候 sizeCtl 就不是 -1 了。根據(jù)我們之前的理解,這里更新 sizeCtl,應該是在釋放鎖。

然后,第一次 while 循環(huán)就結束了。再次進入 while 循環(huán),這次 sc 是 n 的 3/4 了,上一次循環(huán)已經(jīng)更新了 sizeCtl

這次 table 就不等于 null 了。而且根據(jù)我們之前的推斷,現(xiàn)在的 sc 應該等于 n 的 3/4 ,而 n 之前又等于 c。所以, c <= sc 這個條件也不成立。

而且 n >= MAXIMUM_CAPACITY 這個條件大概率是在擴容到最大的時候才會成立。所以,就走到了最后一個條件分支里了。因為 sc 現(xiàn)在是大于 0 的,所以直接走到了最后一個分支。

PS: if (sc < 0) 這個分支好像永遠不會執(zhí)行,因為 while 進入的條件要求 sc >= 0,而在判斷sc < 0 之前又沒有任何地方會把 sc 更新為負數(shù),所以好像一直都不會執(zhí)行。如果我理解錯了,希望有人能解惑一下~

8. resizeStamp()

首先執(zhí)行了一下 resizeStamp() 方法。這個方法也是一個位運算的方法。你可以直接使用 main() 方法跑一下,會返回一個很難理解的正數(shù)。簡單說一下這個數(shù)是怎么得出來的。

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

首先, numberOfLeadingZeros() 會返回 n 的二進制串中從最左邊算起連續(xù)的“0”的總個數(shù)。然后再跟 1 左移 15 位的值按位或(|)操作。 最終得到的就是一個在二進制中,第 16 位為 1 的正數(shù)。

繼續(xù)回到代碼,因為現(xiàn)在已經(jīng)確定 sc 是 n 的 3/4 了(PS:如果這個 3/4 不理解,那換成 0.75 是不是會好點 ,好像跟 HashMap 的擴容因子一樣,其實 sc 的值就是擴容閾值,這個后面會提到,現(xiàn)在不理解沒關系),所以也是大于 0 的。這里又進行了一次 compareAndSwapInt()。這個時候賦值的是把 rs 左移了 16 位。 還記得 resizeStamp() 返回的結果么,第 16 位是 1。所以 rs 右移 16 位之后,最高位就是 1 了,在 int 類型里,二進制的最高位表示正負,1 表示負數(shù)。

所以,這個時候,又把 sizeCtl 更新成負值了。根據(jù)我們之前的理解,這里應該還是獲取鎖的操作。獲取到鎖之后,一般就是需要操作資源了。但是 table 我們不是已經(jīng)初始化好了嗎?這次又要初始什么呢?

記住,現(xiàn)在 sizeCtl 是一個負值,并且 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) 后面要用到!

9.transfer()

        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        //... ...

到這里還是比較好理解的。先初始了一下 striden 這兩個變量。然后,因為我們是初始化進來的,所以 nextTab 一定等于 null。這個時候會初始化 nextTab。在創(chuàng)建數(shù)組的時候捕獲了一個異常,這個異常出現(xiàn)的唯一情況就是內(nèi)存不夠了,分配不了 2 倍的 n 的數(shù)組。這個時候,將 sizeCtl 的值改為了 Integer.MAX_VALUE。然后就結束了。如果沒有拋異常,會更新 nextTabletransferIndex 的值。

我們需要回頭看下 tryPresize() 方法。如果在拋異常的時候結束,會出現(xiàn)什么情況。根據(jù)代碼,異常結束后,會進入第三次循環(huán),這次循環(huán)會進入第二個分支。因為 c <= sc 一定會成立。這里就會結束循環(huán)。

到這里,我們已經(jīng)把 tryPresize() 循環(huán)里的三個分支都走完了,下面繼續(xù)看 transfer() 這個方法。

nextTab 初始化之后,我們又看到了一個新的 Node 類:

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

點到這個類的定義里我們會發(fā)現(xiàn),這個類里面只有一個屬性 nextTable 和一個 find() 方法。關于 find() 是在獲取元素時才能用到,我們先不用關注。目前來看 ForwardingNode 其實就是對 nextTab 的一個封裝。然后繼續(xù)看 transfer()

兩個boolean 類型的值,默認一個 true,一個 false。

下面的代碼是一個 for 循環(huán),但是這個循環(huán)有差不多 100 多行的代碼(如果我在項目里遇到這種代碼估計會罵人的~)。我們一點點看,首先是一個while 循環(huán)。

        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 
                    nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }

首先 i = 0, bound = 0 ,所以,第一次循環(huán)不會進入第一個分支。然后,根據(jù)之前 transferIndex = n; 的賦值,也不會進入第二個分支。

這樣就來到了第三個分支。compareAndSwapInt 會更新 transferIndex 的值,如果 CPU 的個數(shù)是 1,transferIndex 更新成 0 ,否則更新成 nextIndex - stride 。然后更新 bound、i、advance 的值,循環(huán)就結束了。

繼續(xù)往下,現(xiàn)在 i 的值是 n - 1,所以不會命中 if (i < 0 || i >= n || i + n >= nextn) 條件。

然后,因我們是初始化時進入的,素以 tab 里的所有元素都是 null,第二個條件就通過了。

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);

其實就是把 tab 的 i 位置 初始化了一個 fwd 元素。 到這里,第一次 for 循環(huán)就結束了。

第二次循環(huán)其實也很簡單,首先 advance = false ,不會進入 while 循環(huán),然后就會進入下面的判斷

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
    else if ((fh = f.hash) == MOVED)
        advance = true; // already processed

首先,獲取了下 tab[i] 的值,因為上次循環(huán)已經(jīng)賦值過了,現(xiàn)在 f = fwd。然后,有意思的來了,先看下 MOVED 的定義

    static final int MOVED     = -1; // hash for forwarding nodes

沒錯,MOVED = -1,按我們正常的理解,一個對象的 hash 值,怎么也不會等于 -1 吧,我們再回頭看下 ForwardingNode 這個類

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //... ...
    }
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        //... ... 
    }

順便貼了下父類的代碼,主要看構造函數(shù),看到了么? fwd 這個對象在初始化的時候,指定了 hash 值,就是 MOVED。OK,回到之前的循環(huán)。

這次循環(huán)就會把 advance 改為 true。第二次循環(huán)就結束了。

經(jīng)過上面的兩次循環(huán)之后,我們是其實只是執(zhí)行了一行代碼 tab[n-1] = fwd。現(xiàn)在進入第三次循環(huán),之前 i = n - 1 。現(xiàn)在又執(zhí)行了一次 if (--i >= bound || finishing),所以現(xiàn)在 i = n - 2 。但是 bound 可能有兩種情況,一種是 bound = n - stride,一種是 bound = 0。我們先假設 bound = n - stride; stride = 16 。所以,第一個條件是成立的,執(zhí)行 advance = false; ,然后 while 循環(huán)結束。

然后,第一個 if (i < 0 || i >= n || i + n >= nextn) 條件不會成立,又執(zhí)行到了賦值操作里。這時 tab[n-2] = fwd。第三次循環(huán)結束~

然后第四次循環(huán)又會把 advance 改為 true

好好回味下~~

其實,這一頓操作下來就是在執(zhí)行 tab[i] = fwd 這一行代碼。搞了這么多東西,其實就是在支持多線程并發(fā)擴容,簡單說下過程。

首先,while 循環(huán)會確定當前線程擴容的區(qū)間 [ bound,nextIndex ) 左開右閉。然后 while 循環(huán)下面的代碼其實就是在給 tabnextTab 賦值。設想下,如果 while 循環(huán)里的 compareAndSwapInt 執(zhí)行失敗,會是什么情況?沒錯,會空轉(zhuǎn)!結束只有兩種情況,一種是 transferIndex = 0。說明已經(jīng)有其他線程把所有的區(qū)間都認領了,另外一種情況是執(zhí)行 compareAndSwapInt。認領 [ bound,nextIndex ) 的區(qū)間,進行擴容。

其實,你可以直接驗證下的,打斷點也好,手寫也罷,假設n = 1024; NCPU = 4。這時 stride = 32。那么第一個線程會先對 tab[1024-32,1024-1]進行賦值。如果這時有其他線程進來,在 while 循環(huán)的時候,就會認領 tab[1024-32-32,1024-32-1] 的區(qū)間進行賦值。如果有更多的線程進來的話,就會加快這個過程。這個就是所謂的 并發(fā)擴容 ,也有叫 輔助擴容 的。

然后,我們來看下通過 synchronized 加鎖的這段代碼。能執(zhí)行到這里的話只能是 tab[i].hash != MOVED。那就說明這里記錄的是一個正常的數(shù)據(jù)。

首先判斷了下 if (tabAt(tab, i) == f) 沒什么說的,肯定成立,不成立就結束了,然后判斷了下 if (fh >= 0)。有點奇怪,正常數(shù)據(jù)的 hash 還能小于 0 ?那我們先看下不成立的情況

 else if (f instanceof TreeBin) 

明白了吧,當發(fā)生 hash 沖突時,并且鏈表已經(jīng)轉(zhuǎn)成紅黑樹了,這時 tab[i] = TreeBin,那我們看下 TreeBin 的定義。

static final class TreeBin<K,V> extends Node<K,V> {
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ...
    }
}
 static final int TREEBIN   = -2; // hash for roots of trees

真相了,TreeBin 的 hash 值是 -2,就小于 0。后面的代碼我們就不說了,其實跟HashMap是一樣的,如果當前節(jié)點是鏈表,那就采用高低位鏈表的形式對 nextTab 賦值,如果是 TreeBin 那就采用紅黑樹的方式進行賦值。而且,我們對 tab[i] 加了 synchronized 鎖,也不會有線程競爭,老老實實賦值就好了。

最后,transfer 里的代碼基本上都看完了,就剩下面這段了

    if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        if (finishing) {
            nextTable = null;
            table = nextTab;
            sizeCtl = (n << 1) - (n >>> 1);
            return;
        }
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit
        }
    }

while 循環(huán)里,有這么一行代碼 i = -1; 執(zhí)行了這個之后,就會進入上面的代碼里。其實就是 tab 初始化完成之后,即 nextIndex = 0 的時候,就會執(zhí)行 i = -1; ,然后就會進入上面的代碼了。我們看下上面的代碼。

首先,finishing 現(xiàn)在應該是等于 false 的,直接進入第二個 if。這個也很簡單,首先 sc = sizeCtl,賦值,然后通過 CAS 將 sizeCtl 的值改為 sc - 1。還記得 sizeCtl 的值是多少么?? 我直接粘貼一下 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)。是不是跟 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 這個判斷邏輯一致?如果不相等,說明有其他線程修改了sizeCtl,這同時說明有其他線程還在執(zhí)行擴容的動作,即還在執(zhí)行 tryPresize() 或者是 transfer() 方法。那么,因為當前線程已經(jīng)執(zhí)行完了,所以直接 return; 結束,讓其他線程繼續(xù)執(zhí)行就好了。

如果相等,執(zhí)行 finishing = advance = true; i = n。進入下一次 for 循環(huán)。

一頓判斷之后,你會發(fā)現(xiàn),還是會進入到上面的代碼,而這時,finishing == true 了!下面的代碼就不難理解了吧,更新 table ,相當于使用了新的數(shù)組了,而 sizeCtl 也更新了一下。都是位運算,如果你看不明白,可以用 main 方法跑一下。我們假設 n = 1024,那么 table 現(xiàn)在的大小也就是之前 nextTab 的大小,就是2048,然后,我們用 main 跑一下 sizeCtl 的值,不出意外的話應該等于 1536 。如果還看不明白,那么你再計算下 1536 / 2048。結果是 0.75 ,這個數(shù)字熟悉吧? HashMap 的默認加載因子!。沒錯,sizeCtl 其實就是下次需要擴容的閾值。

到這里,我們就把 transfer() 方法看完了。然后,我們重點總結下 sizeCtl 這個屬性,不得不承認,這個設計非常的巧妙!

首先,通常情況下 sizeCtl 應該是等于下次的擴容閾值的。但是在擴容期間,有兩個狀態(tài),一個是 -1,一個是非常大的一個負值。等于 -1 很好理解,相當于是一個鎖,哪個線程更新成功,就可以進行數(shù)組的初始化動作。那么,就剩最后一種情況了。直接用下面的 main() 方法跑一下

    public static void main(String[] args) throws IllegalAccessException {
        int rs = Integer.numberOfLeadingZeros(1024) | (1 << (16 - 1));
        int sizeCtl = (rs << 16) + 2;
        System.out.println(sizeCtl);
        System.out.println((sizeCtl<<16)>>16);
    }

會得到下面的結果

-2146107390
2

有意思了,(sizeCtl<<16)>>16,這個操作是先左移 16 位,再右移 16 位,相當于把 sizeCtl 的高 16 位都置為 0 了。得到了一個 2,其實,這個 2 就是說當前有 (2 - 1) 個線程在進行擴容操作。(PS: sizeCtl 注釋里有寫~)。具體是為什么,我們繼續(xù)往下看。

transfer() 執(zhí)行完,就回到了 tryPresize()。然后繼續(xù)返回就到了 putAll()。繼續(xù)往下執(zhí)行就是 putVal() 方法了。

10.putVal()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
        //... ...

首先判斷了下 null,然后計算了下 hash 哈希值。就進入 for 循環(huán)了。首先是第一個分支。其實就是 tab 還沒有初始化的時候會進入這個分支。

11.initTable()

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

好像也沒有多少復雜的,因為我們之前已經(jīng)對 sizeCtl 做了充分的解釋,這里如果 sc < 0 的話,說明在擴容或者是初始化中,然后當前線程直接執(zhí)行了 Thread.yield();,就是放棄 CPU 執(zhí)行權等待下次分配 CPU 時間片,如果不小于 0 ,并且 tab = null ,那說明現(xiàn)在還沒有線程執(zhí)行擴容,那當前線程就會更新 sizeCtl,然后自己開始執(zhí)行初始化動作,初始化好后直接返回 tab

繼續(xù)回到 putVal() 方法,如果執(zhí)行第二個分支,說明 tab[i] == null,這個位置還沒有元素,直接通過 casTabAt() 方法進行賦值。如果這個位置有值,并且 (fh = f.hash) == MOVED 說明在擴容或者是在初始化,這個時候當前線程會進行 輔助擴容

12.helpTransfer()

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

首先在 if 條件里獲取到 nextTab 如果不為 null,就會進入 while 循環(huán),首先 sc < 0 說還在擴容或者是初始化中,while 循環(huán)里的第一個分支是不需要輔助擴容或者是已經(jīng)達到最大的輔助線程數(shù)量,或者是已經(jīng)剩最后一個線程在擴容了,其他的線程都結束了。所以直接 break; 就可以了。

第二個分支,首先會執(zhí)行 sizeCtl + 1 的操作,執(zhí)行成功就會執(zhí)行 transfer() 方法,這個方法我們之前已經(jīng)看過了,就不看了,需要注意的是,我們之前說過,sizeCtl 的低 16 位代表目前正在擴容的線程數(shù)減一。因為這里新加入一個線程參與擴容,所以對 sizeCtl 進行了加一的操作。如果還有線程進來,那么 sizeCtl 還會加一。這里就解釋清楚 sizeCtl 的另外的一個用法了。擴容結束直接 break;。繼續(xù)回到 putVal()

繼續(xù)下一次 for,如果 tab[i] 還不等于 null,那就說明發(fā)生哈希沖突了,并且當前已經(jīng)不在擴容了。就走到了最后一個分支,使用 synchronized 加鎖的這一段代碼里,這段代碼其實并不復雜,發(fā)生沖突之后無非就兩種情況,鏈表或者是紅黑樹。你可以看下 TreeBin 的構造方法。它的哈希值是 -2。

    static final int TREEBIN   = -2; // hash for roots of trees
    //... ...
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ... 
    }

所以才有了 if (fh >= 0) 的判斷,如果首節(jié)點的哈希值大于 0 ,那一定是鏈表。

最后還有一段進行樹化的判斷操作。

    static final int TREEIFY_THRESHOLD = 8;
    //... ...
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

鏈表的節(jié)點數(shù)超過 8 就會進行樹化操作。到這里其實 putVal() 的相關操作基本上已經(jīng)結束了,就剩最后一個 addCount() 方法了,看名稱也知道這個是更新計數(shù)器的,盲猜也能猜到應該跟元素數(shù)量有關系。不過,好像有點問題啊,你有沒有發(fā)現(xiàn),在整個 putVal() 方法里面,好像沒有觸發(fā)擴容的邏輯!!

13.addCount()

其實這個方法除了操作我們之前見到的 counterCells 屬性外,還會判斷是否需要進行擴容。因為只有在知道具體的元素數(shù)量后,才能判斷出是否需要擴容。我們先看這個方法的第一段代碼。

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //... ...

首先,我們假設目前是第一次執(zhí)行這個方法,那么 counterCells == null,然后就會使用 CAS 執(zhí)行 baseCount = b + x失敗之后,就開始執(zhí)行 fullAddCount() 方法了,因為現(xiàn)在 as == null 是成立的。

fullAddCount() 方法與 Striped64.longAccumulate() 方法基本上是一模一樣的,因為之前已經(jīng)跳過了 Striped64,所以這里也不打算去細看。直接總結下 counterCells 的作用。其實 counterCells 就是在多個線程同時更新 baseCount 失敗時記錄下新增的元素數(shù)量。

舉個例子就明白了,假設 ConcurrentHashMap 初始化完成之后,有 2 個線程,同時執(zhí)行了 addCount(),那么 baseCount 會更新成 1,counterCells 會初始化為一個大小為 2 的數(shù)組,且一個元素是 null,另外一個元素的 counterCells[i].value 值是 1。

如果這個時候又來了一個線程,會有 3 種情況,

  • CAS 更新 baseCount 成功,baseCount = 2。第三個線程結束
  • CAS 失敗且 counterCells[ThreadLocalRandom.getProbe() & m] == null。繼續(xù)初始化 counterCells 的另一個為 null 的元素,值為 1。
  • CAS 失敗且 counterCells[ThreadLocalRandom.getProbe() & m] != null,那么 counterCells[i].value 值更新成 2。

ThreadLocalRandom.getProbe() 方法其實就是取了個隨機值。就是說,如果有多個線程同時更新的話,失敗的線程會隨機的從 counterCells 取一個元素,將新增的數(shù)量保存進去。

其實很簡單,能進入到 fullAddCount() 方法的條件只有一種,counterCells == null 并且 CAS 更新 baseCount 失敗,這種情況就是有多個線程同時執(zhí)行了 addCount() 方法,比如,有兩線程同時執(zhí)行 putVal(),那么必然有一個線程在 CAS 更新 baseCount 時會失敗,這個時候就進入到 fullAddCount() 方法。這個方法內(nèi)部就是在操作 counterCells 數(shù)組。操作的行為基本上就是下面這幾種

要么是初始化 counterCells 數(shù)組

if (counterCells == as) {
    CounterCell[] rs = new CounterCell[2];
    rs[h & 1] = new CounterCell(x);
    counterCells = rs;
    init = true;
}

要么就是初始化 counterCells 數(shù)組元素

    CounterCell r = new CounterCell(x); // Optimistic create
    if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        boolean created = false;
        try {               // Recheck under lock
            CounterCell[] rs; int m, j;
            if ((rs = counterCells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                rs[j] = r;
                created = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (created)
            break;
        continue;           // Slot is now non-empty
    }

要么就是更新 counterCells 數(shù)組元素的值

    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        break;

還有一種操作就是 擴容 ,對 counterCells 進行擴容。

    if (cellsBusy == 0 &&
            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                counterCells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }

fullAddCount() 方法里,每次循環(huán)都會重新隨機取元素 h = ThreadLocalRandom.advanceProbe(h);。如果執(zhí)行循環(huán)了多次,都沒有保存成功,說明 counterCells 的容量不夠用了,就會觸發(fā)擴容。從上面的代碼里也能看到,counterCells 的擴容非常簡單,數(shù)組直接翻倍,元素直接賦值到新數(shù)組里,位置都沒有變。

繼續(xù)回到 addCount() 方法,之后的邏輯就是判斷了下 check 參數(shù)。其實這里的邏輯是,如果有多個線程同時操作,只要沒有發(fā)生哈希沖突,就不進行擴容檢查。你往回翻一下就可以看到 check 參數(shù)其實就是 tab[i] 位置的元素數(shù)量。

如果發(fā)生了哈希沖突,或者說沒有多個線程同時操作(這個時候就進入不了當前的分支,更新 baseCount 不會失敗),就會執(zhí)行 s = sumCount(); 這個方法非常簡單,就是對 baseCountcounterCells 里的數(shù)值進行了一下求和,然后就開始執(zhí)行下面的代碼。

    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }

首先 while 循環(huán)判斷了下當前的元素數(shù)量是否超過了 sizeCtl,即便是在擴容期間,sizeCtl 小于 0 的時候,也算成立。然后判斷了下 tab ,基本上也會成立。直接進入循環(huán)內(nèi)部

第一個分支 if (sc < 0) 說明已經(jīng)有線程開始執(zhí)行擴容動作了,這個時候更新 sizeCtl 的值加一,當前線程參與 輔助擴容

第二個分支是目前還沒有線程進行擴容操作,那么當前線程開始執(zhí)行擴容,(rs << RESIZE_STAMP_SHIFT) + 2) 這個數(shù)值我們之前已經(jīng)看到過了,就不再贅述了。

循環(huán)最后重新計算 s ,擴容結束后 s 就不會小于 sizeCtl,方法結束。

好了,到這里我們基本上就把 ConcurrentHashMap 的 put 操作的邏輯看完了。其實整體上跟 HashMap還是比較類似的,基本上就是把所有對 tab 的操作都使用 Unsafe 包裝了一下,解決多線程操作的問題,而發(fā)生哈希沖突時也是使用了 synchronized 進行了加鎖,解決了多線程操作鏈表的覆蓋問題。比較難的反而是元素數(shù)量的問題。因為 ConcurrentHashMap 一定要保證元素保存到 tab 成功后,元素數(shù)量一定也要加成功!不能因為元素數(shù)量的值更新失敗了,再把保存到 tab 里的元素刪除掉吧。所以呢 ConcurrentHashMap 就使用 counterCells 數(shù)組來保存那些更新 baseCount 失敗的數(shù)量。

14.get()

下面我們看下 get() 方法

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

好像也不復雜,第一個分支是一次就從 tab[i] 位置找到了元素,直接返回。最后一個 while 循環(huán)是 tab[i] 位置發(fā)生了哈希沖突,且當前位置是鏈表,通過 while 循環(huán)遍歷尋找。

重點說一下第二個分支吧 else if (eh < 0) 有兩種情況,一種是 tab[i] 位置發(fā)生了哈希沖突,且當前位置是紅黑樹,這時 eTreeBin 類型的,因為涉及到紅黑樹,我們直接跳過,有興趣的可以自己研究下。另外一種情況是在擴容期間,當前元素已經(jīng)轉(zhuǎn)移到新的 nextTable 上了,這時 e 的類型是 ForwardingNode 類型,我們直接看下 ForwardingNode 類的 find() 代碼

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }

也不復雜,就是直接在 nextTable 找元素,如果 nextTable[i] 位置為 null 直接返回,否則就進入了 for (;;) 循環(huán)里,跟之前類似,第一個分支里是直接找到了元素,而 if (eh < 0) 也有兩種情況,一個是擴容時轉(zhuǎn)移到新的 nextTable,一個就是紅黑樹。最后就是鏈表了。

好了,到這里基本上所有的內(nèi)容都結束了,最后還剩一點有意思的東西,就是 Traverser 類,這個類其實實現(xiàn)了在擴容期間,也能使 ConcurrentHashMap 可以高效的(不使用鎖)遍歷。代碼不多,有興趣的話可以讀一下~

以下是遺留的一些內(nèi)容,有機會再補吧

  • sun.misc.Unsafe 類。
  • LongAdder and Striped64
  • sun.misc.Contended 注解
  • TreeBin

以上就是Java源碼重讀之ConcurrentHashMap詳解的詳細內(nèi)容,更多關于Java ConcurrentHashMap的資料請關注其它相關文章!

原文地址:https://juejin.cn/post/7230450989591986236

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91aa.app| chengrenyingshi | 免费观看三级毛片 | 日本黄色一级电影 | 国产精品午夜未成人免费观看 | 成人9禁啪啪无遮挡免费 | 黄网站在线免费看 | 久久99国产综合精品 | 国产精品久久久久久久久久尿 | 啪啪激情 | 一区二区三区日韩电影 | 亚洲片在线 | 久久在草 | 亚洲午夜久久久精品一区二区三区 | 羞羞视频免费网站日本动漫 | 国产成人av免费观看 | 91精品国产乱码久久久久久久久 | 国产日韩免费观看 | 欧美大胆xxxx肉体摄影 | 成人毛片免费视频 | 色视频在线 | 成人一区二区在线观看视频 | 日韩在线欧美在线 | 91网视频| 日韩黄色精品视频 | 国产一级免费在线视频 | 国内免费视频成人精品 | 国产欧美日韩视频在线观看 | 黄色网战入口 | 大学生一级毛片在线视频 | 国产精品国产三级国产aⅴ无密码 | 久久久久成人免费 | 日韩蜜桃视频 | 免费a级观看 | h视频在线免费看 | 双性帝王调教跪撅打屁股 | 欧美三级毛片 | 男女无遮挡羞羞视频 | 成人一级黄色片 | 亚洲一区国产一区 | www.69色 |