激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Spring TransactionalEventListener事務未提交讀取不到數據的解決

Spring TransactionalEventListener事務未提交讀取不到數據的解決

2022-01-05 10:46程序員牧碼 Java教程

這篇文章主要介紹了Spring TransactionalEventListener事務未提交讀取不到數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、背景

業務處理過程,發現了以下問題,代碼一是原代碼能正常執行,代碼二是經過迭代一次非正常執行代碼

  • 代碼一:以下代碼開啟線程后,代碼正常執行
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
 
@Transactional
public Long test() {
  // ......
  // 插入記錄
  Long studentId = studentService.insert(student);
  // 異步線程
  writeStatisticsData(studentId);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
  • 代碼二:以下代碼開啟線程后,代碼不正常執行
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Transactional
public Long test() {
  // ......
  // 插入記錄
  Long studentId = studentService.insert(student);
   // 異步線程
  writeStatisticsData(studentId);
  // 插入學生地址記錄
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}

二、問題分析

這里使用了spring事務,顯然需要考慮事務的隔離級別

2.1、mysql隔離級別

查看mysql隔離級別

?
1
2
SELECT @@tx_isolation;
READ-COMMITTED

讀提交,即在事務A插入數據過程中,事務B在A提交之前讀取A插入的數據讀取不到,而B在A提交之后再去讀就會讀取到A插入的數據,也即Read Committed不能保證在一個事務中每次讀都能讀到相同的數據,因為在每次讀數據之后其他并發事務可能會對剛才讀到的數據進行修改。

2.2、問題原因分析

  • 代碼一正常運行的原因

由于mysql事務的隔離級別是讀提交,test方法在開啟異步線程后,異步線程也開啟了事務,同時以讀者身份去讀 test 方法中插入的 student 記錄,但此時 test 方法已經提交了事務,所以可以讀取到 student 記錄(即在異步方法中可以讀取到 student 記錄),但此代碼有風險,若事務提交的時間晚一點,異步線程也有可能讀取不到 student 記錄。

  • 代碼二不能正常運行的原因

經過上面分析,很明顯異步方法中不能讀取到 student 記錄,由于代碼二在異步線程下面又執行了其他操作,延時了test方法中事務的提交,所以代碼二不能正常運行。

三、解決問題方案

解決思路是在事務提交后再做其他的處理(如異步發消息處理等),這里還是從Spring執行事務的過程中入手,Spring事務的處理過程不再分析,這里直接看Spring事務增強器TransactionInterceptor的核心處理流程,源碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 獲取事務屬性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加載配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  
  // 聲明式事務的處理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 編程式事務的處理......
  }
  //......
}

這里主要看聲明式事務的處理,因為編程式事務的處理及提交都是用戶在編碼中進行控制。在聲明式事務處理中,當方法執行完后,會執行 commitTransactionAfterReturning 方法來進行提交事務,該方法在 TransactionAspectSupport 類中,源碼如下:

?
1
2
3
4
5
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

再看 commit 方法,該方法在 AbstractPlatformTransactionManager 類中,源碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public final void commit(TransactionStatus status) throws TransactionException {
    // 這里省略很多代碼,如事務回滾......
        processCommit(defStatus);
}
 
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
        prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
          // 提交事務
                    doCommit(status);
                }
                //......
            } catch (......) {
                // 事務異常處理......
            }
 
            
            try {
        // 事務提交成功后的處理-----這里是重點
                triggerAfterCommit(status);
            } finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }
        }
        finally {
            cleanupAfterCompletion(status);
    }
}
 
private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

最終會走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中

?
1
2
3
4
5
6
7
8
9
10
11
public static void triggerAfterCommit() {
  invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
 
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
    synchronization.afterCommit();
      }
  }
}

上面會把緩存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按順序來執行 afterCommit 方法,其中 TransactionSynchronization 以集合形式緩存在 TransactionSynchronizationManager 的 ThreadLocal 中。

3.1、方式一

經過上面分析,只需要代碼中重新生成個 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解決方案,如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 當前存在事務
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            // 當前不存在事務
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

3.2、方式二

使用 @TransactionalEventListener 結合 Spring事件監聽機制,該注解自從Spring4.2版本開始有的,如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}
 
// 監聽器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}
 
@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自動被注入到容器中,采用Autowired即可獲取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  
  @Transactional
  public Long test() {
    // ......
    // 插入記錄
    Long studentId = studentService.insert(student);
    // 發布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入學生地址記錄
    Long addressId = addressService.insert(address);
    return studentId;
  }
}

原理分析

Spring Bean在加載配置文件時,會使用 AnnotationDrivenBeanDefinitionParser 來解析 annotation-driven 標簽,如下:

?
1
2
3
4
5
6
7
8
9
public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
    @Override
    public void init() {
        registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
        registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
    }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  
  @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重點——將TransactionalEventListenerFactory加入到容器中
        registerTransactionalEventListenerFactory(parserContext);
        String mode = element.getAttribute("mode");
        if ("aspectj".equals(mode)) {
            // mode="aspectj"
            registerTransactionAspect(element, parserContext);
        }
        else {
            // mode="proxy"
            AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
        }
        return null;
    }
  
  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
        RootBeanDefinition def = new RootBeanDefinition();
        def.setBeanClass(TransactionalEventListenerFactory.class);
        parserContext.registerBeanComponent(new BeanComponentDefinition(def,
                TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
    }
}
?
1
2
3
4
5
6
7
8
9
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  
  //省略部分代碼......
  
    @Override
    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
    }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
  
   //省略部分代碼......
  
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事務存在時,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存集合中
            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
        } else if (this.annotation.fallbackExecution()) {
            //.......
            }
            processEvent(event);
        } else {
            // 當前不存在事務什么也不做
        }
    }

上述 @TransactionalEventListener 本質上是一個 @EventListener,TransactionalEventListenerFactory類會將每一個掃描到的方法有TransactionalEventListener注解包裝成ApplicationListenerMethodTransactionalAdapter對象,通過ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若當前存在事務,就會生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 結合 Spring事件監聽機制,并使用到異步方式感覺有點別扭,這里是為了說明問題)。

四、使用案例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
 
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
 
    // 數據庫代碼...
 
    // 異步代碼
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                // 異步業務
                execApiCache(api);
            });
    }});
 
    return ResultUtil.buildSucc();
}

Ps:setDaemon(false) 注意這里守護線程標記必須設置為 false,否則主線程執行完,異步線程沒執行完的話,異步線程會馬上被中斷、關閉,所以這里不能設置成守護(用戶)線程。

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。

原文鏈接:https://lux-sun.blog.csdn.net/article/details/109366633

延伸 · 閱讀

精彩推薦
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
主站蜘蛛池模板: 国产99久久精品一区二区 | 国产精品嘿咻嘿咻在线播放 | 成人一级视频在线观看 | 中文字幕一区2区 | 国产69精品久久久久久野外 | 国产深夜福利视频在线播放 | 久久久久久久久久性 | 久久96国产精品久久久 | 亚洲视频综合网 | 天天看夜夜爽 | 精品一区二区免费视频视频 | 久久亚洲成人 | 久久精品视频日本 | 制服下着マ○コ航空5 | 国产精品久久久久久影视 | 性爱在线免费视频 | 国产福利视频在线观看 | 一级在线观看视频 | 另类亚洲孕妇分娩网址 | 亚洲一区二区中文 | 干色视频| 黄免费在线| av日韩一区二区三区 | 草久影视 | 欧美三级一级 | 特级西西444www大精品视频免费看 | 午夜色片 | 欧美一级黄 | 国产一区二区三区四区波多野结衣 | 国产精品久久久久久久午夜片 | 欧美精品成人一区二区三区四区 | 日韩中字幕 | 国产在线看一区 | 一区二区三区视频在线观看 | 精品亚洲免费 | 久久国产精品区 | 91久久久国产精品 | 国产成人精品区 | 91午夜少妇三级全黄 | 亚洲卡通动漫在线观看 | 日本高清一级片 |