RabbitMQ AMQP
RabbitMQ AMQP 是最流行的開源和跨平台消息代理伺服器。
RabbitMQ 也是一種在應用程式之間交換數據的方式,可在不同平台之間交換訊息。例如 .Net 應用程式發送的消息可以由 Node.js 應用程式或 java 應用程式讀取。
RabbitMQ 被設計為通用消息代理,採用點對點,請求/回覆和 pub-sub 通信模式的多種變體。它使用智能代理/啞消費者模型,專注於向消費者提供一致的消息傳遞,消費者的消費速度與經紀人跟踪消費者狀態的速度大致相似。它是成熟的,在正確配置時表現良好。並且有許多可用的插件可以將它擴展到更多的用例和場景。當應用程式需要訪問流 (Stream)歷史時,RabbitMQ 通常與 Apache Cassandra 一起使用,對於需要 “無限” 佇列(queue)的應用程式,RabbitMQ 通常與 LevelDB 插件一起使用,但這兩個功能都不附帶在 RabbitMQ 本身。
AMQP
一般來說,訊息佇列會被應用在所有無法接受訊息漏失的環境,也就是極為重要的應用程式,例如銀行業務或財務系統。這通常表示,典型的企業級訊息佇列其實是相當複雜的軟體設計,會使用堅固的協定及可靠的存儲,來確保即便有任何故障都能夠完成訊息的交換。因此企業級的訊息中介軟體多年來一直由 Oracle 與 IBM 這類軟體巨人所獨佔,而它們多半是實做了專有的通訊協定,嚴厲的綁住了客戶的選擇。所幸這幾年來,訊息系統已突破這道障礙而蓬勃發展,這歸功於 AMQP、STOMP 與 MQTT 這類的開放協定。為了瞭解訊息佇列系統的運作方式,以下我們會針對 AMQP 進行概述,藉此初步瞭解如何使用基於這類協定的 API。
AMQP(Advanced Message Queuing Protocol)是一項開放標準的協定,許多訊息佇列系統都支援了這項協定。事實上它除了是通訊協定外,還是一個涉及路由、過濾、佇列、可靠性與安全性的模型。
在 AMQP 裡,共有三個基礎元件:
- 佇列(Queue): 負責儲存訊息的資料架構,裡頭的訊息將由客戶端消費。佇列訊息會被推送(或拉取)至一或多個消費者,也就是我們的應用程式。若有多個消費者連接至相同的佇列,則訊息會是負載平衡的。
佇列可以是以下幾種類型:
- 可延續性(Durable): 意即若中介者重新啟動,則佇列也會自動建立。可延續性佇列並不表示先前的內容一定會被保留,只有被標示為需要保存的訊息,才會存入磁碟,並於重啟時復原。
- 專用性(Exclusive): 意即佇列綁定於特定的訂閱者。若彼此的連線關閉,則佇列就會被銷毀。
- 自動刪除(Auto-delete): 當沒有任何訂閱者連線時,便刪除佇列
- 交換(Exchange): 訊息發佈之處。依據所實作的演算法,將訊息輸送到一個或多個佇列。
- 直接交換(Direct exchange): 比對整個路由鍵(例如 chat.msg)是否相符來輸送訊息。
- 主題交換(Topic exchange): 使用萬用模式比對路由鍵(例如 chat.# 便吻合所有以 chat 為開頭的路由鍵)。
- 擴散交換(Fanout exchange): 忽略任何的路由鍵,廣播訊息至所有連線的佇列。
- 綁定(Binding): 交換元件與佇列之間的連結。這裡也定義了路由鍵以過濾來自交換元件的訊息。
以上這些元件是由一個中介者 (broker) 進行管理,它會揭露一個 API,用於相關的建置及處理。當連線到中介者時,客戶端會建立一個抽象化的通道 (channel),用於維護與中介者之間的通訊狀態。而在 AMQP 裡,「專用性」或「自動刪除」以外的佇列都可用於實作可延續性訂閱者。
RabbitMQ with C#
這裡我們會使用 C# 來實作最簡單的發送與接收訊息。我們將使用 Visual Studio Code 與 .Net Code SDK。安裝完成後,開啟 VSCode,打開一個終端視窗或命令提示字元輸入:
1 | dotnet --version |
如果一卻正常會返回 .NET Code 的版本訊息,那我們就可以開始了。
解決方案與專案設定
我們先在電腦裡面用一個空的資料夾建立 amqp-sample 的 .NET 解決方案。然後使用 VSCode 打開解決方案資料夾 amqp-sample,開啟一個終端視窗,我們需要用 dotnet 初始化解決方案:
1 | dotnet new sln |
範本 “Solution File” 建立成功後,目錄下會產生一個 amqp-sample.sln 檔案。
接著我們要在這個解決方案目錄下產生兩個專案 Send 與 Receive 分別擺放資料的送出與接收程式。
1 | dotnet new console --name Send |
這會在解決方案目錄下產生兩個專案目錄 Send 與 Receive,首先我們來看看 Send 專案。切換到 Send 目錄測試一下專案。
1 | cd Send |
切換到 Send 目錄,將 Program.cs 改名為 Send.cs,使用 dotnet run 測試它,你應該會看到 Hello World!。
以同樣的方式測試一下 Receive 專案:
1 | cd ../Receive |
接著我們需要安裝 C# 的 RabbitMQ Client,在兩個專案中分別安裝 RabbitMQ 的客戶端依賴:
1 | cd Send |
現在我們已經設置好 .NET 專案了,可以開始編寫代碼。
Sending
我們將調用我們的消息發佈者 publisher (sender) Send.cs 和消息消費者 consumer (receiver) Receive.cs。 從消息發佈者開始。
發布者將連接到 RabbitMQ,發送單個消息,然後退出。
在 Send.cs 中,我們需要使用一些命名空間,也將 class 名稱修改為 Send,靜態方法 Main() 是整個專案程式碼的進入點,我們就從這裡開始:
1 | using System; |
首先我們創建一個 RabbitMQ 伺服器的連接製造工廠:
1 | class Send |
使用製造工廠創建一個連接到 RabbitMQ 伺服器的連接 (connection),接下來,創建一個頻道 (channel),這是完成任務的 API 所在的通道。
這裡使用的 using 與名稱空間 (namespace) 完全沒有關係。這裡的 using 語句可以確保在實現 IDisposable 介面的物件的引用超出作用域時,在該物件上自動調用 Dispose( ) 方法。using 語句的後面是一對小括號,其中是引用變數的聲明和實例化(connection,channel),該語句使變數的作用域限制在隨後的語句塊中,因此在超出此作用域時就會自動關閉連線 (connection)與頻道 (channel)。另外,在變數超出作用域時,即使出現異常,也會自動調用其 Dispose( ) 方法。這也可省去使用 try/finally 語句處理異常狀況。
這裡我們宣告變數也使用了一個關鍵字 var,這是一種類型推斷關鍵字,使用 var 關鍵字替代實際的類型。編譯器可以根據變數的初始化值推斷變數的類型,但需要遵循以下一些規則:
- 變數必須初始化。否則,編譯器就沒有推斷變數類型的依據。
- 初始化器不能為空。
- 初始化器必須放在表達式中。
- 不能把初始化器設置為一個物件,除非在初始化器中創建一個新物件。
聲明了變數且推斷出類型後,就不能再改變變數的類型了。變數的類型確定後,對該變數進行任何賦值時,其強類型化規則必須以推斷出的類型為基礎。
初始化 ConnectionFactory 建構式中,我們使用預設的 port 5672 與預設的使用者名稱與密碼 guest, 如果使用的不是預設值,則建構實例時得顯式的加上這些值:
1 | var factory = new ConnectionFactory() |
要發送訊息,我們必須聲明一個佇列(Queue)供我們發送,然後我們就可以向佇列發佈消息,以下是 Send.cs 程式碼:
1 | using System; |
QueueDeclare( ) 方法聲明一個 queue 佇列來儲存我們的消息,在這個簡單的範例中,我們會跳過交換器 (exchange) 直接將消息送到 queue 中,其實 RabbitMQ 伺服器會使用一個內建的交換器幫我們把消息直接送到 queue 中。此範例,我們可以省掉交換器的聲明與路由鍵對佇列的綁定 (Binding)。
QueueDeclare( ) 聲明佇列是可冪性的(idempotent),只有在它不存在的情況下才會創建它,否則就使用已存在的佇列。 durable 引數目前設為 false,表示如果 RabbitMQ 伺服器重新啟動或崩潰,這個佇列將不復存在。
消息內容是一個字節陣列 (byte array),因此您可以編碼您喜歡的任何內容。這裡則使用 Encoding.UTF8.GetBytes( ) 將字串序列化。
properties.Persistent 這裡設定為 true,這會指示 RabbitMQ 伺服器將佇列的訊息存入磁碟中,預設值是 false,這只會將佇列訊息存在記憶體中,如果 RabbitMQ 伺服器重新啟動或崩潰,這些訊息將會消失。在這個範例中雖然設定為 true,但它不會有作用,因為我們在佇列聲明時 durable 設定為 false,這會在伺服器重啟時將整個佇列刪除。
BasicPublish( ) 方法讓我們送出消息。引數 exchange 這裡設定為空字串,RabbitMQ 伺服器會使用內建的交換器幫我們送出消息,此時引數 routingKey 可直接設定為佇列名稱。接下來就可以測試送出消息了。
1 | dotnet run |
在我們尚未編寫消費者端時,可以前往 RabbitMQ 的 Management UI 查詢佇列的狀況。
Receiving
消費者,它會監聽來自 RabbitMQ 的消息。 因此,與發佈單個消息的發佈者不同,我們將使消費者保持持續運行以偵聽消息並將其顯示出來。
在 Receive.cs 中,我們需要使用一些命名空間,並將 class 名稱修改為 Receive:
1 | using System; |
開始的設置與發佈布者 (Send.cs)相同。我們打開一個連接(connection)和一個通道(channel),並聲明我們要消耗的佇列。請注意,這與 Send 發佈者的佇列要相匹配。
在這裡,我們也在此處聲明佇列。因為我們可能在發佈者之前啟動消費者,所以我們希望在嘗試使用消息之前確保佇列的存在。以下是 Receive.cs 的程式碼:
1 | using System; |
RabbitMQ 伺服器是以非同步的方式傳遞消息的,所以我們需要提供回呼函式(callback)處理器。這就是 EventingBasicConsumer.Received 事件處理程序所做的事情。
C# 的事件基於委託(Delegate),為委托提供了一種發佈/訂閱機制。consumer 實例訂閱了 EventingBasicConsumer 發射的事件,因此我們必須註冊一個處理函式(callback),在 C# 即是要註冊一個委托(Delegate),這裡我們使用 “+=” 註冊一個處理器(或稱監聽器)。
事件一般使用的方法有兩個參數: 其中第一個參數是一個物件,包含事件的發送者,第二個參數提供了事件相關的訊息。
我們透過 e.Body 取得發佈的消息,它是一個字節陣列 (byte array),我們使用 Encoding.UTF8.GetString() 作反敘列化,並顯示在終端機上。
BasicConsume() 中的引數 autoAck 設定為 true,在消費成功後會自動通知伺服器將此訊息從佇列中去除,如果沒有送回 Ack 訊息,消息將會保留在佇列中,有可能會被重覆消費。
現在將它們兜在一起,首先啟動消費者 Receive:
1 | cd Receive |
接著從發布者 Send 送出一個訊息。因消費者將透過 RabbitMQ 發佈者處獲得的消息,消費者將會持續運行,等待消息 (使用 Ctrl-C 停止消費),因此請嘗試從另一個終端視窗執行發佈者。
1 | cd Send |
持續運行中的消費者視窗將會即時顯示訊息。
1 | dotnet run |