很多時候我們都希望能夠最大的利用資源,比如在進行io操作的時候盡可能的避免同步阻塞的等待,因為這會浪費cpu的資源。如果在有可讀的數據的時候能夠通知程序執行讀操作甚至由操作系統內核幫助我們完成數據的拷貝,這再好不過了。從nio到completablefuture、lambda、fork/join,java一直在努力讓程序盡可能變的異步甚至擁有更高的并行度,這一點一些函數式語言做的比較好,因此java也或多或少的借鑒了某些特性。下面介紹一種非常常用的實現異步操作的方式。
考慮有一個耗時的操作,操作完后會返回一個結果(不管是正常結果還是異常),程序如果想擁有比較好的性能不可能由線程去等待操作的完成,而是應該采用listener模式。jdk并發包里的future代表了未來的某個結果,當我們向線程池中提交任務的時候會返回該對象。代碼例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/** * jdk1.8之前的future * * @author administrator * */ public class javafuture { public static void main(string[] args) throws throwable, executionexception { executorservice executor = executors.newfixedthreadpool( 1 ); // future代表了線程執行完以后的結果,可以通過future獲得執行的結果 // 但是jdk1.8之前的future有點雞肋,并不能實現真正的異步,需要阻塞的獲取結果,或者不斷的輪詢 // 通常我們希望當線程執行完一些耗時的任務后,能夠自動的通知我們結果,很遺憾這在原生jdk1.8之前 // 是不支持的,但是我們可以通過第三方的庫實現真正的異步回調 future<string> f = executor.submit( new callable<string>() { @override public string call() throws exception { system.out.println( "task started!" ); thread.sleep( 3000 ); system.out.println( "task finished!" ); return "hello" ; } }); //此處阻塞main線程 system.out.println(f.get()); system.out.println( "main thread is blocked" ); } } |
如果想獲得耗時操作的結果,可以通過get方法獲取,但是該方法會阻塞當前線程,我們可以在做完剩下的某些工作的時候調用get方法試圖去獲取結果,也可以調用非阻塞的方法isdone來確定操作是否完成,這種方式有點兒類似下面的過程:
這種方式對流程的控制很混亂,但是在jdk1.8之前只提供了這種笨拙的實現方式,以至于很多高性能的框架都實現了自己的一套異步框架,比如netty和guava,下面分別介紹下這三種異步的實現方式(包括jdk1.8)。首先是guava中的實現方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package guava; import java.util.concurrent.callable; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import com.google.common.util.concurrent.futurecallback; import com.google.common.util.concurrent.futures; import com.google.common.util.concurrent.listenablefuture; import com.google.common.util.concurrent.listeningexecutorservice; import com.google.common.util.concurrent.moreexecutors; /** * guava中的future * * @author administrator * */ public class guavafuture { public static void main(string[] args) { executorservice executor = executors.newfixedthreadpool( 1 ); // 使用guava提供的moreexecutors工具類包裝原始的線程池 listeningexecutorservice listeningexecutor = moreexecutors.listeningdecorator(executor); //向線程池中提交一個任務后,將會返回一個可監聽的future,該future由guava框架提供 listenablefuture<string> lf = listeningexecutor.submit( new callable<string>() { @override public string call() throws exception { system.out.println( "task started!" ); //模擬耗時操作 thread.sleep( 3000 ); system.out.println( "task finished!" ); return "hello" ; } }); //添加回調,回調由executor中的線程觸發,但也可以指定一個新的線程 futures.addcallback(lf, new futurecallback<string>() { //耗時任務執行失敗后回調該方法 @override public void onfailure(throwable t) { system.out.println( "failure" ); } //耗時任務執行成功后回調該方法 @override public void onsuccess(string s) { system.out.println( "success " + s); } }); //主線程可以繼續做其他的工作 system.out.println( "main thread is running" ); } } |
guava提供了一套完整的異步框架,核心是可監聽的future,通過注冊監聽器或者回調方法實現及時獲取操作結果的能力。需要提一點的是,假設添加監聽的時候耗時操作已經執行完了,此時回調方法會被立即執行并不會丟失。想探究其實現方式的話可以跟一下源碼,底層的原理并不難。
談到異步編程就不得不提一下promise,很多函數式語言比如js原生支持promise,但是在java界也有一些promise框架,其中就有大名鼎鼎的netty。從future、callback到promise甚至線程池,netty實現了一套完整的異步框架,并且netty代碼中也大量使用了promise,下面是netty中的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package netty_promise; import io.netty.util.concurrent.defaulteventexecutorgroup; import io.netty.util.concurrent.eventexecutorgroup; import io.netty.util.concurrent.future; import io.netty.util.concurrent.futurelistener; /** * netty中的promise * * @author administrator * */ public class promisetest { @suppresswarnings ({ "unchecked" , "rawtypes" }) public static void main(string[] args) throws throwable { //線程池 eventexecutorgroup group = new defaulteventexecutorgroup( 1 ); //向線程池中提交任務,并返回future,該future是netty自己實現的future //位于io.netty.util.concurrent包下,此處運行時的類型為promisetask future<?> f = group.submit( new runnable() { @override public void run() { system.out.println( "任務正在執行" ); //模擬耗時操作,比如io操作 try { thread.sleep( 1000 ); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println( "任務執行完畢" ); } }); //增加監聽 f.addlistener( new futurelistener() { @override public void operationcomplete(future arg0) throws exception { system.out.println( "ok!!!" ); } }); system.out.println( "main thread is running." ); } } |
直到jdk1.8才算真正支持了異步操作,其中借鑒了某些框架的實現思想,但又有新的功能,同時在jdk1.8中提供了lambda表達式,使得java向函數式語言又靠近了一步。借助jdk原生的completablefuture可以實現異步的操作,同時結合lambada表達式大大簡化了代碼量。代碼例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package netty_promise; import java.util.concurrent.completablefuture; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.function.supplier; /** * 基于jdk1.8實現任務異步處理 * * @author administrator * */ public class javapromise { public static void main(string[] args) throws throwable, executionexception { // 兩個線程的線程池 executorservice executor = executors.newfixedthreadpool( 2 ); //jdk1.8之前的實現方式 completablefuture<string> future = completablefuture.supplyasync( new supplier<string>() { @override public string get() { system.out.println( "task started!" ); try { //模擬耗時操作 thread.sleep( 2000 ); } catch (interruptedexception e) { e.printstacktrace(); } return "task finished!" ; } }, executor); //采用lambada的實現方式 future.thenaccept(e -> system.out.println(e + " ok" )); system.out.println( "main thread is running" ); } } |
上面的圖只是簡單的表示了一下異步的實現流程,實際的調用中看似順序的步驟會發生線程的切換。
以上所述是小編給大家介紹的java異步編程詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!