1.1項(xiàng)目背景:做一個(gè)災(zāi)情預(yù)警的消息平臺(tái),災(zāi)情檢查系統(tǒng)需要向消息平臺(tái)里面推送消息,這里是典型的異構(gòu)系統(tǒng)的消息傳遞,我們需要選擇一個(gè)中間件作為消息隊(duì)列,調(diào)研分析了rabbitmq,zeromq,activemq,kafka等消息中間件,綜合性能,安全,可持久化等角度果斷選擇了rabbitmq作為我們的消息中間件 (其實(shí)這里是因?yàn)閞abbitmq 是spring官方支持的,開發(fā)起來方便)。需求上我們有多種類型的消息,這里有緊急推送的和一般的等區(qū)分,高并發(fā)時(shí),就會(huì)有對(duì)消息進(jìn)行優(yōu)先推送的情況出現(xiàn),于是rabbitmq消息隊(duì)優(yōu)先級(jí)的推送功能是我們需要解決的首個(gè)技術(shù)點(diǎn).
1.2技術(shù)調(diào)研:這里一個(gè)概念需要說明,為什么說是消息隊(duì)列的優(yōu)先級(jí)而不是消息的優(yōu)先級(jí),來看下消息隊(duì)列的工作原理
生產(chǎn)者生成消息打到交換機(jī)里面(如果沒有聲明交換機(jī),會(huì)打到default exchange里面),交換機(jī)綁定一個(gè)或多個(gè)隊(duì)列,消息進(jìn)入隊(duì)列里面,消費(fèi)者一直在監(jiān)聽隊(duì)列,發(fā)現(xiàn)隊(duì)列里面有消息就開始消費(fèi),這里就是一個(gè)消息傳遞的過程,queue是一個(gè)棧隊(duì)列,棧是先進(jìn)先出的,就是說消息來了依次排隊(duì),一個(gè)隊(duì)列并不能實(shí)現(xiàn)消息的插隊(duì)和優(yōu)先推送的功能。但是如果說我們的多個(gè)隊(duì)列有不同的優(yōu)先級(jí),不同優(yōu)先級(jí)的消息通過roatingkey進(jìn)入不同的隊(duì)列,優(yōu)先級(jí)高的隊(duì)列消息被優(yōu)先消費(fèi),這樣也能形成一個(gè)相對(duì)意義上的優(yōu)先級(jí),所以說這里不是消息的優(yōu)先級(jí)而是隊(duì)列的優(yōu)先級(jí).
1.2.1 為什么說是相對(duì)意義上的優(yōu)先級(jí)
有并發(fā)才有優(yōu)先級(jí),如果每個(gè)消息都能被瞬間處理也不會(huì)有消息優(yōu)先推送的需求,那我們看看消息會(huì)在哪里阻塞
1,queue,很明顯高并發(fā)的時(shí)候隊(duì)列里面是會(huì)存在很多消息的,2,eschange ,高并發(fā)的時(shí)候producer發(fā)送給exchange的時(shí)候也會(huì)產(chǎn)生阻塞。
第一種情況由于我們隊(duì)列已經(jīng)定義優(yōu)先級(jí)了,所以進(jìn)入隊(duì)列的消息都是同種優(yōu)先級(jí)別的,并不需要插隊(duì)。而對(duì)于第二種情況,消息在exchange時(shí)阻塞時(shí)并不能實(shí)現(xiàn)消息優(yōu)先進(jìn)入隊(duì)列,依然是一個(gè)依次處理的情景,但是由于exchang到queue的處理速度極快,所有我們忽略了這塊的優(yōu)先級(jí)。
1.2.3 代碼實(shí)現(xiàn)
在rabbitmq3.5版本之前,官方并沒有實(shí)現(xiàn)隊(duì)列優(yōu)先級(jí)的功能,但論壇里面有一些插件可以實(shí)現(xiàn)(末尾附鏈接),這里我們主要說3.5版本之后的實(shí)現(xiàn)
1.2.3.1 Java代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection(); //創(chuàng)建連接 Channelchannel = conn.createChannel(); //創(chuàng)建channel Map<String,Object> arg = newHashMap<String, Object>(); arg.put( "x-max-priority" , 10 ); //隊(duì)列的屬性參數(shù) 有10個(gè)優(yōu)先級(jí)別 // 聲明(創(chuàng)建)隊(duì)列 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME, true , false , false , arg); // 消息內(nèi)容 String message = "Hello World!" ; channel.basicPublish( "" ,QUEUE_NAME, null , message.getBytes()); BasicPropertiesprop = new BasicProperties( null , null , null , null , 1 , null , null , null , null , null , null , null , null , null ); //消息的參數(shù),聲明該消息的優(yōu)先級(jí)是1 channel.basicPublish( "" ,QUEUE_NAME, prop, message.getBytes()); //消息發(fā)布 System.out.println( "[x] Sent '" + message + "'" ); //關(guān)閉通道和連接 channel.close(); conn.close(); |
客戶端看下結(jié)果:
1.2.3.2結(jié)合spring實(shí)現(xiàn):
1.2.3.2.1 xml配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
< beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > < description >rabbitmq 連接服務(wù)配置</ description > <!-- 連接配置 --> < rabbit:connection-factory id = "connectionFactory" host = "${rabbit.ip}" username = "${rabbit.username}" password = "${rabbit.password}" port = "${rabbit.port}" virtual-host = "${rabbit.vhost}" /> < rabbit:admin connection-factory = "connectionFactory" /> < rabbit:template id = "amqpTemplate" connection-factory = "connectionFactory" /> <!-- spring template聲明--> <!-- 聲明一個(gè)隊(duì)列 --> < rabbit:queue id = "test_queue_key" name = "test_queue_key" durable = "true" auto-delete = "false" exclusive = "false" > < rabbit:queue-arguments > < entry key = "x-max-priority" > < value type = "java.lang.Integer" >10</ value >//這個(gè)地方一定是integer的,別的不好使!! </ entry > </ rabbit:queue-arguments > </ rabbit:queue > <!-- 監(jiān)聽配置queues:監(jiān)聽的隊(duì)列,多個(gè)的話用逗號(hào)(,)分隔 ref:監(jiān)聽器--> < rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" > < rabbit:listener queue-names = "test_queue_key" ref = "queueListenter" method = "onMessage" /> </ rabbit:listener-container > < bean id = "queueListenter" class = "com.DF.spring.springAMQP.QueueListener" /> |
1.2.3.2.2代碼部分:
producter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:/spring/rabbitmq-contextDemo2.xml" ); RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate. class ); Random random = new Random(); for ( int i= 0 ; i< 1000 ; i++){ final int priority = random.nextInt( 10 - 1 + 1 ) + 1 ; //隨機(jī)的優(yōu)先級(jí) amqpTemplate.convertAndSend( "test_queue_key" , (Object)( "hello world" ), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } }); } |
customer:
1
2
3
4
5
6
7
8
9
10
11
|
public class QueueListener implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.print( "[x] 接收到的消息:" + new String(message.getBody(), "utf-8" )+ "&&&" + "優(yōu)先級(jí)" +message.getMessageProperties().getPrority()); Thread.sleep( 1000 ); } catch (Exception e){ e.printStackTrace(); } } } |
從客戶端看下隊(duì)列里面的消息:
我們發(fā)送隨機(jī)優(yōu)先級(jí)的消息進(jìn)入隊(duì)列,看看消費(fèi)端打印出來的消息:
到這里,rabbitmq結(jié)合spring的demo功能實(shí)現(xiàn)......
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://blog.csdn.net/qq_33994587/article/details/52689527?locationNum=4&fps=1