激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術(shù)及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

2022-03-08 00:52buguge Java教程

當項目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。這篇文章主要介紹了springboot+rabbitmq實現(xiàn)指定消費者才能消費,需要的朋友可以參考下

如何保證mq隊列里的消息只被測試服務器上的consumer消費,避免本地環(huán)境誤消費?

程序里有一個應用場景使用到了rabbitmq――當財務確認收到企業(yè)的打款金額后,系統(tǒng)會把企業(yè)訂單生成用戶付款單。由于訂單記錄數(shù)據(jù)量大,改為通過mq來異步實現(xiàn)。即財務確認收款操作后,將企業(yè)訂單數(shù)據(jù)放入mq,另一端監(jiān)聽mq消息隊列,將收到的企業(yè)訂單加工轉(zhuǎn)換成用戶付款單,并做持久化。

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

本地開發(fā)環(huán)境與測試環(huán)境共用一套rabbitmq。當項目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。

當然,首先要排查程序,檢查Consumer的數(shù)據(jù)處理的邏輯是否有bug。單元測試后,發(fā)現(xiàn)并不存在測試環(huán)境的bug。

原來,消息隊列被“非正常”消費了!

Q: 什么情況?

A: 幾個伙伴一起參與的項目,大家總是要調(diào)試自己的程序的。而如果碰巧本地程序監(jiān)聽到消息隊列里有消息,那么,消息就被本地程序消費掉了。問題正是出現(xiàn)在這里!――――團隊開發(fā),大家并不會及時檢出git上最新的程序版本。如果本地的程序版本不是最新的正確的版本,勢必會出現(xiàn)bug。

那么,怎么辦?

每次你改了邏輯,告訴大家獲取最新?

不現(xiàn)實,約定的東西往往不奏效的。

如何保證mq隊列里的消息只被測試服務器上的consumer消費,避免本地環(huán)境誤消費? 或者說,如何實現(xiàn)消息的定向消費呢?

只要肯琢磨,辦法總比困難多!百思可得解!

我們知道,rabbitmq手動ack模式。這還不夠,因為我們怎么讓consumer來決定是否消費呢? 所以,我們需要一個標識――――producer設(shè)定一個標識,consumer如果匹配這個標識,則消費,否則予以reject放回消息隊列。

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

通過查看spring-rabbit/spring-amqp的代碼,發(fā)現(xiàn)可以在spring-amqp里的MessageProperties上做文章。生產(chǎn)者與消費者每次消息傳輸都會攜帶一個MessageProperties,通常我們是不指定的,走MessageProperties的默認設(shè)置值。

我的策略:MessageProperties有一個屬性叫AppId。我們程序所部署的測試機器就一臺,即消息Producer和消息Consumer在一臺機器上。那么,我就可以利用機器的IP來識別消息。只有Producer與Consumer的IP匹配,才消費消息。程序員本機IP與測試服務器IP不一樣,就會拒絕接收消息,會把消息重新放回消息隊列,等待測試服務器的Consumer消費。

話不多說,上代碼吧,

生產(chǎn)者代碼:

package com.sboot.mq;

import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;

public class MQProducerTest extends BaseTest {
  @Autowired
  RabbitTemplate rabbitTemplate;

  @Test
  public void test() throws Exception {
      for (int i = 1; i <= 5; i++) {
          MessageProperties messageProperties = new MessageProperties();
          String ip = InetAddress.getLocalHost().getHostAddress();
          messageProperties.setAppId(ip);
//            messageProperties.setUserId(String.valueOf(i));
          MessageConverter messageConverter = new SimpleMessageConverter();
          String msg = UUID.randomUUID().toString();
//            System.out.println(msg);
          Message message1 = messageConverter.toMessage(msg, messageProperties);
          rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
          System.out.println("入隊完成");
          Thread.sleep(500L);
      }
  }
}

消費者手動ACK,要實現(xiàn)ChannelAwareMessageListener接口,感知rabbitmq.client.Channel實例,調(diào)用channel的basicAck、basicReject等方法:

package com.sboot.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.net.InetAddress;

@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {

  @RabbitHandler
  @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
      Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());

      long tag = message.getMessageProperties().getDeliveryTag();
      String appId = message.getMessageProperties().getAppId();
      log.info("{}-{}, 消息出隊", tag, appId);
      String receiveMsg = "";
      try {
          //核對標識,決定是否消費消息
          String ip = InetAddress.getLocalHost().getHostAddress();
          if (!ip.equals(appId)) {
              log.info("這不是我需要的消息。放回隊列。{}", receiveMsg);
//                channel.basicNack(tag, false, true);
              channel.basicReject(tag, true);
//                channel.basicRecover(true);
              return;
          }

          MessageConverter messageConverter = new SimpleMessageConverter();
          receiveMsg = String.valueOf(messageConverter.fromMessage(message));
          。。。。在這里消費消息
          log.info("success " + receiveMsg);
          channel.basicAck(tag, false);

      } catch (Exception e) {
          log.error("receive message has an error, ", e);
          channel.basicNack(tag, false, true);
      }
  }

}

說明一下依賴的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解沒有ackMode。

解決本案問題過程中的花絮:

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

@RabbitListener的ackMode的值見枚舉org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自動消費 autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手動消費,Consumer端必須顯式調(diào)用ack或nack)AUTO --

springboot+rabbitmq實現(xiàn)指定消費者才能消費的方法

設(shè)置了手動消費,上文消費端的deliveryTag會是不同的long值。自動消費的deliveryTag是重復的1和2這樣的。并且,自動消費時,如果要使用channel的ack/nack,會報異常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

到此這篇關(guān)于springboot+rabbitmq實現(xiàn)指定消費者才能消費的文章就介紹到這了,更多相關(guān)springboot rabbitmq消費內(nèi)容請搜索服務器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/buguge/p/13183980.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 免费观看一区二区三区 | 亚洲午夜一区二区三区 | 欧美1区2区在线观看 | 成人黄色短视频在线观看 | 精品中文字幕视频 | 欧美老逼 | 18视频在线观看娇喘 | 中文字幕在线观看网址 | 国产欧美日韩在线不卡第一页 | 亚洲视频在线网 | 96视频在线免费观看 | 欧美日韩国产一区二区三区在线观看 | 成人午夜视频免费在线观看 | 深夜精品福利 | 99久久久精品免费观看国产 | 草久在线观看视频 | 一夜新娘第三季免费观看 | 视频国产一区二区 | 久久久综 | 久久久www成人免费毛片 | 成人情欲视频在线看免费 | 亚洲影视中文字幕 | 爱逼av | 欧美精品黄色 | 日韩av在线影院 | 港台三级在线观看 | 国产精品久久久久久久久久电影 | 天堂福利电影 | 视频一区二区三区在线播放 | 午夜a狂野欧美一区二区 | 亚洲一区二区三区精品在线观看 | 全黄性性激高免费视频 | 成人免费在线观看视频 | 国产一级毛片av | 久久精品首页 | 精品国产一区二区三区在线 | 久久精品久 | 午夜爽爽爽男女免费观看hd | 在线91视频 | 精国品产一区二区三区有限公司 | 国产精品久久久久久久久久东京 |