激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術(shù)及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 使用RocketMQTemplate發(fā)送帶tags的消息

使用RocketMQTemplate發(fā)送帶tags的消息

2021-09-24 11:51wgslucky Java教程

這篇文章主要介紹了使用RocketMQTemplate發(fā)送帶tags的消息,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

RocketMQTemplate發(fā)送帶tags的消息

RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的個方便發(fā)送消息的模板類,它是基本Spring 的消息機制實現(xiàn)的,對外只提供了Spring抽象出來的消息發(fā)送接口。

在單獨使用RocketMQ的時候,發(fā)送消息使用的Message是‘org.apache.rocketmq.common.message'包下面的Message,而使用RocketMQTemplate發(fā)送消息時,使用的Message是org.springframework.messaging的Message,猛一看,沒辦法發(fā)送帶tags的消息了,其實在RocketMQ集成的時候已經(jīng)解決了這個問題。

在RocketMQTemplate發(fā)送消息時,調(diào)用的方法是:

  1. public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
  2. if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  3. log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
  4. throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  5. }
  6. try {
  7. long now = System.currentTimeMillis();
  8. //在這里對消息進行了轉(zhuǎn)化,將Spring的message轉(zhuǎn)化為rocketmq自己的message
  9. org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
  10. charset, destination, message);
  11. SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
  12. long costTime = System.currentTimeMillis() - now;
  13. log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  14. return sendResult;
  15. } catch (Exception e) {
  16. log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
  17. throw new MessagingException(e.getMessage(), e);
  18. }
  19. }

在上面的代碼中,對消息進行了轉(zhuǎn)化,將Spring的message轉(zhuǎn)化為rocketmq自己的message,在RocketMQUtil.convertToRocketMessage方法中有個地方就是獲取tags的:

  1. String[] tempArr = destination.split(":", 2);
  2. String topic = tempArr[0];
  3. String tags = "";
  4. if (tempArr.length > 1) {
  5. tags = tempArr[1];
  6. }

所以,在發(fā)送消息的時候,我們只要把tags使用":"添加到topic后面就可以了。

例如:xxxx:tag1 || tag2 || tag3

使用RocketMQ 處理消息

消息發(fā)送(生產(chǎn)者)

以maven + SpringBoot 工程為例,先在pom.xml增加依賴

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.1</version>
  5. </dependency>

由于,這個依賴是一個starter,直接引入依賴就可以開始寫投遞消息的代碼了。這個starter注冊了一個叫org.apache.rocketmq.spring.core.RocketMQTemplate的bean,用它就可以直接把消息投遞出去。 具體的API是這樣的

  1. XXXEvent xxxDto = new XXXEvent();
  2. Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
  3. String dest = String.format("%s:%s",topic-name","tag-name");
  4. //默認投遞:同步發(fā)送 不會丟失消息。如果在投遞成功后發(fā)生網(wǎng)絡(luò)異常,客戶端會認為投遞失敗而回滾本地事務
  5. this.rocketMQTemplate.send(dest, xxxDto);

這種投遞方式能保證投遞成功的消息不會丟失,但是不能保證投遞一定成功。假設(shè)一次調(diào)用的流程是這樣的

使用RocketMQTemplate發(fā)送帶tags的消息

如果在步驟3的時候發(fā)生錯誤,因為出錯mqClient會認為消息投遞失敗而把事務回滾。如果消息已經(jīng)被消費,那就會導致業(yè)務錯誤。我們可以用事務消息解決這個問題。

以帶事務方式投遞的消息,正常情況下的處理流程是這樣的

使用RocketMQTemplate發(fā)送帶tags的消息

出錯的時候是這樣的

使用RocketMQTemplate發(fā)送帶tags的消息

由于普通消息沒有消息回查,普通消息用的producer不支持回查操作,不同業(yè)務的回查處理也不一樣,事務消息需要使用單獨的producer。消息發(fā)送代碼大概是這樣的

  1. //調(diào)用這段代碼之前別做會影響數(shù)據(jù)的操作
  2. XXXEvent xxxDto = new XXXEvent();
  3. Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
  4. String dest = String.format("%s:%s",topic-name","tag-name");
  5. TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction("poducer-name","topic-name:tag-name",message,"xxxid");
  6. if (LocalTransactionState.ROLLBACK_MESSAGE.equals(transactionSendResult.getLocalTransactionState())){
  7. throw new RuntimeException("事務消息投遞失敗");
  8. }
  9. //按照RocketMQ的寫法,這個地方不應該有別的代碼
  1. @RocketMQTransactionListener(txProducerGroup = "producer")
  2. class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  3.  
  4. //消息投遞成功后執(zhí)行的邏輯(半消息)
  5. //原文:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  6. @Override
  7. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  8. try{
  9. //
  10. xxxService.doSomething();
  11. return RocketMQLocalTransactionState.COMMIT;
  12. catch(IOException e){
  13. //不確定最終是否成功
  14. return RocketMQLocalTransactionState.UNKNOWN;
  15. }catch(Exception e){
  16. return RocketMQLocalTransactionState.ROLLBACK;
  17. }
  18. }
  19. //回查事務執(zhí)行狀態(tài)
  20. @Override
  21. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  22. Boolean result = xxxService.isSuccess(msg,arg);
  23. if(result != null){
  24. if(result){
  25. return RocketMQLocalTransactionState.COMMIT;
  26. }else{
  27. return RocketMQLocalTransactionState.ROLLBACK;
  28. }
  29. }
  30. return RocketMQLocalTransactionState.UNKNOWN;
  31. }
  32. }

處理消息(消費)

普通消息和事務消息的區(qū)別只在投遞的時候才明顯,對應的消費端代碼比較簡單

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.core.StringRedisTemplate;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. @RocketMQMessageListener(consumerGroup = "xxx-consumer", topic = "topic-name",selectorExpression = "tag-name")
  11. public class XXXEventMQListener implements RocketMQListener<XXXEvent> {
  12. private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s";
  13. @Autowired private StringRedisTemplate redisTemplate;
  14. @Override
  15. public void onMessage(XXXEvent message) {
  16. log.info("consumer message {}",message);
  17. //處理消息
  18. try{
  19. xxxService.doSomething(message);
  20. }catch(Exception ex){
  21. log.warn(String.format("message [%s] 消費失敗",message),ex);
  22. //拋出異常后,MQClient會返回ConsumeConcurrentlyStatus.RECONSUME_LATER,這條消息會再次嘗試消費
  23. throw new RuntimException(ex);
  24. }
  25. }
  26. }

RocketMQ用ACK機制保證NameServer知道消息是否被消費在

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer里是這么處理的

  1. public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
  2. @SuppressWarnings("unchecked")
  3. @Override
  4. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  5. for (MessageExt messageExt : msgs) {
  6. log.debug("received msg: {}", messageExt);
  7. try {
  8. long now = System.currentTimeMillis();
  9. rocketMQListener.onMessage(doConvertMessage(messageExt));
  10. long costTime = System.currentTimeMillis() - now;
  11. log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  12. } catch (Exception e) {
  13. log.warn("consume message failed. messageExt:{}", messageExt, e);
  14. context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  15. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  16. }
  17. }
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. }

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持我們。

原文鏈接:https://blog.csdn.net/youxijishu/article/details/105042136

延伸 · 閱讀

精彩推薦
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程xml與Java對象的轉(zhuǎn)換詳解

    xml與Java對象的轉(zhuǎn)換詳解

    這篇文章主要介紹了xml與Java對象的轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下...

    Java教程網(wǎng)2942020-09-17
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發(fā)項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發(fā)現(xiàn)了對于集合操作轉(zhuǎn)換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關(guān)于Java8中S...

    阿杜7472021-02-04
  • Java教程Java實現(xiàn)搶紅包功能

    Java實現(xiàn)搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現(xiàn)搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經(jīng)有好久沒有升過級了。升級完畢重啟之后,突然發(fā)現(xiàn)好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關(guān)于小米推送Java代碼,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩(wěn)中求8032021-07-12
  • Java教程Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
主站蜘蛛池模板: 国产一国产一级毛片视频在线 | 高清久久久 | 在线成人毛片 | 在线成人免费观看www | 日本一区视频在线播放 | 久久91精品国产91久久yfo | 成人免费激情视频 | 欧美aaaaa一级毛片在线 | 成人久久免费 | 久久久婷婷一区二区三区不卡 | 久久久久久久免费看 | 激情欧美在线 | 91网址在线播放 | a一级黄色大片 | 高清在线国产 | 久久免费视频一区二区三区 | 久草在线视频福利 | 中文字幕综合在线观看 | 色的综合 | 久久精品亚洲一区二区 | 久久91亚洲精品久久91综合 | 毛片大全 | 国产18成人免费视频 | 91精品国产777在线观看 | 羞羞的小视频 | 成人做爰www免费看 成人午夜视频免费看 | 日产精品久久久一区二区开放时间 | 日日操夜夜操狠狠操 | 双性精h调教灌尿打屁股的文案 | 久久亚洲精品久久国产一区二区 | 亚洲男人的天堂在线视频 | 欧美一级免费在线观看 | 男女羞羞视频在线观看免费 | 国产99一区二区 | 国产精品久久999 | 亚洲国产精品一区二区久久 | 久久国产精品二国产精品中国洋人 | 日本a∨精品中文字幕在线 被啪羞羞视频在线观看 | 全黄裸片武则天艳史 | 日韩av电影在线免费观看 | 一级做受大片免费视频 |