前言
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
應用場景:
RabbitMQ無疑是目前最流行的消息隊列之一,對各種語言環境的支持也很豐富,作為一個.NET developer有必要學習和了解這一工具。消息隊列的使用場景大概有3種:
1、系統集成,分布式系統的設計。各種子系統通過消息來對接,這種解決方案也逐步發展成一種架構風格,即“通過消息傳遞的架構”。
2、當系統中的同步處理方式嚴重影響了吞吐量,比如日志記錄。假如需要記錄系統中所有的用戶行為日志,如果通過同步的方式記錄日志勢必會影響系統的響應速度,當我們將日志消息發送到消息隊列,記錄日志的子系統就會通過異步的方式去消費日志消息。
3、系統的高可用性,比如電商的秒殺場景。當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。如果能夠將請求轉發到消息隊列,再由服務器去消費這些消息將會使得請求變得平穩,提高系統的可用性。
一、安裝環境
首先是在 Linux 上安裝 rabbitmq
1
|
2
3
4
5
|
# 環境為CentOS 7 yum install rabbitmq-server # 安裝RabbitMQ systemctl start rabbitmq-server # 啟動 systemctl enable rabbitmq-server # 開機自啟 systemctl stop firewall-cmd # 臨時關閉防火墻 |
然后用 pip 安裝 Python3 的開發包
1
|
|
pip3 install pika |
安裝好軟件之后可以訪問http://115.xx.xx.xx:15672/來訪問自帶的 web 頁面來查看和管理 RabbitMQ。默認管理員的用戶密碼都是guest
二、簡單的向隊列中加入消息
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:25 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : [email protected] # @purpose : RabbitMQ_Producer import pika # 創建連接對象 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '115.xx.xx.xx' )) # 創建頻道對象 channel = connection.channel() # 指定一個隊列,如果該隊列不存在則創建 channel.queue_declare(queue = 'test_queue' ) # 提交消息 for i in range ( 10 ): channel.basic_publish(exchange = ' ', routing_key=' test_queue ', body=' hello,world' + str (i)) print ( "sent..." ) # 關閉連接 connection.close() |
三、簡單的從隊列中獲取消息
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:40 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : [email protected] # @purpose : RabbitMQ_Consumer import pika credentials = pika.PlainCredentials( 'guest' , 'guest' ) # 連接到RabbitMQ服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( '115.xx.xx.xx' , 5672 , '/' , credentials)) channel = connection.channel() # 指定一個隊列,如果該隊列不存在則創建 channel.queue_declare(queue = 'test_queue' ) # 定義一個回調函數 def callback(ch, method, properties, body): print (body.decode( 'utf-8' )) # 告訴RabbitMQ使用callback來接收信息 channel.basic_consume(callback, queue = 'test_queue' , no_ack = False ) print ( 'waiting...' ) # 開始接收信息,并進入阻塞狀態,隊列里有信息才會調用callback進行處理。按ctrl+c退出。 channel.start_consuming() |
四、萬一消費者掉線了
想象這樣一種情況:
消費者從消息隊列中獲取了 n 條數據,正要處理呢結果宕機了,那該怎么辦?在 RabbieMQ 中有一個 ACK 可以用來確認消費者處理結束。就有點類似網絡中的 ACK,消費者每次從隊列中獲取了數據之后隊列不會立刻將數據移除,而是等待對應的 ACK。消費者獲取到數據并處理完成之后會向隊列發送一個 ACK 包,通知 RabbitMQ 這堆消息已經處理妥當了,可以刪除了,這時候 RabbitMQ 才會將數據從隊列中移除。所以這種情況下即使消費者掉線也沒有什么問題,數據依舊會在隊列中存在,留給其他消費者處理。
在 Python 中這樣實現:
消費者有這樣一行代碼channel.basic_consume(callback, queue='test_queue', no_ack=False)
,其中no_ack=False
表示不發送確認包。將其修改為no_ack=True就會在每次處理完之后向 RabbitMQ 發送一個確認包,以確認消息處理完畢。
五、萬一 RabbitMQ 宕機了呢
雖然有了 ACK 包,但是萬一 RabbitMQ 掛了那數據還是會損失。所以我們可以給 RabbitMQ 設置一個數據持久化存儲。RabbitMQ 會將數據持久化存儲到磁盤上,保證下次再啟動的時候隊列還在。
在 Python 中這樣實現:
我們聲明一個隊列是這樣的channel.queue_declare(queue='test_queue')
,如果需要持久化一個隊列可以這樣聲明channel.queue_declare(queue='test_queue', durable=True)
。不過這行直接放在代碼中是不能執行的,因為以前已經有了一個名為test_queue的隊列,RabbitMQ 不允許用不同的方式聲明同一個隊列,所以可以換一個隊列名新建來指定數據持久化存儲。不過如果只是這樣聲明的話,在 RabbitMQ 宕機重啟后確實隊列還在,不過隊列里的數據就沒有了。除非我們這樣來聲明隊列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
六、最簡單的發布訂閱
最簡單的發布訂閱在 RabbitMQ 中稱之為Fanout模式。也就是說訂閱者訂閱某個頻道,然后發布者向這個頻道中發布消息,所有訂閱者就都能接收到這條消息。不過因為發布者需要使用訂閱者創建的隨機隊列所以需要先啟動訂閱者才能啟動發布者。
發布者代碼:
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:21 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : [email protected] # @purpose : RabbitMQ_Publisher import pika # 創建連接對象 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '115.xx.xx.xx' )) # 創建頻道對象 channel = connection.channel() # 定義交換機,exchange表示交換機名稱,type表示類型 channel.exchange_declare(exchange = 'my_fanout' , type = 'fanout' ) message = 'Hello Python' # 將消息發送到交換機 channel.basic_publish(exchange = 'my_fanout' , # 指定exchange routing_key = '', # fanout下不需要配置,配置了也不會生效 body = message) connection.close() |
訂閱者代碼:
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:20 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : [email protected] # @purpose : RabbitMQ_Subscriber import pika credentials = pika.PlainCredentials( 'guest' , 'guest' ) # 連接到RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters( '115.xx.xx.xx' , 5672 , '/' , credentials)) channel = connection.channel() # 定義交換機,進行exchange聲明,exchange表示交換機名稱,type表示類型 channel.exchange_declare(exchange = 'my_fanout' , type = 'fanout' ) # 隨機創建隊列 result = channel.queue_declare(exclusive = True ) # exclusive=True表示建立臨時隊列,當consumer關閉后,該隊列就會被刪除 queue_name = result.method.queue # 將隊列與exchange進行綁定 channel.queue_bind(exchange = 'my_fanout' , queue = queue_name) # 定義回調方法 def callback(ch, method, properties, body): print (body.decode( 'utf-8' )) # 從隊列獲取信息 channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming() |
總結
以上就是這篇文章的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://blog.just666.cn/index.php/archives/63/