這篇文章主要講基本的整合。先把代碼跑起來,再說什么高級特性。
rabbitmq 中的一些術語
如果你打開 rabbitmq web 控制臺,你會發現其中有一個 exhanges 不好理解。下面簡單說明一下。
交換器(exchange)
交換器就像路由器,我們先是把消息發到交換器,然后交換器再根據路由鍵(routingkey)把消息投遞到對應的隊列。(明白這個概念很重要,后面的代碼里面充分體現了這一點)
隊列(queue)
隊列很好理解,就不用解釋了。
綁定(binding)
交換器怎么知道把這條消息投遞到哪個隊列呢?這就需要用到綁定了。大概就是:使用某個路由鍵(routingkey)把某個隊列(queue)綁定到某個交換器(exchange),這樣交換器就知道根據路由鍵把這條消息投遞到哪個隊列了。(后面的代碼里面充分體現了這一點)
加入 rabbitmq maven 依賴
1
2
3
4
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> |
再加入另外一個依賴(這個依賴可省略,主要是用來簡化代碼)
1
2
3
4
5
|
<dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-all</artifactid> <version> 4.0 . 2 </version> </dependency> |
rabbitmqconfig.java 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@configuration public class rabbitmqconfig { public final static string queue_name = "spring-boot-queue" ; public final static string exchange_name = "spring-boot-exchange" ; public final static string routing_key = "spring-boot-key" ; // 創建隊列 @bean public queue queue() { return new queue(queue_name); } // 創建一個 topic 類型的交換器 @bean public topicexchange exchange() { return new topicexchange(exchange_name); } // 使用路由鍵(routingkey)把隊列(queue)綁定到交換器(exchange) @bean public binding binding(queue queue, topicexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key); } @bean public connectionfactory connectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory( "127.0.0.1" , 5672 ); connectionfactory.setusername( "guest" ); connectionfactory.setpassword( "guest" ); return connectionfactory; } @bean public rabbittemplate rabbittemplate(connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } } |
生產者
直接調用 rabbittemplate 的 convertandsend 方法就可以了。從下面的代碼里也可以看出,我們不是把消息直接發送到隊列里面的,而是先發送到了交換器,交換器再根據路由鍵把我們的消息投遞到對應的隊列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@restcontroller public class producercontroller { @autowired private rabbittemplate rabbittemplate; @getmapping ( "/sendmessage" ) public object sendmessage() { new thread(() -> { for ( int i = 0 ; i < 100 ; i++) { string value = new datetime().tostring( "yyyy-mm-dd hh:mm:ss" ); console.log( "send message {}" , value); rabbittemplate.convertandsend(rabbitmqconfig.exchange_name, rabbitmqconfig.routing_key, value); } }).start(); return "ok" ; } } |
消費者
消費者也很簡單,只需要對應的方法上加入 @rabbitlistener 注解,指定需要監聽的隊列名稱即可。
1
2
3
4
5
6
7
8
|
@component public class consumer { @rabbitlistener (queues = rabbitmqconfig.queue_name) public void consumemessage(string message) { console.log( "consume message {}" , message); } } |
運行項目
運行項目,然后打開瀏覽器,輸入 http://localhost:9999/sendmessage
。在控制臺就可以看到生產者在不停的的發送消息,消費者不斷的在消費消息。
打開 rabbitmq web 控制臺,也可以看到剛才我們在代碼里面配置的交換器和隊列,以及綁定信息。
點擊進入交換器的詳情
結語
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://my.oschina.net/u/3523423/blog/1618335