激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java并發編程Semaphore計數信號量詳解

Java并發編程Semaphore計數信號量詳解

2021-01-25 11:26IAMTJW Java教程

這篇文章主要介紹了Java并發編程Semaphore計數信號量詳解,具有一定參考價值,需要的朋友可以了解下。

Semaphore 是一個計數信號量,它的本質是一個共享鎖。信號量維護了一個信號量許可集。線程可以通過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過release()來釋放它所持有的信號量許可(用完信號量之后必須釋放,不然其他線程可能會無法獲取信號量)。

簡單示例:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package me.socketthread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreLearn {
  //信號量總數
  private static final int SEM_MAX = 12;
  public static void main(String[] args) { 
    Semaphore sem = new Semaphore(SEM_MAX);
    //創建線程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    //在線程池中執行任務
    threadPool.execute(new MyThread(sem, 7));
    threadPool.execute(new MyThread(sem, 4));
    threadPool.execute(new MyThread(sem, 2));
    //關閉池
    threadPool.shutdown();
  }
}
  class MyThread extends Thread {
    private volatile Semaphore sem;  // 信號量
    private int count;    // 申請信號量的大小 
     
    MyThread(Semaphore sem, int count) {
      this.sem = sem;
      this.count = count;
    }
    public void run() {
      try {
       // 從信號量中獲取count個許可
        sem.acquire(count);
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + " acquire count="+count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 釋放給定數目的許可,將其返回到信號量。
        sem.release(count);
        System.out.println(Thread.currentThread().getName() + " release " + count + "");
      }
    }
  }

執行結果:

?
1
2
3
4
5
6
pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=7
pool-1-thread-1 release 7
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=2
pool-1-thread-3 release 2

線程1和線程2會并發執行,因為兩者的信號量和沒有超過總信號量,當前兩個線程釋放掉信號量之后線程3才能繼續執行。

源碼分析:

1、構造函數

在構造函數中會初始化信號量值,這值最終是作為鎖標志位state的值

?
1
Semaphore sem = new Semaphore(12);//簡單來說就是給鎖標識位state賦值為12

2、Semaphore.acquire(n);簡單理解為獲取鎖資源,如果獲取不到線程阻塞

?
1
Semaphore.acquire(n);//從鎖標識位state中獲取n個信號量,簡單來說是state = state-n 此時state大于0表示可以獲取信號量,如果小于0則將線程阻塞
?
1
2
3
4
5
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //獲取鎖
    sync.acquireSharedInterruptibly(permits);
  }

acquireSharedInterruptibly中的操作是獲取鎖資源,如果可以獲取則將state= state-permits,否則將線程阻塞

?
1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
      throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//tryAcquireShared中嘗試獲取鎖資源
      doAcquireSharedInterruptibly(arg); //將線程阻塞
  }

tryAcquireShared中的操作是嘗試獲取信號量值,簡單來說就是state=state-acquires ,如果此時小于0則返回負值,否則返回大于新值,再判斷是否將當線程線程阻塞

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
      //獲取state值
        int available = getState();
      //從state中獲取信號量
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
        //如果信號量小于0則直接返回,表示無法獲取信號量,否則將state值修改為新值
          return remaining;
      }
    }

doAcquireSharedInterruptibly中的操作簡單來說是將當前線程添加到FIFO隊列中并將當前線程阻塞。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/會將線程添加到FIFO隊列中,并阻塞 
private void doAcquireSharedInterruptibly(int arg) 
    throws InterruptedException { 
    //將線程添加到FIFO隊列中 
    final Node node = addWaiter(Node.SHARED); 
    boolean failed = true
    try
      for (;;) { 
        final Node p = node.predecessor(); 
        if (p == head) { 
          int r = tryAcquireShared(arg); 
          if (r >= 0) { 
            setHeadAndPropagate(node, r); 
            p.next = null; // help GC 
            failed = false
            return
          
        
        //parkAndCheckInterrupt完成線程的阻塞操作 
        if (shouldParkAfterFailedAcquire(p, node) && 
          parkAndCheckInterrupt()) 
          throw new InterruptedException(); 
      
    } finally
      if (failed) 
        cancelAcquire(node); 
    
  }

3、Semaphore.release(int permits),這個函數的實現操作是將state = state+permits并喚起處于FIFO隊列中的阻塞線程。

?
1
2
3
4
5
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
  //state = state+permits,并將FIFO隊列中的阻塞線程喚起
    sync.releaseShared(permits);
  }

releaseShared中的操作是將state = state+permits,并將FIFO隊列中的阻塞線程喚起。

?
1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
    //tryReleaseShared將state設置為state = state+arg
    if (tryReleaseShared(arg)) {
      //喚起FIFO隊列中的阻塞線程
      doReleaseShared();
      return true;
    }
    return false;
  }

tryReleaseShared將state設置為state = state+arg

?
1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        //將state值設置為state=state+releases
        if (compareAndSetState(current, next))
          return true;
      }
    }

doReleaseShared()喚起FIFO隊列中的阻塞線程

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doReleaseShared() { 
  
    for (;;) { 
      Node h = head; 
      if (h != null && h != tail) { 
        int ws = h.waitStatus; 
        if (ws == Node.SIGNAL) { 
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
            continue;      // loop to recheck cases 
          //完成阻塞線程的喚起操作 
          unparkSuccessor(h); 
        
        else if (ws == 0 && 
             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
          continue;        // loop on failed CAS 
      
      if (h == head)          // loop if head changed 
        break
    
  }

總結:Semaphore簡單來說設置了一個信號量池state,當線程執行時會從state中獲取值,如果可以獲取則線程執行,并且在執行后將獲取的資源返回到信號量池中,并喚起其他阻塞線程;如果信號量池中的資源無法滿足某個線程的需求則將此線程阻塞。

Semaphore源碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class Semaphore implements java.io.Serializable {
  private static final long serialVersionUID = -3222578661600680210L;
  private final Sync sync;
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    //設置鎖標識位state的初始值
    Sync(int permits) {
      setState(permits);
    }
    //獲取鎖標識位state的值,如果state值大于其需要的值則表示鎖可以獲取
    final int getPermits() {
      return getState();
    }
    //獲取state值減去acquires后的值,如果大于等于0則表示鎖可以獲取
    final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
    //釋放鎖
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        //將state值加上release值
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
          return true;
      }
    }
    //將state的值減去reductions
    final void reducePermits(int reductions) {
      for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
          throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
          return;
      }
    }
    final int drainPermits() {
      for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
          return current;
      }
    }
  }
  //非公平鎖
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
    }
  }
  //公平鎖
  static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
  }
  //設置信號量
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
  //獲取鎖
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
  public void acquireUninterruptibly() {
    sync.acquireShared(1);
  }
  public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
  }
  public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }
  public void release() {
    sync.releaseShared(1);
  }
  //獲取permits值鎖
  public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
  }
  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
  }
  public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
  }
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  }
  //釋放
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
  }
  public int availablePermits() {
    return sync.getPermits();
  }
  public int drainPermits() {
    return sync.drainPermits();
  }
  protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
  }
  public boolean isFair() {
    return sync instanceof FairSync;
  }
  public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }
  public final int getQueueLength() {
    return sync.getQueueLength();
  }
  protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
  }
  public String toString() {
    return super.toString() + "[Permits = " + sync.getPermits() + "]";
  }
}

總結

以上就是本文關于Java并發編程Semaphore計數信號量詳解的全部內容,希望對大家有所幫助。有什么問題,可以留言交流討論。感謝朋友們對本站的支持!

原文鏈接:http://blog.csdn.net/qq924862077/article/details/70224646

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: www.成人精品 | 午夜激情视频网站 | 色蜜桃av | 毛片一级网站 | 日韩视频―中文字幕 | 国产人成精品一区二区三 | 国产精品久久久久久久久久大牛 | 欧美一区二区三区不卡免费观看 | 精品中文一区 | 欧美色大成网站www永久男同 | 一级在线 | 亚洲欧美国产视频 | 91精品国| 亚洲一区免费电影 | 国产呻吟 | 精品国产一区二区三区四区阿崩 | 91av网址| 精品久久久久久综合日本 | 性少妇videosexfreexx | 一区二区三区欧美精品 | 欧美成人免费在线视频 | 黄色免费网站在线播放 | 九九热免费精品视频 | 国产毛片在线看 | 欧美一区二区三区四区夜夜大片 | 成人一级黄色大片 | 青青国产在线视频 | 激情97| mmmwww| 欧美成人二区 | 日本在线不卡一区二区 | 龙的两根好大拔不出去h | h视频免费看 | 中文在线观看视频 | 黄色成人在线 | 27xxoo无遮挡动态视频 | 欧美在线成人影院 | 九九热免费观看 | 久久久麻豆 | 久精品国产 | 一级黄色毛片播放 |