0%

Oracle CQN Using Node.js

WoT (Web of Things) 是 IoT (Internet of Things 物聯網) 的 Application 層,就是使用 Web 技術來打造 application。也就是說,IoT + Web-enabled technologies 就是 WoT。對 WoT 來說,最重要的觀念,就是以 URL 來表示 IoT 裝置;為 IoT 加入 URL 的觀念,就是 Google 提出的 Physical Web 計畫。

未來每個物件 (Objects) 全部都會和 Web 結合, 而首要條件就是所謂的 Things 必須是可以和 Web 溝通的裝置, 這些智慧裝置之間也能透過 Web 標準來互相溝通。WoT 重點在於使用 Web 的軟體標準和框架, 例如 REST, HTTP 和 URI 來建立應用程式以及服務, 並和其他裝置做溝通, 簡單來說就是日常生活所有的東西都可以存取 Web, 就是 WoT 的精隨。

除了能夠相互溝通,時間性當然也很重要,Real-time 數據越來越受歡迎。基本思路很簡單:
當物件發生變化時,任何查看物件數據的客戶端都應立即能看得到異動。最初,在 Web 應用程式中模擬 real-time 數據的唯一方法是從瀏覽器每隔一段固定的時間不斷循環的從 Web 服務器抓取資料 (Polling)。 這通常很複雜,難以擴大應用。WebSocket,改變了這種狀況,這是一種以高效率的方式在瀏覽器和 Web 服務器之間提供雙向通信的協議。

Oracle Database 的 UTL_HTTP 是傳統上與外界 Web 溝通的方式,後來加入了 ORDS,努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,但這仍無法主動的通知資料庫內資料的異動,你可以使用 Trigger 加上 AQ,但這些微的資料庫資料變化仍很難跨出資料庫,Continuous Query Notification (CQN) 則改變了這個狀況。

之前介紹了 Oracle Continuous Query Notification (CQN),使用 PL/SQL API 為事件處理程式 (Handler)。Oracle 在 Node.js 的程式庫 node-oracledb 則使其功能更強大,它讓 CQN 跨出了 Oracle 資料庫。

基本觀念及使用與 PL/SQL API 一樣。

/* Notification Handler 通常稱 為 callback */
function demoCallback(event) {
console.log(event);
}

/* 註冊 Query 與 callback 函數。註冊後,Node.js 程式會一直等待資料庫發出的事件通知 */
connection.subscribe('demosub', {
sql: "SELECT sal, comm FROM emp WHERE deptno = 10",
callback: demoCallback,
qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
});

相較於使用 Kafka 或 RabbitMQ,node-oracledb 的 CQN API,不需另外架構 Server,只需在一台不會關機的機器上啟動你的 Node.js Client 程式,就像 Kafka 或 RabbitMQ Consumer 一樣。

另外當然需要 JavaScript 技術,JavaScript 在這股 WoT 的潮流中佔有非常重要的地位,強烈建議,除了 PL/SQL,JavaScript 是你現在必須要熟悉的技術。

範例

程式碼看起來有點長,因為要示範的關係,將所有程式碼放在一個資料檔上,你可以將它模組化。

首先你需要建立一個 Node.js 專案資料夾,然後安裝 oracle node-oracledb

demo-cqn.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
const oracledb = require('oracledb');

const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}

const oradb = database(); /* 這可以模組化,用來創建個 Oracle connection pool, 與執行 statement 的 API */

startup(); /* 啟動後 Node.js process 就會一直等待事件的通知,直到 Timeout (如果有設定) */

async function startup() {
await oradb.initialize(); /* Create Oracle Connection Pool */
await registerQuery(); /* 註冊 CQN Query */
}

async function registerQuery() {
const config = {
...dbConfig,
events: true /* 使用 CQN subscribe events 模式必須為 true, 預設值 false */
}

const options = {
sql: 'SELECT sal, comm FROM emp where deptno = 10', /* 要註冊的 Query */
callback: demoCallback, /* 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數 */
timeout: 600, /* 因是測試,設定 600 秒後會自動撤銷註冊 */
qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS /* 註冊為 QRCN,加上 QOS_ROWIDS,事件物件才會包含 rowid */
};

let connection;

try {
connection = await oracledb.getConnection(config); /* 註冊時的 Connection 設定不一樣,使用一般的 Connection */

await connection.subscribe('demosub', options); /* 調用 subscribe() 註冊 */

console.log("Subscription created...");
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close(); /* 註完冊後,就可以關閉這個 connection,資料庫會創建一個 OCI API 通道用來回呼你的 Callback 函數 */
} catch (err) {
console.error(err);
}
}
}
}

// 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數
async function demoCallback(event) {
const statement = 'select * from emp where rowid = :rid'; /* 事件物件(Event Object) 只會回傳受影響資料行的 rowid,這裡將使用此 rowid 實際到資料庫讀取資料 */

console.log("event type:", event.type);

if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) {
console.log("Deregistration has taken place..."); /* Subscription closed or timeout */
return;
}

// 解析事件物件,讀取所需的資料。
console.log("event database name:", event.dbName);
console.log("event transaction id:", event.txId);

const queries = event.queries || [{ tables: event.tables }]; /* OCN 與 QRCN 回傳的事件物件屬性不太一樣 */

for (const query of queries) {
const tables = query.tables;

for (const table of tables) {
console.log(`--> Table Name: ${table.name}`);
console.log(`--> Table Operation: ${operationName(table.operation)} Rows=${table.rows.length}`);

if (table.rows) {
for (const row of table.rows) {
console.log(`--> --> Row Rowid: ${row.rowid}`); /* 受影響的單筆資料 */
console.log(`--> --> Row Operation: ${operationName(row.operation)}`);

// 用 rowid 到資料庫讀取詳細的資料,這理應是你的應用程式感興趣的地方。
// 這裡的 connection 使用的是先前創建的 connection pool。這裡千萬要避免使用一般的 connection,
// 如果同時收到大量的資料,產生大量的 connections,資料庫有可能就掛掉了。
// 使用 connection pool 效能較佳,又可限制 connection 數量。
try {
const { rows } = await oradb.doExecute(statement, [row.rowid]);
console.log(rows);
} catch (err) {
console.log(err);
}
console.log(Array(61).join("-"));
}
}
console.log(Array(61).join("="));
}
}
}

// 非必要,只為好看些
function operationName(operation) {
let operationName;

switch (operation) {
case oracledb.CQN_OPCODE_INSERT:
operatonName = 'Records Inserted';
break;
case oracledb.CQN_OPCODE_UPDATE:
operatonName = 'Records Updated';
break;
case oracledb.CQN_OPCODE_DELETE:
operatonName = 'Records Deteled';
break;
case oracledb.CQN_OPCODE_ALTER:
operatonName = 'Table Altered';
break;
case oracledb.CQN_OPCODE_DROP:
operatonName = 'Table Dropped';
break;
default:
operatonName = 'Unknown Operation';
}
return operatonName;
}

// 抽象化資料庫層
function database() {
const config = {
...dbConfig,
poolAlias: 'db99demo',
poolMin: 1,
poolMax: 2,
poolIncrement: 1,
queueMax: -1
};

async function initialize() {
try {
await oracledb.createPool(config); /* 創建 Connection Pool */
}
catch (err) {
console.log(err);
}
}

async function close() {
try {
oracledb.getPool(config.poolAlias).close();
}
catch (err) {
console.log(err);
}
}

function doExecute(statement, binds = [], opts = {}) {
return new Promise(
async (resolve, reject) => {
let connection;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;

try {
connection = await oracledb.getConnection(config.poolAlias); /* 使用 Connection Pool */
const result = await connection.execute(statement, binds, opts);
resolve(result);
}
catch (err) { reject({ error: err }); }
finally {
if (connection) {
try {
await connection.close();
}
catch (err) { console.log(err); }
}
}
}
);
}

return {
initialize,
close,
doExecute
};
}

process
.on('SIGTERM', () => {
console.log("\nTerminating");
oradb.close();
process.exit(0);
})
.on('SIGINT', () => {
console.log("\nTerminating");
oradb.close();
process.exit(0);
})
.on('uncaughtException', err => {
console.log("\nUncaught exception");
oradb.close();
console.log(err);
process.exit(1);
});

啟動

$ node demo-cqn
Subscription created...

啟動並註冊後,Node.js process 會等待通知事件,並呼叫 Callback 函數。

SQL> select regid, callback, table_name, timeout from user_change_notification_regs;

REGID CALLBACK TABLE_NAME TIMEOUT
------- ------------------------------------------------- ---------- ----------
1536 net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30) DEMO.EMP 200
(PORT=49919))?PR=0

資料庫會用 OCI API net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30)(PORT=49919))?PR=0 回呼你的 Node.js callback 函數。所以你 Client 端程式的機器及 port 要保持暢通。TIMEOUT 顯示這個註冊的查詢還剩多少時間就會 timeout。如果註冊時沒有設定 timeout,這裡的值會是非常大的數字 4294967295。那你可能必須手動使用 DBMS_CQ_NOTIFICATION.DEREGISTER 註銷註冊的 Query。

SQL> select * from user_cq_notification_queries;

QUERYID QUERYTEXT REGID
-------- ------------------------------------------------------------ -------
71 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1536
EMO.EMP.DEPTNO = 10
SQL> update emp set comm = comm + 1 where empno = 7934;

1 row updated.

SQL> commit;

Commit complete.
$ node demo-cqn
Subscription created...
event type: 7
event database name: lx26
event transaction id: <Buffer 06 00 0d 00 f4 a3 0a 00>
--> Table Name: DEMO.EMP
--> Table Operation: Records Updated Rows=1
--> --> Row Rowid: AACzQEAAEAAACAXAAN
--> --> Row Operation: Records Updated
[
{
EMPNO: 7934,
ENAME: '楊喆',
JOB: 'CLERK',
MGR: 7902,
HIREDATE: 1982-01-22T16:00:00.000Z,
SAL: 1500,
COMM: 13,
DEPTNO: 10
}
]
------------------------------------------------------------
============================================================

在 WoT 與 Real-time 的潮流中,Kafka 受到了非常多得關注,RabbitMQ 也有銀行界的加持,有幸為公司導入了這兩個架構,今又有 Oracle CQN 可以選擇,如何應用,應該看應用的範圍與規模,Kafka 與 RabbitMQ 可以是資訊策略的一環,CQN 則適合單打獨鬥,它不需要另外架構 Server,但他只適合用於較小量的資料異動,千萬不要用於大量資料的處理;Kafka 則彈性的多,它可處理一天多達 Petabyte 的資料。RabbitMQ 則有較佳的 GUI 管理介面。不管如何,都可以讓你有些新思維。

Go Real-time!