激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - 基于RocketMQ推拉模式詳解

基于RocketMQ推拉模式詳解

2021-09-25 01:08mingxungu Java教程

這篇文章主要介紹了RocketMQ推拉模式的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

消費(fèi)者客戶端有兩種方式從消息中間件獲取消息并消費(fèi)。嚴(yán)格意義上來(lái)講,RocketMQ并沒(méi)有實(shí)現(xiàn)PUSH模式,而是對(duì)拉模式進(jìn)行一層包裝,名字雖然是 Push 開(kāi)頭,實(shí)際在實(shí)現(xiàn)時(shí),使用 Pull 方式實(shí)現(xiàn)。

通過(guò) Pull 不斷輪詢 Broker 獲取消息。當(dāng)不存在新消息時(shí),Broker 會(huì)掛起請(qǐng)求,直到有新消息產(chǎn)生,取消掛起,返回新消息。

1、概述

1.1、PULL方式

由消費(fèi)者客戶端主動(dòng)向消息中間件(MQ消息服務(wù)器代理)拉取消息;采用Pull方式,如何設(shè)置Pull消息的拉取頻率需要重點(diǎn)去考慮,舉個(gè)例子來(lái)說(shuō),可能1分鐘內(nèi)連續(xù)來(lái)了1000條消息,然后2小時(shí)內(nèi)沒(méi)有新消息產(chǎn)生(概括起來(lái)說(shuō)就是“消息延遲與忙等待”)。

如果每次Pull的時(shí)間間隔比較久,會(huì)增加消息的延遲,即消息到達(dá)消費(fèi)者的時(shí)間加長(zhǎng),MQ中消息的堆積量變大;若每次Pull的時(shí)間間隔較短,但是在一段時(shí)間內(nèi)MQ中并沒(méi)有任何消息可以消費(fèi),那么會(huì)產(chǎn)生很多無(wú)效的Pull請(qǐng)求的RPC開(kāi)銷,影響MQ整體的網(wǎng)絡(luò)性能;

1.2、PUSH方式

由消息中間件(MQ消息服務(wù)器代理)主動(dòng)地將消息推送給消費(fèi)者;采用Push方式,可以盡可能實(shí)時(shí)地將消息發(fā)送給消費(fèi)者進(jìn)行消費(fèi)。

但是,在消費(fèi)者的處理消息的能力較弱的時(shí)候(比如,消費(fèi)者端的業(yè)務(wù)系統(tǒng)處理一條消息的流程比較復(fù)雜,其中的調(diào)用鏈路比較多導(dǎo)致消費(fèi)時(shí)間比較久。

概括起來(lái)地說(shuō)就是“慢消費(fèi)問(wèn)題”),而MQ不斷地向消費(fèi)者Push消息,消費(fèi)者端的緩沖區(qū)可能會(huì)溢出,導(dǎo)致異常;

2、PUSH模式

主動(dòng)推送的模式實(shí)現(xiàn)起來(lái)簡(jiǎn)單,避免了拉取的消費(fèi)端業(yè)務(wù)邏輯的復(fù)雜度,消息的消費(fèi)可以認(rèn)為是實(shí)時(shí)的,同時(shí)也存在一定的弊端,要求消費(fèi)端要有很強(qiáng)的消費(fèi)能力。

2.1、代碼實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Consumer1 {   
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_push");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("TopicTest", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently(){
 
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer1===  存入時(shí)間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
                }
            });
            consumer.start();
            System.out.println("Consumer1===啟動(dòng)成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

PUSH消費(fèi)方式,需要注冊(cè)一個(gè)監(jiān)聽(tīng)器Listener,,用來(lái)監(jiān)聽(tīng)最新的消息,進(jìn)行業(yè)務(wù)處理,同時(shí)反饋消息的消費(fèi)狀態(tài),消費(fèi)成功(CONSUME_SUCCESS)、消費(fèi)重試(RECONSUME_LATER),消息重試會(huì)根據(jù)配置的消息的延遲等級(jí)的時(shí)間間隔,定時(shí)重新發(fā)送消費(fèi)失敗的記錄。(PS:延遲消息中會(huì)重點(diǎn)討論)

PUSH消息方式由于返回了消息的狀態(tài),服務(wù)端會(huì)維護(hù)每個(gè)消費(fèi)端的消費(fèi)進(jìn)度,內(nèi)部會(huì)記錄消費(fèi)進(jìn)度,消息發(fā)送成功后會(huì)更新消費(fèi)進(jìn)度。

PUSH消息方式的局限性,是在HOLD住Consumer請(qǐng)求的時(shí)候需要占用資源,它適合用在消息隊(duì)列這種客戶端連接數(shù)可控的場(chǎng)景中。

上一個(gè)章節(jié)說(shuō)明了服務(wù)端存儲(chǔ)的每個(gè)主題對(duì)應(yīng)的消費(fèi)組的每個(gè)消息隊(duì)列的偏移量

查看服務(wù)器文件上的消費(fèi)進(jìn)度信息:

/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

基于RocketMQ推拉模式詳解

3、PULL模式

3.1、代碼實(shí)現(xiàn)(1)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer");
        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        consumer.start();
 
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            
            SINGLE_MQ: while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println("=============================================================");
                    System.out.println("Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "結(jié)果:" + pullResult.getPullStatus());
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.print(new String(m.getBody()) +" == ");
                        }
                        System.out.println("");
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

結(jié)果:

基于RocketMQ推拉模式詳解

每次拉取消息的時(shí)候需要提供偏移量和拉取的消息的個(gè)數(shù),需要自己業(yè)務(wù)實(shí)現(xiàn)每個(gè)主題下的隊(duì)列的消費(fèi)進(jìn)度。

代碼實(shí)現(xiàn)(1)這種方式只能拉取歷史的消息,最新的消息拉取不了,也可以進(jìn)行改造,來(lái)實(shí)現(xiàn)一直拉取。

3.2、代碼實(shí)現(xiàn)(2)

在MQPullConsumer這個(gè)類里面,有一個(gè)MessageQueueListener,它的目的就是當(dāng)queue發(fā)生變化的時(shí)候,通知Consumer。也正是這個(gè)借口,幫助我們?cè)赑ull模式里面,實(shí)現(xiàn)負(fù)載均衡。

注意,這個(gè)接口在MQPushConsumer里面是沒(méi)有的,那里面有的是上面代碼里的MessageListener。

?
1
2
3
4
5
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
public interface MessageQueueListener {
    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                             final Set<MessageQueue> mqDivided);
}

有了這個(gè)Listener,我們就可以動(dòng)態(tài)的知道當(dāng)前的Consumer分?jǐn)偟搅藥讉€(gè)MessageQueue。然后對(duì)這些MessageQueue,我們可以開(kāi)個(gè)線程池來(lái)消費(fèi)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PullConsumerExtend {
    public static void main(String[] args) throws MQClientException {
           //消費(fèi)組
            final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("pullConsumer");
           //MQ NameService地址
            scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
           //負(fù)載均衡模式
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
           //需要處理的消息topic
            scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
 
                @Override
                public void doPullTask(MessageQueue mq, PullTaskContext context) {
                    MQPullConsumer consumer = context.getPullConsumer();
                    try {
                        
                        long offset = consumer.fetchConsumeOffset(mq, false);
                        if (offset < 0)
                            offset = 0;
                        PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                        System.out.println("");
                        System.out.println("Consume from the queue: " + mq + "offset:" + offset + "結(jié)果:" + pullResult.getPullStatus());
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                                for (MessageExt m : messageExtList) {
                                    System.out.print(new String(m.getBody()) +" == ");
                                }
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                            case OFFSET_ILLEGAL:
                                break;
                            default:
                                break;
                        }
                        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                        //設(shè)置下一下拉取的間隔時(shí)間
                        context.setPullNextDelayTimeMillis(10000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            scheduleService.start();
    }
}

結(jié)果:

基于RocketMQ推拉模式詳解

比較**代碼實(shí)現(xiàn)(1)**這種方式改進(jìn)了很多,不需要業(yè)務(wù)維護(hù)每個(gè)消費(fèi)隊(duì)列的消費(fèi)進(jìn)度,可以更新到服務(wù)端的。

弊端也很明顯就是每次隊(duì)列拉取消息的時(shí)間間隔,時(shí)間長(zhǎng)導(dǎo)致消息擠壓,時(shí)間段消息少,影響服務(wù)端性能。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://my.oschina.net/mingxungu/blog/3083956

延伸 · 閱讀

精彩推薦
  • Java教程Java實(shí)現(xiàn)搶紅包功能

    Java實(shí)現(xiàn)搶紅包功能

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)搶紅包功能,采用多線程模擬多人同時(shí)搶紅包,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程升級(jí)IDEA后Lombok不能使用的解決方法

    升級(jí)IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級(jí),尋思已經(jīng)有好久沒(méi)有升過(guò)級(jí)了。升級(jí)完畢重啟之后,突然發(fā)現(xiàn)好多錯(cuò)誤,本文就來(lái)介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程xml與Java對(duì)象的轉(zhuǎn)換詳解

    xml與Java對(duì)象的轉(zhuǎn)換詳解

    這篇文章主要介紹了xml與Java對(duì)象的轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下...

    Java教程網(wǎng)2942020-09-17
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程20個(gè)非常實(shí)用的Java程序代碼片段

    20個(gè)非常實(shí)用的Java程序代碼片段

    這篇文章主要為大家分享了20個(gè)非常實(shí)用的Java程序片段,對(duì)java開(kāi)發(fā)項(xiàng)目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關(guān)于小米推送Java代碼,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧...

    富貴穩(wěn)中求8032021-07-12
  • Java教程Java8中Stream使用的一個(gè)注意事項(xiàng)

    Java8中Stream使用的一個(gè)注意事項(xiàng)

    最近在工作中發(fā)現(xiàn)了對(duì)于集合操作轉(zhuǎn)換的神器,java8新特性 stream,但在使用中遇到了一個(gè)非常重要的注意點(diǎn),所以這篇文章主要給大家介紹了關(guān)于Java8中S...

    阿杜7472021-02-04
  • Java教程Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望...

    spcoder14552021-10-18
主站蜘蛛池模板: 日韩视频在线观看免费视频 | 国产精品亚洲综合一区二区三区 | 欧美3p激情一区二区三区猛视频 | 欧美三区在线 | 欧美 中文字幕 | 伦理三区 | 久久综合爱 | 91久久国产露脸精品国产护士 | 中国7777高潮网站 | xxx日本视频| 美国一级毛片片aa久久综合 | 久久网日本| 91亚洲精品一区二区福利 | 免费看一级视频 | 亚洲第一页夜 | 午夜生活理论片 | 亚洲成人高清在线观看 | 91高清免费在线观看 | 久久久久久麻豆 | 精品三区视频 | av在线直播观看 | 亚洲人成在线播放 | 免费观看黄色一级视频 | 亚洲精品久久久久久久久久 | 国产精品视频久久久 | 91精品最新国内在线播放 | 日本看片一区二区三区高清 | 国产99久久久久久免费看 | 久久精品欧美视频 | 欧美三级一级 | 19禁国产精品福利视频 | 欧美黑人伦理 | 在线看国产视频 | 91av视频大全 | 欧美性猛交xxx乱大交3蜜桃 | 毛片在哪里看 | 中文字幕综合在线观看 | 成人一级黄色 | 红杏亚洲影院一区二区三区 | 狠狠操电影| 亚洲精品成人av在线 |