1什么是并發問題。
多個進程或線程同時(或著說在同一段時間內)訪問同一資源會產生并發問題。
銀行兩操作員同時操作同一賬戶就是典型的例子。比如A、B操作員同時讀取一余額為1000元的賬戶,A操作員為該賬戶增加100元,B操作員同時為該賬戶減去50元,A先提交,B后提交。最后實際賬戶余額為1000-50=950元,但本該為1000+100-50=1050。這就是典型的并發問題。如何解決?可以用鎖。
2java中synchronized的用法
用法1
1
2
3
4
5
|
public class Test{ public synchronized void print(){ ….; } } |
某線程執行print()方法,則該對象將加鎖。其它線程將無法執行該對象的所有synchronized塊。
用法2
1
2
3
4
5
6
7
8
|
public class Test{ public void print(){ synchronized ( this ){ //鎖住本對象 …; } } } |
同用法1, 但更能體現synchronized用法的本質。
用法3
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class Test{ private String a = “test”; public void print(){ synchronized (a){ //鎖住a對象 …; } } public synchronized void t(){ …; //這個同步代碼塊不會因為print()而鎖定. } } |
執行print(),會給對象a加鎖,注意不是給Test的對象加鎖,也就是說Test對象的其它synchronized方法不會因為print()而被鎖。同步代碼塊執行完,則釋放對a的鎖。
為了鎖住一個對象的代碼塊而不影響該對象其它synchronized塊的高性能寫法:
1
2
3
4
5
6
7
8
9
10
11
|
public class Test{ private byte [] lock = new byte [ 0 ]; public void print(){ synchronized (lock){ …; } } public synchronized void t(){ …; } } |
靜態方法的鎖
1
2
3
4
5
|
public class Test{ public synchronized static void execute(){ …; } } |
效果同
1
2
3
4
5
6
7
|
public class Test{ public static void execute(){ synchronized (TestThread. class ){ …; } } } |
3 Java中的鎖與排隊上廁所。
鎖就是阻止其它進程或線程進行資源訪問的一種方式,即鎖住的資源不能被其它請求訪問。在JAVA中,sychronized關鍵字用來對一個對象加鎖。比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class MyStack { int idx = 0 ; char [] data = new char [ 6 ]; public synchronized void push( char c) { data[idx] = c; idx++; } public synchronized char pop() { idx--; return data[idx]; } public static void main(String args[]){ MyStack m = new MyStack(); /** 下面對象m被加鎖。嚴格的說是對象m的所有synchronized塊被加鎖。 如果存在另一個試圖訪問m的線程T,那么T無法執行m對象的push和 pop方法。 */ m.pop(); //對象m被加鎖。 } } |
Java的加鎖解鎖跟多個人排隊等一個公共廁位完全一樣。第一個人進去后順手把門從里面鎖住,其它人只好排隊等。第一個人結束后出來時,門才會打開(解鎖)。輪到第二個人進去,同樣他又會把門從里面鎖住,其它人繼續排隊等待。
用廁所理論可以很容易明白:一個人進了一個廁位,這個廁位就會鎖住,但不會導致另一個廁位也被鎖住,因為一個人不能同時蹲在兩個廁位里。對于Java就是說:Java中的鎖是針對同一個對象的,不是針對class的。看下例:
1
2
3
4
|
MyStatckm1=newMyStack(); MyStatckm2=newMystatck(); m1.pop(); m2.pop(); |
m1對象的鎖是不會影響m2的鎖的,因為它們不是同一個廁位。就是說,假設有3線程t1,t2,t3操作m1,那么這3個線程只可能在m1上排隊等,假設另2個線程t8,t9在操作m2,那么t8,t9只會在m2上等待。而t2和t8則沒有關系,即使m2上的鎖釋放了,t1,t2,t3可能仍要在m1上排隊。原因無它,不是同一個廁位耳。
Java不能同時對一個代碼塊加兩個鎖,這和數據庫鎖機制不同,數據庫可以對一條記錄同時加好幾種不同的鎖。
4何時釋放鎖?
一般是執行完畢同步代碼塊(鎖住的代碼塊)后就釋放鎖,也可以用wait()方式半路上釋放鎖。wait()方式就好比蹲廁所到一半,突然發現下水道堵住了,不得已必須出來站在一邊,好讓修下水道師傅(準備執行notify的一個線程)進去疏通馬桶,疏通完畢,師傅大喊一聲:“已經修好了”(notify),剛才出來的同志聽到后就重新排隊。注意啊,必須等師傅出來啊,師傅不出來,誰也進不去。也就是說notify后,不是其它線程馬上可以進入封鎖區域活動了,而是必須還要等notify代碼所在的封鎖區域執行完畢從而釋放鎖以后,其它線程才可進入。
這里是wait與notify代碼示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public synchronized char pop() { char c; while (buffer.size() == 0 ) { try { this .wait(); //從廁位里出來 } catch (InterruptedException e) { // ignore it… } } c = ((Character)buffer.remove(buffer.size()- 1 )). charValue(); return c; } public synchronized void push( char c) { this .notify(); //通知那些wait()的線程重新排隊。注意:僅僅是通知它們重新排隊。 Character charObj = new Character(c); buffer.addElement(charObj); } //執行完畢,釋放鎖。那些排隊的線程就可以進來了。 |
再深入一些。
由于wait()操作而半路出來的同志沒收到notify信號前是不會再排隊的,他會在旁邊看著這些排隊的人(其中修水管師傅也在其中)。注意,修水管的師傅不能插隊,也得跟那些上廁所的人一樣排隊,不是說一個人蹲了一半出來后,修水管師傅就可以突然冒出來然后立刻進去搶修了,他要和原來排隊的那幫人公平競爭,因為他也是個普通線程。如果修水管師傅排在后面,則前面的人進去后,發現堵了,就wait,然后出來站到一邊,再進去一個,再wait,出來,站到一邊,只到師傅進去執行notify.這樣,一會兒功夫,排隊的旁邊就站了一堆人,等著notify.
終于,師傅進去,然后notify了,接下來呢?
1.有一個wait的人(線程)被通知到。
2.為什么被通知到的是他而不是另外一個wait的人?取決于JVM.我們無法預先
判斷出哪一個會被通知到。也就是說,優先級高的不一定被優先喚醒,等待
時間長的也不一定被優先喚醒,一切不可預知!(當然,如果你了解該JVM的
實現,則可以預知)。
3.他(被通知到的線程)要重新排隊。
4.他會排在隊伍的第一個位置嗎?回答是:不一定。他會排最后嗎?也不一定。
但如果該線程優先級設的比較高,那么他排在前面的概率就比較大。
5.輪到他重新進入廁位時,他會從上次wait()的地方接著執行,不會重新執行。
惡心點說就是,他會接著拉巴巴,不會重新拉。
6.如果師傅notifyAll().則那一堆半途而廢出來的人全部重新排隊。順序不可知。
JavaDOC上說,Theawakenedthreadswillnotbeabletoproceeduntilthecurrentthreadrelinquishesthelockonthisobject(當前線程釋放鎖前,喚醒的線程不能去執行)。
這用廁位理論解釋就是顯而易見的事。
5Lock的使用
用synchronized關鍵字可以對資源加鎖。用Lock關鍵字也可以。它是JDK1.5中新增內容。用法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[ 100 ]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0 ; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0 ) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0 ; --count; notFull.signal(); return x; } finally { lock.unlock(); } } } |
(注:這是JavaDoc里的例子,是一個阻塞隊列的實現例子。所謂阻塞隊列,就是一個隊列如果滿了或者空了,都會導致線程阻塞等待。Java里的ArrayBlockingQueue提供了現成的阻塞隊列,不需要自己專門再寫一個了。)
一個對象的lock.lock()和lock.unlock()之間的代碼將會被鎖住。這種方式比起synchronize好在什么地方?簡而言之,就是對wait的線程進行了分類。用廁位理論來描述,則是那些蹲了一半而從廁位里出來等待的人原因可能不一樣,有的是因為馬桶堵了,有的是因為馬桶沒水了。通知(notify)的時候,就可以喊:因為馬桶堵了而等待的過來重新排隊(比如馬桶堵塞問題被解決了),或者喊,因為馬桶沒水而等待的過來重新排隊(比如馬桶沒水問題被解決了)。這樣可以控制得更精細一些。不像synchronize里的wait和notify,不管是馬桶堵塞還是馬桶沒水都只能喊:剛才等待的過來排隊!假如排隊的人進來一看,發現原來只是馬桶堵塞問題解決了,而自己渴望解決的問題(馬桶沒水)還沒解決,只好再回去等待(wait),白進來轉一圈,浪費時間與資源。
Lock方式與synchronized對應關系:
LockawaitsignalsignalAll
synchronizedwaitnotifynotifyAll
注意:不要在Lock方式鎖住的塊里調用wait、notify、notifyAll
6利用管道進行線程間通信
原理簡單。兩個線程,一個操作PipedInputStream,一個操作PipedOutputStream。PipedOutputStream寫入的數據先緩存在Buffer中,如果Buffer滿,此線程wait。PipedInputStream讀出Buffer中的數據,如果Buffer沒數據,此線程wait。
jdk1.5中的阻塞隊列可實現同樣功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package io; import java.io.*; public class PipedStreamTest { public static void main(String[] args) { PipedOutputStream ops= new PipedOutputStream(); PipedInputStream pis= new PipedInputStream(); try { ops.connect(pis); //實現管道連接 new Producer(ops).run(); new Consumer(pis).run(); } catch (Exception e){ e.printStackTrace(); } } } //生產者 class Producer implements Runnable{ private PipedOutputStream ops; public Producer(PipedOutputStream ops) { this .ops=ops; } public void run() { try { ops.write( "hell,spell" .getBytes()); ops.close(); } catch (Exception e) { e.printStackTrace(); } } } //消費者 class Consumer implements Runnable{ private PipedInputStream pis; public Consumer(PipedInputStream pis) { this .pis=pis; } public void run() { try { byte [] bu= new byte [ 100 ]; int len=pis.read(bu); System.out.println( new String(bu, 0 ,len)); pis.close(); } catch (Exception e) { e.printStackTrace(); } } } |
例2 對上面的程序做少許改動就成了兩個線程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package io; import java.io.*; public class PipedStreamTest { public static void main(String[] args) { PipedOutputStream ops= new PipedOutputStream(); PipedInputStream pis= new PipedInputStream(); try { ops.connect(pis); //實現管道連接 Producer p = new Producer(ops); new Thread(p).start(); Consumer c = new Consumer(pis); new Thread(c).start(); } catch (Exception e){ e.printStackTrace(); } } } //生產者 class Producer implements Runnable{ private PipedOutputStream ops; public Producer(PipedOutputStream ops) { this .ops=ops; } public void run() { try { for (;;){ ops.write( "hell,spell" .getBytes()); ops.close(); } } catch (Exception e) { e.printStackTrace(); } } } //消費者 class Consumer implements Runnable{ private PipedInputStream pis; public Consumer(PipedInputStream pis) { this .pis=pis; } public void run() { try { for (;;){ byte [] bu= new byte [ 100 ]; int len=pis.read(bu); System.out.println( new String(bu, 0 ,len)); } pis.close(); } catch (Exception e) { e.printStackTrace(); } } } |
例3. 這個例子更加貼進應用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
import java.io.*; public class PipedIO { //程序運行后將sendFile文件的內容拷貝到receiverFile文件中 public static void main(String args[]){ try { //構造讀寫的管道流對象 PipedInputStream pis= new PipedInputStream(); PipedOutputStream pos= new PipedOutputStream(); //實現關聯 pos.connect(pis); //構造兩個線程,并且啟動。 new Sender(pos,”c:\text2.txt”).start(); new Receiver(pis,”c:\text3.txt”).start(); } catch (IOException e){ System.out.println(“Pipe Error”+ e); } } } //線程發送 class Sender extends Thread{ PipedOutputStream pos; File file; //構造方法 Sender(PipedOutputStream pos, String fileName){ this .pos=pos; file= new File(fileName); } //線程運行方法 public void run(){ try { //讀文件內容 FileInputStream fs= new FileInputStream(file); int data; while ((data=fs.read())!=- 1 ){ //寫入管道始端 pos.write(data); } pos.close(); } catch (IOException e) { System.out.println(“Sender Error” +e); } } } //線程讀 class Receiver extends Thread{ PipedInputStream pis; File file; //構造方法 Receiver(PipedInputStream pis, String fileName){ this .pis=pis; file= new File(fileName); } //線程運行 public void run(){ try { //寫文件流對象 FileOutputStream fs= new FileOutputStream(file); int data; //從管道末端讀 while ((data=pis.read())!=- 1 ){ //寫入本地文件 fs.write(data); } pis.close(); } catch (IOException e){ System.out.println( "Receiver Error" +e); } } } |
7阻塞隊列
阻塞隊列可以代替管道流方式來實現進水管/排水管模式(生產者/消費者).JDK1.5提供了幾個現成的阻塞隊列.現在來看ArrayBlockingQueue的代碼如下:
這里是一個阻塞隊列
1
|
BlockingQueue blockingQ = new ArrayBlockingQueue 10 ; |
一個線程從隊列里取
1
2
3
|
for (;;){ Object o = blockingQ.take(); //隊列為空,則等待(阻塞) } |
另一個線程往隊列存
1
2
3
|
for (;;){ blockingQ.put( new Object()); //隊列滿,則等待(阻塞) } |
可見,阻塞隊列使用起來比管道簡單。
8使用Executors、Executor、ExecutorService、ThreadPoolExecutor
可以使用線程管理任務。還可以使用jdk1.5提供的一組類來更方便的管理任務。從這些類里我們可以體會一種面向任務的思維方式。這些類是:
Executor接口。使用方法:
1
2
|
Executor executor = anExecutor; //生成一個Executor實例。 executor.execute( new RunnableTask1()); |
用意:使用者只關注任務執行,不用操心去關注任務的創建、以及執行細節等這些第三方實現者關心的問題。也就是說,把任務的調用執行和任務的實現解耦。
實際上,JDK1.5中已經有該接口出色的實現。夠用了。
Executors是一個如同Collections一樣的工廠類或工具類,用來產生各種不同接口的實例。
ExecutorService接口它繼承自Executor.Executor只管把任務扔進executor()里去執行,剩余的事就不管了。而ExecutorService則不同,它會多做點控制工作。比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class NetworkService { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService( int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void serve() { try { for (;;) { pool.execute( new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); //不再執行新任務 } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this .socket = socket; } public void run() { // read and service request } } |
ExecutorService(也就是代碼里的pool對象)執行shutdown后,它就不能再執行新任務了,但老任務會繼續執行完畢,那些等待執行的任務也不再等待了。
任務提交者與執行者通訊
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public static void main(String args[]) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable task = new Callable(){ public String call() throws Exception{ return “test”; } } ; Future f = executor.submit(task); String result = f.get(); //等待(阻塞)返回結果 System.out.println(result); executor.shutdown(); } |
Executors.newSingleThreadExecutor()取得的Executor實例有以下特性:
任務順序執行.比如:
1
2
|
executor.submit(task1); executor.submit(task2); |
必須等task1執行完,task2才能執行。
task1和task2會被放入一個隊列里,由一個工作線程來處理。即:一共有2個線程(主線程、處理任務的工作線程)。
其它的類請參考JavaDoc
9并發流程控制
本節例子來自溫少的Java并發教程,可能會有改動。向溫少致敬。
CountDownLatch門插銷計數器
啟動線程,然后等待線程結束。即常用的主線程等所有子線程結束后再執行的問題。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static void main(String[] args) throws Exception { // TODO Auto-generated method stub final int count= 10 ; final CountDownLatch completeLatch = new CountDownLatch(count); //定義了門插銷的數目是10 for ( int i= 0 ;i<count;i++){ Thread thread = new Thread( "worker thread" +i){ public void run(){ //do xxxx completeLatch.countDown(); //減少一根門插銷 } } ; thread.start(); } completeLatch.await(); //如果門插銷還沒減完則等待。 } |
JDK1.4時,常用辦法是給子線程設置狀態,主線程循環檢測。易用性和效率都不好。
啟動很多線程,等待通知才能開始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public static void main(String[] args) throws Exception { // TODO Auto-generated method stub final CountDownLatch startLatch = new CountDownLatch( 1 ); //定義了一根門插銷 for ( int i = 0 ; i < 10 ; i++) { Thread thread = new Thread( "worker thread" + i) { public void run() { try { startLatch.await(); //如果門插銷還沒減完則等待 } catch (InterruptedException e) { } // do xxxx } } ; thread.start(); } startLatch.countDown(); //減少一根門插銷 } |
CycliBarrier. 等所有線程都達到一個起跑線后才能開始繼續運行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class CycliBarrierTest implements Runnable { private CyclicBarrier barrier; public CycliBarrierTest(CyclicBarrier barrier) { this .barrier = barrier; } public void run() { //do xxxx; try { this .barrier.await(); //線程運行至此會檢查是否其它線程都到齊了,沒到齊就繼續等待。到齊了就執行barrier的run函數體里的內容 } catch (Exception e) { } } /** * @param args */ public static void main(String[] args) { //參數2代表兩個線程都達到起跑線才開始一起繼續往下執行 CyclicBarrier barrier = new CyclicBarrier( 2 , new Runnable() { public void run() { //do xxxx; } } ); Thread t1 = new Thread( new CycliBarrierTest(barrier)); Thread t2 = new Thread( new CycliBarrierTest(barrier)); t1.start(); t2.start(); } } |
這簡化了傳統的用計數器+wait/notifyAll來實現該功能的方式。
10并發3定律
Amdahl定律.給定問題規模,可并行化部分占12%,那么即使把并行運用到極致,系統的性能最多也只能提高1/(1-0.12)=1.136倍。即:并行對提高系統性能有上限。
Gustafson定律.Gustafson定律說Amdahl定律沒有考慮隨著cpu的增多而有更多的計算能力可被使用。其本質在于更改問題規模從而可以把Amdahl定律中那剩下的88%的串行處理并行化,從而可以突破性能門檻。本質上是一種空間換時間。
Sun-Ni定律.是前兩個定律的進一步推廣。其主要思想是計算的速度受限于存儲而不是CPU的速度.所以要充分利用存儲空間等計算資源,盡量增大問題規模以產生更好/更精確的解.
11由并發到并行
計算機識別物體需要飛速的計算,以至于芯片發熱發燙,而人在識別物體時卻一目了然,卻并不會導致某個腦細胞被燒熱燒焦(夸張)而感到不適,是由于大腦是一個分布式并行運行系統,就像google用一些廉價的linux服務器可以進行龐大復雜的計算一樣,大腦內部無數的神經元的獨自計算,互相分享成果,從而瞬間完成需要單個cpu萬億次運算才能有的效果。試想,如果在并行處理領域有所創建,將對計算機的發展和未來產生不可估量的影響。當然,其中的挑戰也可想而知:許多的問題是并不容易輕易就“分割”的了的。
總結
以上就是本文關于java并發問題概述的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
原文鏈接:http://blog.csdn.net/sinat_15274667/article/details/53591304