激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - 淺談使用java實現(xiàn)阿里云消息隊列簡單封裝

淺談使用java實現(xiàn)阿里云消息隊列簡單封裝

2021-04-09 11:45狂盜一枝梅 JAVA教程

這篇文章主要介紹了淺談使用java實現(xiàn)阿里云消息隊列簡單封裝,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

一、前言

最近公司有使用阿里云消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調(diào)用方式以方便內(nèi)部系統(tǒng)的調(diào)用,現(xiàn)在已經(jīng)完成,特此記錄其中過程和使用到的相關技術,與君共勉。

現(xiàn)在阿里云提供了兩種消息服務:mns服務和ons服務,其中我認為mns是簡化版的ons,而且mns的消息消費需要自定義輪詢策略的,相比之下,ons的發(fā)布與訂閱模式功能更加強大(比如相對于mns,ons提供了消息追蹤、日志、監(jiān)控等功能),其api使用起來更加方便,而且聽聞阿里內(nèi)部以后不再對mns進行新的開發(fā),只做維護,ons服務則會逐步替代mns服務成為阿里消息服務的主打產(chǎn)品,所以,如果有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。

涉及到的技術:Spring,反射、動態(tài)代理、Jackson序列化和反序列化

在看下面的文章之前,需要先看上面的文檔以了解相關概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發(fā)送和接收代碼實現(xiàn)。

該博文只針對有消息隊列知識基礎的朋友看,能幫上大家的忙我自然很高興,看不懂的也不要罵,說明你路子不對。

二、設計方案

1.消息發(fā)送

在一個簡單的cs架構(gòu)中,假設server會監(jiān)聽一個Topic的Producer發(fā)送的消息,那么它首先應該提供client一個api,client只需要簡單的調(diào)用該api,就可以通過producer來生產(chǎn)消息

2.消息接收

由于api是server制定的,所以server當然也知道如何消費這些消息

在這個過程中,server實際充當著消費者的角色,client實際充當著生產(chǎn)者的角色,但是生產(chǎn)者生產(chǎn)消息的規(guī)則則由消費者制定以滿足消費者消費需求。

3.最終目標

我們要創(chuàng)建一個單獨的jar包,起名為queue-core為生產(chǎn)者和消費者提供依賴和發(fā)布訂閱的具體實現(xiàn)。

三、消息發(fā)送

1.消費者提供接口

java" id="highlighter_443537">
?
1
2
3
4
5
6
7
8
9
@Topic(name="kdyzm",producerId="kdyzm_producer")
public interface UserQueueResource {
  
  @Tag("test1")
  public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);
  
  @Tag("test2")
  public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);
}

由于Topic和producer之間是N:1的關系,所以這里直接將producerId作為Topic的一個屬性;Tag是一個很關鍵的過濾條件,消費者通過它進行消息的分類做不同的業(yè)務處理,所以,這里使用Tag作為路由條件。

2.生產(chǎn)者使用消費者提供的api發(fā)送消息

由于消費者只提供了接口給生產(chǎn)者使用,接口是沒有辦法直接使用的,因為沒有辦法實例化,這里使用動態(tài)代理生成對象,在消費者提供的api中,添加如下config,以方便生產(chǎn)者直接導入config即可使用,這里使用了基于java的spring config,請知悉。

?
1
2
3
4
5
6
7
8
9
@Configuration
public class QueueConfig {
 
  @Autowired
  @Bean
  public UserQueueResource userQueueResource() {
    return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class);
  }
}

3.queue-core對生產(chǎn)者發(fā)送消息的封裝

以上1中所有的注解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中注解的定義只是定義了規(guī)則,真正的實現(xiàn)實際上是在QueueResourceFactory中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.wy.queue.core.api.MQConnection;
import com.wy.queue.core.utils.JacksonSerializer;
import com.wy.queue.core.utils.MQUtils;
import com.wy.queue.core.utils.QueueCoreSpringUtils;
 
public class QueueResourceFactory implements InvocationHandler {
 
  private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class);
  
  private String topicName;
 
  private String producerId;
  
  private JacksonSerializer serializer=new JacksonSerializer();
  
  private static final String PREFIX="PID_";
  
  public QueueResourceFactory(String topicName,String producerId) {
    this.topicName = topicName;
    this.producerId=producerId;
  }
 
  public static <T> T createProxyQueueResource(Class<T> clazz) {
    String topicName = MQUtils.getTopicName(clazz);
    String producerId = MQUtils.getProducerId(clazz);
    T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(),
        new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId));
    return target;
  }
 
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if(args.length == 0 || args.length>1){
      throw new RuntimeException("only accept one param at queueResource interface.");
    }
    String tagName=MQUtils.getTagName(method);
    ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class);
    MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    
    Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId);
    
    //發(fā)送消息
    Message msg = new Message( //
        // 在控制臺創(chuàng)建的 Topic,即該消息所屬的 Topic 名稱
        connectionInfo.getPrefix()+"_"+topicName,
        // Message Tag,
        // 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
        tagName,
        // Message Body
        // 任何二進制形式的數(shù)據(jù), MQ 不做任何干預,
        // 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式
        serializer.serialize(args[0]).getBytes());
    SendResult sendResult = producer.send(msg);
    logger.info("Send Message success. Message ID is: " + sendResult.getMessageId());
    return null;
  
}

這里特意將自定義包和第三方使用的包名都貼過來了,以便于區(qū)分。

這里到底做了哪些事情呢?

發(fā)送消息的過程就是動態(tài)代理創(chuàng)建一個代理對象,該對象調(diào)用方法的時候會被攔截,首先解析所有的注解,比如topicName、producerId、tag等關鍵信息從注解中取出來,然后調(diào)用阿里sdk發(fā)送消息,過程很簡單,但是注意,這里發(fā)送消息的時候是分環(huán)境的,一般來講現(xiàn)在企業(yè)中會區(qū)分QA、staging、product三種環(huán)境,其中QA和staging是測試環(huán)境,對于消息隊列來講,也是會有三種環(huán)境的,但是QA和staging環(huán)境往往為了降低成本使用同一個阿里賬號,所以創(chuàng)建的topic和productId會放到同一個區(qū)域下,這樣同名的TopicName是不允許存在的,所以加上了環(huán)境前綴加以區(qū)分,比如QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection接口,以獲取配置信息,生產(chǎn)者服務只需要實現(xiàn)該接口即可。

4.生產(chǎn)者發(fā)送消息

?
1
2
3
4
5
6
7
8
9
10
@Autowired
private UserQueueResource userQueueResource;
 
@Override
public void sendMessage() {
  UserModel userModel=new UserModel();
  userModel.setName("kdyzm");
  userModel.setAge(25);
  userQueueResource.handleUserInfo(userModel);
}

只需要數(shù)行代碼即可將消息發(fā)送到指定的Topic,相對于原生的發(fā)送代碼,精簡了太多。

四、消息消費

相對于消息發(fā)送,消息的消費要復雜一些。

1.消息消費設計

由于Topic和Consumer之間是N:N的關系,所以將ConsumerId放到消費者具體實現(xiàn)的方法上

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Controller
@QueueResource
public class UserQueueResourceImpl implements UserQueueResource {
 
  private Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer")
  public void handleUserInfo(UserModel user) {
    logger.info("收到消息1:{}", new Gson().toJson(user));
  }
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer1")
  public void handleUserInfo1(UserModel user) {
    logger.info("收到消息2:{}", new Gson().toJson(user));
  }
}

這里又有兩個新的注解@QueueResource和@ConsumerAnnotation,這兩個注解后續(xù)會討論如何使用。有人會問我為什么要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,因為Consumer這個名字和aliyun提供的sdk中的名字沖突了。。。。

在這里, 消費者提供api 接口給生產(chǎn)者以方便生產(chǎn)者發(fā)送消息,消費者則實現(xiàn)該接口以消費生產(chǎn)者發(fā)送的消息,如何實現(xiàn)api接口就實現(xiàn)了監(jiān)聽,這點是比較關鍵的邏輯。

2.queue-core實現(xiàn)消息隊列監(jiān)聽核心邏輯

第一步:使用sping 容器的監(jiān)聽方法獲取所有加上QueueResource注解的Bean

第二步:分發(fā)處理Bean

如何處理這些Bean呢,每個Bean實際上都是一個對象,有了對象,比如上面例子中的UserQueueResourceImpl 對象,我們可以拿到該對象實現(xiàn)的接口字節(jié)碼對象,進而可以拿到該接口UserQueueRerousce上的注解以及方法上和方法中的注解,當然UserQueueResourceImpl實現(xiàn)方法上的注解也能拿得到,這里我將獲取到的信息以consumerId為key,其余相關信息封裝為Value緩存到了一個Map對象中,核心代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Class<?> clazz = resourceImpl.getClass();
    Class<?> clazzIf = clazz.getInterfaces()[0];
    Method[] methods = clazz.getMethods();
    String topicName = MQUtils.getTopicName(clazzIf);
    for (Method m : methods) {
      ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class);
 
      if (null == consumerAnno) {
//        logger.error("method={} need Consumer annotation.", m.getName());
        continue;
      }
      String consuerId = consumerAnno.value();
      if (StringUtils.isEmpty(consuerId)) {
        logger.error("method={} ConsumerId can't be null", m.getName());
        continue;
      }
      Class<?>[] parameterTypes = m.getParameterTypes();
      Method resourceIfMethod = null;
      try {
        resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes);
      } catch (NoSuchMethodException | SecurityException e) {
        logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(),
            e);
        continue;
      }
      String tagName = MQUtils.getTagName(resourceIfMethod);
      consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m));
    }

第三步:通過反射實現(xiàn)消費的動作

首先,先確定好反射動作執(zhí)行的時機,那就是監(jiān)聽到了新的消息

其次,如何執(zhí)行反射動作?不贅述,有反射相關基礎的童鞋都知道怎么做,核心代碼如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    String topicPrefix=connectionInfo.getPrefix()+"_";
    String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_";
    for(String consumerId:consumersMap.keySet()){
      MethodInfo methodInfo=consumersMap.get(consumerId);
      Properties connectionProperties=convertToProperties(connectionInfo);
      // 您在控制臺創(chuàng)建的 Consumer ID
      connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId);
      Consumer consumer = ONSFactory.createConsumer(connectionProperties);
      consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag
        public Action consume(Message message, ConsumeContext context) {
          try {
            String messageBody=new String(message.getBody(),"UTF-8");
            logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody);
            Method method=methodInfo.getMethod();
            Class<?> parameType = method.getParameterTypes()[0];
            Object arg = jacksonSerializer.deserialize(messageBody, parameType);
            Object[] args={arg};
            method.invoke(resourceImpl, args);
          } catch (Exception e) {
            logger.error("",e);
          }
          return Action.CommitMessage;
        }
      });
      consumer.start();
      logger.info("consumer={} has started.",consumerIdPrefix+consumerId);
    }

五、完整代碼見下面的git鏈接

 https://github.com/kdyzm/queue-core.git

 以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.cnblogs.com/kuangdaoyizhimei/p/8508357.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25
主站蜘蛛池模板: 91在线观看| 欧美精品18 | 成年性羞羞视频免费观看 | 免费国产一级特黄久久 | 一级毛片免费高清 | 成年片在线观看 | 免费的性生活视频 | 欧美一级毛片特黄黄 | 中文字幕在线观看1 | 亚洲第五色综合网 | 婷婷久久网 | 久久久久免费电影 | 一区二区三区四区在线 | 91 免费看片 | 李宗瑞国产福利视频一区 | 欧美女优一区 | 2级毛片 | 91网站永久免费看 | 色av网址| 欧美福利视频一区二区三区 | 国产一区二区三区在线免费观看 | 国产99久久精品一区二区 | 久久人人av | 桥本有菜免费av一区二区三区 | 一本一本久久a久久精品综合小说 | 免费1级做55爰片l在线观看 | 精品国产欧美一区二区 | 欧美精品成人一区二区在线观看 | 九九热国产在线 | 国产精品久久久久久久久久10秀 | 精品一区二区久久久久久按摩 | 国产毛毛片一区二区三区四区 | 黑人一区二区三区四区五区 | 亚洲第一成人在线观看 | 爽爽视频免费看 | 男人的天堂视频网站 | 欧美无极品 | 最新一区二区三区 | 亚洲精品7777xxxx青睐 | 日本精品久久久久 | 自拍偷拍亚洲图片 |