1. 生產者模塊通過publisher confirm機制實現消息可靠性
1.1 生產者模塊導入rabbitmq相關依賴
1
2
3
4
5
6
7
8
9
10
|
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用于mq消息的序列化與反序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> |
1.2 配置文件中進行mq的相關配置
1
2
3
4
5
6
7
8
9
|
spring.rabbitmq.host= 10.128 . 240.183 spring.rabbitmq.port= 5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns= true spring.rabbitmq.template.mandatory= true |
- publish-confirm-type:開啟publisher-confirm,有以下可選值
simple:同步等待confirm結果,直到超時
correlated:異步回調,定義ConfirmCallback。mq返回結果時會回調這個ConfirmCallback
- publish-returns:開啟publish-return功能。可以定義ReturnCallback
- template.mandatory: 定義消息路由失敗的策略
true:調用ReturnCallback
false:直接丟棄消息
1.3 定義ReturnCallback(消息投遞到隊列失敗觸發此回調)
- 每個RabbitTemplate只能配置一個ReturnCallback。
- 當消息投遞失敗,就會調用生產者的returnCallback中定義的處理邏輯
- 可以在容器啟動時就配置這個回調
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate. class ); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0 ) { // 是一個延遲消息,忽略這個錯誤提示 return ; } // 記錄日志 log.error( "消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}" , replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發消息 }); } } |
1.4 定義ConfirmCallback(消息到達交換機觸發此回調)
可以為redisTemplate指定一個統一的確認回調
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate. class ); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0 ) { // 是一個延遲消息,忽略這個錯誤提示 return ; } // 記錄日志 log.error( "消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}" , replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發消息 }); // 設置統一的confirm回調。只要消息到達broker就ack=true rabbitTemplate.setConfirmCallback( new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println( "這是統一的回調" ); System.out.println( "correlationData:" + correlationData); System.out.println( "ack:" + b); System.out.println( "cause:" + s); } }); } } |
也可以為特定的消息定制回調
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testmq() throws InterruptedException { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result->{ if (result.isAck()) { // ACK log.debug( "消息成功投遞到交換機!消息ID: {}" , correlationData.getId()); } else { // NACK log.error( "消息投遞到交換機失敗!消息ID:{}" , correlationData.getId()); // 重發消息 } },ex->{ // 記錄日志 log.error( "消息發送失敗!" , ex); // 重發消息 }); rabbitTemplate.convertAndSend( "example.direct" , "blue" , "hello,world" ,correlationData); } |
2. 消費者模塊開啟消息確認
2.1 添加配置
1
2
|
# 手動ack消息,不使用默認的消費端確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual |
- none:關閉ack,消息投遞時不可靠的,可能丟失
- auto:類似事務機制,出現異常時返回nack,消息回滾到mq,沒有異常,返回
- ackmanual:我們自己指定什么時候返回ack
2.2 manual模式在監聽器中自定義返回ack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@RabbitListener (queues = "order.release.order.queue" ) @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println( "收到過期的訂單信息,準備關閉訂單" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); // 第二個參數為false則表示僅確認此條消息。如果為true則表示對收到的多條消息同時確認 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { // 第二個參數為ture表示將這個消息重新加入隊列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true ); } } } |
3. 消費者模塊開啟消息失敗重試機制
3.1 配置文件添加配置,開啟本地重試
1
2
3
4
5
6
7
8
9
10
|
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費者失敗重試 initial-interval: 1000 # 初識的失敗等待時長為 1 秒 multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval max-attempts: 3 # 最大重試次數 stateless: true # true 無狀態; false 有狀態。如果業務中包含事務,這里改為 false |
- 開啟本地重試,如果消息處理過程總拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達到最大次數后,spring會返回ack,消息會被丟棄
4. 消費者模塊添加失敗策略(用于開啟失敗本地重試功能后)
- 當開啟本地重試后,重試最大次數后消息直接丟棄。
- 三種策略,都繼承于MessageRecovery接口
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機
4.2 定義處理失敗消息的交換機和隊列 沒有會自動創建相應的隊列、交換機與綁定關系,有了就啥也不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange( "error.direct" ); } @Bean public Queue errorQueue(){ return new Queue( "error.queue" , true ); } // 路由鍵為key @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with( "error" ); } |
4.3 向容器中添加一個失敗策略組件
1
2
3
4
5
|
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ // error為路由鍵 return new RepublishMessageRecoverer(rabbitTemplate, "error.direct" , "error" ); } |
到此這篇關于springboot中rabbitmq實現消息可靠性的文章就介紹到這了,更多相關springboot rabbitmq消息可靠性內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/weixin_44390164/article/details/120455793