前言:
Semaphore是計數信號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實并沒有實際的許可證這個對象,Semaphore只是維持了一個可獲得許可證的數量。
Semaphore可以維護當前訪問自身的線程個數,并提供了同步機制。使用Semaphore可以控制同時訪問資源的線程個數,例如,實現一個文件允許的并發訪問數。
1 Semaphore的主要方法
Semaphore(int permits):
構造方法,創建具有給定許可數的計數信號量并設置為非公平信號量。
Semaphore(int permits,boolean fair):
構造方法,當fair
等于true
時,創建具有給定許可數的計數信號量并設置為公平信號量。
void acquire():
當前線程嘗試去阻塞的獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了1個可用的許可證,則會停止等待,繼續執行。
-
當前線程被中斷,則會拋出
InterruptedException
異常,并停止等待,繼續執行。
void acquire(int n):
從此信號量獲取給定數目許可,在提供這些許可前一直將線程阻塞。
當前線程嘗試去阻塞的獲取多個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了n個可用的許可證,則會停止等待,繼續執行。
-
當前線程被中斷,則會拋出
InterruptedException
異常,并停止等待,繼續執行。
void release():
釋放一個許可,將其返回給信號量。
void release(int n):
釋放n個許可。
int availablePermits():
當前可用的許可數。
void acquierUninterruptibly():
當前線程嘗試去阻塞的獲取1個許可證(不可中斷的)。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了1個可用的許可證,則會停止等待,繼續執行。
void acquireUninterruptibly(permits):
當前線程嘗試去阻塞的獲取多個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了n個可用的許可證,則會停止等待,繼續執行。
boolean tryAcquire()
當前線程嘗試去獲取1個許可證。
- 此過程是非阻塞的,它只是在方法調用時進行一次嘗試。
-
如果當前線程獲取了1個可用的許可證,則會停止等待,繼續執行,并返回
true
。 -
如果當前線程沒有獲得這個許可證,也會停止等待,繼續執行,并返回
false
。
boolean tryAcquire(permits):
當前線程嘗試去獲取多個許可證。
- 此過程是非阻塞的,它只是在方法調用時進行一次嘗試。
-
如果當前線程獲取了
permits
個可用的許可證,則會停止等待,繼續執行,并返回true
。 -
如果當前線程沒有獲得
permits
個許可證,也會停止等待,繼續執行,并返回false
。
boolean tryAcquire(timeout,TimeUnit):
當前線程在限定時間內,阻塞的嘗試去獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
-
當前線程獲取了可用的許可證,則會停止等待,繼續執行,并返回
true
。 -
當前線程等待時間
timeout
超時,則會停止等待,繼續執行,并返回false
。 -
當前線程在
timeout
時間內被中斷,則會拋出InterruptedException
一次,并停止等待,繼續執行。
boolean tryAcquire(permits,timeout,TimeUnit):
當前線程在限定時間內,阻塞的嘗試去獲取permits
個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
-
當前線程獲取了可用的
permits
個許可證,則會停止等待,繼續執行,并返回true
。 -
當前線程等待時間
timeout
超時,則會停止等待,繼續執行,并返回false
。 -
當前線程在
timeout
時間內被中斷,則會拋出InterruptedException
一次,并停止等待,繼續執行。
2 實例講解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
public class SemaphoreTest { private static final Semaphore semaphore = new Semaphore( 3 ); public static void main(String[] args) { Executor executor = Executors.newCachedThreadPool(); String[] name = { "Jack" , "Pony" , "Larry" , "Martin" , "James" , "ZhangSan" , "Tree" }; int [] age = { 21 , 22 , 23 , 24 , 25 , 26 , 27 }; for ( int i= 0 ;i< 7 ;i++) { Thread t1= new InformationThread(name[i],age[i]); executor.execute(t1); } } private static class InformationThread extends Thread { private final String name; private final int age; public InformationThread(String name, int age) { this .name = name; this .age = age; } @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":大家好,我是" + name + "我今年" + age + "當前時間段為:" + System.currentTimeMillis()); Thread.sleep( 1000 ); System.out.println(name + "要準備釋放許可證了,當前時間為:" + System.currentTimeMillis()); System.out.println( "當前可使用的許可數為:" + semaphore.availablePermits()); System.out.println( "是否有正在等待許可證的線程:" + semaphore.hasQueuedThreads()); System.out.println( "正在等待許可證的隊列長度(線程數量):" + semaphore.getQueueLength()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543498535306
pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543498535306
pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543498535306
Pony要準備釋放許可證了,當前時間為:1543498536310
Jack要準備釋放許可證了,當前時間為:1543498536310
當前可使用的許可數為:0
Larry要準備釋放許可證了,當前時間為:1543498536310
是否有正在等待許可證的線程:true
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):4
正在等待許可證的隊列長度(線程數量):4
當前可使用的許可數為:0
pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543498536311
是否有正在等待許可證的線程:true
pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543498536311
正在等待許可證的隊列長度(線程數量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543498536312
James要準備釋放許可證了,當前時間為:1543498537315
Martin要準備釋放許可證了,當前時間為:1543498537315
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):1
當前可使用的許可數為:0
是否有正在等待許可證的線程:false
pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543498537316
正在等待許可證的隊列長度(線程數量):0
ZhangSan要準備釋放許可證了,當前時間為:1543498537317
當前可使用的許可數為:1
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
Tree要準備釋放許可證了,當前時間為:1543498538319
當前可使用的許可數為:2
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
以上是非公平信號量,將建立Semaphore
對象的語句改為如下語句:
1
|
private static final Semaphore semaphore= new Semaphore( 3 , true ); |
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543498810563
pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543498810564
pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543498810563
Jack要準備釋放許可證了,當前時間為:1543498811564
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):4
pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543498811564
Larry要準備釋放許可證了,當前時間為:1543498811568
當前可使用的許可數為:0
Pony要準備釋放許可證了,當前時間為:1543498811568
是否有正在等待許可證的線程:true
當前可使用的許可數為:0
正在等待許可證的隊列長度(線程數量):3
是否有正在等待許可證的線程:true
pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543498811568
正在等待許可證的隊列長度(線程數量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543498811568
Martin要準備釋放許可證了,當前時間為:1543498812566
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):1
pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543498812566
James要準備釋放許可證了,當前時間為:1543498812572
當前可使用的許可數為:0
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
ZhangSan要準備釋放許可證了,當前時間為:1543498812572
當前可使用的許可數為:1
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
Tree要準備釋放許可證了,當前時間為:1543498813568
當前可使用的許可數為:2
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
實現單例模式
將創建信號量對象語句修改如下:
1
|
private static final Semaphore semaphore= new Semaphore( 1 ); |
運行程序,結果如下:
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543499053898
Jack要準備釋放許可證了,當前時間為:1543499054903
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):6
pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543499054904
Pony要準備釋放許可證了,當前時間為:1543499055907
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):5
pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543499055907
Larry要準備釋放許可證了,當前時間為:1543499056909
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):4
pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543499056909
Martin要準備釋放許可證了,當前時間為:1543499057913
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):3
pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543499057913
James要準備釋放許可證了,當前時間為:1543499058914
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543499058915
ZhangSan要準備釋放許可證了,當前時間為:1543499059919
當前可使用的許可數為:0
是否有正在等待許可證的線程:true
正在等待許可證的隊列長度(線程數量):1
pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543499059919
Tree要準備釋放許可證了,當前時間為:1543499060923
當前可使用的許可數為:0
是否有正在等待許可證的線程:false
正在等待許可證的隊列長度(線程數量):0
如上可知,如果將給定許可數設置為1,就如同一個單例模式,即單個停車位,只有一輛車進,然后這輛車出來后,下一輛車才能進。
3 源碼解析
Semaphore
有兩種模式,公平模式和非公平模式。公平模式就是調用acquire
的順序就是獲取許可證的順序,遵循FIFO
;而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
構造方法
Semaphore有兩個構造方法,如下:
1
2
3
4
5
6
7
|
public Semaphore( int permits) { sync = new NonfairSync(permits); } public Semaphore( int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } |
獲取許可
先從獲取一個許可看起,并且先看非公平模式下的實現。首先看acquire
方法,acquire
方法有幾個重載,但主要是下面這個方法
1
2
3
4
|
public void acquire( int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } |
從上面可以看到,調用了Sync
的acquireSharedInterruptibly
方法,該方法在父類AQS
中,如下:
1
2
3
4
5
6
7
|
public final void acquireSharedInterruptibly( int arg) throws InterruptedException { if (Thread.interrupted()) //如果線程被中斷了,拋出異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) //獲取許可失敗,將線程加入到等待隊列中 doAcquireSharedInterruptibly(arg); } |
AQS
子類如果要使用共享模式的話,需要實現tryAcquireShared
方法,下面看NonfairSync
的該方法實現:
1
2
3
|
protected int tryAcquireShared( int acquires) { return nonfairTryAcquireShared(acquires); } |
該方法調用了父類中的nonfairTyAcquireShared
方法,如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
final int nonfairTryAcquireShared( int acquires) { for (;;) { //獲取剩余許可數量 int available = getState(); //計算給完這次許可數量后的個數 int remaining = available - acquires; //如果許可不夠或者可以將許可數量重置的話,返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } |
從上面可以看到,只有在許可不夠時返回值才會小于0,其余返回的都是剩余許可數量,這也就解釋了,一旦許可不夠,后面的線程將會阻塞。看完了非公平的獲取,再看下公平的獲取,
代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
protected int tryAcquireShared( int acquires) { for (;;) { //如果前面有線程再等待,直接返回-1 if (hasQueuedPredecessors()) return - 1 ; //后面與非公平一樣 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } |
從上面可以看到,FairSync
與NonFairSync
的區別就在于會首先判斷當前隊列中有沒有線程在等待,如果有,就老老實實進入到等待隊列;而不像NonfairSync
一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。
看完了獲取許可后,再看一下釋放許可。
釋放許可
釋放許可也有幾個重載方法,但都會調用下面這個帶參數的方法,
1
2
3
4
|
public void release( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.releaseShared(permits); } |
releaseShared
方法在AQS中,如下:
1
2
3
4
5
6
7
8
|
public final boolean releaseShared( int arg) { //如果改變許可數量成功 if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } |
AQS子類實現共享模式的類需要實現tryReleaseShared
類來判斷是否釋放成功,
實現如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
protected final boolean tryReleaseShared( int releases) { for (;;) { //獲取當前許可數量 int current = getState(); //計算回收后的數量 int next = current + releases; if (next < current) // overflow throw new Error( "Maximum permit count exceeded" ); //CAS改變許可數量成功,返回true if (compareAndSetState(current, next)) return true ; } } |
從上面可以看到,一旦CAS改變許可數量成功,那么就會調用doReleaseShared()
方法釋放阻塞的線程。
減小許可數量
Semaphore
還有減小許可數量的方法,該方法可以用于用于當資源用完不能再用時,這時就可以減小許可證。代碼如下:
1
2
3
4
|
protected void reducePermits( int reduction) { if (reduction < 0 ) throw new IllegalArgumentException(); sync.reducePermits(reduction); } |
可以看到,委托給了Sync,Sync的reducePermits方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final void reducePermits( int reductions) { for (;;) { //得到當前剩余許可數量 int current = getState(); //得到減完之后的許可數量 int next = current - reductions; if (next > current) // underflow throw new Error( "Permit count underflow" ); //如果CAS改變成功 if (compareAndSetState(current, next)) return ; } } |
從上面可以看到,就是CAS改變AQS中的state
變量,因為該變量代表許可證的數量。
獲取剩余許可數量
Semaphore
還可以一次將剩余的許可數量全部取走,該方法是drain方法,
如下:
1
2
3
|
public int drainPermits() { return sync.drainPermits(); } |
Sync的實現如下:
1
2
3
4
5
6
7
|
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } |
可以看到,就是CAS將許可數量置為0。
4、總結
Semaphore
是信號量,用于管理一組資源。其內部是基于AQS
的共享模式,AQS
的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那么就有可能重新喚醒等待隊列中的線程繼續執行。
到此這篇關于Java多線程之Semaphore
實現信號燈的文章就介紹到這了,更多相關Java多線程 Semaphore實現信號燈內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://juejin.cn/post/7019623653927026719