一、背景
業務處理過程,發現了以下問題,代碼一是原代碼能正常執行,代碼二是經過迭代一次非正常執行代碼
- 代碼一:以下代碼開啟線程后,代碼正常執行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5 , 10 , 200 , TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>( 5 )); @Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 異步線程 writeStatisticsData(studentId); return studentId; } private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); //........ }); } |
- 代碼二:以下代碼開啟線程后,代碼不正常執行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 異步線程 writeStatisticsData(studentId); // 插入學生地址記錄 Long addressId = addressService.insert(address); return studentId; } private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); //........ }); } |
二、問題分析
這里使用了spring事務,顯然需要考慮事務的隔離級別
2.1、mysql隔離級別
查看mysql隔離級別
1
2
|
SELECT @@tx_isolation; READ - COMMITTED |
讀提交,即在事務A插入數據過程中,事務B在A提交之前讀取A插入的數據讀取不到,而B在A提交之后再去讀就會讀取到A插入的數據,也即Read Committed不能保證在一個事務中每次讀都能讀到相同的數據,因為在每次讀數據之后其他并發事務可能會對剛才讀到的數據進行修改。
2.2、問題原因分析
- 代碼一正常運行的原因
由于mysql事務的隔離級別是讀提交,test方法在開啟異步線程后,異步線程也開啟了事務,同時以讀者身份去讀 test 方法中插入的 student 記錄,但此時 test 方法已經提交了事務,所以可以讀取到 student 記錄(即在異步方法中可以讀取到 student 記錄),但此代碼有風險,若事務提交的時間晚一點,異步線程也有可能讀取不到 student 記錄。
- 代碼二不能正常運行的原因
經過上面分析,很明顯異步方法中不能讀取到 student 記錄,由于代碼二在異步線程下面又執行了其他操作,延時了test方法中事務的提交,所以代碼二不能正常運行。
三、解決問題方案
解決思路是在事務提交后再做其他的處理(如異步發消息處理等),這里還是從Spring執行事務的過程中入手,Spring事務的處理過程不再分析,這里直接看Spring事務增強器TransactionInterceptor的核心處理流程,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 獲取事務屬性 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); //加載配置中配置的TransactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 聲明式事務的處理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null ; //...... retVal = invocation.proceedWithInvocation(); //...... commitTransactionAfterReturning(txInfo); return retVal; } else { // 編程式事務的處理...... } //...... } |
這里主要看聲明式事務的處理,因為編程式事務的處理及提交都是用戶在編碼中進行控制。在聲明式事務處理中,當方法執行完后,會執行 commitTransactionAfterReturning 方法來進行提交事務,該方法在 TransactionAspectSupport 類中,源碼如下:
1
2
3
4
5
|
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } |
再看 commit 方法,該方法在 AbstractPlatformTransactionManager 類中,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public final void commit(TransactionStatus status) throws TransactionException { // 這里省略很多代碼,如事務回滾...... processCommit(defStatus); } private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false ; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true ; boolean globalRollbackOnly = false ; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { // 提交事務 doCommit(status); } //...... } catch (......) { // 事務異常處理...... } try { // 事務提交成功后的處理-----這里是重點 triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } } private void triggerAfterCommit(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { TransactionSynchronizationUtils.triggerAfterCommit(); } } |
最終會走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中
1
2
3
4
5
6
7
8
9
10
11
|
public static void triggerAfterCommit() { invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); } public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) { if (synchronizations != null ) { for (TransactionSynchronization synchronization : synchronizations) { synchronization.afterCommit(); } } } |
上面會把緩存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按順序來執行 afterCommit 方法,其中 TransactionSynchronization 以集合形式緩存在 TransactionSynchronizationManager 的 ThreadLocal 中。
3.1、方式一
經過上面分析,只需要代碼中重新生成個 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解決方案,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
private void writeStatisticsData(Long studentId) { if (TransactionSynchronizationManager.isActualTransactionActive()) { // 當前存在事務 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { executor.execute(() -> {Student student = studentService.findById(studentId); //........ }); }}); } else { // 當前不存在事務 executor.execute(() -> {Student student = studentService.findById(studentId); //........ }); } } |
3.2、方式二
使用 @TransactionalEventListener 結合 Spring事件監聽機制,該注解自從Spring4.2版本開始有的,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 事件 public class StudentEvent extends ApplicationEvent { public StudentEvent(Long studentId) { super (studentId); } } // 監聽器 public class StudentEventListener{ @TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT) public void writeStatisticsData(StudentEvent studentEvent) { executor.execute(() -> { Student student = studentService.findById(studentEvent.getSource()); //........ }); } } @Service public class StudentService { // Spring4.2之后,ApplicationEventPublisher自動被注入到容器中,采用Autowired即可獲取 @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 發布事件 applicationEventPublisher.publishEvent( new StudentEvent(studentId)); // 插入學生地址記錄 Long addressId = addressService.insert(address); return studentId; } } |
原理分析
Spring Bean在加載配置文件時,會使用 AnnotationDrivenBeanDefinitionParser 來解析 annotation-driven 標簽,如下:
1
2
3
4
5
6
7
8
9
|
public class TxNamespaceHandler extends NamespaceHandlerSupport { //...... @Override public void init() { registerBeanDefinitionParser( "advice" , new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser( "annotation-driven" , new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser( "jta-transaction-manager" , new JtaTransactionManagerBeanDefinitionParser()); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser { @Override public BeanDefinition parse(Element element, ParserContext parserContext) { // 重點——將TransactionalEventListenerFactory加入到容器中 registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute( "mode" ); if ( "aspectj" .equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null ; } private void registerTransactionalEventListenerFactory(ParserContext parserContext) { RootBeanDefinition def = new RootBeanDefinition(); def.setBeanClass(TransactionalEventListenerFactory. class ); parserContext.registerBeanComponent( new BeanComponentDefinition(def, TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)); } } |
1
2
3
4
5
6
7
8
9
|
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered { //省略部分代碼...... @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter { //省略部分代碼...... @Override public void onApplicationEvent(ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive()) { // 事務存在時,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存集合中 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } else if ( this .annotation.fallbackExecution()) { //....... } processEvent(event); } else { // 當前不存在事務什么也不做 } } |
上述 @TransactionalEventListener 本質上是一個 @EventListener,TransactionalEventListenerFactory類會將每一個掃描到的方法有TransactionalEventListener注解包裝成ApplicationListenerMethodTransactionalAdapter對象,通過ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若當前存在事務,就會生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 結合 Spring事件監聽機制,并使用到異步方式感覺有點別扭,這里是為了說明問題)。
四、使用案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor( 5 , new ThreadFactoryBuilder().setDaemon( false ).setNamePrefix( "execApiCache" ).build()); @Override @Transactional (rollbackFor = Exception. class ) public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) { // 數據庫代碼... // 異步代碼 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { log.warn( "afterCommit..." ); executorService.execute(() -> { // 異步業務 execApiCache(api); }); }}); return ResultUtil.buildSucc(); } |
Ps:setDaemon(false) 注意這里守護線程標記必須設置為 false,否則主線程執行完,異步線程沒執行完的話,異步線程會馬上被中斷、關閉,所以這里不能設置成守護(用戶)線程。
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://lux-sun.blog.csdn.net/article/details/109366633