0%

Oracle Advanced Queuing Using Node.js

是的! 你沒有看錯標題。Oracle AQ 也跨出了資料庫。在 node-oracledb 4.0 中引入了用於 AQ 的 Node-oracledb API。

Oracle AQ 的運作已有一段時間了,基本運作方式大家都很熟了,這裡直接看範例,讓 AQ 走出 Oracle 資料庫。

Create and Start Queue

1
2
3
4
5
6
/* For the data we want to queue */
CREATE OR REPLACE TYPE demo_msg_type AS OBJECT (
subject VARCHAR2(30),
text VARCHAR2(100)
)
/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- Create and start a queue
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB',
QUEUE_PAYLOAD_TYPE => 'DEMO.DEMO_MSG_TYPE');

DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE',
QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB');

DBMS_AQADM.START_QUEUE(
QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE',
ENQUEUE => TRUE);
END;
/

Enqueue

demo-enq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const oracledb = require('oracledb');
const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}
const queueName = "DEMO_MSG_QUEUE";

enqueue();

async function enqueue() {
let connection;

try {
connection = await oracledb.getConnection(dbConfig);

const queue = await connection.getQueue(queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const message = new queue.payloadTypeClass(
{
SUBJECT: "Node.js AQ 範例",
TEXT: `Hello Tainan, 台南! ${Date()}`
}
);
console.log(`Enqueuing: ${message}`);

await queue.enqOne(message);
await connection.commit();
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close();
} catch (err) {
console.error(err);
}
}
}
}

Dequeue

demo-deq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const oracledb = require('oracledb');
const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}
const queueName = "DEMO_MSG_QUEUE";

dequeue();

async function dequeue() {
let connection;

try {
connection = await oracledb.getConnection(dbConfig);

const queue = await connection.getQueue(queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const msg = await queue.deqOne();
if (msg) {
// do something here.
console.log('Dequeued: ' + msg.payload);
}

await connection.commit();
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close();
} catch (err) {
console.error(err);
}
}
}
}

現在可以傳送資料了。

$ node demo-enq
Enqueuing: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)'
}

接收資料:

$ node demo-deq
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)'
}

如果你先 run dequeue, 預設情況下,deqOne( ) 將等待直到接收到消息。

這裡的 dequeue 一次接收一筆資料,我們也可以註冊一個 notification。

Register notification

這裡的 Notification 就像 Apache Kafka 與 RabbitMQ 的 Consumer 一樣。註冊方式與使用 Oracle CQN 一樣,細節請參考 Oracle CQN Using Node.js。註冊時一樣要設定事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數。這觀念與使用 PL/SQL 是一樣的。

demo-notify.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const oracledb = require('oracledb');

const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com",
poolAlias: 'db99demo',
poolMin: 1,
poolMax: 2,
poolIncrement: 1,
queueMax: -1
}

startup();

async function startup() {
try {
await oracledb.createPool(dbConfig);

registerAndNotify();
}
catch (err) {
console.log(err);
}
}

async function registerAndNotify() {
const config = {
user: dbConfig.user,
password: dbConfig.password,
connectString: dbConfig.connectString,
events: true /* 與使用 CQN 一樣,必須為 true, 預設值 false */
}
const queueName = "DEMO_MSG_QUEUE";

const subscrOptions = {
namespace: oracledb.SUBSCR_NAMESPACE_AQ,
callback: notifyCallback, /* 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數 */
timeout: 600 /* 因是測試,設定 600 秒後會自動撤銷註冊 */
};

let connection;

try {
connection = await oracledb.getConnection(config);

await connection.subscribe(queueName, subscrOptions); /* 調用 subscribe() 註冊 */

console.log("AQ notification created...");
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close(); /* 註完冊後,就可以關閉這個 connection */
} catch (err) {
console.error(err);
}
}
}
}

async function notifyCallback(event) {
console.log("event type:", event.type);

if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) {
console.log("Deregistration has taken place..."); /* Subscription closed or timeout */
return;
}

try {
connection = await oracledb.getConnection(dbConfig.poolAlias); /* 使用先前創建的 connection pool */

const queue = await connection.getQueue(event.queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const msg = await queue.deqOne();
if (msg) {
// do something here.
console.log('Dequeued: ' + msg.payload);
}
await connection.commit(); /* 成功處理後,記得要 commit,才會從 Queue 移除 */
}
catch (err) { console.log(err) }
finally {
if (connection) {
try {
await connection.close();
}
catch (err) { console.log(err); }
}
}
}

process
.on('SIGTERM', () => {
console.log("\nTerminating");
oracledb.getPool(dbConfig.poolAlias).close();
process.exit(0);
})
.on('SIGINT', () => {
console.log("\nTerminating");
oracledb.getPool(dbConfig.poolAlias).close();
process.exit(0);
})
.on('unhandledRejection', err => {
console.log("\nUnhandledRejection exception");
oracledb.getPool(dbConfig.poolAlias).close();
console.log(err);
process.exit(1);
})
.on('uncaughtException', err => {
console.log("\nUncaught exception");
oracledb.getPool(dbConfig.poolAlias).close();
console.log(err);
process.exit(1);
});

現在啟動 notification,它會一直等待接收消息。

$ node demo-notify
AQ notification created...

Enqueue 一筆資料:

$ node demo-enq
Enqueuing: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}

事件處理程式馬上收到消息,然後繼續等待消息。

$ node demo-notify
AQ notification created...
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}

現在從 Oracle 資料庫使用 PLSQL Enqueue 一筆資料。

demo_msg_enqueue.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE OR REPLACE PROCEDURE demo_msg_enqueue( msg_in IN VARCHAR2 )
AS
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message DEMO.DEMO_MSG_TYPE;
BEGIN
message := DEMO.DEMO_MSG_TYPE('Node.js AQ 範例 from PLSQL', msg_in || ' ' || TO_CHAR(sysdate, 'yyyy-mm-dd hh24:mi:ss'));

DBMS_AQ.ENQUEUE(
queue_name => 'DEMO.DEMO_MSG_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
END;
/
demo_msg_enqueue.sql
1
2
3
4
5
6
7
SQL> exec demo_msg_enqueue('Hello Tainan, 台南!');

PL/SQL procedure successfully completed.

SQL> commit;

Commit complete.
$ node demo-notify
AQ notification created...
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例 from PLSQL',
TEXT: 'Hello Tainan, 台南! 2020-12-29 10:46:52'
}

現在你可以即時的將資料庫外的資料用 Node.js enqueue,然後用 PLSQL dequeue 傳入 Oracle 資料庫。相反的,可以用 PLSQL enqueue,然後使用 Node.js dequeue 即時的讓資料庫內的資料傳出資料庫。

Oracle 努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,使用傳統的 UTL_HTTP 在組合 JSON 格式的資料,實在是一種夢靨。現在你有了另一種選擇了。

除了 Apache Kafka、RabbitMQ 之外,你又多了一些選擇可以讓 Oracle 資料庫中的物件 (Things) 即時 (Read-Time) 的走出 Oracle 資料庫。你一定得試試看!