0%

RabbitMQ with C#

RabbitMQ AMQP

RabbitMQ AMQP 是最流行的開源和跨平台消息代理伺服器。

RabbitMQ 也是一種在應用程式之間交換數據的方式,可在不同平台之間交換訊息。例如 .Net 應用程式發送的消息可以由 Node.js 應用程式或 java 應用程式讀取。

RabbitMQ 被設計為通用消息代理,採用點對點,請求/回覆和 pub-sub 通信模式的多種變體。它使用智能代理/啞消費者模型,專注於向消費者提供一致的消息傳遞,消費者的消費速度與經紀人跟踪消費者狀態的速度大致相似。它是成熟的,在正確配置時表現良好。並且有許多可用的插件可以將它擴展到更多的用例和場景。當應用程式需要訪問流 (Stream)歷史時,RabbitMQ 通常與 Apache Cassandra 一起使用,對於需要 “無限” 佇列(queue)的應用程式,RabbitMQ 通常與 LevelDB 插件一起使用,但這兩個功能都不附帶在 RabbitMQ 本身。

AMQP

一般來說,訊息佇列會被應用在所有無法接受訊息漏失的環境,也就是極為重要的應用程式,例如銀行業務或財務系統。這通常表示,典型的企業級訊息佇列其實是相當複雜的軟體設計,會使用堅固的協定及可靠的存儲,來確保即便有任何故障都能夠完成訊息的交換。因此企業級的訊息中介軟體多年來一直由 Oracle 與 IBM 這類軟體巨人所獨佔,而它們多半是實做了專有的通訊協定,嚴厲的綁住了客戶的選擇。所幸這幾年來,訊息系統已突破這道障礙而蓬勃發展,這歸功於 AMQP、STOMP 與 MQTT 這類的開放協定。為了瞭解訊息佇列系統的運作方式,以下我們會針對 AMQP 進行概述,藉此初步瞭解如何使用基於這類協定的 API。

AMQP(Advanced Message Queuing Protocol)是一項開放標準的協定,許多訊息佇列系統都支援了這項協定。事實上它除了是通訊協定外,還是一個涉及路由、過濾、佇列、可靠性與安全性的模型。

在 AMQP 裡,共有三個基礎元件:

  • 佇列(Queue): 負責儲存訊息的資料架構,裡頭的訊息將由客戶端消費。佇列訊息會被推送(或拉取)至一或多個消費者,也就是我們的應用程式。若有多個消費者連接至相同的佇列,則訊息會是負載平衡的。

佇列可以是以下幾種類型:

  • 可延續性(Durable): 意即若中介者重新啟動,則佇列也會自動建立。可延續性佇列並不表示先前的內容一定會被保留,只有被標示為需要保存的訊息,才會存入磁碟,並於重啟時復原。
  • 專用性(Exclusive): 意即佇列綁定於特定的訂閱者。若彼此的連線關閉,則佇列就會被銷毀。
  • 自動刪除(Auto-delete): 當沒有任何訂閱者連線時,便刪除佇列
  • 交換(Exchange): 訊息發佈之處。依據所實作的演算法,將訊息輸送到一個或多個佇列。
  • 直接交換(Direct exchange): 比對整個路由鍵(例如 chat.msg)是否相符來輸送訊息。
  • 主題交換(Topic exchange): 使用萬用模式比對路由鍵(例如 chat.# 便吻合所有以 chat 為開頭的路由鍵)。
  • 擴散交換(Fanout exchange): 忽略任何的路由鍵,廣播訊息至所有連線的佇列。
  • 綁定(Binding): 交換元件與佇列之間的連結。這裡也定義了路由鍵以過濾來自交換元件的訊息。

以上這些元件是由一個中介者 (broker) 進行管理,它會揭露一個 API,用於相關的建置及處理。當連線到中介者時,客戶端會建立一個抽象化的通道 (channel),用於維護與中介者之間的通訊狀態。而在 AMQP 裡,「專用性」或「自動刪除」以外的佇列都可用於實作可延續性訂閱者。

RabbitMQ with C#

這裡我們會使用 C# 來實作最簡單的發送與接收訊息。我們將使用 Visual Studio Code.Net Code SDK。安裝完成後,開啟 VSCode,打開一個終端視窗或命令提示字元輸入:

1
dotnet --version

如果一卻正常會返回 .NET Code 的版本訊息,那我們就可以開始了。

解決方案與專案設定

我們先在電腦裡面用一個空的資料夾建立 amqp-sample 的 .NET 解決方案。然後使用 VSCode 打開解決方案資料夾 amqp-sample,開啟一個終端視窗,我們需要用 dotnet 初始化解決方案:

1
dotnet new sln

範本 “Solution File” 建立成功後,目錄下會產生一個 amqp-sample.sln 檔案。

接著我們要在這個解決方案目錄下產生兩個專案 Send 與 Receive 分別擺放資料的送出與接收程式。

1
2
3
dotnet new console --name Send

dotnet new console --name Receive

這會在解決方案目錄下產生兩個專案目錄 Send 與 Receive,首先我們來看看 Send 專案。切換到 Send 目錄測試一下專案。

1
2
3
cd Send
move Program.cs Send.cs
dotnet run

切換到 Send 目錄,將 Program.cs 改名為 Send.cs,使用 dotnet run 測試它,你應該會看到 Hello World!。

以同樣的方式測試一下 Receive 專案:

1
2
3
cd ../Receive
move Program.cs Receive.cs
dotnet run

接著我們需要安裝 C# 的 RabbitMQ Client,在兩個專案中分別安裝 RabbitMQ 的客戶端依賴:

1
2
3
4
cd Send
dotnet add package RabbitMQ.Client
cd ../Receive
dotnet add package RabbitMQ.Client

現在我們已經設置好 .NET 專案了,可以開始編寫代碼。

Sending

我們將調用我們的消息發佈者 publisher (sender) Send.cs 和消息消費者 consumer (receiver) Receive.cs。 從消息發佈者開始。

發布者將連接到 RabbitMQ,發送單個消息,然後退出。

在 Send.cs 中,我們需要使用一些命名空間,也將 class 名稱修改為 Send,靜態方法 Main() 是整個專案程式碼的進入點,我們就從這裡開始:

Send.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
using System;
using RabbitMQ.Client;
using System.Text;

namespace Send
{
class Send
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
}
}
}

首先我們創建一個 RabbitMQ 伺服器的連接製造工廠:

Send.cs
1
2
3
4
5
6
7
8
9
10
11
12
class Send
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "10.11.xxx.xxx" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
...
}
}
}

使用製造工廠創建一個連接到 RabbitMQ 伺服器的連接 (connection),接下來,創建一個頻道 (channel),這是完成任務的 API 所在的通道。

這裡使用的 using 與名稱空間 (namespace) 完全沒有關係。這裡的 using 語句可以確保在實現 IDisposable 介面的物件的引用超出作用域時,在該物件上自動調用 Dispose( ) 方法。using 語句的後面是一對小括號,其中是引用變數的聲明和實例化(connection,channel),該語句使變數的作用域限制在隨後的語句塊中,因此在超出此作用域時就會自動關閉連線 (connection)與頻道 (channel)。另外,在變數超出作用域時,即使出現異常,也會自動調用其 Dispose( ) 方法。這也可省去使用 try/finally 語句處理異常狀況。

這裡我們宣告變數也使用了一個關鍵字 var,這是一種類型推斷關鍵字,使用 var 關鍵字替代實際的類型。編譯器可以根據變數的初始化值推斷變數的類型,但需要遵循以下一些規則:

  • 變數必須初始化。否則,編譯器就沒有推斷變數類型的依據。
  • 初始化器不能為空。
  • 初始化器必須放在表達式中。
  • 不能把初始化器設置為一個物件,除非在初始化器中創建一個新物件。

聲明了變數且推斷出類型後,就不能再改變變數的類型了。變數的類型確定後,對該變數進行任何賦值時,其強類型化規則必須以推斷出的類型為基礎。

初始化 ConnectionFactory 建構式中,我們使用預設的 port 5672 與預設的使用者名稱與密碼 guest, 如果使用的不是預設值,則建構實例時得顯式的加上這些值:

Send.cs
1
2
3
4
5
6
7
var factory = new ConnectionFactory()
{
HostName = "10.11.xx.xxx",
Port = 5672,
UserName = "yourname",
Password = "yourpassword"
};

要發送訊息,我們必須聲明一個佇列(Queue)供我們發送,然後我們就可以向佇列發佈消息,以下是 Send.cs 程式碼:

Send.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
using System;
using RabbitMQ.Client;
using System.Text;

namespace Send
{
class Send
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "10.11.xx.xxx" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "dotnetDemoQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);

string message = $"哈囉 from C# send 消息 {DateTime.Now.ToString()}";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();

properties.Persistent = true;

channel.BasicPublish(
exchange: "",
routingKey: "dotnetDemoQueue",
basicProperties: properties,
body: body
);
Console.WriteLine($" [x] Send {message}");
}
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
}
}

QueueDeclare( ) 方法聲明一個 queue 佇列來儲存我們的消息,在這個簡單的範例中,我們會跳過交換器 (exchange) 直接將消息送到 queue 中,其實 RabbitMQ 伺服器會使用一個內建的交換器幫我們把消息直接送到 queue 中。此範例,我們可以省掉交換器的聲明與路由鍵對佇列的綁定 (Binding)。

QueueDeclare( ) 聲明佇列是可冪性的(idempotent),只有在它不存在的情況下才會創建它,否則就使用已存在的佇列。 durable 引數目前設為 false,表示如果 RabbitMQ 伺服器重新啟動或崩潰,這個佇列將不復存在。

消息內容是一個字節陣列 (byte array),因此您可以編碼您喜歡的任何內容。這裡則使用 Encoding.UTF8.GetBytes( ) 將字串序列化。

properties.Persistent 這裡設定為 true,這會指示 RabbitMQ 伺服器將佇列的訊息存入磁碟中,預設值是 false,這只會將佇列訊息存在記憶體中,如果 RabbitMQ 伺服器重新啟動或崩潰,這些訊息將會消失。在這個範例中雖然設定為 true,但它不會有作用,因為我們在佇列聲明時 durable 設定為 false,這會在伺服器重啟時將整個佇列刪除。

BasicPublish( ) 方法讓我們送出消息。引數 exchange 這裡設定為空字串,RabbitMQ 伺服器會使用內建的交換器幫我們送出消息,此時引數 routingKey 可直接設定為佇列名稱。接下來就可以測試送出消息了。

1
2
3
dotnet run
[x] Send 哈囉 from C# send 消息 2019/7/11 上午 08:57:47
Press [enter] to exit.

在我們尚未編寫消費者端時,可以前往 RabbitMQ 的 Management UI 查詢佇列的狀況。

Receiving

消費者,它會監聽來自 RabbitMQ 的消息。 因此,與發佈單個消息的發佈者不同,我們將使消費者保持持續運行以偵聽消息並將其顯示出來。

在 Receive.cs 中,我們需要使用一些命名空間,並將 class 名稱修改為 Receive:

Receive.cs
1
2
3
4
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

開始的設置與發佈布者 (Send.cs)相同。我們打開一個連接(connection)和一個通道(channel),並聲明我們要消耗的佇列。請注意,這與 Send 發佈者的佇列要相匹配。

在這裡,我們也在此處聲明佇列。因為我們可能在發佈者之前啟動消費者,所以我們希望在嘗試使用消息之前確保佇列的存在。以下是 Receive.cs 的程式碼:

Receive.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Receive
{
class Receive
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "10.11.xx.xxx" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "dotnetDemoQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, e) =>
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] Received {message}");
};

channel.BasicConsume(
queue: "dotnetDemoQueue",
autoAck: true,
consumer: consumer
);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

RabbitMQ 伺服器是以非同步的方式傳遞消息的,所以我們需要提供回呼函式(callback)處理器。這就是 EventingBasicConsumer.Received 事件處理程序所做的事情。

C# 的事件基於委託(Delegate),為委托提供了一種發佈/訂閱機制。consumer 實例訂閱了 EventingBasicConsumer 發射的事件,因此我們必須註冊一個處理函式(callback),在 C# 即是要註冊一個委托(Delegate),這裡我們使用 “+=” 註冊一個處理器(或稱監聽器)。

事件一般使用的方法有兩個參數: 其中第一個參數是一個物件,包含事件的發送者,第二個參數提供了事件相關的訊息。

我們透過 e.Body 取得發佈的消息,它是一個字節陣列 (byte array),我們使用 Encoding.UTF8.GetString() 作反敘列化,並顯示在終端機上。

BasicConsume() 中的引數 autoAck 設定為 true,在消費成功後會自動通知伺服器將此訊息從佇列中去除,如果沒有送回 Ack 訊息,消息將會保留在佇列中,有可能會被重覆消費。

現在將它們兜在一起,首先啟動消費者 Receive:

1
2
cd Receive
dotnet run

接著從發布者 Send 送出一個訊息。因消費者將透過 RabbitMQ 發佈者處獲得的消息,消費者將會持續運行,等待消息 (使用 Ctrl-C 停止消費),因此請嘗試從另一個終端視窗執行發佈者。

1
2
cd Send
dotnet run

持續運行中的消費者視窗將會即時顯示訊息。

1
2
3
dotnet run
Press [enter] to exit.
[x] Received 哈囉 from C# send 消息 2019/7/11 上午 11:56:27