上一篇說的CountDownLatch是一個(gè)計(jì)數(shù)器,類似線程的join方法,但是有一個(gè)缺陷,就是當(dāng)計(jì)數(shù)器的值到達(dá)0之后,再調(diào)用CountDownLatch的await和countDown方法就會(huì)立刻返回,就沒有作用了,那么反正是一個(gè)計(jì)數(shù)器,為什么不能重復(fù)使用呢?于是就出現(xiàn)了這篇說的CyclicBarrier,它的狀態(tài)可以被重用;
一.簡(jiǎn)單例子
用法其實(shí)和CountDownLatch差不多,也就是一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器的值變?yōu)?之后,就會(huì)把阻塞的線程喚醒:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.example.demo.study; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Study0216 { // 注意這里的構(gòu)造器,第一個(gè)參數(shù)表示計(jì)數(shù)器初始值 // 第二個(gè)參數(shù)表示當(dāng)計(jì)數(shù)器的值變?yōu)?的時(shí)候就觸發(fā)的任務(wù) static CyclicBarrier cyclicBarrier = new CyclicBarrier( 2 , () -> { System.out.println( "cyclicBarrier task " ); }); public static void main(String[] args) { // 新建兩個(gè)線程的線程池 ExecutorService pool = Executors.newFixedThreadPool( 2 ); // 線程1放入線程池中 pool.submit(() -> { try { System.out.println( "Thread1----await-begin" ); cyclicBarrier.await(); System.out.println( "Thread1----await-end" ); } catch (Exception e) { e.printStackTrace(); } }); // 線程2放到線程池中 pool.submit(() -> { try { System.out.println( "Thread2----await-begin" ); cyclicBarrier.await(); System.out.println( "Thread2----await-end" ); } catch (Exception e) { e.printStackTrace(); } }); // 關(guān)閉線程池,此時(shí)還在執(zhí)行的任務(wù)會(huì)繼續(xù)執(zhí)行 pool.shutdown(); } } |
我們?cè)倏纯碈yclicBarrier的復(fù)用性,這里比如有一個(gè)任務(wù),有三部分組成,分別是A,B,C,然后創(chuàng)建兩個(gè)線程去執(zhí)行這個(gè)任務(wù),必須要等到兩個(gè)線程都執(zhí)行完成A部分,然后才能開始執(zhí)行B,只有兩個(gè)線程都執(zhí)行完成B部分,才能執(zhí)行C:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package com.example.demo.study; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Study0216 { // 這里的構(gòu)造器,只有一個(gè)參數(shù),表示計(jì)數(shù)器初始值 static CyclicBarrier cyclicBarrier = new CyclicBarrier( 2 ); public static void main(String[] args) { // 新建兩個(gè)線程的線程池 ExecutorService pool = Executors.newFixedThreadPool( 2 ); // 線程1放入線程池中 pool.submit(() -> { try { System.out.println( "Thread1----stepA-start" ); cyclicBarrier.await(); System.out.println( "Thread1----stepB-start" ); cyclicBarrier.await(); System.out.println( "Thread1----stepC-start" ); } catch (Exception e) { e.printStackTrace(); } }); // 線程2放到線程池中 pool.submit(() -> { try { System.out.println( "Thread2----stepA-start" ); cyclicBarrier.await(); System.out.println( "Thread2----stepB-start" ); cyclicBarrier.await(); System.out.println( "Thread2----stepC-start" ); } catch (Exception e) { e.printStackTrace(); } }); // 關(guān)閉線程池,此時(shí)還在執(zhí)行的任務(wù)會(huì)繼續(xù)執(zhí)行 pool.shutdown(); } } |
二.基本原理
我們看看一些重要屬性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public class CyclicBarrier { //這個(gè)內(nèi)部類只有一個(gè)boolean值 private static class Generation { boolean broken = false ; } //獨(dú)占鎖 private final ReentrantLock lock = new ReentrantLock(); //條件變量 private final Condition trip = lock.newCondition(); //保存線程的總數(shù) private final int parties; //這是一個(gè)任務(wù),通過構(gòu)造器傳遞一個(gè)任務(wù),當(dāng)計(jì)數(shù)器變?yōu)?之后,就可以執(zhí)行這個(gè)任務(wù) private final Runnable barrierCommand; //這類內(nèi)部之后一個(gè)boolean的值,表示屏障是否被打破 private Generation generation = new Generation(); //計(jì)數(shù)器 private int count; } |
構(gòu)造器:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//我們的構(gòu)造器初始值設(shè)置的是parties public CyclicBarrier( int parties) { this (parties, null ); } //注意,這里開始的時(shí)候是count等于parties //為什么要有兩個(gè)變量呢?我們每次調(diào)用await方法的時(shí)候count減一,當(dāng)count的值變?yōu)?之后,怎么又還原成初始值呢? //直接就把parties的值賦值給count就行了呀,簡(jiǎn)單吧! public CyclicBarrier( int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } |
然后再看看await方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
public int await() throws InterruptedException, BrokenBarrierException { try { //調(diào)用的是dowait方法 return dowait( false , 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //假設(shè)count等于3,有三個(gè)線程都在調(diào)用這個(gè)方法,默認(rèn)超時(shí)時(shí)間為0,那么首每次都只有一個(gè)線程可以獲取鎖,將count減一,不為0 //就會(huì)到下面的for循環(huán)中扔到條件隊(duì)列中掛起;直到第三個(gè)線程調(diào)用這個(gè)dowait方法,count減一等于0,那么當(dāng)前線程執(zhí)行任務(wù)之后, //就會(huì)喚醒條件變量中阻塞的線程,并重置count為初始值3 private int dowait( boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獲取鎖 final ReentrantLock lock = this .lock; lock.lock(); try { //g中只有一個(gè)boolean值 final Generation g = generation; //如果g中的值為true的時(shí)候,拋錯(cuò) if (g.broken) throw new BrokenBarrierException(); //如果當(dāng)前線程中斷,就拋錯(cuò) if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //count減一,再賦值給index int index = --count; //如果index等于0的時(shí)候,說明所有的線程已經(jīng)到屏障點(diǎn)了,就可以 if (index == 0 ) { // tripped boolean ranAction = false ; try { //執(zhí)行當(dāng)前線程的任務(wù) final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; //喚醒其他因?yàn)檎{(diào)用了await方法阻塞的線程 nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } //能到這里來,說明是count不等于0,也就是還有的線程沒有到屏障點(diǎn) for (;;) { try { //wait方法有兩種情況,一種是設(shè)置超時(shí)時(shí)間,一種是不設(shè)置超時(shí)時(shí)間 //這里就是對(duì)超時(shí)時(shí)間進(jìn)行的一個(gè)判斷,如果設(shè)置的超時(shí)時(shí)間為0,則會(huì)在條件隊(duì)列中無限的等待下去,直到被喚醒 //設(shè)置了超時(shí)時(shí)間,那就等待該時(shí)間 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } } //喚醒其他因?yàn)檎{(diào)用了await方法阻塞的線程 private void nextGeneration() { //喚醒條件變量中所有線程 trip.signalAll(); //重置count的值 count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true ; //重置count為初始值parties count = parties; //喚醒條件隊(duì)列中的所有線程 trip.signalAll(); } |
以上就是詳解Java回環(huán)屏障CyclicBarrier的詳細(xì)內(nèi)容,更多關(guān)于Java CyclicBarrier的資料請(qǐng)關(guān)注服務(wù)器之家其它相關(guān)文章!
原文鏈接:https://www.cnblogs.com/wyq1995/p/12317630.html