激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - springboot中rabbitmq實現消息可靠性機制詳解

springboot中rabbitmq實現消息可靠性機制詳解

2022-01-18 16:52每一個不曾起舞的日子, Java教程

這篇文章主要介紹了springboot中rabbitmq實現消息可靠性機制詳解,本文通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下

1. 生產者模塊通過publisher confirm機制實現消息可靠性

 1.1 生產者模塊導入rabbitmq相關依賴

?
1
2
3
4
5
6
7
8
9
10
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--用于mq消息的序列化與反序列化-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

1.2 配置文件中進行mq的相關配置

?
1
2
3
4
5
6
7
8
9
spring.rabbitmq.host=10.128.240.183
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
        
spring.rabbitmq.publisher-confirm-type=correlated
        
spring.rabbitmq.publisher-returns=true
        
spring.rabbitmq.template.mandatory=true
  • publish-confirm-type:開啟publisher-confirm,有以下可選值

simple:同步等待confirm結果,直到超時
correlated:異步回調,定義ConfirmCallback。mq返回結果時會回調這個ConfirmCallback

  • publish-returns:開啟publish-return功能。可以定義ReturnCallback
  • template.mandatory: 定義消息路由失敗的策略

true:調用ReturnCallback
false:直接丟棄消息

1.3 定義ReturnCallback(消息投遞到隊列失敗觸發此回調)

  • 每個RabbitTemplate只能配置一個ReturnCallback。
  • 當消息投遞失敗,就會調用生產者的returnCallback中定義的處理邏輯
  • 可以在容器啟動時就配置這個回調
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發消息
        });
    }
}

1.4 定義ConfirmCallback(消息到達交換機觸發此回調)

可以為redisTemplate指定一個統一的確認回調

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發消息
        });
 
        
        // 設置統一的confirm回調。只要消息到達broker就ack=true
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("這是統一的回調");
                System.out.println("correlationData:" + correlationData);
                System.out.println("ack:" + b);
                System.out.println("cause:" + s);
            }
        });
    }
}

也可以為特定的消息定制回調

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Autowired
   private RabbitTemplate rabbitTemplate;
 
   @Test
   public void testmq() throws InterruptedException {
 
 
       CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 
       correlationData.getFuture().addCallback(result->{
           if (result.isAck()) {
               // ACK
               log.debug("消息成功投遞到交換機!消息ID: {}", correlationData.getId());
           } else {
               // NACK
               log.error("消息投遞到交換機失敗!消息ID:{}", correlationData.getId());
               // 重發消息
           }
       },ex->{
           // 記錄日志
           log.error("消息發送失敗!", ex);
           // 重發消息
       });
       rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData);
   }

2. 消費者模塊開啟消息確認

2.1 添加配置

?
1
2
# 手動ack消息,不使用默認的消費端確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • none:關閉ack,消息投遞時不可靠的,可能丟失
  • auto:類似事務機制,出現異常時返回nack,消息回滾到mq,沒有異常,返回
  • ackmanual:我們自己指定什么時候返回ack

2.2 manual模式在監聽器中自定義返回ack

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
 
    @Autowired
    private OrderService orderService;
 
 
    @RabbitHandler
    private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("收到過期的訂單信息,準備關閉訂單" + orderEntity.getOrderSn());
 
        try {
            orderService.closeOrder(orderEntity);
            // 第二個參數為false則表示僅確認此條消息。如果為true則表示對收到的多條消息同時確認
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 第二個參數為ture表示將這個消息重新加入隊列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

3. 消費者模塊開啟消息失敗重試機制

3.1 配置文件添加配置,開啟本地重試

?
1
2
3
4
5
6
7
8
9
10
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初識的失敗等待時長為1
          multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數
          stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
  • 開啟本地重試,如果消息處理過程總拋出異常,不會requeue到隊列,而是在消費者本地重試
  • 重試達到最大次數后,spring會返回ack,消息會被丟棄

4.  消費者模塊添加失敗策略(用于開啟失敗本地重試功能后)

  • 當開啟本地重試后,重試最大次數后消息直接丟棄。
  • 三種策略,都繼承于MessageRecovery接口
  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機

4.2 定義處理失敗消息的交換機和隊列 沒有會自動創建相應的隊列、交換機與綁定關系,有了就啥也不做

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
 
// 路由鍵為key
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

4.3 向容器中添加一個失敗策略組件

?
1
2
3
4
5
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    // error為路由鍵
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

到此這篇關于springboot中rabbitmq實現消息可靠性的文章就介紹到這了,更多相關springboot rabbitmq消息可靠性內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/weixin_44390164/article/details/120455793

延伸 · 閱讀

精彩推薦
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
主站蜘蛛池模板: 一区二区高清视频在线观看 | 国产精品高潮99久久久久久久 | 国产成人av免费观看 | 九九热精彩视频 | 久久精品一二三区 | 久久美女色视频 | 成人小视频在线播放 | 成人羞羞视频在线观看免费 | 成人在线视频国产 | 成人性视频免费网站下载软件 | 久久精品一级片 | 91精品国产福利尤物免费 | 欧美一级黄色网 | 欧美国产一区二区三区激情无套 | 99亚洲国产精品 | 男人的天堂色偷偷 | 亚洲第一男人天堂 | 一级@片 | 国产精品久久久久久婷婷天堂 | 久久毛片 | 久久久久久久久久久久免费 | 国产一级在线看 | 免费日本一区二区 | 美国av片在线观看 | 在线亚州 | 91 视频网站 | 毛片免费视频 | 黄在线看 | 日韩精品免费看 | 91久久国产综合精品女同国语 | 亚洲精品免费播放 | av免播放| 欧美视频一级 | 日本一区二区三区视频在线 | 黄色欧美精品 | 国产日韩久久久久69影院 | 黄色片网站免费 | 日韩一级网站 | 污视频在线免费 | 狠狠操夜夜爱 | 欧美不卡 |