消費(fèi)者客戶端有兩種方式從消息中間件獲取消息并消費(fèi)。嚴(yán)格意義上來(lái)講,RocketMQ并沒(méi)有實(shí)現(xiàn)PUSH模式,而是對(duì)拉模式進(jìn)行一層包裝,名字雖然是 Push 開(kāi)頭,實(shí)際在實(shí)現(xiàn)時(shí),使用 Pull 方式實(shí)現(xiàn)。
通過(guò) Pull 不斷輪詢 Broker 獲取消息。當(dāng)不存在新消息時(shí),Broker 會(huì)掛起請(qǐng)求,直到有新消息產(chǎn)生,取消掛起,返回新消息。
1、概述
1.1、PULL方式
由消費(fèi)者客戶端主動(dòng)向消息中間件(MQ消息服務(wù)器代理)拉取消息;采用Pull方式,如何設(shè)置Pull消息的拉取頻率需要重點(diǎn)去考慮,舉個(gè)例子來(lái)說(shuō),可能1分鐘內(nèi)連續(xù)來(lái)了1000條消息,然后2小時(shí)內(nèi)沒(méi)有新消息產(chǎn)生(概括起來(lái)說(shuō)就是“消息延遲與忙等待”)。
如果每次Pull的時(shí)間間隔比較久,會(huì)增加消息的延遲,即消息到達(dá)消費(fèi)者的時(shí)間加長(zhǎng),MQ中消息的堆積量變大;若每次Pull的時(shí)間間隔較短,但是在一段時(shí)間內(nèi)MQ中并沒(méi)有任何消息可以消費(fèi),那么會(huì)產(chǎn)生很多無(wú)效的Pull請(qǐng)求的RPC開(kāi)銷,影響MQ整體的網(wǎng)絡(luò)性能;
1.2、PUSH方式
由消息中間件(MQ消息服務(wù)器代理)主動(dòng)地將消息推送給消費(fèi)者;采用Push方式,可以盡可能實(shí)時(shí)地將消息發(fā)送給消費(fèi)者進(jìn)行消費(fèi)。
但是,在消費(fèi)者的處理消息的能力較弱的時(shí)候(比如,消費(fèi)者端的業(yè)務(wù)系統(tǒng)處理一條消息的流程比較復(fù)雜,其中的調(diào)用鏈路比較多導(dǎo)致消費(fèi)時(shí)間比較久。
概括起來(lái)地說(shuō)就是“慢消費(fèi)問(wèn)題”),而MQ不斷地向消費(fèi)者Push消息,消費(fèi)者端的緩沖區(qū)可能會(huì)溢出,導(dǎo)致異常;
2、PUSH模式
主動(dòng)推送的模式實(shí)現(xiàn)起來(lái)簡(jiǎn)單,避免了拉取的消費(fèi)端業(yè)務(wù)邏輯的復(fù)雜度,消息的消費(fèi)可以認(rèn)為是實(shí)時(shí)的,同時(shí)也存在一定的弊端,要求消費(fèi)端要有很強(qiáng)的消費(fèi)能力。
2.1、代碼實(shí)現(xiàn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class Consumer1 { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup( "consumer_push" ); consumer.setNamesrvAddr( "10.10.12.203:9876;10.10.12.204:9876" ); consumer.subscribe( "TopicTest" , "*" ); consumer.registerMessageListener( new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for (MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8" ); SimpleDateFormat sd = new SimpleDateFormat( "YYYY-MM-dd HH:mm:ss" ); Date date = new Date(msg.getStoreTimestamp()); System.out.println( "Consumer1=== 存入時(shí)間 : " + sd.format(date) + " == MessageBody: " + msgbody); //輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功 } }); consumer.start(); System.out.println( "Consumer1===啟動(dòng)成功!" ); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
PUSH消費(fèi)方式,需要注冊(cè)一個(gè)監(jiān)聽(tīng)器Listener,,用來(lái)監(jiān)聽(tīng)最新的消息,進(jìn)行業(yè)務(wù)處理,同時(shí)反饋消息的消費(fèi)狀態(tài),消費(fèi)成功(CONSUME_SUCCESS)、消費(fèi)重試(RECONSUME_LATER),消息重試會(huì)根據(jù)配置的消息的延遲等級(jí)的時(shí)間間隔,定時(shí)重新發(fā)送消費(fèi)失敗的記錄。(PS:延遲消息中會(huì)重點(diǎn)討論)
PUSH消息方式由于返回了消息的狀態(tài),服務(wù)端會(huì)維護(hù)每個(gè)消費(fèi)端的消費(fèi)進(jìn)度,內(nèi)部會(huì)記錄消費(fèi)進(jìn)度,消息發(fā)送成功后會(huì)更新消費(fèi)進(jìn)度。
PUSH消息方式的局限性,是在HOLD住Consumer請(qǐng)求的時(shí)候需要占用資源,它適合用在消息隊(duì)列這種客戶端連接數(shù)可控的場(chǎng)景中。
上一個(gè)章節(jié)說(shuō)明了服務(wù)端存儲(chǔ)的每個(gè)主題對(duì)應(yīng)的消費(fèi)組的每個(gè)消息隊(duì)列的偏移量
查看服務(wù)器文件上的消費(fèi)進(jìn)度信息:
/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json
3、PULL模式
3.1、代碼實(shí)現(xiàn)(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer( "pullConsumer" ); consumer.setNamesrvAddr( "10.10.12.203:9876;10.10.12.204:9876" ); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues( "TopicTest" ); for (MessageQueue mq : mqs) { SINGLE_MQ: while ( true ) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null , getMessageQueueOffset(mq), 32 ); System.out.println( "=============================================================" ); System.out.println( "Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "結(jié)果:" + pullResult.getPullStatus()); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.print( new String(m.getBody()) + " == " ); } System.out.println( "" ); case NO_MATCHED_MSG: break ; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break ; default : break ; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null ) return offset; return 0 ; } } |
結(jié)果:
每次拉取消息的時(shí)候需要提供偏移量和拉取的消息的個(gè)數(shù),需要自己業(yè)務(wù)實(shí)現(xiàn)每個(gè)主題下的隊(duì)列的消費(fèi)進(jìn)度。
代碼實(shí)現(xiàn)(1)這種方式只能拉取歷史的消息,最新的消息拉取不了,也可以進(jìn)行改造,來(lái)實(shí)現(xiàn)一直拉取。
3.2、代碼實(shí)現(xiàn)(2)
在MQPullConsumer這個(gè)類里面,有一個(gè)MessageQueueListener,它的目的就是當(dāng)queue發(fā)生變化的時(shí)候,通知Consumer。也正是這個(gè)借口,幫助我們?cè)赑ull模式里面,實(shí)現(xiàn)負(fù)載均衡。
注意,這個(gè)接口在MQPushConsumer里面是沒(méi)有的,那里面有的是上面代碼里的MessageListener。
1
2
3
4
5
|
void registerMessageQueueListener( final String topic, final MessageQueueListener listener); public interface MessageQueueListener { void messageQueueChanged( final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); } |
有了這個(gè)Listener,我們就可以動(dòng)態(tài)的知道當(dāng)前的Consumer分?jǐn)偟搅藥讉€(gè)MessageQueue。然后對(duì)這些MessageQueue,我們可以開(kāi)個(gè)線程池來(lái)消費(fèi)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
public class PullConsumerExtend { public static void main(String[] args) throws MQClientException { //消費(fèi)組 final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService( "pullConsumer" ); //MQ NameService地址 scheduleService.getDefaultMQPullConsumer().setNamesrvAddr( "10.10.12.203:9876;10.10.12.204:9876" ); //負(fù)載均衡模式 scheduleService.setMessageModel(MessageModel.CLUSTERING); //需要處理的消息topic scheduleService.registerPullTaskCallback( "TopicTest" , new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = consumer.fetchConsumeOffset(mq, false ); if (offset < 0 ) offset = 0 ; PullResult pullResult = consumer.pull(mq, "*" , offset, 32 ); System.out.println( "" ); System.out.println( "Consume from the queue: " + mq + "offset:" + offset + "結(jié)果:" + pullResult.getPullStatus()); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.print( new String(m.getBody()) + " == " ); } break ; case NO_MATCHED_MSG: break ; case NO_NEW_MSG: case OFFSET_ILLEGAL: break ; default : break ; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); //設(shè)置下一下拉取的間隔時(shí)間 context.setPullNextDelayTimeMillis( 10000 ); } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } } |
結(jié)果:
比較**代碼實(shí)現(xiàn)(1)**這種方式改進(jìn)了很多,不需要業(yè)務(wù)維護(hù)每個(gè)消費(fèi)隊(duì)列的消費(fèi)進(jìn)度,可以更新到服務(wù)端的。
弊端也很明顯就是每次隊(duì)列拉取消息的時(shí)間間隔,時(shí)間長(zhǎng)導(dǎo)致消息擠壓,時(shí)間段消息少,影響服務(wù)端性能。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://my.oschina.net/mingxungu/blog/3083956