激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

2022-03-04 00:37飄渺Jam Java教程

這篇文章主要介紹了為大家介紹了微服務(wù)架構(gòu)中RocketMQ進(jìn)階層面事務(wù)消息的原理詳解,有需要的朋友可以借鑒參考下希望能夠有所幫助

前言

分布式消息選型的時候是否支持事務(wù)消息是一個很重要的考量點(diǎn),而目前只有RocketMQ對事務(wù)消息支持的最好。今天我們來嘮嘮如何實(shí)現(xiàn)RocketMQ的事務(wù)消息!

Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來實(shí)現(xiàn)了提交事務(wù)消息,同時增加一個補(bǔ)償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

 

RocketMQ事務(wù)流程概要

RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:

正常事務(wù)發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費(fèi)者暫時不能消費(fèi)的消息)
2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功
3、開始執(zhí)行本地事務(wù)
4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作

事務(wù)信息的補(bǔ)償流程
1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認(rèn)回查的操作請求
2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作
補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。

 

RocketMQ事務(wù)流程關(guān)鍵

1、事務(wù)消息在一階段對用戶不可見
事務(wù)消息相對普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對用戶是不可見的,也就是說消費(fèi)者不能直接消費(fèi)。這里RocketMQ的實(shí)現(xiàn)方法是原消息的主題與消息消費(fèi)隊列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC ,這樣由于消費(fèi)者沒有訂閱這個主題,所以不會被消費(fèi)。

2、如何處理第二階段的失敗消息?
在本地事務(wù)執(zhí)行完成后會向MQServer發(fā)送Commit或Rollback操作,此時如果在發(fā)送消息的時候生產(chǎn)者出故障了,那么要保證這條消息最終被消費(fèi),MQServer會像服務(wù)端發(fā)送回查請求,確認(rèn)本地事務(wù)的執(zhí)行狀態(tài)。
當(dāng)然了rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。

3、消息狀態(tài) 事務(wù)消息有三種狀態(tài):

TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息

TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。

TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。

 

實(shí)現(xiàn)

我們構(gòu)建這樣一個需求:用戶請求訂單微服務(wù) order-service 接口刪除訂單(退貨),刪除訂單后需要發(fā)送消息給用戶服務(wù)account-service,用戶微服務(wù)收到消息后會給用戶賬戶增加余額。這個需求跟錢相關(guān),肯定要保證消息的事務(wù)性,接下來我們根據(jù)上面的原理實(shí)現(xiàn)整個流程。

基礎(chǔ)配置

生產(chǎn)者order-servcie和account-service都要引入RocketMQ相關(guān)依賴,增加RocketMQ的相關(guān)配置

引入組件

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

添加配置

# within rocketmq
rocketmq:
name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876
producer:
  group: cloud-group

發(fā)送半消息

order-service在執(zhí)行刪除訂單操作時發(fā)送一條半消息給MQServer,發(fā)送半消息主要是使用rocketMQTemplate.sendMessageInTransaction() 方法,發(fā)送事務(wù)消息。

@Override
public void delete(String orderNo) {
	Order order = orderMapper.selectByNo(orderNo);
	//如果訂單存在且狀態(tài)為有效,進(jìn)行業(yè)務(wù)處理
	if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
		String transactionId = UUID.randomUUID().toString();
		//如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶中心消費(fèi)消息
		rocketMQTemplate.sendMessageInTransaction("add-amount",
				MessageBuilder.withPayload(
						UserAddMoneyDTO.builder()
								.userCode(order.getAccountCode())
								.amount(order.getAmount())
								.build()
				)
				.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
				.setHeader("order_id",order.getId())
				.build()
				,order
		);
	}
}

首先先校驗一下訂單狀態(tài),然后發(fā)送消息給MQServer,這個邏輯大家都看得懂,

主要是關(guān)注sendMessageInTransaction() 方法,源碼如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
	try {
		if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
			throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
		} else {
			org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
			return this.producer.sendMessageInTransaction(rocketMsg, arg);
		}
	} catch (MQClientException var5) {
		throw RocketMQUtil.convert(var5);
	}
}

該方法有三個參數(shù):

destination:目的地(主題),這里發(fā)送給add-amount 這個主題

message:發(fā)送給消費(fèi)者的消息體,需要使用 MessageBuilder.withPayload() 來構(gòu)建消息

arg:參數(shù)

注意,這里我們生成了一個transactionId,并放在header中跟消息一起發(fā)送(這里實(shí)際也可以構(gòu)造成一個對象,放在arg里進(jìn)行發(fā)送),作用后面再講!

執(zhí)行本地事務(wù)與回查

MQServer收到半消息后會告訴生產(chǎn)者order-service確認(rèn)收到半消息,這時候order-service需要執(zhí)行本地事務(wù),執(zhí)行完本地事務(wù)后再告訴MQServer本地事務(wù)的執(zhí)行狀態(tài),確認(rèn)消息究竟是Commit還是Rollback。如果在告訴MQServer本地執(zhí)行狀態(tài)的時候出異常了還需要讓MQServer能夠回查到,怎么實(shí)現(xiàn)這一些列操作呢?

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事務(wù)監(jiān)聽器,這個接口類的實(shí)現(xiàn)如下:

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

第一個方法executeLocalTransaction 為執(zhí)行本地事務(wù);
第二個方法checkLocalTransaction 為檢查本地事務(wù)的執(zhí)行狀態(tài),也就是回查動作。
有了這個接口類我們的執(zhí)行邏輯清楚了,但是還有個問題:本地事務(wù)已經(jīng)執(zhí)行完成了,怎么去回查本地事務(wù)的執(zhí)行結(jié)果呢?

我們可以在執(zhí)行本地事務(wù)的時候同時生成一個事務(wù)日志,讓本地事務(wù)與日志事務(wù)在同一個方法中,同時添加@Transactional 注解,保證兩個操作事務(wù)是一個原子操作。這樣如果事務(wù)日志表中有這個本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒有對應(yīng)的事務(wù)日志,則表示沒執(zhí)行成功,需要Rollback

思路既然理順了,咱們就開擼。

首先創(chuàng)建一個日志表

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

很簡單的三個字段,主要是這個事務(wù)id,需要根據(jù)這個事務(wù)id回查事務(wù),還記得我們在發(fā)送半消息時生成的事務(wù)id嗎,就是干這個用的!

在生產(chǎn)者編寫方法實(shí)現(xiàn)RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
  private final OrderService orderService;
  private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
  /**
   * 執(zhí)行本地事務(wù)
   */
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
      log.info("執(zhí)行本地事務(wù)");
      MessageHeaders headers = message.getHeaders();
      //獲取事務(wù)ID
      String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
      Integer orderId = Integer.valueOf((String)headers.get("order_id"));
      log.info("transactionId is {}, orderId is {}",transactionId,orderId);

      try{
          //執(zhí)行本地事務(wù),并記錄日志
          orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
          //執(zhí)行成功,可以提交事務(wù)
          return RocketMQLocalTransactionState.COMMIT;
      }catch (Exception e){
          return RocketMQLocalTransactionState.ROLLBACK;
      }
  }

  /**
   * 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
   */
  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

      MessageHeaders headers = message.getHeaders();
      //獲取事務(wù)ID
      String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
      log.info("檢查本地事務(wù),事務(wù)ID:{}",transactionId);
      //根據(jù)事務(wù)id從日志表檢索
      QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
      queryWrapper.eq("transaction_id",transactionId);
      RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
      if(null != rocketmqTransactionLog){
          return RocketMQLocalTransactionState.COMMIT;
      }
      return RocketMQLocalTransactionState.ROLLBACK;
  }
}

執(zhí)行本地事務(wù)的方法

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
  //將訂單狀態(tài)置位無效
	orderMapper.changeStatus(id,status);
  //插入事務(wù)表
	rocketMqTransactionLogMapper.insert(
			RocketmqTransactionLog.builder()
					.transactionId(transactionId)
					.log("執(zhí)行刪除訂單操作")
			.build()
	);
}

這一塊的代碼邏輯都是在生產(chǎn)端,即Order-Server,大家不要搞錯了

消費(fèi)消息

Rollback的消息MQServer會給我們處理,我們只要關(guān)注Commit狀態(tài)時消費(fèi)端可以正常消費(fèi)即可。在account-service監(jiān)聽消息,如果收到消息則給用戶賬戶增加余額。

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
  private final AccountMapper accountMapper;
  /**
   * 收到消息的業(yè)務(wù)邏輯
   */
  @Override
  public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
      log.info("received message: {}",userAddMoneyDTO);
      accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
      log.info("add money success");
  }
}

 

測試

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

訂單表有這樣一條記錄,用戶為jianzh5,amount為200

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

用戶表的記錄,執(zhí)行完成后jianzh5的賬戶應(yīng)該變成250

調(diào)用刪除訂單接口,刪除訂單

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

發(fā)送半消息

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

執(zhí)行本地事務(wù),并生成事務(wù)日志

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

模擬異常情況 在發(fā)送Commit消息的時候我們用命令殺掉進(jìn)程taskkill /pid 19748 -t -f,模擬異常!

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

重新啟動order-service,查看是否會執(zhí)行回查動作

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

MQServer進(jìn)行回查,檢查事務(wù)日志,判斷是否可以提交事務(wù)

消費(fèi)者消費(fèi)事務(wù)消息,保證事務(wù)的一致性

微服務(wù)架構(gòu)設(shè)計RocketMQ進(jìn)階事務(wù)消息原理詳解

 

總結(jié)

使用RocketMQ實(shí)現(xiàn)事務(wù)消息的過程還是很復(fù)雜的,需要好好理解開頭的那張圖,只有理解了事務(wù)消息的交互過程才能編寫相應(yīng)的代碼!

好了,各位朋友們,本期的內(nèi)容到此就全部結(jié)束啦,能看到這里的同學(xué)都是優(yōu)秀的同學(xué),下一個升職加薪的就是你了!

以上就是微服務(wù)架構(gòu)RocketMQ進(jìn)階事務(wù)消息原理詳解的詳細(xì)內(nèi)容,更多關(guān)于微服務(wù)架構(gòu)RocketMQ事務(wù)消息的資料請關(guān)注服務(wù)器之家其它相關(guān)文章!

原文鏈接:https://jianzh5.blog.csdn.net/article/details/105283647

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 热99在线视频 | av在线免费观看网址 | 亚洲成人第一区 | 成年人高清视频在线观看 | 免费国产视频在线观看 | 国产成人av在线播放 | 日本精品久久久久久草草 | 一级外国毛片 | 免费一级特黄毛片 | 黄色大片在线观看 | 国产资源在线观看视频 | 久久精品性视频 | 欧日韩在线视频 | 亚洲视频在线一区二区 | 国产精品一区网站 | 一级做a爱视频 | 欧美成人午夜一区二区三区 | 国产精品一区2区3区 | 日韩电影av在线 | 成人免费电影在线观看 | 一区二区三区欧美在线观看 | 精品无吗乱吗av国产爱色 | 国产亚洲精品久久久久久久久久 | 成人免费看视频 | 黄色99视频| 99精品视频在线看 | 国产免费v片| 久久69精品久久久久久国产越南 | 欧美重口另类videos人妖 | 性欧美性欧美 | 欧美成网站 | 国产午夜亚洲精品午夜鲁丝片 | 国产免费视频一区二区裸体 | 精品一区二区三区中文字幕老牛 | 久久久久久久久久亚洲 | 国产成人自拍小视频 | 中国hdxxxx护士爽在线观看 | 特黄一区二区三区 | 曰批全过程120分钟免费69 | 亚洲第五色综合网 | 国产无区一区二区三麻豆 |