0%

Apache Kafka 與 Electron-RabbitMQ Notification

上回用 Electron 與 RabbitMQ 寫了可以釘選在 Windonws Desktop 托盤上的應用程式 桌面應用程式 Electron-RabbitMQ Notification

有各種管道可以發送訊息,現在就來看看如何在 Apache Kafka 消費成功或失敗時發送即時訊息到這個 Notification 應用程式。

這是一個 Apache Kafka 消費者,這裡用 Node.js,所以可以用 Node.js 的 RabbitMQ 的程式庫 amqplib,但只要發送訊息到 RabbitMQ,可以直接使用 GraphQL API 會比較單純。

這裡會送到 RabbitMQ 的 Exchange electron.notification.admin,Routing Key 則是 dba。你可想成,這裡要將訊息發送給 admin 群組下的子群組 dba。 下面的 Queue 則綁定了這個 Exchange 與 Routing Key。

kafka2rabbit-api-notification.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
const kafka = require('kafka-node');
const fetch = require('node-fetch');

const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: '10.11.xx.xxx:9093'});
const offset = new kafka.Offset(client);

const topic = 'testing';
const partition = 0;

offset.fetchLatestOffsets([topic], function (err, offsets) {
if (err) return err;

let lastestOffset = offsets[topic][partition];
console.log(`lastest Offset: ${lastestOffset}`);

consumer = new Consumer(
client,
[
{
topic: topic,
partition: partition,
offset: lastestOffset - 1
}
],
{
//groupId: "demo-employees-group-1",
fromOffset: true,
autoCommit: true
}
);

consumer.on('message', function (message) {
console.log(message);

let data = {
title: 'Kafka Consumer',
content: `Kafka Topic: ${message.topic}`,
description: [
`Offset: ${message.offset}`,
`highWaterOffset: ${message.highWaterOffset}`,
`timestamp: ${message.timestamp}`
],
created: (new Date()).toString(),
from: 'Apache Consumer',
link: {
text: 'Go Kafka',
url: 'http://10.11.xx.xxx:3030'
},
};

sendToExchange(data);
});

consumer.on('error', function(err) {
console.log(err);

let data = {
title: 'Kafka Consumer Error',
content: `Kafka Topic: ${err.message}`,
created: (new Date()).toString(),
from: 'Apache Consumer',
type: 'urgent',
link: {
text: 'Go Kafka',
url: 'http://10.11.xx.xxx:3030'
},
};

sendToExchange(data);
});

consumer.on('offsetOutOfRange', function(err) {
console.log(err);
});
});

function sendToExchange(value) {
const url = "http://10.11.xx.xxx:x000/v1/graphql";
const exchange = "electron.notification.admin";
const routingKey = "dba";
const query = `mutation sendToExchange(
$exchange: String!,
$routingKey: String,
$message: String!) {
sendToExchange(exchange: $exchange, routingKey: $routingKey, message: $message)
}
`;

let message = JSON.stringify({ ...value, created: (new Date()).toString() });

fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Accept": "application/json"
},
body: JSON.stringify({
query: query,
variables: { exchange, routingKey, message }
})
})
.then(r => r.json())
.then(data => {
console.log(`data returned: ${JSON.stringify(data)}`);
})
.catch(err => console.log(err.message));
}

  • 第 11 行 fetchLatestOffsets 方法可以抓取到這個 Topic 的最後 Offset,我們可以從這個 Offset 開始消費,這會跟第 28 行的 fromOffset 設定為 false 是一樣的,會從新進來的消息開始消費。

  • 第 23 行的 offset 現在設為 lastestOffset - 1,所以會從上次已消費的最後一筆開始,可視你的需求調整。

  • 第 28 行 fromOffset 因為我們有提供 offset,所以這裡要設為 true。

  • 第 52、70 行當 Apache Kafka 消費成功或失敗時發送訊息。

  • 第 78 行是發送訊息到 RabbitMQ API 的函式 sendToExchange。

  • 第 80 行發送的 RabbitMQ Exchange electron.notification.admin

  • 第 81 行定義 Routing Key dba

這裡的 Apache Kafka Topic testing 每小時會從 Oracle 資料庫發送訊息到這個 Apache Kafka Topic,消費完成後再轉發通知到 RabbitMQ 的 Exchange electron.notification.admin

看起來我們好像繞了一圈,你如要用微服務架構 (Microservices),這就是不可少的架構與技術。