激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - Java多線程之同步工具類CyclicBarrier

Java多線程之同步工具類CyclicBarrier

2022-03-03 12:56冬日毛毛雨 Java教程

這篇文章主要介紹Java多線程之同步工具類CyclicBarrier,它是一個同步工具類,它允許一組線程互相等待,直到達到某個公共屏障點,支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后,該命令只在每個屏障點運行一次

前言:

CyclicBarrier是一個同步工具類,它允許一組線程互相等待,直到達到某個公共屏障點。與CountDownLatch不同的是該barrier在釋放線程等待后可以重用,所以它稱為循環(huán)(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若再繼續(xù)所有的參與線程之前更新共享狀態(tài),此屏蔽操作很有用。

1 CyclicBarrier方法說明

CyclicBarrier提供的方法有:

  • CyclicBarrier(parties):初始化相互等待的線程數(shù)量的構(gòu)造方法。
  • CyclicBarrier(parties,Runnable barrierAction):初始化相互等待的線程數(shù)量以及屏障線程的構(gòu)造方法。

屏障線程的運行時機:

等待的線程數(shù)量=parties之后,CyclicBarrier打開屏障之前。
舉例:在分組計算中,每個線程負責一部分計算,最終這些線程計算結(jié)束之后,交由屏障線程進行匯總計算。

int getParties():獲取CyclicBarrier打開屏障的線程數(shù)量,也成為方數(shù)。

int getNumberWaiting():獲取正在CyclicBarrier上等待的線程數(shù)量。

int await():CyclicBarrier上進行阻塞等待,直到發(fā)生以下情形之一:

  • CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
  • 當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。

int await(timeout,TimeUnit):CyclicBarrier上進行限時的阻塞等待,直到發(fā)生以下情形之一:

  • CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
  • 當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
  • 當前線程等待超時,則拋出TimeoutException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
  • 其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。

boolean isBroken():獲取是否破損標志位broken的值,此值有以下幾種情況:

  • CyclicBarrier初始化時,broken=false,表示屏障未破損。
  • 如果正在等待的線程被中斷,則broken=true,表示屏障破損。
  • 如果正在等待的線程超時,則broken=true,表示屏障破損。
  • 如果有線程調(diào)用CyclicBarrier.reset()方法,則broken=false,表示屏障回到未破損狀態(tài)。

void reset():使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事:

  • 如果有正在等待的線程,則會拋出BrokenBarrierException異常,且這些線程停止等待,繼續(xù)執(zhí)行。
  • 將是否破損標志位broken置為false

2 CyclicBarrier實例

假若有若干個線程都要進行寫數(shù)據(jù)操作,并且只有所有線程都完成寫數(shù)據(jù)操作之后,這些線程才能繼續(xù)做后面的事情,此時就可以利用CyclicBarrier了:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
       int N = 4;
       CyclicBarrier barrier  = new CyclicBarrier(N);
       for(int i=0;i<N;i++)
           new Writer(barrier).start();
   }
   static class Writer extends Thread{
       private CyclicBarrier cyclicBarrier;
       public Writer(CyclicBarrier cyclicBarrier) {
           this.cyclicBarrier = cyclicBarrier;
       }
 
       @Override
       public void run() {
           System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)...");
           try {
               Thread.sleep(5000);      //以睡眠來模擬寫入數(shù)據(jù)操作
               System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
               cyclicBarrier.await();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }catch(BrokenBarrierException e){
               e.printStackTrace();
           }
           System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
       }
   }

線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...

從上面輸出結(jié)果可以看出,每個寫入線程執(zhí)行完寫數(shù)據(jù)操作之后,就在等待其他線程寫入操作完畢。

當所有線程線程寫入操作完畢之后,所有線程就繼續(xù)進行后續(xù)的操作了。

如果想在所有線程寫入操作完之后,進行額外的其他操作可以為CyclicBarrier提供Runnable參數(shù):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CyclicBarrierTest {
 
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("當前線程"+Thread.currentThread().getName());
            }
        });
 
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)...");
            try {
                Thread.sleep(3000);      //以睡眠來模擬寫入數(shù)據(jù)操作
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
        }
    }
 
}

線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
當前線程Thread-2
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...

從結(jié)果可以看出,當四個線程都到達barrier狀態(tài)后,會從四個線程中選擇一個線程去執(zhí)行Runnable

await指定時間的效果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class CyclicBarrierTest {
 
 
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier = new CyclicBarrier(N);
 
        for (int i = 0; i < N; i++) {
            if (i < N - 1)
                new Writer(barrier).start();
            else {
                try {
                    //運行時間遠小于2000(cyclicBarrier.await 指定時間) 就不會拋出TimeoutException
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                new Writer(barrier).start();
            }
            
        }
    }
 
    static class Writer extends Thread {
        private CyclicBarrier cyclicBarrier;
 
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
            try {
                Thread.sleep(3000);      //以睡眠來模擬寫入數(shù)據(jù)操作
                System.out.println("線程" + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
                try {
                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
        }
    }
}

線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3正在寫入數(shù)據(jù)...
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-0所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-1所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-2所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-3所有線程寫入完畢,繼續(xù)處理其他任務(wù)...

上面的代碼在main方法的for循環(huán)中,故意讓最后一個線程啟動延遲,因為在前面三個線程都達到barrier之后,等待了指定的時間發(fā)現(xiàn)第四個線程還沒有達到barrier,就拋出異常并繼續(xù)執(zhí)行后面的任務(wù)。

另外CyclicBarrier是可以重用的,看下面這個例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CyclicBarrierTest {
 
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("CyclicBarrier重用");
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)...");
            try {
                Thread.sleep(3000);      //以睡眠來模擬寫入數(shù)據(jù)操作
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
 
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
        }
    }
}

線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
Thread-2所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-1所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-3所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-0所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
CyclicBarrier重用
線程Thread-4正在寫入數(shù)據(jù)...
線程Thread-5正在寫入數(shù)據(jù)...
線程Thread-6正在寫入數(shù)據(jù)...
線程Thread-7正在寫入數(shù)據(jù)...
線程Thread-5寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-4寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-7寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-6寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
Thread-6所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-5所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-4所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-7所有線程寫入完畢,繼續(xù)處理其他任務(wù)...

從執(zhí)行結(jié)果可以看出,在初次的4個線程越過barrier狀態(tài)后,又可以用來進行新一輪的使用。而CountDownLatch無法進行重復(fù)使用。

3 CyclicBarrier源碼解析

先看一下CyclicBarrier中成員變量的組成:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;//攔截的線程數(shù)量
/* The command to run when tripped */
private final Runnable barrierCommand; //當屏障撤銷時,需要執(zhí)行的屏障操作
//當前的Generation。每當屏障失效或者開閘之后都會自動替換掉。從而實現(xiàn)重置的功能。
private Generation generation = new Generation();
 
/**
 * Number of parties still waiting. Counts down from parties to 0
 * on each generation.  It is reset to parties on each new
 * generation or when broken.
 */
private int count;

可以看出,CyclicBarrier是由ReentrantLockCondition來實現(xiàn)的。具體每個變量都有什么意義,我們在分析源碼的時候具體說。
我們主要從CyclicBarrier的構(gòu)造方法和它的await方法分析說起。

CyclicBarrier構(gòu)造函數(shù)

CyclicBarrier有兩個構(gòu)造函數(shù):

?
1
2
3
4
5
6
7
8
9
10
11
12
//帶Runnable參數(shù)的函數(shù)
 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;//有幾個運動員要參賽
        this.count = parties;//目前還需要幾個運動員準備好
        //你要在所有線程都繼續(xù)執(zhí)行下去之前要執(zhí)行什么操作,可以為空
        this.barrierCommand = barrierAction;
    }
//不帶Runnable參數(shù)的函數(shù)
 public CyclicBarrier(int parties) {
     this(parties, null);
 }

其中,第二個構(gòu)造函數(shù)調(diào)用的是第一個構(gòu)造函數(shù),這個 Runnable barrierAction 參數(shù)是什么呢?其實在上面的小示例中我們就用到了這個Runnable參數(shù),它就是在所有線程都準備好之后,滿足Barrier條件時,并且在所有線程繼續(xù)執(zhí)行之前,我們可以執(zhí)行這個Runnable。但是值得注意的是,這不是新起了一個線程,而是通過最后一個準備好的(也就是最后一個到達Barrier的)線程承擔啟動的。這一點我們在上面示例中打印的運行結(jié)果中也可以看出來:Thread-2線程是最后一個準備好的,就是它執(zhí)行的這個barrierAction
這里partiescount不要混淆,parties是表示必須有幾個線程要到達Barrier,而count是表示目前還有幾個線程未到達Barrier。也就是說,只有當count參數(shù)為0時,Barrier條件即滿足,所有線程可以繼續(xù)執(zhí)行。
count變量是怎么減少到0的呢?是通過Barrier執(zhí)行的await方法。下面我們就看一下await方法。

await方法

?
1
2
3
4
5
6
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }

await方法調(diào)用的dowait方法:

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private int dowait(boolean timed, long nanos)
     throws InterruptedException, BrokenBarrierException,
            TimeoutException {
     final ReentrantLock lock = this.lock;
     lock.lock();//獲取ReentrantLock互斥鎖
     try {
         final Generation g = generation;//獲取generation對象
 
         if (g.broken)//如果generation損壞,拋出異常
             throw new BrokenBarrierException();
 
         if (Thread.interrupted()) {
             //如果當前線程被中斷,則調(diào)用breakBarrier方法,停止CyclicBarrier,并喚醒所有線程
             breakBarrier();
             throw new InterruptedException();
         }
 
         int index = --count;// 看到這里了吧,count減1
         //index=0,也就是說,有0個線程未滿足CyclicBarrier條件,也就是條件滿足,
         //可以喚醒所有的線程了
         if (index == 0) {  // tripped
             boolean ranAction = false;
             try {
                //這就是構(gòu)造器的第二個參數(shù),如果不為空的話,就執(zhí)行這個Runnable的run方法,
                //你看,這里是執(zhí)行的是run方法,也就是說,并沒有新起一個另外的線程,
                //而是最后一個執(zhí)行await操作的線程執(zhí)行的這個run方法。
                 final Runnable command = barrierCommand;
                 if (command != null)
                     command.run(); //同步執(zhí)行barrierCommand
                 ranAction = true;
                 nextGeneration(); //執(zhí)行成功設(shè)置下一個nextGeneration
                 return 0;
             } finally {
                 if (!ranAction) . //如果barrierCommand執(zhí)行失敗,進行屏障破壞處理
                     breakBarrier();
             }
         }
         //如果當前線程不是最后一個到達的線程
         // loop until tripped, broken, interrupted, or timed out
         for (;;) {
             try {
                 if (!timed)
                     trip.await(); //調(diào)用Condition的await()方法阻塞
                 else if (nanos > 0L)
                     nanos = trip.awaitNanos(nanos); //調(diào)用Condition的awaitNanos()方法阻塞
             } catch (InterruptedException ie) {
             //如果當前線程被中斷,則判斷是否有其他線程已經(jīng)使屏障破壞。若沒有則進行屏障破壞處理,并拋出異常;否則再次中斷當前線程
                 if (g == generation && ! g.broken) {
                     breakBarrier();//執(zhí)行breakBarrier,喚醒所有線程
                     throw ie;
                 } else {
                     // We're about to finish waiting even if we had not
                     // been interrupted, so this interrupt is deemed to
                     // "belong" to subsequent execution.
                     Thread.currentThread().interrupt();
                 }
             }
 
             if (g.broken)//如果當前generation已經(jīng)損壞,拋出異常
                 throw new BrokenBarrierException();
 
             if (g != generation)//如果generation已經(jīng)更新?lián)Q代,則返回index
                 return index;
             //如果是參數(shù)是超時等待,并且已經(jīng)超時,則執(zhí)行breakBarrier()方法
             //喚醒所有等待線程。
             if (timed && nanos <= 0L) {
                 breakBarrier();
                 throw new TimeoutException();
             }
         }
     } finally {
         lock.unlock();
     }
 }

簡單來說,如果不發(fā)生異常,線程不被中斷,那么dowait方法會調(diào)用Conditionawait方法(具體Condition的原理請看前面的文章),直到所有線程都準備好,即都執(zhí)行了dowait方法,(做count的減操作,直到count=0),即CyclicBarrier條件已滿足,就會執(zhí)行喚醒線程操作,也就是上面的nextGeneration()方法。可能大家會有疑惑,這個Generation是什么東西呢?其實這個Generation定義的很簡單,就一個布爾值的成員變量:

?
1
2
3
4
5
private Generation generation = new Generation();
 
private static class Generation {
    boolean broken = false;
}

Generation 可以理解成“代”,我們要知道,CyclicBarrier是可以重復(fù)使用的,CyclicBarrier中的同一批線程屬于同一“代”,當所有線程都滿足了CyclicBarrier條件,執(zhí)行喚醒操作nextGeneration()方法時,會新new 出一個Generation,代表一下“代”。

nextGeneration的源碼

?
1
2
3
4
5
6
7
8
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();//調(diào)用Condition的signalAll方法,喚醒所有await的線程
    // set up next generation
    count = parties;//重置count值
    //生成新的Generation,表示上一代的所有線程已經(jīng)喚醒,進行更新?lián)Q代
    generation = new Generation();
}

breakBarrier源碼

再來看一下breakBarrier的代碼,breakBarrier方法是在當前線程被中斷時執(zhí)行的,用來喚醒所有的等待線程:

?
1
2
3
4
5
private void breakBarrier() {
    generation.broken = true;//表示當代因為線程被中斷,已經(jīng)發(fā)成損壞了
    count = parties;//重置count值
    trip.signalAll();//調(diào)用Condition的signalAll方法,喚醒所有await的線程
}

isBroken方法

?
1
2
3
4
5
6
7
8
9
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

判斷此屏障是否處于中斷狀態(tài)。如果因為構(gòu)造或最后一次重置而導(dǎo)致中斷或超時,從而使一個或多個參與者擺脫此屏障點,或者因為異常而導(dǎo)致某個屏障操作失敗,則返回true;否則返回false

reset方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
//將屏障重置為其初始狀態(tài)。
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //喚醒所有等待的線程繼續(xù)執(zhí)行,并設(shè)置屏障中斷狀態(tài)為true
        breakBarrier();   // break the current generation
        //喚醒所有等待的線程繼續(xù)執(zhí)行,并設(shè)置屏障中斷狀態(tài)為false
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

getNumberWaiting方法

?
1
2
3
4
5
6
7
8
9
10
//返回當前在屏障處等待的參與者數(shù)目,此方法主要用于調(diào)試和斷言。
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

總結(jié):

1.CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的應(yīng)用場景。
2.這個等待的await方法,其實是使用ReentrantLockCondition控制實現(xiàn)的。
3.CyclicBarrier可以重復(fù)使用。

到此這篇關(guān)于Java多線程之同步工具類CyclicBarrier的文章就介紹到這了,更多相關(guān)Java多線程 CyclicBarrier內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://juejin.cn/post/7020282803896418312

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 午夜色视频在线观看 | 免费观看一区二区三区视频 | 91精品久久久久久久久网影视 | 欧洲精品久久 | 久久91亚洲人成电影网站 | 欧美日本91精品久久久久 | 久久不射电影 | 国产亚洲精品久久久久久网站 | 久久一区二区三区av | 一区二区高清视频在线观看 | 亚洲网站免费观看 | av电影在线免费 | 91专区在线观看 | www.99av| 久久久www免费看片 日本视频网 | 国产精品成人亚洲一区二区 | 毛片免费在线观看视频 | 亚洲生活片 | 日韩视频一区二区三区在线观看 | 日韩精品一二三区 | 欧美一区二区三区久久精品视 | 成人午夜免费国产 | 欧美精品一区二区性色 | 91精品福利视频 | 999久久久精品 | 国产精品久久久久久久久岛 | 97中文| 羞羞视频入口 | 亚洲性生活免费视频 | 日韩午夜一区二区三区 | 黄色视屏免费在线观看 | 九色com| 日韩电影一区二区 | 深夜视频在线 | 国产精品成aⅴ人片在线观看 | 中文字幕在线永久 | 国产二区三区在线播放 | 一区二区三区四区高清视频 | 一级片九九| 亚州综合图片 | 日韩精品中文字幕在线播放 |