1. 前言
隨著數(shù)據(jù)量和調(diào)用量的增長,用戶對(duì)應(yīng)用的性能要求越來越高。另外,在實(shí)際的服務(wù)中,還存在著這樣的場景:系統(tǒng)在組裝數(shù)據(jù)的時(shí)候,對(duì)于數(shù)據(jù)的各個(gè)部分的獲取實(shí)際上是沒有前后依賴關(guān)系的。這些問題都很容易讓我們想到將這些同步調(diào)用全都改造為異步調(diào)用。不過自己實(shí)現(xiàn)起來比較麻煩,還容易出錯(cuò)。好在spring已經(jīng)提供了該問題的解決方案,而且使用起來十分方便。
2.spring異步執(zhí)行框架的使用方法
2.1 maven 依賴
spring異步執(zhí)行框架的相關(guān)bean包含在spring-context和spring-aop模塊中,所以只要引入上述的模塊即可。
2.2 開啟異步任務(wù)支持
spring提供了@enableasync的注解來標(biāo)注是否啟用異步任務(wù)支持。使用方式如下:
1
2
3
4
|
@configuration @enableasync public class appconfig { } |
note: @enableasync必須要配合@configuration使用,否則會(huì)不生效
2.3 方法標(biāo)記為異步調(diào)用
將同步方法的調(diào)用改為異步調(diào)用也很簡單。對(duì)于返回值為void的方法,直接加上@async注解即可。對(duì)于有返回值的方法,除了加上上述的注解外,還需要將方法的返回值修改為future類型和將返回值用asyncresult包裝起來。如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
|
// 無返回值的方法直接加上注解即可。 @async public void method1() { ... } // 有返回值的方法需要修改返回值。 @async public future<object> method2() { ... return new asyncresult<>(object); } |
2.4 方法調(diào)用
對(duì)于void的方法,和普通的調(diào)用沒有任何區(qū)別。對(duì)于非void的方法,由于返回值是future類型,所以需要用get()方法來獲取返回值。如下所示:
1
2
3
4
5
6
7
8
9
10
|
public static void main(string[] args) { service.method1(); future<object> futureresult = service.method2(); object result; try { result = futureresult.get(); } catch (interruptedexception | executionexception e) { ... } } |
3. 原理簡介
這塊的源碼的邏輯還是比較簡單的,主要是spring幫我們生成并管理了一個(gè)線程池,然后方法調(diào)用的時(shí)候使用動(dòng)態(tài)代理將方法的執(zhí)行包裝為callable類型并提交到線程池中執(zhí)行。核心的實(shí)現(xiàn)邏輯在asyncexecutioninterceptor類的invoke()方法中。如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@override public object invoke( final methodinvocation invocation) throws throwable { class <?> targetclass = (invocation.getthis() != null ? aoputils.gettargetclass(invocation.getthis()) : null ); method specificmethod = classutils.getmostspecificmethod(invocation.getmethod(), targetclass); final method userdeclaredmethod = bridgemethodresolver.findbridgedmethod(specificmethod); asynctaskexecutor executor = determineasyncexecutor(userdeclaredmethod); if (executor == null ) { throw new illegalstateexception( "no executor specified and no default executor set on asyncexecutioninterceptor either" ); } callable<object> task = new callable<object>() { @override public object call() throws exception { try { object result = invocation.proceed(); if (result instanceof future) { return ((future<?>) result).get(); } } catch (executionexception ex) { handleerror(ex.getcause(), userdeclaredmethod, invocation.getarguments()); } catch (throwable ex) { handleerror(ex, userdeclaredmethod, invocation.getarguments()); } return null ; } }; return dosubmit(task, executor, invocation.getmethod().getreturntype()); } |
4.自定義taskexecutor及異常處理
4.1自定義taskexecutor
spring查找taskexecutor邏輯是:
1. 如果spring context中存在唯一的taskexecutor bean,那么就使用這個(gè)bean。
2. 如果1中的bean不存在,那么就會(huì)查找是否存在一個(gè)beanname為taskexecutor且是java.util.concurrent.executor實(shí)例的bean,有則使用這個(gè)bean。
3. 如果1、2中的都不存在,那么spring就會(huì)直接使用默認(rèn)的executor,即simpleasynctaskexecutor。
在第2節(jié)的實(shí)例中,我們直接使用的是spring默認(rèn)的taskexecutor。但是對(duì)于每一個(gè)新的任務(wù),simpleaysnctaskexecutor都是直接創(chuàng)建新的線程來執(zhí)行,所以無法重用線程。具體的執(zhí)行的代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@override public void execute(runnable task, long starttimeout) { assert .notnull(task, "runnable must not be null" ); runnable tasktouse = ( this .taskdecorator != null ? this .taskdecorator.decorate(task) : task); if (isthrottleactive() && starttimeout > timeout_immediate) { this .concurrencythrottle.beforeaccess(); doexecute( new concurrencythrottlingrunnable(tasktouse)); } else { doexecute(tasktouse); } } protected void doexecute(runnable task) { thread thread = ( this .threadfactory != null ? this .threadfactory.newthread(task) : createthread(task)); thread.start(); } |
所以我們?cè)谑褂玫臅r(shí)候,最好是使用自定義的taskexecutor。結(jié)合上面描述的spring查找taskexecutor的邏輯,最簡單的自定義的方法是使用@bean注解。示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// threadpooltaskexecutor的配置基本等同于線程池 @bean ( "taskexecutor" ) public executor getasyncexecutor() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); taskexecutor.setmaxpoolsize(max_pool_size); taskexecutor.setcorepoolsize(core_pool_size); taskexecutor.setqueuecapacity(core_pool_size * 10 ); taskexecutor.setthreadnameprefix( "wssys-async-task-thread-pool" ); taskexecutor.setwaitfortaskstocompleteonshutdown( true ); taskexecutor.setawaitterminationseconds( 60 * 10 ); taskexecutor.setrejectedexecutionhandler( new threadpoolexecutor.abortpolicy()); return taskexecutor; } |
另外,spring還提供了一個(gè)asyncconfigurer接口,通過實(shí)現(xiàn)該接口,除了可以實(shí)現(xiàn)自定義executor以外,還可以自定義異常的處理。代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@configuration @slf4j public class asyncconfig implements asyncconfigurer { private static final int max_pool_size = 50 ; private static final int core_pool_size = 20 ; @override @bean ( "taskexecutor" ) public executor getasyncexecutor() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); taskexecutor.setmaxpoolsize(max_pool_size); taskexecutor.setcorepoolsize(core_pool_size); taskexecutor.setqueuecapacity(core_pool_size * 10 ); taskexecutor.setthreadnameprefix( "async-task-thread-pool" ); taskexecutor.setwaitfortaskstocompleteonshutdown( true ); taskexecutor.setawaitterminationseconds( 60 * 10 ); taskexecutor.setrejectedexecutionhandler( new threadpoolexecutor.abortpolicy()); return taskexecutor; } @override public asyncuncaughtexceptionhandler getasyncuncaughtexceptionhandler() { return (ex, method, params) -> log.error( "invoke async method occurs error. method: {}, params: {}" , method.getname(), json.tojsonstring(params), ex); } } |
note:
spring還提供了一個(gè)asyncconfigurersupport類,該類也實(shí)現(xiàn)了asyncconfigurer接口,且方法的返回值都是null,旨在提供一個(gè)方便的實(shí)現(xiàn)。
當(dāng)getasyncexecutor()方法返回null的時(shí)候,spring會(huì)使用默認(rèn)的處理器(強(qiáng)烈不推薦)。
當(dāng)getasyncuncaughtexceptionhandler()返回null的時(shí)候,spring會(huì)使用simpleasyncuncaughtexceptionhandler來處理異常,該類會(huì)打印出異常的信息。
所以對(duì)該類的使用,最佳的實(shí)踐是繼承該類,并且覆蓋實(shí)現(xiàn)getasyncexecutor()方法。
4.2 異常處理
spring異步框架對(duì)異常的處理如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 所在類:asyncexecutionaspectsupport protected void handleerror(throwable ex, method method, object... params) throws exception { if (future. class .isassignablefrom(method.getreturntype())) { reflectionutils.rethrowexception(ex); } else { // could not transmit the exception to the caller with default executor try { this .exceptionhandler.handleuncaughtexception(ex, method, params); } catch (throwable ex2) { logger.error( "exception handler for async method '" + method.togenericstring() + "' threw unexpected exception itself" , ex2); } } } |
從代碼來看,如果返回值是future類型,那么直接將異常拋出。如果返回值不是future類型(基本上包含的是所有返回值void類型的方法,因?yàn)槿绻椒ㄓ蟹祷刂担仨氁胒uture包裝起來),那么會(huì)調(diào)用handleuncaughtexception方法來處理異常。
注意:在handleuncaughtexception()方法中拋出的任何異常,都會(huì)被spring catch住,所以沒有辦法將void的方法再次拋出并傳播到上層調(diào)用方的!!!
關(guān)于spring 這個(gè)設(shè)計(jì)的緣由我的理解是:既然方法的返回值是void,就說明調(diào)用方不關(guān)心方法執(zhí)行是否成功,所以也就沒有必要去處理方法拋出的異常。如果需要關(guān)心異步方法是否成功,那么返回值改為boolean就可以了。
4.4 最佳實(shí)踐的建議
- @async可以指定方法執(zhí)行的executor,用法:@async("mytaskexecutor")。推薦指定executor,這樣可以避免因?yàn)閑xecutor配置沒有生效而spring使用默認(rèn)的executor的問題。
- 實(shí)現(xiàn)接口asyncconfigurer的時(shí)候,方法getasyncexecutor()必須要使用@bean,并指定bean的name。如果不使用@bean,那么該方法返回的executor并不會(huì)被spring管理。用java doc api的原話是:is not a fully managed spring bean.(具體含義沒有太理解,不過親測不加這個(gè)注解無法正常使用)
- 由于其本質(zhì)上還是基于代理實(shí)現(xiàn)的,所以如果一個(gè)類中有a、b兩個(gè)異步方法,而a中存在對(duì)b的調(diào)用,那么調(diào)用a方法的時(shí)候,b方法不會(huì)去異步執(zhí)行的。
- 在異步方法上標(biāo)注@transactional是無效的。
- future.get()的時(shí)候,最好使用get(long timeout, timeunit unit)方法,避免長時(shí)間阻塞。
- listenablefuture和completablefuture也是推薦使用的,他們相比future,提供了對(duì)異步調(diào)用的各個(gè)階段或過程進(jìn)行介入的能力。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://juejin.im/post/5c3fe71ff265da61223a966c