0%

在 Node.js 模塊系統的基本屬性 require( )是同步工作的,且 module.exports 不能設置為非同步。在 Node.js 核心模塊和許多 npm 軟件包中有些存在有同步 API 的主要原因之一,是在提供一種方便的替代方法,而這個替代方法主要用於初始化任務,而不是要取代原有的非同步 API 。

不幸的是,這並不總是可能的。 同步 API 可能並不總是可取得,尤其是對於在使用網絡組件的初始化階段,例如,執行握手協議(handshake protoclos)或檢索配置參數(configuration parameters)。 許多中間件系統的資料庫驅動程序和客戶端(例如消息隊列 message queues)也都有初始化階段的情況。

經典解決方案

讓我們舉個例子:一個名為 db 的模塊,該模塊連接到遠程資料庫。 只有在完成與資料庫伺服器的連接和握手之後,db 模塊才能接受請求。 在這種情況下,我們通常有兩個選擇:

  • 在開始使用模塊之前,確保已對其進行了初始化,否則需等待其初始化。 每當我們要在非同步模塊上調用操作時,都必須完成此過程:
1
2
3
4
5
6
7
8
9
10
11
12
13
const db = require('aDb');   //非同步模塊

module.exports = function findAll(type, callback) {
if (db.connected) { //is it initialized?
runFind();
} else {
db.once('connected', runFind);
}

function runFind() {
db.findAll(type, callback);
}
}
  • 使用 Dependency Injection(DI)而不是直接使用非同步模塊。這樣做,我們可以延遲某些模塊的初始化,直到它們的非同步依賴關係被完全初始化為止。 這種技術將管理模塊初始化的複雜性轉移到了另一個組件(通常是父組件)上。 在以下示例中,此組件是 app.js:
app.js
1
2
3
4
5
6
const db = require('aDB');  //非同步模塊
const findAllFactory = require('./findAll');

db.on('connected', function() {
const findAll = findAllFactory(db);
});
findAll.js
1
2
3
4
5
6
module.exports = function(db) {
//db is guaranteed to be initialized
return function findAll(type, callback) {
db.findAll(type, callback)
};
}

第一種選擇將涉及大量的代碼數量,可能不會是好的選擇。另外,使用 DI 的第二個選項有時也不可取,在大型專案中,它可能很快變得過於復雜,尤其是如果手動完成並使用非同步初始化的模塊。

但是,我們將看到,還有第三種選擇,它使我們能夠輕鬆地將模塊與其依賴項的初始化狀態隔離開來。

預初始化隊列(Preinitialization queues)

使模塊與依賴項的初始化狀態脫鉤的簡單模式包括使用隊列(queues)和命令模式(Command pattern)。 這個想法是保存尚未初始化的模塊所接收的所有操作(operations),然後在完成所有初始化步驟後立即執行它們。

實現非同步初始化的模塊

為了展示這種簡單而有效的技術,讓我們構建一個小型測試應用程序; 沒什麼花哨的,只是可以驗證我們假設的東西,讓我們從創建一個非同步初始化的模塊 asyncModule.js 開始;稍後,我們將會將它實際用在 Oracle 資料庫上。

asyncModule.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const asyncModule = module.exports;

asyncModule.initialized = false;

asyncModule.initialize = (callback) => {
setTimeout(function () {
asyncModule.initialized = true;
callback();
}, 10000);
};

asyncModule.tellMeSomething = (callback) => {
process.nextTick(() => {
if(!asyncModule.initialized) {
return callback(new Error("I don't have anythins to say right now"));
}
callback(null, `Current time is: ${new Date()}`);
});
};

在前面的代碼中,asynModule 嘗試展示非同步初始化的模塊的工作方式。它公開了 initialize( )方法,該方法在延遲 10 秒後將初始化變數設置為 true 並通知其 callback(對於實際情況來說 10 秒是很長的時間) 應用程序,但是對我們來說,可以突顯出任何的 race conditions)。另一個方法 tellMeSomething( ) 返回當前時間,但是如果模塊尚未初始化,則會產生錯誤。

下一步是根據我們剛剛創建的服務創建另一個模塊。 讓我們考慮一下在一個名為 routes.js 的文件中實現的簡單 HTTP 請求處理程序:

routes.js
1
2
3
4
5
6
7
8
9
10
11
12
const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if (err) {
res.writeHead(500);
return res.end(`Error: ${err.message}`);
}
res.writeHead(200);
res.end(`I say: ${something}`);
});
};

處理程序調用 asyncModule的tellMeSomething( )方法,然後將結果寫入 HTTP 回應中。 如我們所看到的,我們沒有對 asyncModule 的初始化狀態執行任何檢查,可以想像,這可能將導致問題。

現在,讓我們創建一個非常基本的 HTTP 服務器,僅使用核心 http 模塊。

app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule');

asyncModule.initialize(() => {
console.log('Async module initialized.');
});

http.createServer((req, res) => {
if(req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));

前面的小模塊是我們應用程序的入口,它所做的只是觸發 asyncModule 的初始化,並創建一個 HTTP 服務器,該服務器利用我們先前創建的 routes.say( ) 的請求處理程序。

現在,我們可以像往常一樣通過執行 app.js 模塊來嘗試啟動服務器。

$ node app.js

服務器啟動後,我們可以嘗試使用瀏覽器訪問 URL http://localhost:8000/say ,並查看 asyncModule 的返回結果。

Error: I don’t have anythins to say right now

這意味著 asyncModule 尚未初始化,但我們仍然嘗試使用它。根據非同步初始化的 Module 的實現細節,我們可能會收到一個錯誤,丟失重要信息甚至崩潰整個應用程序。

通常,必須完全避免剛剛描述的情況。在大多數情況下,初始化可能非常快速,不必擔心幾個失敗的請求,甚至在實踐中它可能永遠不會發生。但是,對於旨在自動擴展的高負載應用程序和雲服務器,這兩種不太可能發生錯誤的假設都必須完全避免。

用預初始化隊列包裝模塊

為了增加伺務器的穩健性,我們現在將重構它。我們將對尚未初始化而在 asyncModule 上調用的所有操作進行隊列化(queue),直到準備好處理它們時再將隊列刷新。 這看起來像是 State 模式的絕佳應用! 我們將需要兩種狀態,

  • 一種狀態是在模塊尚未初始化時將所有操作送入隊列。
  • 另一種狀態是在初始化完成後將隊列中的每個操作直接執行原始 asyncModule 模塊中的操作。

通常,我們沒有機會修改非同步模塊的代碼。 因此,要添加我們的隊列層,我們將需要圍繞原始 asyncModule 模塊創建一個代理 Proxy。

讓我們創建一個名為 asyncModuleWrapper.js 的新檔案,然後開始逐段構建它。 我們需要做的第一件事是創建將操作委託給活動狀態的物件

1
2
3
4
5
6
7
8
9
10
11
12
const asyncModule = require('./asyncModule');

const asyncModuleWrapper = module.exports;

asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function () {
activeState.initialize.apply(activeState, arguments);
};

asyncModuleWrapper.tellMeSomething = function () {
activeState.tellMeSomething.apply(activeState, arguments);
};

在前面的代碼中,asyncModuleWrapper 簡單地將其每個方法委託給當前活動狀態(currently active state)。 從 notInitializedState 開始,讓我們看看這兩種狀態的樣子:

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
let pending = [];
let notInitializedState = {
initialize: function (callback) {
asyncModule.initialize(() => {
asyncModuleWrapper.initalized = true;
activeState = initializedState;

pending.forEach(req => {
asyncModule[req.method].apply(null, req.args);
});
pending = [];
callback();
});
},

tellMeSomething: function (callback) {
return pending.push({
method: 'tellMeSomething',
args: arguments
});
}
};

當調用 initialize( )方法時,我們觸發原始 asyncModule 模塊的初始化(第 16 行),並提供回調代理。 這使我們的包裝器知道何時初始化了原始模塊,並因此觸發了以下操作:

  1. 第 18 行使用流程中的下一個狀態物件 initializedState 更新 activeState 變數。
  2. 第 20 行執行之前存儲在 pending 隊列中的所有操作指令。
  3. 第 24 行調用原始回調(callback)。

開始時由於模塊尚未初始化,因此此狀態(notInitializedState)的 tellMeSomething( )方法簡單地創建一個新的 Command 物件將其添加到 pending 隊列的操作中。

此時,當原始 asyncModule 模塊尚未初始化時,模式應該已經很清楚了,我們的包裝器簡單地將所有接收到的請求送入 pending 隊列排隊。 然後,當通知我們初始化已完成時,我們將執行所有排隊的操作,然後將內部狀態切換為 initializedState。讓我們看看包裝的最後一部分是什麼樣子:

35
let initializedState = asyncModule;

毫不奇怪,initializedState 物件只是對原始 asyncModule 的引用! 實際上,初始化完成後,我們可以安全地將任何請求直接路由到原始模塊。 不需要什麼 Proxy 代理了。

最後,我們必須設置啟動這個模組的初始活動狀態,該狀態當然是 notInitializedState:

36
let activeState = notInitializedState;

現在,我們可以嘗試再次啟動測試服務器,但首先,請不要忘記用新的 asyncModuleWrapper 物件替換對原始 asyncModule 模塊的引用; 這必須修改 app.js 和 routes.js 模塊。

1
2
3
...
const asyncModule = require('./asyncModuleWrapper');
...

完成此操作後,如果嘗試再次向服務器發送請求,我們將看到在 asyncModule 模塊尚未初始化的時間內,請求不會失敗; 相反,它們將等待直到初始化完成,然後才實際執行。 我們可以肯定地這是一個更加穩健的伺服器。

現在,我們的服務器可以在啟動後立即開始接受請求,並保證這些請求不會因其模塊的初始化狀態而失敗。 我們能夠獲得此結果,而無需使用 DI 或進行冗長且易於出錯的檢查來驗證非同步混淆的狀態。

這種模式,如果模塊是非同步初始化的,將所有操作送入隊列,直到模塊完全初始化為止。許多資料庫驅動程序和 ORM 程式庫都使用該模式。不必等待資料庫的連接就可以發送查詢,因為每個操作都會先存到隊列中,然後在與資料庫的連接完全建立後再執行。

Oracle 應用實例

這裡使用 Oracle XE,可以設定多個 Oracle Connection Pools。建議盡量使用 Connection Pools,效能較好,同時可以控制連線的數量。直接使用 Connection 方式,當同時湧入大量的需求,資料庫有可能就會崩潰。

dbconfig.js
1
2
3
4
5
6
7
8
9
10
11
12
module.exports = {
xepdb1: {
user: "demo",
password: "demo6460",
connectString: "localhost:1521/xepdb1",
poolAlias: 'xepdb1demo',
poolMin: 2,
poolMax: 5,
poolIncrement: 1,
queueMax: -1
}
};

這裡將 Oracle 資料庫驅動程式 node-oracledb 包裝在我們的模塊中。這與之前的 asyncModuleWrapper 模塊有相同的概念。這個模塊也分為兩種狀態,一種狀態是在模塊尚未初始化時將所有操作送入隊列,另一種狀態是在初始化完成後將每個操作直接執行原始 oracledb 模塊中的操作。

database.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
const { EventEmitter } = require('events');
const oracledb = require('oracledb');
const dbconfig = require('./dbconfig');

const METHODS_REQUIRING_CONNECTION = ['doExecute', 'doExecuteMany'];
const deactivate = Symbol('deactivate');

class InitializedState {
constructor(db) {
this.dbsPool = db.dbsPool;
}

async doExecute(poolAlias, statement, binds = [], opts = {}) {
return new Promise(async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
try {
conn = await oracledb.getConnection(this.dbsPool[poolAlias]);
const result = await conn.execute(statement, binds, opts);
resolve(result);
}
catch (err) { reject({ error: err }); }
finally {
if (conn) {
try {
await conn.close();
} catch (err) {
console.log(err);
}
}
}
}
);
}

async doExecuteMany(poolAlias, statement, binds = [], opts = {}) {
return new Promise(async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
opts.batchErrors = true;
try {
conn = await oracledb.getConnection(this.dbsPool[poolAlias]);
const result = await conn.executeMany(statement, binds, opts);
resolve(result);
}
catch (err) { reject(err); }
finally {
if (conn) {
try {
await conn.close();
} catch (err) {
console.log(err);
}
}
}
});
}
}

class QueuingState {
constructor(db) {
this.db = db;
this.commandsQueue = [];

METHODS_REQUIRING_CONNECTION.forEach(methodName => {
this[methodName] = function (...args) {
console.log('Command queued:', methodName, args);
return new Promise((resolve, reject) => {
const command = () => {
db[methodName](...args)
.then(resolve, reject);
};
this.commandsQueue.push(command);
});
};
});
}

[deactivate]() {
this.commandsQueue.forEach(command => command());
this.commandsQueue = [];
}
}

class DB extends EventEmitter {
constructor() {
super();
this.dbsPool = {};
this.state = new QueuingState(this);
}

async initialize() {
try {
for (let db of Object.keys(dbconfig)) {
let config = dbconfig[db];
this.dbsPool[config.poolAlias] = await oracledb.createPool(config);
}

this.initialized = true;
this.emit('initialized');
const oldState = this.state;
this.state = new InitializedState(this);
oldState[deactivate] && oldState[deactivate]();
} catch (err) {
console.log(err);
}
}

async doExecute(poolAlias, statement, binds = [], opts = {}) {
return this.state.doExecute(poolAlias, statement, binds, opts);
}

async doExecuteMany(poolAlias, statement, binds = [], opts = {}) {
return this.state.doExecuteMany(poolAlias, statement, binds, opts);
}

async close() {
try {
for (let db of Object.keys(dbconfig)) {
await oracledb.getPool(dbconfig[db].poolAlias).close();
}
}
catch (err) {
console.log(err);
}
}
}

module.exports = new DB();

第 8 行的 InitializedState 類與第 62 行的 QueuingState 類分別代表初始化完成後與尚未初始化的兩種狀態。

第 87 行的 DB 類是維一對外揭露的類,在建構式中第 91 行,開始是處於尚未初始化的 QueuingState。在這個狀態下,把所有的需求操作都送入 commandsQueue 陣列中,尚未實際調用資料庫驅動程式對資料庫提出需求,因為此時尚未完成 Connection Pools 的創建。

第 94 行 initialize( ) 方法開始創建 Oracle Connection Pools,在創建 Connection Pools 完成後,在第 104 行將狀態切換到初始化完成後的 InitializedState。在這之後所有的需求操作都會直接調用資料庫驅動程式 oracledb 對資料庫提出需求,不再需要使用 QueuingState。

先來測試看看,第 3 行在開始初始化時,我們並沒有等到 Connection Pools 完成,主程式就繼續往下執行,馬上對資料庫提出需求操作。

simpleTest.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const oradb = require('./database');

oradb.initialize();
oradb.on('initialized', () => console.log('Database initialized.'));

const statement = 'select * from emp where empno = :empno';

doExecute("xepdb1demo", statement, [7788]);
doExecute("xepdb1demo", statement, [7934]);
//employees();

setTimeout(() => {
doExecute("xepdb1demo", statement, [7782]);
doExecute("xepdb1demo", statement, [7876]);
}, 500);

setTimeout(() => oradb.close(), 30000);

async function doExecute(dbpool, statement, binds) {
try {
const result = await oradb.doExecute(dbpool, statement, binds);
console.log(result);
} catch (err) {
console.log(err);
}
}

async function doExecuteMany(dbpool, statement, binds) {
try {
const result = await oradb.doExecuteMany(dbpool, statement, binds);
console.log(result);
} catch (err) {
console.log(err);
}
}

async function employees() {
const dbpool = "xepdb1demo";
const statement = 'select * from emp';
try {
const result = await oradb.doExecute(dbpool, statement);
console.log(result);
} catch (err) {
console.log(err);
}
}

現在將它實際應用在伺服器上,讓我們創建一個非常基本的 HTTP 服務器來使用 Oracle Database。

routes.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const oradb = require('./database');

module.exports.listEmployees = (req, res) => {
const dbpool = "xepdb1demo";
const statement = 'select * from emp';

oradb.doExecute(dbpool, statement)
.then(result => {
res.writeHead(200, {'Content-Type': 'application/json; charset=utf8'});
res.end(JSON.stringify(result));
})
.catch(err => {
res.writeHead(500);
return res.end(`Error: ${err.message}`);
});
};
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
const http = require('http');
const routes = require('./routes');
const oradb = require('./database');

oradb.initialize();
oradb.on('initialized', () => console.log('Database initialized.'));

http.createServer((req, res) => {
if(req.method === 'GET' && req.url === '/employees') {
return routes.listEmployees(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));

That is it.

通常我們使用 Node.js 開發的專案最後都會將它佈署到 Linux 伺服器上,而 Linux 伺服器通常都不會安裝 GUI 操作介面,有時想要簡單的>更改檔案的內容,對於 vi 又不熟悉,實在是大費周章。這裡就示範如何使用 Windows 上的 VS Code 連線到 Linux 伺服器,直接修改 Linux 伺服器上的檔案。如有必要,也可直接下 Linux 指令。

VS Code Remote-SSH Extension

這裡要使用的是 VS Code 的 Remote-SSH Extension。

安裝了 Remote-SSH Extension 之後 VS Code 的左下角會出現一個符號。

按一下這個符號,VS Code 上方會出現所要選擇使用的功能。

這裡選擇 Remote-SSH: Connect to Host… 這會開啟另外一個 Window 視窗,然後要求你輸入使用者與伺服器名稱。

如果 VS Code 無法自動檢測到您要連接的服務器的類型,會要求您手動選擇類型。

輸入密碼。

如果連線成功,左下角的 Status 將會出現連線的伺服器。

現在你可以使用 VS Code 打開文件夾。

選擇文件夾,然後按 OK。

每次選擇一個新的文件夾都會要求你再度輸入密碼。

現在你可以像使用 Local 端的檔案夾一樣,使用 VS Code 編輯檔案。

你也可以開啟一個終端螢幕。

可以使用這個終端螢幕下 Linux 伺服器指令,就像使用 Putty 連到 Linux 伺服器一樣。

要離線請選擇 File > Close Remote Connection。

是的! 你沒有看錯標題。Oracle AQ 也跨出了資料庫。在 node-oracledb 4.0 中引入了用於 AQ 的 Node-oracledb API。

Oracle AQ 的運作已有一段時間了,基本運作方式大家都很熟了,這裡直接看範例,讓 AQ 走出 Oracle 資料庫。

Create and Start Queue

1
2
3
4
5
6
/* For the data we want to queue */
CREATE OR REPLACE TYPE demo_msg_type AS OBJECT (
subject VARCHAR2(30),
text VARCHAR2(100)
)
/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- Create and start a queue
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB',
QUEUE_PAYLOAD_TYPE => 'DEMO.DEMO_MSG_TYPE');

DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE',
QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB');

DBMS_AQADM.START_QUEUE(
QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE',
ENQUEUE => TRUE);
END;
/

Enqueue

demo-enq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const oracledb = require('oracledb');
const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}
const queueName = "DEMO_MSG_QUEUE";

enqueue();

async function enqueue() {
let connection;

try {
connection = await oracledb.getConnection(dbConfig);

const queue = await connection.getQueue(queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const message = new queue.payloadTypeClass(
{
SUBJECT: "Node.js AQ 範例",
TEXT: `Hello Tainan, 台南! ${Date()}`
}
);
console.log(`Enqueuing: ${message}`);

await queue.enqOne(message);
await connection.commit();
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close();
} catch (err) {
console.error(err);
}
}
}
}

Dequeue

demo-deq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const oracledb = require('oracledb');
const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}
const queueName = "DEMO_MSG_QUEUE";

dequeue();

async function dequeue() {
let connection;

try {
connection = await oracledb.getConnection(dbConfig);

const queue = await connection.getQueue(queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const msg = await queue.deqOne();
if (msg) {
// do something here.
console.log('Dequeued: ' + msg.payload);
}

await connection.commit();
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close();
} catch (err) {
console.error(err);
}
}
}
}

現在可以傳送資料了。

$ node demo-enq
Enqueuing: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)'
}

接收資料:

$ node demo-deq
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)'
}

如果你先 run dequeue, 預設情況下,deqOne( ) 將等待直到接收到消息。

這裡的 dequeue 一次接收一筆資料,我們也可以註冊一個 notification。

Register notification

這裡的 Notification 就像 Apache Kafka 與 RabbitMQ 的 Consumer 一樣。註冊方式與使用 Oracle CQN 一樣,細節請參考 Oracle CQN Using Node.js。註冊時一樣要設定事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數。這觀念與使用 PL/SQL 是一樣的。

demo-notify.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const oracledb = require('oracledb');

const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com",
poolAlias: 'db99demo',
poolMin: 1,
poolMax: 2,
poolIncrement: 1,
queueMax: -1
}

startup();

async function startup() {
try {
await oracledb.createPool(dbConfig);

registerAndNotify();
}
catch (err) {
console.log(err);
}
}

async function registerAndNotify() {
const config = {
user: dbConfig.user,
password: dbConfig.password,
connectString: dbConfig.connectString,
events: true /* 與使用 CQN 一樣,必須為 true, 預設值 false */
}
const queueName = "DEMO_MSG_QUEUE";

const subscrOptions = {
namespace: oracledb.SUBSCR_NAMESPACE_AQ,
callback: notifyCallback, /* 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數 */
timeout: 600 /* 因是測試,設定 600 秒後會自動撤銷註冊 */
};

let connection;

try {
connection = await oracledb.getConnection(config);

await connection.subscribe(queueName, subscrOptions); /* 調用 subscribe() 註冊 */

console.log("AQ notification created...");
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close(); /* 註完冊後,就可以關閉這個 connection */
} catch (err) {
console.error(err);
}
}
}
}

async function notifyCallback(event) {
console.log("event type:", event.type);

if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) {
console.log("Deregistration has taken place..."); /* Subscription closed or timeout */
return;
}

try {
connection = await oracledb.getConnection(dbConfig.poolAlias); /* 使用先前創建的 connection pool */

const queue = await connection.getQueue(event.queueName, { payloadType: "DEMO.DEMO_MSG_TYPE" });

const msg = await queue.deqOne();
if (msg) {
// do something here.
console.log('Dequeued: ' + msg.payload);
}
await connection.commit(); /* 成功處理後,記得要 commit,才會從 Queue 移除 */
}
catch (err) { console.log(err) }
finally {
if (connection) {
try {
await connection.close();
}
catch (err) { console.log(err); }
}
}
}

process
.on('SIGTERM', () => {
console.log("\nTerminating");
oracledb.getPool(dbConfig.poolAlias).close();
process.exit(0);
})
.on('SIGINT', () => {
console.log("\nTerminating");
oracledb.getPool(dbConfig.poolAlias).close();
process.exit(0);
})
.on('unhandledRejection', err => {
console.log("\nUnhandledRejection exception");
oracledb.getPool(dbConfig.poolAlias).close();
console.log(err);
process.exit(1);
})
.on('uncaughtException', err => {
console.log("\nUncaught exception");
oracledb.getPool(dbConfig.poolAlias).close();
console.log(err);
process.exit(1);
});

現在啟動 notification,它會一直等待接收消息。

$ node demo-notify
AQ notification created...

Enqueue 一筆資料:

$ node demo-enq
Enqueuing: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}

事件處理程式馬上收到消息,然後繼續等待消息。

$ node demo-notify
AQ notification created...
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}

現在從 Oracle 資料庫使用 PLSQL Enqueue 一筆資料。

demo_msg_enqueue.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE OR REPLACE PROCEDURE demo_msg_enqueue( msg_in IN VARCHAR2 )
AS
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message DEMO.DEMO_MSG_TYPE;
BEGIN
message := DEMO.DEMO_MSG_TYPE('Node.js AQ 範例 from PLSQL', msg_in || ' ' || TO_CHAR(sysdate, 'yyyy-mm-dd hh24:mi:ss'));

DBMS_AQ.ENQUEUE(
queue_name => 'DEMO.DEMO_MSG_QUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
END;
/
demo_msg_enqueue.sql
1
2
3
4
5
6
7
SQL> exec demo_msg_enqueue('Hello Tainan, 台南!');

PL/SQL procedure successfully completed.

SQL> commit;

Commit complete.
$ node demo-notify
AQ notification created...
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例',
TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)'
}
Dequeued: [DEMO.DEMO_MSG_TYPE] {
SUBJECT: 'Node.js AQ 範例 from PLSQL',
TEXT: 'Hello Tainan, 台南! 2020-12-29 10:46:52'
}

現在你可以即時的將資料庫外的資料用 Node.js enqueue,然後用 PLSQL dequeue 傳入 Oracle 資料庫。相反的,可以用 PLSQL enqueue,然後使用 Node.js dequeue 即時的讓資料庫內的資料傳出資料庫。

Oracle 努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,使用傳統的 UTL_HTTP 在組合 JSON 格式的資料,實在是一種夢靨。現在你有了另一種選擇了。

除了 Apache Kafka、RabbitMQ 之外,你又多了一些選擇可以讓 Oracle 資料庫中的物件 (Things) 即時 (Read-Time) 的走出 Oracle 資料庫。你一定得試試看!

WoT (Web of Things) 是 IoT (Internet of Things 物聯網) 的 Application 層,就是使用 Web 技術來打造 application。也就是說,IoT + Web-enabled technologies 就是 WoT。對 WoT 來說,最重要的觀念,就是以 URL 來表示 IoT 裝置;為 IoT 加入 URL 的觀念,就是 Google 提出的 Physical Web 計畫。

未來每個物件 (Objects) 全部都會和 Web 結合, 而首要條件就是所謂的 Things 必須是可以和 Web 溝通的裝置, 這些智慧裝置之間也能透過 Web 標準來互相溝通。WoT 重點在於使用 Web 的軟體標準和框架, 例如 REST, HTTP 和 URI 來建立應用程式以及服務, 並和其他裝置做溝通, 簡單來說就是日常生活所有的東西都可以存取 Web, 就是 WoT 的精隨。

除了能夠相互溝通,時間性當然也很重要,Real-time 數據越來越受歡迎。基本思路很簡單:
當物件發生變化時,任何查看物件數據的客戶端都應立即能看得到異動。最初,在 Web 應用程式中模擬 real-time 數據的唯一方法是從瀏覽器每隔一段固定的時間不斷循環的從 Web 服務器抓取資料 (Polling)。 這通常很複雜,難以擴大應用。WebSocket,改變了這種狀況,這是一種以高效率的方式在瀏覽器和 Web 服務器之間提供雙向通信的協議。

Oracle Database 的 UTL_HTTP 是傳統上與外界 Web 溝通的方式,後來加入了 ORDS,努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,但這仍無法主動的通知資料庫內資料的異動,你可以使用 Trigger 加上 AQ,但這些微的資料庫資料變化仍很難跨出資料庫,Continuous Query Notification (CQN) 則改變了這個狀況。

之前介紹了 Oracle Continuous Query Notification (CQN),使用 PL/SQL API 為事件處理程式 (Handler)。Oracle 在 Node.js 的程式庫 node-oracledb 則使其功能更強大,它讓 CQN 跨出了 Oracle 資料庫。

基本觀念及使用與 PL/SQL API 一樣。

/* Notification Handler 通常稱 為 callback */
function demoCallback(event) {
console.log(event);
}

/* 註冊 Query 與 callback 函數。註冊後,Node.js 程式會一直等待資料庫發出的事件通知 */
connection.subscribe('demosub', {
sql: "SELECT sal, comm FROM emp WHERE deptno = 10",
callback: demoCallback,
qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
});

相較於使用 Kafka 或 RabbitMQ,node-oracledb 的 CQN API,不需另外架構 Server,只需在一台不會關機的機器上啟動你的 Node.js Client 程式,就像 Kafka 或 RabbitMQ Consumer 一樣。

另外當然需要 JavaScript 技術,JavaScript 在這股 WoT 的潮流中佔有非常重要的地位,強烈建議,除了 PL/SQL,JavaScript 是你現在必須要熟悉的技術。

範例

程式碼看起來有點長,因為要示範的關係,將所有程式碼放在一個資料檔上,你可以將它模組化。

首先你需要建立一個 Node.js 專案資料夾,然後安裝 oracle node-oracledb

demo-cqn.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
const oracledb = require('oracledb');

const dbConfig = {
user: "demo",
password: "secret",
connectString: "10.11.25.139:1522/db99.example.com"
}

const oradb = database(); /* 這可以模組化,用來創建個 Oracle connection pool, 與執行 statement 的 API */

startup(); /* 啟動後 Node.js process 就會一直等待事件的通知,直到 Timeout (如果有設定) */

async function startup() {
await oradb.initialize(); /* Create Oracle Connection Pool */
await registerQuery(); /* 註冊 CQN Query */
}

async function registerQuery() {
const config = {
...dbConfig,
events: true /* 使用 CQN subscribe events 模式必須為 true, 預設值 false */
}

const options = {
sql: 'SELECT sal, comm FROM emp where deptno = 10', /* 要註冊的 Query */
callback: demoCallback, /* 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數 */
timeout: 600, /* 因是測試,設定 600 秒後會自動撤銷註冊 */
qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS /* 註冊為 QRCN,加上 QOS_ROWIDS,事件物件才會包含 rowid */
};

let connection;

try {
connection = await oracledb.getConnection(config); /* 註冊時的 Connection 設定不一樣,使用一般的 Connection */

await connection.subscribe('demosub', options); /* 調用 subscribe() 註冊 */

console.log("Subscription created...");
} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close(); /* 註完冊後,就可以關閉這個 connection,資料庫會創建一個 OCI API 通道用來回呼你的 Callback 函數 */
} catch (err) {
console.error(err);
}
}
}
}

// 事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數
async function demoCallback(event) {
const statement = 'select * from emp where rowid = :rid'; /* 事件物件(Event Object) 只會回傳受影響資料行的 rowid,這裡將使用此 rowid 實際到資料庫讀取資料 */

console.log("event type:", event.type);

if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) {
console.log("Deregistration has taken place..."); /* Subscription closed or timeout */
return;
}

// 解析事件物件,讀取所需的資料。
console.log("event database name:", event.dbName);
console.log("event transaction id:", event.txId);

const queries = event.queries || [{ tables: event.tables }]; /* OCN 與 QRCN 回傳的事件物件屬性不太一樣 */

for (const query of queries) {
const tables = query.tables;

for (const table of tables) {
console.log(`--> Table Name: ${table.name}`);
console.log(`--> Table Operation: ${operationName(table.operation)} Rows=${table.rows.length}`);

if (table.rows) {
for (const row of table.rows) {
console.log(`--> --> Row Rowid: ${row.rowid}`); /* 受影響的單筆資料 */
console.log(`--> --> Row Operation: ${operationName(row.operation)}`);

// 用 rowid 到資料庫讀取詳細的資料,這理應是你的應用程式感興趣的地方。
// 這裡的 connection 使用的是先前創建的 connection pool。這裡千萬要避免使用一般的 connection,
// 如果同時收到大量的資料,產生大量的 connections,資料庫有可能就掛掉了。
// 使用 connection pool 效能較佳,又可限制 connection 數量。
try {
const { rows } = await oradb.doExecute(statement, [row.rowid]);
console.log(rows);
} catch (err) {
console.log(err);
}
console.log(Array(61).join("-"));
}
}
console.log(Array(61).join("="));
}
}
}

// 非必要,只為好看些
function operationName(operation) {
let operationName;

switch (operation) {
case oracledb.CQN_OPCODE_INSERT:
operatonName = 'Records Inserted';
break;
case oracledb.CQN_OPCODE_UPDATE:
operatonName = 'Records Updated';
break;
case oracledb.CQN_OPCODE_DELETE:
operatonName = 'Records Deteled';
break;
case oracledb.CQN_OPCODE_ALTER:
operatonName = 'Table Altered';
break;
case oracledb.CQN_OPCODE_DROP:
operatonName = 'Table Dropped';
break;
default:
operatonName = 'Unknown Operation';
}
return operatonName;
}

// 抽象化資料庫層
function database() {
const config = {
...dbConfig,
poolAlias: 'db99demo',
poolMin: 1,
poolMax: 2,
poolIncrement: 1,
queueMax: -1
};

async function initialize() {
try {
await oracledb.createPool(config); /* 創建 Connection Pool */
}
catch (err) {
console.log(err);
}
}

async function close() {
try {
oracledb.getPool(config.poolAlias).close();
}
catch (err) {
console.log(err);
}
}

function doExecute(statement, binds = [], opts = {}) {
return new Promise(
async (resolve, reject) => {
let connection;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;

try {
connection = await oracledb.getConnection(config.poolAlias); /* 使用 Connection Pool */
const result = await connection.execute(statement, binds, opts);
resolve(result);
}
catch (err) { reject({ error: err }); }
finally {
if (connection) {
try {
await connection.close();
}
catch (err) { console.log(err); }
}
}
}
);
}

return {
initialize,
close,
doExecute
};
}

process
.on('SIGTERM', () => {
console.log("\nTerminating");
oradb.close();
process.exit(0);
})
.on('SIGINT', () => {
console.log("\nTerminating");
oradb.close();
process.exit(0);
})
.on('uncaughtException', err => {
console.log("\nUncaught exception");
oradb.close();
console.log(err);
process.exit(1);
});

啟動

$ node demo-cqn
Subscription created...

啟動並註冊後,Node.js process 會等待通知事件,並呼叫 Callback 函數。

SQL> select regid, callback, table_name, timeout from user_change_notification_regs;

REGID CALLBACK TABLE_NAME TIMEOUT
------- ------------------------------------------------- ---------- ----------
1536 net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30) DEMO.EMP 200
(PORT=49919))?PR=0

資料庫會用 OCI API net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30)(PORT=49919))?PR=0 回呼你的 Node.js callback 函數。所以你 Client 端程式的機器及 port 要保持暢通。TIMEOUT 顯示這個註冊的查詢還剩多少時間就會 timeout。如果註冊時沒有設定 timeout,這裡的值會是非常大的數字 4294967295。那你可能必須手動使用 DBMS_CQ_NOTIFICATION.DEREGISTER 註銷註冊的 Query。

SQL> select * from user_cq_notification_queries;

QUERYID QUERYTEXT REGID
-------- ------------------------------------------------------------ -------
71 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1536
EMO.EMP.DEPTNO = 10
SQL> update emp set comm = comm + 1 where empno = 7934;

1 row updated.

SQL> commit;

Commit complete.
$ node demo-cqn
Subscription created...
event type: 7
event database name: lx26
event transaction id: <Buffer 06 00 0d 00 f4 a3 0a 00>
--> Table Name: DEMO.EMP
--> Table Operation: Records Updated Rows=1
--> --> Row Rowid: AACzQEAAEAAACAXAAN
--> --> Row Operation: Records Updated
[
{
EMPNO: 7934,
ENAME: '楊喆',
JOB: 'CLERK',
MGR: 7902,
HIREDATE: 1982-01-22T16:00:00.000Z,
SAL: 1500,
COMM: 13,
DEPTNO: 10
}
]
------------------------------------------------------------
============================================================

在 WoT 與 Real-time 的潮流中,Kafka 受到了非常多得關注,RabbitMQ 也有銀行界的加持,有幸為公司導入了這兩個架構,今又有 Oracle CQN 可以選擇,如何應用,應該看應用的範圍與規模,Kafka 與 RabbitMQ 可以是資訊策略的一環,CQN 則適合單打獨鬥,它不需要另外架構 Server,但他只適合用於較小量的資料異動,千萬不要用於大量資料的處理;Kafka 則彈性的多,它可處理一天多達 Petabyte 的資料。RabbitMQ 則有較佳的 GUI 管理介面。不管如何,都可以讓你有些新思維。

Go Real-time!

Continuous Query Notification (CQN) 允許應用程式向資料庫註冊一些查詢 (Queries),如果與註冊的查詢 (Query) 有相關的物件 (Object) 或會影響註冊的查詢的結果 (Result set),資料庫就會觸發事件,通知註冊的事件處理程式 (Notification handler)。

這有兩種模式:

  1. Object change notification (OCN)
    如果查詢(Query)註冊為物件更改通知 (OCN),則只要一段交易 (transaction) 更改了查詢引用 (references) 和提交(commit) 的物件,資料庫就會通知應用程式。

  2. Query result change notification (QRCN)
    如果查詢註冊為結果更改通知 (QRCN),則只要交易更改影響了查詢結果並提交,資料庫就會通知應用程式。

這些一個或多個查詢的列表與通知類型 (OCN 或 QRCN) 需向 CQN 註冊,並提供通知處理程式 (Notification handler)。您可以使用 PL/SQL 或調用 Oracle OCI。如果使用 PL/SQL,則通知處理程式就是資料庫中的 PL/SQL stored procedure; 如果您使用 OCI,則通知處理程式會是客戶端 C 回調程式 (callback procedure)。除了使用 C,Node.js 的 Oracle Library node-oracledb 也支援 CQN。也就是說,當資料庫觸發此事件,就可即時反應在資料庫外端的應用程式。

Object change notification (OCN)

如果應用程式註冊查詢 (Query) 為物件更改通知 (OCN),則只要交易更改與註冊的 Query 有關聯的物件並提交,資料庫就會向應用程式發送 OCN,這只關心物件是否被更改,而不管查詢的結果是否發生更改。

例如,如果應用程式在將下面的查詢示例註冊為 OCN,並且用戶提交更改 EMP,則資料庫會向應用程式發送 OCN,即使更改的資料沒有滿足查詢的條件(例如,如果 deptno = 30)。

1
2
3
4
select empno, sal
from emp
where deptno = 10
/

Query result change notification (QRCN)

如果應用程式註冊查詢為查詢結果更改通知 (QRCN),則只要交易更改而影響了註冊的查詢結果並提交,資料庫就會向應用程式發送 QRCN。

例如,如果應用程式在將下面的查詢示例註冊查詢為 QRCN,則只有在查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN; 也就是說,如果其中一個 DML 語句提交:

  • INSERT 或 DELETE 滿足查詢條件 (deptno = 10)。
  • UPDATE 欄位 empno 或 sal 且滿足查詢條件 (deptno = 10) 的資料。
  • UPDATE 欄位 deptno,將其值從 10 以外的值更改為 10,造成將該行 (row) 添加到結果集中。
1
2
3
4
select empno, sal
from emp
where deptno = 10
/

預設的通知類型是 OCN。如果要注冊為 QRCN,可在 CQ_NOTIFICATION$_REG_INFO 的 QOSFLAGS 屬性中指定 QOS_QUERY。這我們會在後面的範例看到。

使用 QRCN,您可以選擇保證模式 (guaranteed mode 預設) 或盡力而為模式 (best-effort mode)。

  • Guaranteed Mode
    在保證模式下,只有在保證查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN。例如,假設某個應用程式註冊了下面的查詢為 QRCN,員工 7839 歸屬在部門 10,並且執行了這些 UPDATE 語句。

    1
    2
    3
    4
    select empno, sal
    from emp
    where deptno = 10
    /
    1
    2
    3
    4
    5
    UPDATE emp SET sal = sal + 10 WHERE empno = 7839;

    UPDATE emp SET sal = sal – 10 WHERE empno = 7839;

    COMMIT;

    上面的交易中的每個 UPDATE 語句都會更改查詢結果集,但整個交易對查詢結果集沒有影響; 因此,資料庫不會向應用程序發送 QRCN。

    在保證模式下,某些查詢對於 QRCN 而言會過於復雜。

  • Best-Effort Mode
    某些查詢對於保證模式而言過於復雜,則可以在 QRCN 下註冊為盡力而為模式,CQN 會創建並註冊它們的簡單版本 (simpler versions)。

    例中的查詢對於保證模式下的 QRCN 而言過於復雜,因為它包含聚合函數 SUM。

    1
    SELECT sum(sal) FROM emp WHERE deptno = 20;

    在盡力而為模式下,CQN 在此示例中註冊了這個更簡單的查詢版本。

    1
    SELECT sal FROM emp WHERE deptno = 20;

    每當原始查詢的結果發生變化時,其更簡單版本的結果也會發生變化; 因此,簡化中不會丟失任何通知。但是,簡化可能會導致誤報,因為較簡單版本的結果可能會在原始查詢的結果不變 ( sum(sal) 的結果沒變 ) 時更改。

    如果是多表查詢 (join),則會更複雜,所以盡量簡單,不要註冊太複雜的查詢!

實作範例

需要的權限
SQL> grant change notification to demo;

SQL> grant execute on dbms_cq_notification to demo;

創建 notifications Table 用來儲存範例子中觸發的通知。

1
2
3
4
5
6
7
8
9
10
CREATE TABLE notifications (
id NUMBER,
message VARCHAR2(4000),
notification_date DATE
);

CREATE SEQUENCE notifications_seq;

ALTER TABLE notifications
ADD CONSTRAINT notifications_pk PRIMARY KEY(id);

事件處理程式 (Notification handler)

Notification Handler 就像 AQ 的 notify 程式,通常稱為事件處理器,當事件發生時的處理程式。在 JavaScript 通常則稱為 callback 函數。當事件發生時,CQN 會發出一個事件物件 (Event Object),內容包含事件的一些訊息,事件處理器必須有一參數承接此事件物件。

事件物件 (Event Object)

CQN 會發出 CQ_NOTIFICATION $ _DESCRIPTOR 類型的事件物件。 事件處理程式可以在 CQ_NOTIFICATION $ _DESCRIPTOR 物件的屬性中找到資料庫更改的詳細信息。

SQL> desc CQ_NOTIFICATION$_DESCRIPTOR
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
REGISTRATION_ID NUMBER
TRANSACTION_ID RAW(8)
DBNAME VARCHAR2(30)
EVENT_TYPE NUMBER
NUMTABLES NUMBER
TABLE_DESC_ARRAY SYS.CHNF$_TDESC_ARRAY
QUERY_DESC_ARRAY

CQ_NOTIFICATION $ _DESCRIPTOR 類型包含另外兩個屬性:

  • TABLE_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _TABLE 的表描述符。
  • QUERY_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _QUERY 的結果集更改描述符。
SQL> desc CQ_NOTIFICATION$_TABLE
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
OPFLAGS NUMBER
TABLE_NAME VARCHAR2(64)
NUMROWS NUMBER
ROW_DESC_ARRAY SYS.CHNF$_RDESC_ARRAY
SQL> desc CQ_NOTIFICATION$_QUERY
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
QUERYID NUMBER
QUERYOP NUMBER
TABLE_DESC_ARRAY SYS.CHNF$_QDESC_ARRAY

如果在註冊過程中指定了 ROWID 選項,則 CQ_NOTIFICATION $ _TABLE 類型具有 ROW_DESC_ARRAY 屬性,其類型為 CQ_NOTIFICATION $ _ROW,其中包含已更改行的 ROWID。 如果在 CQ_NOTIFICATION $ _TABLE 物件的 OPFLAGS 字段中設置了 ALL_ROWS,則 ROWID 信息不可用。

SQL> desc CQ_NOTIFICATION$_ROW
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
OPFLAGS NUMBER
ROW_ID VARCHAR2(2000)
處理程式

瞭解了事件物件,可以開始創建事件處理程式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
create or replace PROCEDURE notifications_handler (
ntfnds IN CQ_NOTIFICATION$_DESCRIPTOR /* 事件物件 (Event Object) 參數 */
)
IS
l_regid NUMBER;
l_table_name VARCHAR2(60);
l_event_type NUMBER;
l_numtables NUMBER;
l_operation_type NUMBER;
l_operation_name VARCHAR2(30);
l_numrows NUMBER;
l_row_id VARCHAR2(2000);
l_numqueries NUMBER;
l_qid NUMBER;
l_qop NUMBER;
l_message VARCHAR2(4000) := NULL;
BEGIN
l_regid := ntfnds.registration_id;
l_event_type := ntfnds.event_type;

l_numqueries := 0;

IF (l_event_type = DBMS_CQ_NOTIFICATION.EVENT_QUERYCHANGE) THEN /* 這程式將只處理 QRCN event type 7 */
l_numqueries := ntfnds.query_desc_array.count; /* 註冊的 Query 數 */

FOR i IN 1..l_numqueries LOOP /* loop over queries */
l_qid := ntfnds.query_desc_array(i).queryid;
l_qop := ntfnds.query_desc_array(i).queryop;

l_numtables := 0;
l_numtables := ntfnds.query_desc_array(i).table_desc_array.count; /* 每個 Query 所牽涉的 Table 數 */

FOR j IN 1..l_numtables LOOP /* loop over tables */
l_table_name :=
ntfnds.query_desc_array(i).table_desc_array(j).table_name;
l_operation_type :=
ntfnds.query_desc_array(i).table_desc_array(j).Opflags; /* Insert、Update or ... */

IF (bitand(l_operation_type, DBMS_CQ_NOTIFICATION.ALL_ROWS) = 0) THEN
l_numrows := ntfnds.query_desc_array(i).table_desc_array(j).numrows; /* 影響的筆數 */
ELSE
l_numrows := 0; /* ROWID info not available 注冊 Query 時如果沒有要求 ROWID,筆數將會是 0 */
END IF;

CASE /* 這只是讓資料好看些,非必要。*/
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.INSERTOP) != 0 THEN
l_operation_name := 'Records Inserted';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UPDATEOP) != 0 THEN
l_operation_name := 'Records Updated';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DELETEOP) != 0 THEN
l_operation_name := 'Records Deleted';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.ALTEROP) != 0 THEN
l_operation_name := 'Table Altered';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DROPOP) != 0 THEN
l_operation_name := 'Table Dropped';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UNKNOWNOP) != 0 THEN
l_operation_name := 'Unknown Operation';
ELSE
l_operation_name := '?';
END CASE;

/* l_numrows Table 所影響的筆數 */
l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - ' || l_operation_name || '. Rows=' || l_numrows;

INSERT INTO notifications (id, message, notification_date)
VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE);

/* Body of loop does not run when l_numrows is zero. */
FOR k IN 1..l_numrows LOOP /* loop over rows */
/* 影響的每筆及它的 ROWID。可使用 ROWID 讀取該筆資料,這應是你需要處理的地方。*/
l_row_id := ntfnds.query_desc_array(i).table_desc_array(j).row_desc_array(k).row_id;

l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - RowID:' || l_row_id;

INSERT INTO notifications (id, message, notification_date)
VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE);

END LOOP; /* loop over rows */
END LOOP; /* loop over tables */
END LOOP; /* loop over queries */
END IF;

COMMIT;
END;
/

註冊查詢 (Query)

有了事件處理程式,就可以註冊所要偵測的 Queries。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DECLARE
reginfo CQ_NOTIFICATION$_REG_INFO;
mgr_id NUMBER;
dept_id NUMBER;
v_cursor SYS_REFCURSOR;
regid NUMBER;
BEGIN
reginfo := cq_notification$_reg_info (
'notifications_handler', /* 事件處理程式 */
DBMS_CQ_NOTIFICATION.QOS_QUERY /* QOS_QUERY 表示註冊為 QRCN */
+ DBMS_CQ_NOTIFICATION.QOS_ROWIDS, /* QOS_ROWIDS 發出的事件物件才會包含每筆資料的 ROWID,處理程式才抓得到所影響的單筆詳細資料 */
0, /* registration persists until unregistered */
0, /* notify on all operations */
0 /* notify immediately */
);

/* Create registration. */
regid := DBMS_CQ_NOTIFICATION.new_reg_start(reginfo);

/* 註冊的 Query,當所有會影響此段 Query 結果的異動,都會觸發事件發出通知(Notification)。
這裡可以註冊多段的 Query。 */
OPEN v_cursor FOR
SELECT dbms_cq_notification.CQ_NOTIFICATION_QUERYID, sal, comm
FROM DEMO.EMP
WHERE deptno = 10;

CLOSE v_cursor;

DBMS_CQ_NOTIFICATION.reg_end;
END;
/

註冊的 Query 牽涉到 Emp Table 的 sal、comm 與 deptno 欄位。

  • 查詢已經註冊的事件處理程式:
1
2
3
select regid, table_name, callback
from user_change_notification_regs
/
     REGID TABLE_NAME           CALLBACK
---------- -------------------- ----------------------------------------
1231 DEMO.EMP plsql://notifications_handler?PR=0
  • 查詢已經註冊的 Queries:
1
2
3
select queryid, querytext, regid
from user_cq_notification_queries
/
QUERYID QUERYTEXT                                                         REGID
------- ------------------------------------------------------------ ----------
48 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1231
EMO.EMP.DEPTNO = 10
  • 可以用此註銷註冊的 Query:
1
SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231);

實際運作

SQL> select * from notifications;

no rows selected
SQL> select * from emp where empno in (7608, 7934);

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------ ------ ------ -------
7608 馬小九 ANALYST 7788 28-JUN-10 1000 100 40
7934 楊喆 CLERK 7902 23-JAN-82 1500 10
SQL>  update emp set comm = 99 where empno in (7608, 7934);

2 rows updated. /* 異動兩筆資料,但只有一筆會影響註冊 Query 的結果 */

SQL> commit;

Commit complete.
SQL> select * from notifications;

/* 註冊 Query 時如果沒有 QOS_ROWIDS, ID 261 Rows=0, ID 262 則不會存在 */

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
/* 只觸發一筆資料的 notification */

SQL> select * from emp where rowid = 'AACzQEAAEAAACAXAAN';

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10
SQL> select * from emp where empno = 7934;

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10

SQL> update emp set comm = 99 where empno = 7934;

1 row updated.

SQL> commit;

Commit complete. /* 這個異動沒有影響註冊 Query 的結果 */
/* 沒有觸發 notification 事件 */
SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
SQL> update emp set deptno = 20 where empno = 7934;

1 row updated. /* 將部門由 10 改為 20 */
/* 觸發了 notification 事件 */
SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
=> 263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
=> 264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
SQL> insert into emp values(9099, '李小明', 'CLERK', 7698, sysdate, 23100, null, 10);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
=> 265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55
=> 266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55

SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB';

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
9099 李小明 CLERK 7698 2020-12-25 09:15:52 23100 10
SQL> delete emp where empno = 9099;

1 row deleted.

SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55
266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55
=> 267 QueryID:48 Table (DEMO.EMP) - Records Deleted. Rows=1 2020-12-25 09:19:00
=> 268 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:19:00

8 rows selected.

SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB';

no rows selected

現在來看複雜一點的:

SQL> truncate table notifications;

Table truncated.

SQL> select empno, ename, comm, deptno from emp where deptno = 10;

EMPNO ENAME COMM DEPTNO
------- ---------- ------ -------
7839 KING 10
7782 楊建華 10
7934 楊喆 99 10
8907 牸祢 10

SQL> update emp set comm = 99 where deptno = 10;

4 rows updated. /* update 4 筆資料 */

SQL> commit;

Commit complete.

SQL> select * from notifications;

/* 只有觸發 3 筆資料的 notification */

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
275 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=3 2020-12-25 09:32:15
276 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAA 2020-12-25 09:32:15
277 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAC 2020-12-25 09:32:15
278 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAS 2020-12-25 09:32:15
SQL> truncate table notifications;

Table truncated.

SQL> select * from emp where empno = 7934;

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10

SQL> update emp set comm = comm + 10 where empno = 7934;

1 row updated.

SQL> update emp set comm = comm - 10 where empno = 7934;

1 row updated.

SQL> commit; /* 同一個交易 (Transaction) 沒有改變註冊 Query 的結果 */

Commit complete.

SQL> select * from notifications;

no rows selected /* 沒有觸發 notification 事件 */

測試完,註銷:

SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231);

PL/SQL procedure successfully completed.

SQL> select regid, table_name, callback
2 from user_change_notification_regs
3 /

no rows selected

SQL> select queryid, querytext, regid
2 from user_cq_notification_queries
3 /

no rows selected

Table function 可以像 SELECT 語句的 FROM 子句中的 Table 一樣使用的函數。 Table function 的一個常見用法是像資料流 (stream data) 一樣,直接從一個來源進程傳輸或轉換到下一個流程,而中間不需有暫存的過程。以這種方式使用的 table function 稱為 Streaming table function。這種技術最常用於數據倉儲,作為提取 (extract),轉換 (transform) 和加載 (load) ETL 操作的一部分。

之前面我們展示了 Table function 的概述。 現在則來看看創建 Streaming table function 所需的基本步驟。

在深入了解詳細細節之前,我們看一下 Streaming table function 功能示例:

INSERT INTO tickers
SELECT *
FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)))
/

這裡發生了什麼? 讓我們從內到外逐步來看:

Code Description
SELECT * FROM stocks 獲取 stocks TABLE 中的所有的 rows
CURSOR ( ) 使用 CURSOR 表達式創建指向結果集 (result set) 的 cursor 變數
( ) 將 cursor 變數傳遞給 doubled table function
doubled ( ) 執行 doubled 函數的轉換並返回結果集
SELECT * FROM TABLE ( … ) SQL 引擎將從 doubled 函數返回的結果集轉換為關係的行和列集(relational set of rows and columns)
INSERT INTO tickers 將這些行插入到 tickers TABLE 中

有時(通常?),您需要在資料流式處理過程 (Streaming process) 中執行多個轉換。沒問題,您可以串聯多個 Table function 的調用:

INSERT INTO tickers
SELECT *
FROM TABLE (transform2 (
CURSOR (SELECT *
FROM TABLE (transform1 (
CURSOR (SELECT * FROM stocks
))))))
/

範例

要將數據從一個 Table 轉換到另一個 Table,我們需要 Table 和這些 Table 中的數據。在此例,從 stocks Table 開始,每筆資料 (row) 包含每個股票代碼的開盤價和收盤價。這裡有 Stocks 資料範本

1
2
3
4
5
6
7
CREATE TABLE stocks (
ticker VARCHAR2(20),
trade_date DATE,
opening_price NUMBER,
closing_price NUMBER
)
/

這裡的轉換很簡單: 將 stocks Table 中的每一筆資料 (row), 拆開成為 tickers Table 的兩筆資料 (將開盤價和收盤價拆為兩筆):

1
2
3
4
5
6
7
CREATE TABLE tickers (
ticker VARCHAR2(20),
pricedate DATE,
pricetype VARCHAR2(1),
price NUMBER
)
/

在繼續之前,必須要指出,對於這個特定的轉換 (從 stocks 中的一行到 tickers 的兩行) 並不需要一個 Table function 來完成工作。你可以使用 INSERT ALL 兩次插入代碼:

1
2
3
4
5
6
7
INSERT ALL
INTO tickers (ticker, pricedate, pricetype, price)
VALUES (ticker, trade_date, 'O', opening_price)
INTO tickers (ticker, pricedate, pricetype, price)
VALUES (ticker, trade_date, 'C', closing_price)
SELECT * FROM stocks
/

你也可以使用 UNPIVOT:

1
2
3
4
5
6
INSERT INTO tickers (ticker, pricedate, pricetype, price)
SELECT *
FROM stocks UNPIVOT ( price
FOR price_type
IN (opening_price AS 'O', closing_price AS 'C'))
/

SQL 是一種非常強大的語言。你的轉換很可能在純 SQL 中就可行,那就應該避免使用 Table function。但是,如果轉換需要使用處理邏輯 (因此需要 PL/SQL),或者如果 SQL 語法無法完成,則 Table function 提供了一種強大,直接的方式來完成工作。在此,假設轉換要復雜得多,並且需要使用 Table function。

之前關於 Table function 所示,當你需要一個 Table function 返回的集合中每個元素(行)含多個欄位數據時,而不僅僅是一個字符串或數字,那你就需要創建一個物件類型 (Object type) 以及這些物件類型的集合類型 (Nested table type)。

在這裡的示例中,要將 stocks 的數據移動到 stickers ,因此我需要一個 “看起來像” stickers table 的物件類型。理想情況下,應該創建一個像下面的集合類型:

CREATE TYPE tickers_nt AS TABLE OF tickers%ROWTYPE
/

但是 %ROWTYPE 是 PL/SQL 聲明屬性,SQL 引擎不認得該屬性,因此該語句會失敗:

PLS-00329: schema-level type has illegal reference to TICKERS

因此我們不能在 Schema level 下使用 tickers%ROWTYPE 必須另外創建一個物件類型。

替代的,需要創建了一個模仿 stickers Table 結構的物件類型,如下所示:

1
2
3
4
5
6
7
8
9
CREATE TYPE ticker_ot
AUTHID DEFINER IS OBJECT
(
ticker VARCHAR2(20),
pricedate DATE,
pricetype VARCHAR2(1),
price NUMBER
)
/

然後創建這些物件類型的集合類型 (Nested table type):

1
2
CREATE TYPE tickers_nt AS TABLE OF ticker_ot
/

現在這個 tickers_nt 集合類型裡的每個元素 (row) 長的就像 ticker_ot 物件類型。與 tickers Table 看起來一模一樣。

SQL> desc tickers_nt;
tickers_nt TABLE OF TICKER_OT
Name Null? Type
----------------------------------------- -------- ----------------------------
TICKER VARCHAR2(20)
PRICEDATE DATE
PRICETYPE VARCHAR2(1)
PRICE NUMBER

SQL> desc tickers;
Name Null? Type
----------------------------------------- -------- ----------------------------
TICKER VARCHAR2(20)
PRICEDATE DATE
PRICETYPE VARCHAR2(1)
PRICE NUMBER

這裡打算在 Table function 使用資料流式的處理(Streaming process)。 意思是將從 SQL 傳遞一組數據 (rows and columns)到 table function。為此,需要定義一個強型態 (strong) 的 REF CURSOR 類型,該類型將用作接受 SQL 語句內數據集的函數參數的數據類型。

在下面的 Package specification 中,創建了兩個強型態 REF CURSOR 類型,一個用於 stocks table 中的 row,另一個用於 tickers table。

1
2
3
4
5
6
7
8
9
10
CREATE OR REPLACE PACKAGE stock_mgr
AUTHID DEFINER
IS
TYPE stocks_rc IS REF CURSOR /* 這裡有 RETURN 特定的物件類型,是一個強型態 Strong REF CURSOR type */
RETURN stocks%ROWTYPE; /* 可以不定義 RETURN,則會是弱型態 Weak REF CURSOR type */

TYPE tickers_rc IS REF CURSOR
RETURN tickers%ROWTYPE;
END stock_mgr;
/
CURSOR 和 REF CURSOR 之間的區別

從最基本的技術層面上講,它們是相同的。“正常”PLSQL CURSOR 在定義中是靜態的 (static)。REF CURSOR 可以動態的打開或基於邏輯動態的打開。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DECLARE
TYPE rc IS REF CURSOR; /* 弱型態 CURSOR */

CURSOR c IS SELECT * FROM dual; /* 靜態的 (static) */

l_cursor rc;
BEGIN
IF (TO_CHAR(SYSDATE, 'dd') = 30) THEN /* 弱型態 REF CURSUR 可以是 */
OPEN l_cursor FOR 'SELECT * FROM emp'; /* static SQL 或 dynamic SQL statement*/
ELSIF (TO_CHAR(SYSDATE, 'dd') = 29) THEN
OPEN l_cursor FOR SELECT * FROM dept;
ELSE
OPEN l_cursor FOR SELECT * FROM dual;
END IF;

OPEN C;
END;
/
  • 無論運行多少次,CURSOR c 始終是 “SELECT * FROM dual”。 REF CURSOR 則可以是任何東西,而且可以是 static SQL 或 dynamic SQL statement。
  • REF CURSOR 可以返回 (return) 給客戶端。 CURSOR 無法返回給客戶端。
  • CURSOR 可以是全域的 (global), 可以定義在 SQL 中;REF CURSOR 不能,它只能定義在子程式中 (你不能定義它們在 PROCEDURE / FUNCTION 的外面)。
  • REF CURSOR 可作為參數在子程式中傳遞,CURSOR 不能。
  • Static SQL (不使用 REF CURSOR) 效率較佳。
  • REF CURSOR 的使用應侷限於:
    • 將結果集返回給客戶端
    • 當沒有其他有效的手段來實現目標時也就是說,您首先應使用靜態 SQL (實際上使用 implicit cursors),只在必要時才使用 REF CURSOR。

使用 REF CURSOR 類型聲明的變數就是 CURSOR 變數 (cursor variable)。在 PL/SQL 中,可以這樣使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DECLARE
l_cursor stock_mgr.stocks_rc; /* stock_mgr.stocks_rc 是一個強型態的 REF CURSOR 類型 */

l_stock stocks%ROWTYPE;
BEGIN
OPEN l_cursor FOR SELECT * FROM stocks; /* 強型態的 REF CURSOR 要使用 static SQL statement */
/* 不能使用 dynamic SQL statement */
LOOP
FETCH l_cursor INTO l_stock;

EXIT WHEN l_cursor%NOTFOUND;
END LOOP;

CLOSE l_cursor;
END;
/

您可以對 cursor 變數使用所有常用的 cursor attributes 和 operators: FETCH,%FOUND,CLOSE 等。

但後面會看到,在 SQL 的 streaming table function 使用此 REF CURSOR 類型的方式會有所不同。

Streaming table function 和“普通”Table function 之間的主要區別是 streaming table function 的參數至少一個是 CURSOR 類型的變數。 Table function 可以有多個 CURSOR 變數的輸入和其他類型的其他參數,如字符串或日期。 在此的 streaming table function,將只使用單個 CURSOR 變數參數。

通常,Streaming table function 的流程是:

  1. 從 CURSOR 變數中獲取一行。
  2. 處裡每一行的轉換應用。
  3. 將轉換後的數據放入集合中。
  4. 完成後返回集合。

現在讓我們看看這個模式是如何在 doubled function 中展開的,將一筆 stocks table 的 row 拆解成兩筆 tickers rows 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
CREATE OR REPLACE FUNCTION doubled (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;

l_stocks stocks_aat;

l_doubled tickers_nt := tickers_nt();
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;

FOR l_row IN 1 .. l_stocks.COUNT
LOOP
l_doubled.EXTEND;
l_doubled(l_doubled.LAST) := ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'O',
l_stocks (l_row).opening_price
);

l_doubled.EXTEND;
l_doubled(l_doubled.LAST) := ticker_ot( l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'C',
l_stocks (l_row).closing_price
);
END LOOP;
END LOOP;
CLOSE rows_in;

RETURN l_doubled;
END;
/
  • 第 1 行,使用 Package stock_mgr 中定義的 REF CURSOR 類型。因為要從 stocks Table 中取出資料,所以使用 stocks_rc 類型。
  • 第 2 行,返回一組集合,其中 tickers_nt 類型每個元素看起來就像 tickers Table 中的 row。
  • 第 7 行,聲明一個關聯陣列 (Associative array) 來保存從 rows_in 游標變數中獲取的 rows (因使用 BULK COLLECT 所以需要)。
  • 第 9 行,l_doubled 是一個 local 變數,儲存要返回 SELECT 語句的資料。
  • 第 12 ~ 13 行,啟動一個簡單的循環從 rows_in 游標變數中獲取 row。CURSOR 表達式會自動 Open 這個游標變數。使用 BULK COLLECT 功能每次提取 100 rows。避免沒有效率的逐行處理。當關聯陣列為空時退出循環。
  • 第 15 行,取出關聯陣列每個 row。
  • 第 17 ~ 22 行,使用 EXTEND 在 nested table 的末尾添加另一個元素,然後將 tickers 物件放入集合中新的最後一個索引值。
  • 第 24 ~ 29 行,添加另一個元素,因為我們將一筆拆成兩筆。
  • 第 32 行,現在已經獲取了所有的 row,關閉游標變數。 注意:此步驟是可選的。當使用 CURSOR 表達式傳入結果集時, cursor 將在函數終止時自動關閉。
  • 第 34 行返回結果集。

在這個例子中的 FETCH-BULK COLLECT-LIMIT,使用了值 100 作為 LIMIT 子句。這是一個不錯的預設值。但是,如果您正在處理極大量的 rows 並希望從函數中獲得更好的性能,則可以嘗試更大的 LIMIT 值。但請注意,這會消耗更多的 PGA 記憶體,並且在某些時候,由於記憶體消耗過多,程式碼將會變慢。您可將 LIMIT 值作為參數傳遞,以便能夠在不重新編譯函數的情況下改變執行的性能,如:

1
2
3
4
5
6
7
8
9
10
CREATE OR REPLACE FUNCTION doubled (
rows_in stock_mgr.stocks_rc, limit_in INTEGER DEFAULT 100)
...
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT limit_in;
EXIT WHEN l_stocks.COUNT = 0;
...
END;
/

現在可以來處裡一些 Streaming 的作業了。這裡有 Stocks 資料範本

SQL> select count(*) from stocks;

COUNT(*)
----------
1000

SQL> select count(*) from tickers;

COUNT(*)
----------
0
INSERT INTO tickers
SELECT * FROM TABLE(doubled (CURSOR (SELECT * from stocks)))
/
SQL> select count(*) from tickers;

COUNT(*)
----------
2000

這裡只展示了一個步驟的資料流轉換串接,你可以再建多個 steaming table function,做多步驟的資料流轉換串接。執行的時候就像:

SELECT *
FROM TABLE ( another (CURSOR ( SELECT *
FROM TABLE ( doubled (CURSOR ( SELECT * FROM stocks where ticker = '1500'))))))

Streaming table function 在數據倉儲 ETL 操作中扮演著至關重要的角色。Oracle Database 透過實現 PL/SQL CURSOR 變數和 CURSOR 表達式,使構建此類函數變得容易。

請記住,由 Streaming table function 返回的集合將消耗 PGA 記憶體,因此通過 CURSOR 變數傳遞到函數如果是非常大的數據集可能會導致記憶體耗用錯誤。那能做些什麼?

那就得將 streaming table function 轉為 pipelined streaming table function

現在讓我們創建 doubled 函數的 Pipelined table function 版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE OR REPLACE FUNCTION doubled_pl (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
PIPELINED
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;

l_stocks stocks_aat;
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;

FOR l_row IN 1 .. l_stocks.COUNT
LOOP
PIPE ROW (ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'O',
l_stocks(l_row).opening_price)
);

PIPE ROW (ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'C',
l_stocks (l_row).closing_price)
);
END LOOP;
END LOOP;

RETURN;
END;
/
  • 第 3 行,加了 PIPELINED 關鍵字。
  • 第 16 行,使用 PIPE ROW 立即將該數據發送回調用查詢。所以不會有阻塞或記憶體消耗的問題,這對大量的 ETL 數據處裡很重要。
  • 第 30 行 RETURN 之前你不需要顯式關閉 CURSOR 變數(rows_in); 當 Table function 終止時,Oracle 資料庫將自動關閉使用 CURSOR 表達式創建的 CURSOR 變數。
  • 第 30 行,只返回控制,沒有其它數據。數據已經都用 PIPE ROW 送出了。

現在我們來看看有使用 pipelined 與沒有使用 pipelined 的區別。

SQL> select count(*) from stocks;

COUNT(*)
----------
256000

SQL> set timing on
SQL> SELECT * FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)))
2 WHERE ROWNUM < 10;

TICKER PRICEDATE P PRICE
-------------------- ------------------ - ----------
1001 24-OCT-18 O 48
1001 24-OCT-18 C 43
...

9 rows selected.

Elapsed: 00:00:02.06
SQL> SELECT * FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
2 WHERE ROWNUM < 10;

TICKER PRICEDATE P PRICE
-------------------- ------------------ - ----------
1001 24-OCT-18 O 48
1001 24-OCT-18 C 43
...

9 rows selected.

Elapsed: 00:00:00.01
  • 使用非 pipelined 版本,我必須等待 256000 行轉換加倍到 512000 行 (同樣消耗大量的 PGA 記憶體)。然後將所有這些行傳遞回 SELECT 語句,此時 SQL 引擎說:“好吧,我只想要前 9 個” 並拋棄其餘的行。

  • 使用 pipelined 版本的回應時間明顯縮短,清楚地 SELECT 語句能夠跟踪函數立即返回的行。只要傳回 9 行,SQL 引擎就會終止 Pipelined table function 的執行 (WHEN NO_DATA_NEEDED)。這只會終止函數,但不會終止調用它的 SELECT 語句。

現在試試沒有 WHERE 條件句。

SQL> SELECT count(*) FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)));

COUNT(*)
----------
512000

Elapsed: 00:00:02.06
SQL> SELECT count(*) FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)));

COUNT(*)
----------
512000

Elapsed: 00:00:00.51

即使沒有 WHERE 條件 pipelined 版本也明顯得快很多,這是因為減少耗用大量 PGA 記憶體的 I/O,如果 PGA 不夠用而需用到 Temporary Tablespace ,則影響更大。

從記憶體的角度來看,非 Pipelined table function 比 pipelined 版本消耗更多的 PGA 記憶體。 這結果應該是完全合理的,因為我們不需要聲明並使用函數所需暫存返回的資料集合。

在使用 APEX 時有些複雜的資料常會使用 APEX Collection,這不僅效能不好,也常是 Oracle 資料庫效能不佳的原因,建議您試試 Table Function!

Table function 返回的集合將消耗 PGA 記憶體,因此函數如果產生非常大的數據集可能會導致記憶體耗用錯誤。那能做些什麼?

那就得將 table function 轉為 pipelined table function

Pipelined table function 是一種特別的 table function,它透過管道 (pipe) 在函數完成所有處理之前,將數據以函數所生成的形式返回 (RETURN) 給調用的查詢語句。

在深入 Pipelined table function 的實現和應用之前,了解這個述語句的不一樣非常重要。 PL/SQL 不是多線程語言 (Multithread)。通常,當調用 PL/SQL 區塊(anonymous、nested、procedure、function 等)時,該 session 的進一步處理將 “等候(on hold)”(或暫停 - suspended),直到該 PL/SQL 區塊將控制權返回給調用區塊的宿主(host),該宿主有可能是另一個 PL/SQL 區塊,SQL 語句或 Java 等主機語言。

一般的(非 pipelined) table function 都以這種方式作業。每次在 FROM 子句中調用 table function 時,SQL 引擎必須等到函數執行 RETURN 語句才能將集合傳遞回 SELECT 語句以轉換為行和列。

此種阻塞行為可能會對 SELECT 語句的整體性能產生負面影響,尤其是在數據倉儲的 ETL 操作中。 此外,隨著每個元素添加到 table function 中的集合中,消耗了越來越多的 PGA 記憶體。 對於非常大的數據集,這可能導致進一步的性能下降,甚至錯誤。

Pipelined table function 解決了這兩個問題(性能與記憶體)。讓我們透過一個非常簡單的一般的 Table function 和一個非常簡單的 pipelined table function 來開始我們對它的了解。

首先,創建一個 Schema-level 包含單一字符串的 nested table 類型。

1
2
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100)
/

接下來,編譯一個 table function,該函數返回一個 strings_t 類型的 nested table,其中只包含一筆資料內含一個字符串:

1
2
3
4
5
6
7
8
9
CREATE OR REPLACE FUNCTION strings
RETURN strings_t
AUTHID DEFINER
IS
l_strings strings_t := strings_t ('壹貳叁');
BEGIN
RETURN l_strings;
END;
/
1
2
SELECT COLUMN_VALUE my_string FROM TABLE (strings())
/
MY_STRING
----------------------------------------------------------------------------------
壹貳叁

現在創建同一個 table function 的 pipelined 版本。

1
2
3
4
5
6
7
8
9
10
CREATE OR REPLACE FUNCTION strings_pl
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW('壹貳叁');
RETURN;
END;
/
1
2
SELECT COLUMN_VALUE my_string FROM TABLE (strings_pl())
/
MY_STRING
----------------------------------------------------------------------------------
壹貳叁

我們看到了相同的結果。

現在讓我們探討這個一般的 table function 和 pipelined table function 中的代碼之間的差異:

一般的 Table function 和 Pipelined table function 之間還有另一個很大的區別:一般的 Table function 可以在 PL/SQL 中調用,但 Pipelined table function 只能在 SELECT 語句中調用。

1
2
3
4
5
6
DECLARE
l_strings strings_t := strings_t ();
BEGIN
l_strings := strings_pl (); /* 這行不通的 */
END;
/
PLS-00653: aggregate/table functions are not allowed in PL/SQL scope

這應該是完全意料中的。因為 PL/SQL 不是多線程 (Multithread) 語言,所以它不能接受在函數終止執行之前 PIPE 回來的數據。也因此,在 Pipelined table function 功能中,它不會有阻塞或記憶體消耗的問題。它與一般的 Table function 的基本差異:

  1. 添加 PIPELINED 關鍵字到函數標頭。
  2. 使用 PIPE ROW 將數據發送回調用的 SELECT 語句,而不是將數據添加到 local 的集合。
  3. 函數只返回控制,沒有其它的數據。

現在看一個比較實際的應用,這裡要將 emp Table 的每一筆資料依薪資 (sal) 與獎金 (comm) 拆成兩筆:

1
2
3
4
5
6
7
8
9
10
11
CREATE OR REPLACE TYPE employee_ot
AUTHID DEFINER
IS OBJECT
(
empno NUMBER(4),
ename VARCHAR2(10),
income_type CHAR(1),
income_value NUMBER(7,2),
dname VARCHAR2(14)
)
/
1
2
CREATE OR REPLACE TYPE employee_nt IS TABLE OF employee_ot
/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE OR REPLACE FUNCTION employee_pl(deptno_in NUMBER DEFAULT NULL)
RETURN employee_nt
PIPELINED
AUTHID DEFINER
IS
CURSOR c1 IS
SELECT e.empno, e.ename, e.sal, e.comm, d.dname
FROM emp e, dept d
WHERE e.deptno = d.deptno
AND e.deptno = coalesce(deptno_in, e.deptno)
ORDER BY e.empno;
BEGIN
FOR rec IN c1
LOOP
PIPE ROW(employee_ot(rec.empno, rec.ename, 'S', rec.sal, rec.dname));
PIPE ROW(employee_ot(rec.empno, rec.ename, 'C', rec.comm, rec.dname));
END LOOP;

RETURN;
END;
/
1
select * from table(employee_pl(30));
     EMPNO ENAME      I INCOME_VALUE DNAME
---------- ---------- - ------------ --------------
7499 ALLEN S 1600 SALES
7499 ALLEN C 303 SALES
7654 葉習堃 S 1250 SALES
7654 葉習堃 C 1400 SALES
7698 BLAKE S 2850 SALES
7698 BLAKE C 101 SALES
7844 하찮고 S 1500 SALES
7844 하찮고 C SALES
7900 JAMES S 94998 SALES
7900 JAMES C SALES

10 rows selected.
NO_DATA_NEEDED

有時候需要在所有行被傳回之前終止 Pipelined table function,就像下面例子:

1
select * from table(employee_pl()) where rownum < 5;
     EMPNO ENAME      I INCOME_VALUE DNAME
---------- ---------- - ------------ --------------
7369 SMITH S 8001 RESEARCH
7369 SMITH C RESEARCH
7499 ALLEN S 1600 SALES
7499 ALLEN C 303 SALES

where rownum < 5 將會在接收到 4 筆資料後引發資料庫 NO_DATA_NEEDED 異常。這將終止函數,但不會終止調用它的 SELECT 語句。因此,在大多數情況下,你不必擔心此 NO_DATA_NEEDED 異常。

但如果滿足以下任一條件,則需要特別去處理此異常:

  • 在包含 PIPE ROW 語句的區塊中包含 OTHERS 異常處理程序。
  • 提供 PIPE ROW 語句的代碼必須有一些清理的程序。 通常,清理過程會釋放代碼不再需要的資源。

讓我們更詳細地探討這種行為。在上面示例中,我只 SELECT 了 4 行,因此 Oracle 資料庫引發了 NO_DATA_NEEDED 而終止了函數的執行,但沒有引發異常。

現在在原函數只添加一個 OTHERS 異常處理程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE OR REPLACE FUNCTION employee_pl(deptno_in NUMBER DEFAULT NULL)
RETURN employee_nt
PIPELINED
AUTHID DEFINER
IS
CURSOR c1 IS
SELECT e.empno, e.ename, e.sal, e.comm, d.dname
FROM emp e, dept d
WHERE e.deptno = d.deptno
AND e.deptno = coalesce(deptno_in, e.deptno)
ORDER BY e.empno;
BEGIN
FOR rec IN c1
LOOP
PIPE ROW(employee_ot(rec.empno, rec.ename, 'S', rec.sal, rec.dname));
PIPE ROW(employee_ot(rec.empno, rec.ename, 'C', rec.comm, rec.dname));
END LOOP;

RETURN;
EXCEPTION
WHEN OTHERS
THEN
DBMS_OUTPUT.put_line('Error: ' || SQLERRM);
RAISE;
END;
/

在包含 PIPE ROW 語句的區塊中包含 OTHERS 異常處理程序。

1
2
3
set serveroutput on

select * from table(employee_pl()) where rownum < 5;
      EMPNO ENAME      I INCOME_VALUE DNAME
---------- ---------- - ------------ --------------
7369 SMITH S 8001 RESEARCH
7369 SMITH C RESEARCH
7499 ALLEN S 1600 SALES
7499 ALLEN C 303 SALES

Error: ORA-06548: no more rows needed

如這裡顯示的,NO_DATA_NEEDED 錯誤被 OTHERS 處理程序捕獲,並且重新 RAISE, 但這不會在 SELECT 語句中顯示為錯誤。但是,採用這種方法的問題在於,您的 OTHERS 處理程序可能需要包含特定的清理代碼,這些代碼對於意外故障有意義,但不適用於數據管道的提前終止。 因此,建議您單獨為 NO_DATA_NEEDED 包含一個特定的處理程序。

在下面的代碼中,展示 NO_DATA_FOUND 和 NO_DATA_NEEDED 在 Pipelined table function 中預設是從從調用的查詢中被 “隱藏”,但其他異常 (如 PROGRAM_ERROR) 則會導致 SQL 語句的終止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE OR REPLACE FUNCTION strings (err_in IN VARCHAR2)
RETURN strings_t
PIPELINED
AUTHID DEFINER
IS
BEGIN
PIPE ROW (1);

CASE err_in
WHEN 'no_data_found'
THEN
RAISE NO_DATA_FOUND;
WHEN 'no_data_needed'
THEN
RAISE NO_DATA_NEEDED;
WHEN 'program_error'
THEN
RAISE PROGRAM_ERROR;
END CASE;

RETURN;
END;
/

這段程式碼中我們故意引發錯誤,觀察 SQL 的回應。

SQL> select COLUMN_VALUE my_string from table(strings('no_data_found'));

MY_STRING
--------------------------------------- /* NO_DATA_FOUND 錯誤被 SQL 隱藏了 */
1

SQL> select COLUMN_VALUE my_string from table(strings('no_data_needed'));

MY_STRING
--------------------------------------- /* NO_DATA_NEEDED 錯誤被 SQL 隱藏了 */
1

SQL> select COLUMN_VALUE my_string from table(strings('program_error'));
ERROR:
ORA-06501: PL/SQL: program error /* 其它的異常會導致 SQL 語句的終止 */
ORA-06512: at "DEMO.STRINGS", line 18

no rows selected

關於 NO_DATA_NEEDED 的基本要點是不要擔心,除非你在 Pipelined table function 中提供 WHEN OTHERS 處理程序。在這種情況下,請確保單獨為 NO_DATA_NEEDED 提供處理程序,只需使用 RAISE 語句重新引用該異常即可 (即 RAISE NO_DATA_NEEDED)。

Pipelined table function 在 PL/SQL 中是一個比較特殊的東西。它們在函數完成之前就將數據傳遞回調用的查詢。 RETURN 除了返回控制之外,它們不會傳回任何數據。而你不能在 PL/SQL 本身內調用 Pipelined table function ,只能從 SELECT 語句的 FROM 子句中調用。

但是那些奇怪的功能反映了這種特殊功能的強大:與普通 (非 Pipelined) Table function 相比,性能提高不少,更減少了記憶體的消耗。

甚麼是 Table Function? 就是 Function 用起來就像 Table。PL/SQL Function 有什麼作用?

Function

Function 可以返回一個值。 該值可以是純量值,例如字符串:

1
2
3
4
5
6
7
8
9
CREATE OR REPLACE FUNCTION longer_string (
string_in IN VARCHAR2, to_length_in IN INTEGER)
RETURN VARCHAR2
AUTHID DEFINER
IS
BEGIN
RETURN RPAD (string_in, to_length_in, 'x');
END;
/
SQL> select longer_string('Hello Tainan!', 20) as hello from dual;

HELLO
--------------------------------------------------------------------------------
Hello Tainan!xxxxxxx

Function 還可以返回更複雜的數據類型,例如,記錄(record)、甚至集合(collection)。為了展示這一點,首先聲明一個 Schema-level 的 Nested table 類型:

1
2
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2(100)
/

然後定義一個 function,該 function 返回該類型 (strings_t) 的隨機生成的字符串 (varchar2) 的集合 (collection):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE OR REPLACE FUNCTION random_strings (
count_in IN INTEGER)
RETURN strings_t
AUTHID DEFINER
IS
l_strings strings_t := strings_t();
BEGIN
l_strings.EXTEND(count_in);

FOR indx IN 1 .. count_in
LOOP
l_strings(indx) := DBMS_RANDOM.string('u', 10);
END LOOP;

RETURN l_strings;
END;
/

這是一個返回集合的函數,我們可以將它當成 Table Function 使用,這意思是,您可以使用 SELECT 語句查詢函數返回值的內容,就像使用一般的 Table 一樣。

SQL> select * from table(random_strings(5));

COLUMN_VALUE
--------------------------------------------------------------------------------
HLSFIPLIWX
ITYWJXMWBD
OTPGAWXFGG
HMNQCQIPKL
EPNLHKYJEG

當然你也可以將它當成一般普通的函式這樣用:

SQL> select random_strings(5) from dual;

RANDOM_STRINGS(5)
--------------------------------------------------------------------------------
STRINGS_T('XJNABIIQLO', 'WTVKOLJHMT', 'JQMOPVDOQE', 'DUOUHJUVHO', 'FFJFVTBWTT')

但在 Oracle 資料庫中這樣的資料可能不是你要的。

Table Functions

Table function 可用在以下的情況中:

  • 合併 Tables 的數據資料
    您無法或很難單純的使用 SQL 的功能來獲得一些答案。則可使用 TABLE 子句和 Table Function 來實現。

  • 以程式的方式建構要傳遞到主機環境的數據集
    您的網頁需要顯示一些數據。 然而,這些數據需要一些複雜的整合。一般的情況,您需要執行一些過程代碼來構造數據集,然後將構造的數據存入一個暫存的 Table (Temporary Table) 中,然後執行 SELECT。但是使用 Table function,您可以立即將數據傳遞到網頁,而無需任何暫存的處理過程

  • 模擬可參數化的 View (parameterized view)
    Oracle database 不支援真正的參數化的 View,如

    CREATE OR REPLACE VIEW my_view (param1_in IN INTEGER) ...

    您可以使用 system contexts 實現類似的效果,在 WHERE 子句調用 SYS_CONTEXT 函數以從 session 中獲取參數值。

    但您也可以使用 Table function 接受參數來模擬可參數化的 View。

  • 限制開發人員對 Tables 的使用權限
    如果要嚴格遵循資料庫的資料安全準則,使用者和開發人員都無法直接訪問 Table,即使是 SELECT 權限! 對於非查詢 DML (插入,更新,刪除),一值都建議打包成 PL/SQL Package 來執行這些操作。對於查詢 (SELECT),則可以通過使用 Table function 來實現對 Table 查詢的完全控制。 您不必授予 Table 的 SELECT 權限,而是授予 Table function 的 EXECUTE 權限。

  • 執行 Data warehouse 的資料轉換
    數據倉儲環境中的一個常見要求是執行轉換 (Transformation),通常都是從一個 Table 轉換到另一個 Table,如圖:

    一種特殊類型的 Table function ,稱為 Streaming table function,可以簡潔優雅的支援這些轉換並兼具高性能。

現在我們看看如何使用 random_strings 函數,既可以作為 PL/SQL 中的 “普通” 函數,也可以作為 Table function。

在下面的 PL/SQL 程式碼中,調用 random_strings 函數,然後使用 DBMS_OUTPUT.PUT_LINE 顯示生成的字符串:

1
2
3
4
5
6
7
8
9
10
11
set serveroutput on

DECLARE
l_strings strings_t := random_strings(5);
BEGIN
FOR indx IN 1 .. l_strings.COUNT
LOOP
DBMS_OUTPUT.put_line(l_strings(indx));
END LOOP;
END;
/
RNOOEODDOH
LJNWHIUVJA
LFLHRCCURN
ROUGNTXSQU
RJKRTXKGCK

PL/SQL procedure successfully completed.

現在直接在 SELECT 語句中調用該函數:

1
2
3
SELECT COLUMN_VALUE my_string
FROM TABLE (random_strings(5))
/
MY_STRING
--------------------------------------------------------------------------------
BKYEWJPQJW
CVSINZCLPX
CLFUSFDWOU
MEEQSNYKXW
PANIDFINDJ

這就是 Table function !

Oracle 資料庫中的 SQL 引擎在 table function 方面為我們做了大部分繁重工作。在 SELECT 語句的 TABLE 子句中調用函數時,SQL 引擎會將函數返回的數據集轉換為關係結果集 (Relational result set)。然後就可以像從 Table 或 View 中的結果集那樣操縱該結果集。

當集合類型的每個元素都是單純的量值時(如上面的 strings_t 的情況),該結果集的單個 Column 的名稱將自動設置為 COLUMN_VALUE。 當然,您可以使用 column alias 更改它。

當我們從函數返回一個集合 (collection) 時,集合裡面的每個元素 (row) 不會像前面例子,只含單獨的一個欄位(column),我們需要像 Table 中的每筆資料一樣含有許多的 columns,要達到此目地我們就必須使用物件來達到此目的:

1
2
3
4
5
6
7
8
CREATE OR REPLACE TYPE two_strings_ot
AUTHID DEFINER
IS OBJECT
(
string1 VARCHAR2(10),
string2 VARCHAR2(10)
)
/
1
2
CREATE OR REPLACE TYPE two_strings_nt IS TABLE OF two_strings_ot
/

現在我們可以使用這兩個類型:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE OR REPLACE FUNCTION three_pairs
RETURN two_strings_nt
AUTHID DEFINER
IS
l_strings two_strings_nt;
BEGIN
RETURN two_strings_nt(
two_strings_ot('a', 'b'),
two_strings_ot('c', 'd'),
two_strings_ot('e', 'f')
);
END;
/
1
2
SELECT * FROM TABLE (three_pairs())
/
STRING1    STRING2
---------- ----------
a b
c d
e f

一旦在 TABLE 子句中嵌入了函數調用,就可以像使用 Table 或 inline view 中的結果集一樣使用結果集。你可以 join 該集合,可以將 columns 使用於 WHERE 子句中,也可以使用集合運算符 (set operator)。

以下是使用 three_pairs 函數以及一般 Table 的例子。首先我們需要一個簡單的一般 Table:

1
2
3
4
5
CREATE TABLE string_pairs (
string1 VARCHAR2(10),
string2 VARCHAR2(10)
)
/
1
2
3
4
5
6
7
BEGIN
INSERT INTO string_pairs VALUES ('a', 'bb');
INSERT INTO string_pairs VALUES ('cc', 'dd');

COMMIT;
END;
/

現在使用 UNION ALL 集合運算符(set operator),組合 Table 和 Table function:

1
2
3
4
SELECT * FROM string_pairs
UNION ALL
SELECT * FROM TABLE (three_pairs())
/
STRING1    STRING2
---------- ----------
a bb
cc dd
a b
c d
e f

接下來,將 Table 和 Table function join 在一起:

1
2
3
4
5
6
SELECT t.string1 string1, t.string2 t_string2, tf.string2 tf_string2
FROM string_pairs t,
TABLE (three_pairs()) tf
WHERE t.string1 = tf.string1
ORDER BY string1
/
STRING1  T_STRING2  TF_STRING2
---------- ---------- ----------
a bb b

甚至可以隱藏使用 Table function,將它放在 View 中:

1
2
3
4
CREATE OR REPLACE VIEW three_pairs_v
AS
SELECT * FROM TABLE (three_pairs())
/
1
2
3
4
5
6
SELECT t.string1 string1, t.string2 t_string2, tf.string2 tf_string2
FROM string_pairs t,
three_pairs_v tf
WHERE t.string1 = tf.string1
ORDER BY string1
/
STRING1  T_STRING2  TF_STRING2
---------- ---------- ----------
a bb b

Table function 為資料庫開發人員提供了靈活性和功能性。 你可以使用 Table function 將 SQL 功能與 PL/SQL 的過程控制相結合,以應對各種挑戰。

之前,我們學習了 GraphQL 並創建了一個基本的 GraphQL API。 在此示例中,我們將擴展這些概念,使用 AWS AppSync 創建一個音樂節應用程序。

此應用程序將會有以下內容:

  • Amazon DynamoDB Tables,將用於存儲節目和表演廳。
  • GraphQL API 用於創建、讀取、更新、刪除、和列出節目和表演廳。
  • 只有管理員才能創建、更新、或刪除節目或表演廳。
  • 所有用戶都應該能夠查看節目和表演廳。
  • 節目與表演廳之間的關連。
  • 用戶應該能夠查看所有節目以及查看節目的詳細信息。

GraphQL、AppSync API 和 React Router

首先,我們將介紹如何為 GraphQL 類型 (Type) 之間的關係建模;如何在 GraphQL 類型和字段 (fields) 上實現授權規則;如何為 AppSync API 啟用多種授權模式,以及如何使用 React Router 啟用路由參數。

GraphQL 類型之間的關係

創建 GraphQL API 或任何 API 時,了解數據之間的建模關係非常重要,例如,我們正在構建的應用程序將具有以下兩種類型:

  • Stage
    這類型將保存各個表演廳的信息,包括表演廳名稱、和表演廳 ID。 每個表演廳也都會有許多相關的節目。
  • Performance
    這個類型將保存各個節目的信息,包括表演者、節目簡介、演出的表演廳、和表演時間。

對於這類型的 API,理想情況下,至少要具有以下的存取模式:

  • 查詢單個表演廳和該表演廳的節目
  • 查詢所有表演廳以及每個表演廳的節目。
  • 查詢個別的表演節目和相應的表演廳。
  • 查詢所有的表演節目和相應的表演廳。

現在的問題是,您如何啟用這些不同的關係和存取模式? 在我們的案例中,如何使用像 DynamoDB 這樣的 NoSQL 數據庫來做到這一點? 有兩種方法可以實現此目的:

  • 透過使用主鍵,排序鍵和本地次要索引(local secondary indexes)的組合,使 DynamoDB 中的數據模式化,從而可以使用單個表(single table)執行所有這些存取模式。 為了與 AppSync 配合使用,這必須手動並從頭開始編寫和維護所有解析器邏輯。
  • 直接在 GraphQL resolvers 解析器級別啟用這些關係。 因為我們使用的是 GraphQL,GraphQL 啟用了按字段解析器(per-field resolvers),所以可以做到這一點。 為了更好地理解這一點,讓我們看一下將要使用的 Stage 類型。
Stage type in GraphQL
1
2
3
4
5
type Stage {
id: ID!
name: String!
performances: [Performance]
}

當為此類型創建一個或多個解析器時,以下是一個示例操作鏈。假設在請求表演廳和相對應的節目時會發生這種情況:

  1. 主要的 Stage GraphQL 解析器將使用 Stage ID 從數據庫中的 Stage 表中檢索表演廳的信息。
  2. Stage 類型的 performances 字段將具有其自己的 GraphQL 解析器。該解析器使用 Stage ID 透過使用 GSI 查詢數據庫來檢索相關的節目,僅返回與該表演廳 ID 相關的節目。

Global secondary indexes(GSIs) 允許我們添加可用於查詢表並啟用其他數據存取模式的其他索引。DynamoDB 和 NoSQL 數據庫最強大的功能之一通常是建立其他索引,以實現多種存取模式 (DynamoDB 最多可擁有 20 個 GSIs)。這就像關聯式數據庫中的 indexes 與相關的 constraints (foreign key、unique index 等)。

GraphQL Transform: @connection

在之前,我們使用了 GraphQL Transform 程式庫的 @model 指令來搭建整個後端,包括解析器、數據庫和其他 GraphQL schema。 GraphQL Transform 是一個程式庫,這些指令不是 GraphQL SDL 的一部分,它使我們可以 “裝飾(decorate)” GraphQL schema,並添加其他功能。

這裡,我們將介紹幾個新的指令,包括 @connection,它使我們能夠僅用幾行代碼建立這些關係模式所需要的解析器。

多重身份驗證類型

在之前,我們使用 API key 作為身份驗證類型創建了 GraphQL API。 這在某些情況下是很好的,例如當你想讓應用程序的所有用戶都可以使用 GraphQL 查詢時。

除了 API key 外,AppSync 支援四種主要的身份驗證方法:

  • The API key
    使用 API key 時,要在發出 HTTP 請求時,在 header 以 x-api-key 的形式發送 API key。 如果使用 Amplify 客戶端,則會自動發送。
  • Amazon Cognito user pools
    在這個範例,我們要使用 Amazon Cognito 的身份驗證託管服務。 使用 Amazon Cognito,我們可以針對 API 本身以及 GraphQL 類型和字段的存取,做進階的權限和群組的控管。
  • OpenID Connect
    OpenID Connect 使你能夠使用自己的身份驗證提供者。因此,如果你喜歡其他身份驗證服務(如 Auth0),或者你的公司具有自己的身份驗證方式,則仍可以使用它來對 AppSync API 進行身份驗證。
  • IAM
    AWS IAM 類型會在 GraphQL API 上使用 AWS Signature Version 4 簽章過程。您可以使用 Cognito identity pools 中未經身份驗證的 IAM 角色給予 public 的存取權限,這會比使用 API key 啟用 public 的存取權限更安全。

這裡,我們將結合使用 API key 和 Amazon Cognito 為 API 提供的多種身份驗證類型,從而實現公共讀取和私有存取的權限控管。

授權

使用 GraphQL Transform 程式庫,我們還可以使用 @auth 指令為 API 定義不同的授權規則。

使用 @auth,我們可以定義不同類型的規則,例如:

  • 允許所有用戶創建和讀取,但僅允許創建項目的所有者可以更新和刪除。
  • 只允許特定群組的用戶能夠創建,更新或刪除。
  • 只允許所有用戶都能讀取,但不能執行任何其他操作。
  • 上述規則的組合。

在這理,我們將構建的應用程序將支援私有存取和公共的讀取。此外,我們還需要對這些規則進行更多的控管。我們需要支援以下內容:

  • 經過身份驗證而且是 Amazon Cagnito Admin 群組的用戶,將能夠執行所有操作:創建、讀取、更新、和刪除。
  • 未經身份驗證的用戶將有權訪問,但只能讀取。

使用 GSI 的自定義數據存取模式

DynamoDB 最強大的功能之一是,每個表允許 20 個額外的 GSI。 透過使用 GSI 或 GSI + sort key 的組合(也可以將其視為 filter key),你可以為數據創建極其靈活而強大的數據存取模式。 GraphQL Transform 程式庫具有 @key 指令,可輕鬆為 @model 類型配置自定義的索引結構。

我們將使用 @key 在 Performance 表上將表演廳 ID 設置為 GSI 以創建存取模式。這樣將使我們能夠在單個 GraphQL 查詢中讀取表演廳及其相關的節目表。

現在瞭解了技能概述;讓我們開始構建應用程序。

開始構建 APP

首先,我們將創建新的 React 專案,安裝依賴項,初始化新的 Amplify 應用程序以及透過 CLI 添加功能。

創建 React 專案:
yarn create react-app festivalapp

cd festivalapp

yarn add aws-amplify antd @aws-amplify/ui-react react-router-dom
初始化一個新的 Amplify 專案:
amplify init
# 請按照步驟為專案命名,環境名稱並設置預設的文本編輯器。
# 接受其他所有設置的預設值,然後選擇你的 AWS Profile。

建立後端

我們將添加的第一個功能是身份驗證。 此應用程序將需要具有基本的身份驗證,以及 Lambda 觸發器,使我們可以在用戶註冊確認後,動態的將一組預定義的用戶添加到 Admin 群組中。

Authentication

使用 auth 類別添加 Cognito 身份驗證:

amplify add auth
 Do you want to use the default authentication and security configuration? Default configuration
How do you want users to be able to sign in? Username
Do you want to configure advanced settings? Yes, I want to make some additional changes.
What attributes are required for signing up? Email
Do you want to enable any of the following capabilities? Add User to Group
? Enter the name of the group to which users will be added. Admin
? Do you want to edit your add-to-group function now? Yes

使用以下代碼更新函式,並設定 adminEmails 陣列,這個陣列就是預定義的 Admin 群組用戶:

amplify/backend/function/<function_name>/src/add-to-group.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const aws = require('aws-sdk');

exports.handler = async (event, context, callback) => {
const cognitoProvider = new aws.CognitoIdentityServiceProvider({ apiVersion: '2016-04-18' });

let isAdmin = false;
const adminEmails = ['user1@somedomain.com', 'user2@somedomain.com'];

if (adminEmails.indexOf(event.request.userAttributes.email) !== -1) {
isAdmin = true;
}

const groupParams = {
UserPoolId: event.userPoolId,
};

const userParams = {
UserPoolId: event.userPoolId,
Username: event.userName,
};

if (isAdmin) {
groupParams.GroupName = 'Admin';
userParams.GroupName = 'Admin';

try {
await cognitoProvider.getGroup(groupParams).promise();
} catch (e) {
await cognitoProvider.createGroup(groupParams).promise();
}

try {
await cognitoProvider.adminAddUserToGroup(userParams).promise();
callback(null, event);
} catch (e) {
callback(e);
}
} else {
callback(null, event)
}
};

現在,身份驗證服務已設置完畢,我們可以繼續下一步:創建 AppSync API。

The AppSync API

接下來,我們將創建 AppSync GraphQL API。 請記住,對於此 API,我們需要為公共和受保護的存取啟用多種身份驗證類型,所有這些都可以由 CLI 啟用。

我們將使用 api 類別添加 AppSync API:

amplify add api
? Please select from one of the below mentioned services: GraphQL
? Provide API name: festivalapi
? Choose the default authorization type for the API Amazon Cognito User Pool
Use a Cognito user pool configured as a part of this project.
? Do you want to configure advanced settings for the GraphQL API Yes, I want to make some additional changes.
? Configure additional auth types? Yes
? Choose the additional authorization types you want to configure for the API API key
API key configuration
? Enter a description for the API key: public
? After how many days from now the API key should expire (1-365): 365
? Configure conflict detection? No
? Do you have an annotated GraphQL schema? No
? Choose a schema template: Single object with fields (e.g., “Todo” with ID, name, description)
? Do you want to edit the schema now? (y/N) Yes

我們將使用的 GraphQL schema 有兩個主要類型: Stage 和 Performance。

amplify/backend/api/festivalapi/schema.graphql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Stage @model
@auth(rules: [
{ allow: public, operations: [read] },
{ allow: groups, groups: ["Admin"] }
])
{
id: ID!
name: String!
performances: [Performance] @connection(keyName: "byStageId", fields: ["id"])
}

type Performance @model
@key(name: "byStageId", fields: ["performanceStageId"])
@auth(rules: [
{ allow: public, operations: [read] },
{ allow: groups, groups: ["Admin"] }
])
{
id: ID!
performanceStageId: ID!
productId: ID
performer: String!
imageUrl: String
description: String!
time: String
stage: Stage @connection
}

讓我們看看我們使用的指令及其工作方式。

  • @auth
    首先,@auth 指令允許我們傳入一個陣列的身份驗證規則。 每個規則都有一個 allow 字段(必要)以及其他元數據(可選),包括諸如指定身份驗證提供者(如果它與預設授權類型不同)之類的事情。

    在 Stage 和 Performance 類型中,我們使用了兩種授權類型,一種用於群組存取 (groups),另一種用於公共存取 (public)。 對於 public,我們還設置 operations 陣列。 該陣列應包含我們要在 API 上啟用的操作的列表。 如果未列出任何操作,則預設情況下將啟用所有操作。

  • @key
    @key 指令使我們能夠添加 DynamoDB 表的 GSI 和對鍵進行排序。 在這個架構中,我們創建了一個名為 byStageId 的鍵,該鍵將允許我們使用 Performance 表上的 performanceStageId 字段按表演廳的 ID 查詢 Performance 表中的節目。 Stage 類型上的 performances 字段的解析器將使用該表演廳的 ID 來讀取該表演廳的所有節目。

  • @connection
    @connection 指令允許我們對類型之間的關係進行建模。 可以創建的關係類型可以是: 一對多、多對一、或多對多。 在此示例中,我們創建了兩個關係。

    • 表演廳與節目之間的關係(一個表演廳有很多表演節目)
    • 節目與表演廳之間的關係(表演節目僅屬於一個表演廳)
部署服務

配置了所有服務之後,我們就可以部署後端了:

amplify push

部署完成後,你可以隨時查詢 Amplify 專案狀態:

amplify status

Current Environment: pecftadev

| Category | Resource name | Operation | Provider plugin |
| -------- | ----------------------------------- | --------- | ----------------- |
| Function | festivalapp0ed53a88PostConfirmation | No Change | awscloudformation |
| Auth | festivalapp0ed53a88 | No Change | awscloudformation |
| Api | festivalapi | No Change | awscloudformation |

GraphQL endpoint: ...
GraphQL API KEY: ...

服務已部署,我們可以開始編寫客戶端代碼。

建立前端

我們要做的第一件事是在 src 目錄中創建此應用程序所需的程式檔:

Container.js、Footer.js、Nav.js、Admin.js、Router.js、Performance.js 、與 Home.js。

下一步是打開 src/index.js 添加 Amplify 配置,導入 Ant Design 樣式,並用即將創建的 Router 組件替換主要組件。

src/index.js
1
2
3
4
5
6
7
8
9
10
11
12
import React from 'react'
import ReactDOM from 'react-dom'
import Router from './Router'
import 'antd/dist/antd.css'
import Amplify from 'aws-amplify'
import config from './aws-exports'
Amplify.configure(config)

ReactDOM.render(
<Router />,
document.getElementById('root')
);
Container

現在,讓我們創建一個 Container 組件,該組件將用作可重用的組件,為我們的視圖添加間距與樣式:

src/Container.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import React from 'react'

const Container = ({ children }) => {
return (
<div style={container}>
{ children }
</div>
)
}

const container = {
padding: '30px 40px',
minHeight: 'calc(100vh - 120px)'
}

export default Container

這裡,我們將創建 Footer 組件,該組件也是一個可重用的組件,以添加基本的頁腳,並提供一個鏈接,使管理員可以註冊和登錄:

src/Footer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import React from 'react'
import { Link } from 'react-router-dom'

const Footer = () => {
return (
<div style={footerStyle}>
<Link to="/admin">
Admins
</Link>
</div>
);
}

const footerStyle = {
borderTop: '1px solid #ddd',
display: 'flex',
alignItems: 'center',
padding: 20
}

export default Footer

現在,打開 src/Nav.js 創建基本的應用程序導航。 只有一個鏈接:回到主視圖的鏈接,其中將列出所有表演廳和節目表:

src/Nav.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import React from 'react'
import { Link } from 'react-router-dom'
import { Menu } from 'antd'
import { HomeOutlined } from '@ant-design/icons'

const Nav = ({ current }) => {
return (
<div>
<Menu selectedKeys={[current]} mode="horizontal">
<Menu.Item key='home'>
<Link to={'/'}>
<HomeOutlined />Home
</Link>
</Menu.Item>
</Menu>
</div>
)
}

export default Nav
Admin

將創建的 Admin 組件僅做三件事:允許用戶註冊、登錄、和登出。 該組件是為管理員提供一種註冊方法,以便他們可以隨後以管理員身份創建和管理 API。

請記住,當某人註冊時,如果在 Lambda 觸發器中驗證了他們的電子郵件,則他們將在註冊後被置於 Admin 群組中。 然後,他們將能夠執行 GraphQL mutations 操作來創建、更新、和刪除表演廳和節目。

如果你需要更新諸如 GraphQL schema 或 Lambda 函式之類的後端代碼,可以先在本地端進行更改,然後執行 amplify push 將更改部署到後端。

src/Admin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import React from 'react'
import { withAuthenticator, AmplifySignOut } from '@aws-amplify/ui-react'

const Admin = () => {
return (
<div>
<h1 style={titleStyle}>Admin</h1>
<AmplifySignOut />
</div>
)
}

const titleStyle = {
fontWeight: 'normal',
margin: '0px 0px 10px 0px'
}

export default withAuthenticator(Admin)
Router
src/Router.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import React, { useState, useEffect } from 'react'
import { HashRouter, Switch, Route } from 'react-router-dom'

import Home from './Home'
import Admin from './Admin'
import Nav from './Nav'
import Footer from './Footer'
import Container from './Container'
import Performance from './Performance'

const Router = () => {
const [current, setCurrent] = useState('home')

useEffect(() => {
setRoute()
window.addEventListener('hashchange', setRoute)
return () => window.removeEventListener('hashchange', setRoute)
}, [])

function setRoute() {
const location = window.location.href.split('/')
const pathname = location[location.length - 1]
setCurrent(pathname ? pathname : 'home')
}

return (
<HashRouter>
<Nav current={current} />
<Container>
<Switch>
<Route exact path="/" component={Home} />
<Route exact path="/performance/:id" component={Performance} />
<Route exact path="/admin" component={Admin} />
</Switch>
</Container>
<Footer />
</HashRouter>
)
}

export default Router

在此組件中,我們將路由器與持久性 UI 組件 Container 和 Footer 結合在一起。

這個應用程序具有三個路由:

  • Home
    這是呈現表演廳和節目的主要路由。
  • Performance
    這是呈現個別的節目和節目細節的路由。
  • Admin
    這是將為管理員提供 註冊/登錄 頁面的路由。

在 Performance 路由中,你將看到我們使用的路由如下:

/performance/:id

這是使用 URL 參數的方式,符合此路由將使我們能夠在組件本身中使用這個參數。 這很有用,透過路由參數使我們能夠使用節目的 id 來獲取節目的詳細信息。 路由參數還使我們能夠輕鬆構建支援深層鏈接的應用程序。

Performance
src/Performance.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import React, { useState, useEffect } from 'react'
import { useParams } from 'react-router-dom'
import { getPerformance } from './graphql/queries'
import { API } from 'aws-amplify'

const Performance = () => {
const [performance, setPerformance] = useState(null)
const [loading, setLoading] = useState(true)

let { id } = useParams()

useEffect(() => {
fetchPerformanceInfo()
}, [])

async function fetchPerformanceInfo() {
try {
const performanceInfo = await API.graphql({
query: getPerformance,
variables: { id },
authMode: 'API_KEY'
})
setPerformance(performanceInfo.data.getPerformance)
setLoading(false)
} catch (err) {
console.log('error fetching performance info...', err)
setLoading(false)
}
}

return (
<div>
<p>節目資訊</p>
{ loading && <h3>Loading...</h3> }
{
performance && (
<div>
<h1>{performance.performer}</h1>
<h3>{performance.time}</h3>
<p>{performance.description}</p>
</div>
)
}
</div>
)
}

export default Performance

這個組件的 render 方法非常基礎。 它只是呈現演出者、時間、和描述。 關於此組件的重點在於我們如何獲取該信息。

  1. 我們使用 useState 掛鉤創建兩個狀態:loading (預設為 true) 和 performance (預設為 null)。 我們還創建了一個名為 id 的變數,該變數使用 React Router 中的 useParams 獲取路由參數 id 的值。
  2. 加載組件時,我們使用 useEffect 掛鉤立即調用 fetchPerformanceInfo 函式。
  3. fetchPerformanceInfo 函式使用路由參數中的 id 來調用 AppSync API。 這裡的 API 調用使用 API.graphql,傳入 variables,query 和 authMode。 預設情況下,這裡的 API 使用 Cognito User Pools 作為身份驗證模式。 每當我們想覆蓋此預設 (例如在這裡的情況下進行 public API 調用) 時,我們需要在 API 調用本身中指定 authMode。
  4. 從 API 返回數據後,我們調用 setLoading 和 setPerformance 來更新 UI 以呈現從 API 返回的數據。
Home

現在,讓我們創建最後一個組件,Home 組件:

src/Home.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import React, { useState, useEffect } from 'react'
import { API } from 'aws-amplify'
import { listStages } from './graphql/queries'
import { Link } from 'react-router-dom'
import { List } from 'antd'

const Home = () => {
const [stages, setStages] = useState([])
const [loading, setLoading] = useState(true)

useEffect(() => {
getStages()
}, [])

async function getStages() {
try {
const apiData = await API.graphql({
query: listStages,
authMode: 'API_KEY'
})
const { data: { listStages: { items }}} = apiData
setStages(items)
setLoading(false)
} catch (err) {
console.log('error fetching stages list...', err)
setLoading(false)
}
}

return (
<div>
<h1 style={heading}>表演廳院</h1>
{ loading && <h2>Loading...</h2> }
{
stages.map(stage => (
<div key={stage.id} style={stageInfo}>
<p style={infoHeading}>{stage.name}</p>
<p style={infoTitle}>節目表</p>
<List
itemLayout="horizontal"
dataSource={stage.performances.items}
renderItem={performance => (
<List.Item>
<List.Item.Meta
title={<Link style={performerInfo}
to={`/performance/${performance.id}`}
>{ performance.performer }</Link>
}
description={performance.time}
/>
</List.Item>
)}
/>
</div>
))
}
</div>
)
}

const heading = { fontSize: 44, fontWeight: 300, marginBottom: 5 }
const stageInfo = { padding: '20px 0px 10px', borderBottom: '2px solid #ddd' }
const infoTitle = { fontWeight: 'bold', fontSize: 18 }
const infoHeading = { fontSize: 30, marginBottom: 5 }
const performerInfo = { fontSize: 24 }

export default Home

這個組件中的邏輯實際上與我們在 Performance 組件中所做的非常相似。

現在,應用程序已完成,但還有一件事。 因為我們為 performances 解析器創建了自定義存取模式,所以我們需要更新 listStages 查詢定義,以便也能返回與表演廳相關的節目表。 為此,使用以下命令更新 listStages 查詢:

src/graphql/queries.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export const listStages = /* GraphQL */ `
query ListStages(
$filter: ModelStageFilterInput
$limit: Int
$nextToken: String
) {
listStages(filter: $filter, limit: $limit, nextToken: $nextToken) {
items {
id
name
createdAt
updatedAt
performances {
items {
id
time
performer
description
}
}
}
nextToken
}
}
`;

現在,我們可以加入一些數據。 啟動應用程序並註冊一個管理員用戶:

yarn start

單擊頁腳中的 Admins 鏈接進行註冊。

我們在這應用程序範例中沒有建立可以創建,更新或刪除表演廳與節目的 UI 介面,所以我們要直接使用 AWS AppSync 控制台來加入一些數據。註冊一個管理員用戶後,打開 AppSync 控制台:

amplify console api

選擇 GraphQL。

在控制台的 Queries 面板中,您需要單擊 Login with User Pools 然後用剛創建的用戶的 username 和 password 登錄。 當提示你輸入 ClientID 時,請使用專案的 src/aws-exports.js 文件中的 aws_user_pools_web_client_id。

接下來在 AppSync 控制台的 Queries 面板中,創建表演廳和節目:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mutation createStage {
createStage(input: {
id: "stage-1"
name: "音樂廳"
}) {
id name
}
}

mutation createPerformance {
createPerformance(input: {
performanceStageId: "stage-1"
performer: "小提琴家胡乃元"
description: "與大師有約 聽見不凡聲響。活躍於國內外藝術季及演出的對位室內樂團,邀請比利時伊莉莎白女王國際音樂大賽首獎得主–小提琴家胡乃元,演奏經典的布魯赫《第一號小提琴協奏曲》"
time: "2020/11/29 (日) 15:00"
}) {
id performer description
}
}

現在,我們的數據庫具有一些數據,我們應該能夠在我們的應用程序中查看它,並在主畫面和節目詳細資訊畫面視圖之間切換導航。

在這個範例中學到了幾點:

  • GraphQL Transform 指令使你能夠向 GraphQL API 添加強大的功能,例如身份驗證規則,關係和用於其他數據存取模式的自定義索引。
  • @auth 指令允許你傳入規則陣列以定義類型和字段的授權規則。
  • @connection 指令使你可以對 GraphQL 類型之間的關係進行建模。
  • @key 指令使你能夠為自定義數據存取模式定義自定義索引並增強現有關係。
  • 當創建具有多種授權類型的 API 時,進行 API 調用時會使用預設的主要授權類型。 如果需要覆蓋主要授權類型時,可以使用 API 類 authMode 參數傳遞你要使用的授權類型。

我們完成了一個 Serverless 的應用程序,而且幾乎沒有寫到伺服端的程式碼。也不需要擔心認證、授權、與資料庫的維護、安全、與效能問題。

Offline first 是一種軟件開發方法,開發人員可以構建應用程序的核心功能,以在有或沒有 Internet 連接的情況下正常運行。借助 Offline first 方法,數據可以在本地端寫入終端用戶的設備上,並定期上傳和復製到雲中。

Offline first 策略的一個重要目標是在互聯網連接速度慢或不存在時為終端用戶提供一致的用戶體驗(UX)。該體系結構將數據和應用程序邏輯推送到網絡邊緣(network edge),並且大多數處理都在終端用戶的設備上進行。

將所有裝置產生的資料傳送至集中式資料中心,或傳送至雲端,會造成頻寬及延遲問題。邊緣運算(Edge computing)提供更有效率的替代方案:在更接近資料建立的位置處理及分析資料。因為不需將資料傳送到雲端或資料中心進行處理,因此大幅減少延遲現象。邊緣運算 - 以及 5G 網路上的行動式邊緣運算 - 能提供更快更全面的資料分析、產生更深入的洞察、具有更快的回應時間,以及改善客戶體驗。而終端用戶端的計算機也可視為網絡邊緣(network edge)

這種方法不僅確保了應用程序的核心功能在沒有可靠的網絡連接的情況下仍能正常工作,而且還為移動用戶提供了電池資源和頻寬的更有效利用。對於旅行和體驗互聯網覆蓋盲區的終端用戶而言,這一點尤其重要。

AWS Offline Apps

到目前為止,我們已經使用了 REST API 和 GraphQL API。 在使用 GraphQL API 時,我們在本地端使用 API class 直接調用 GraphQL API 的 mutationsqueries

Amplify 還支援另一種與 AppSync 互動的 API:Amplify DataStore。 與一般的 GraphQL API 比較,DataStore 具有一些不同的方法。

DataStore 引入了客戶端 SDK(client-side SDK),它允許你使用本地端存儲引擎(local storage engine)進行讀寫操作,並保留這些數據。(例如使用,Web 的 IndexDB 和用於 iOS 和 Android 的 SQLite)。然後,DataStore 會自動將本地端數據透過 GraphQL API 同步到遠端存儲。記得我們也曾經使用 PouchDB 與 CouchDB 做過本地端與遠端數據的同步。這裡則是 DataStore 本地端自動透過 GraphQL API 同步到雲端的存儲。

使用 DataStore SDK,你只需直接執行 save、update 和 delete 之類的操作,即可直接寫入 DataStore 本身。 DataStore 可以為你處理其他所有事情:當你有 Internet 網路連接時,它會將數據同步到雲中;如果你離線時,則將會保留在 queue 中,在下次連線時將其同步到雲中。

DataStore 有三種內置的衝突解決策略來為您處理衝突檢測和解決方式:

  • AutoMerge
    在運行時檢查 GraphQL 類型的物件信息,以執行合併操作 (建議的選項)。
  • Optimistic concurrency
    傳入的記錄將與資料庫中最後寫入的項目進行版本檢查。
  • Custom
    使用 Lambda 函式將所需的自定義合併或拒絕更新業務邏輯寫入流程。

Amplify DataStore

Amplify DataStore 是以下各項的組合:

  • AppSync GraphQL API
  • 本地端存儲庫和同步引擎,也可以離線保存數據
  • 用於與本地端存儲庫互動的客戶端 SDK。
  • 啟用特殊的同步 GraphQL 解析器(由 Amplify CLI 自動產生的),可在伺服器上實現複雜的衝突檢測和衝突解決。

Amplify DataStore 概觀

開始使用 DataStore 時,仍然可以像創建一般的 API 一樣。 主要區別是,在創建 API 時,在 Amplify CLI 的進階設置中啟用 conflict detection 衝突檢測。

創建 GraphQL API 後接著要在客戶端上啟用 DataStore,我們需要為 DataStore 創建資料模型以用於與存儲庫互動。這只需依據已經存在的 GraphQL schema 定義,並執行 amplify codegen models CLI 構建命令,即可輕鬆實現。

Amplify DataStore Operations

Operation Commands
Import the model and
DataStore API
import { DataStore } from ‘@aws-amplify/datastore’
import { Message } from ‘./models’
Saving data await DataStore.save(
    new Message({
        title: ‘Hello Tainan’,
        sender: ‘Emily’
    })
)
Reading data const messages = await DataStore.query(Message)
Deleting data const message = await DataStore.query(Message, ‘123’)
DataStore.delete(message)
Updating data const message = await DataStore.query(Message, ‘123)
await DataStore.save(
    Message.copyOf(message, updated => {
        updated.title = ‘My new title’
    })
)
Observing/subscribing to
changes in data for real-time
functionality
const subscription = DataStore
    .observe(Message)
    .subscribe(msg => {
        console.log(msg.model, msg.opType, msg.element)
  });

DataStore Predicates

你可以使用 GraphQL 類型上定義的字段(field)以及 DynamoDB 支援的條件謂詞對數據存儲應用過濾器:

Strings: eq | ne | le | lt | ge | gt | contains | notContains | beginsWith | between

Numbers: eq | ne | le | lt | ge | gt | between

Lists: contains | notContains

例如,如果要獲取 title 包含 “Hello” 的所有訊息的列表:

1
2
const messages = await DataStore
.query(Message, m => m.title('contains', 'Hello'))

也可以將多個條件謂詞鏈接到一個操作中:

1
2
const message = await DataStore
.query(Message, m => m.title('contains', 'Hello').sender('eq', 'Emily'))

這些條件謂詞使你可以透過多種方式從本地數據中檢索不同的選擇集。以這種方式,你無需將整個資料經過網路下載到客戶端然後再檢索整個集合和過濾,直接可以快速的從本地存儲中精確查詢所需的數據。

Building an Offline and Real-Time App with Amplify DataStore

了解了基本的運作方式,我們要來實作一個應用程序。這個應用程序的用戶可以創建一條新消息,所有其他用戶將即時收到該消息。如果用戶離線,他們將能夠繼續創建消息,一旦他們重新連上線,這些消息將自動同步到雲端,並且也將會同步取得其他用戶在這段時間所創建的所有消息。而其他用戶也會即時收到你同步到雲端的新消息。

我們的應用將執行三種類型的 DataStore API 操作:

  • save
    使用 DataStore 創建一個新項目,將項目保存在本地端,並在背後執行 GraphQL mutation。
  • query
    從本地數據存儲中讀取,返回單個項目或列表 (陣列),並在背後執行 GraphQL query。
  • observe
    監聽本地數據存儲中的更改(創建,更新,刪除),並在背後執行 GraphQL subscription。

1. Creating the Base Project

我們要做的第一件事是創建 React 專案:

yarn create react-app rtmessageboard

cd rtmessageboard

yarn add @aws-amplify/core @aws-amplify/datastore antd react-color

接下來,初始化一個新的 Amplify 專案:

amplify init
# 請按照步驟為專案命名,環境名稱並設置預設的文本編輯器。
# 接受其他所有設置的預設值,然後選擇你的 AWS Profile。

2. Creating the API

現在,我們將創建 AppSync GraphQL API:

amplify add api
? Please select from one of the below mentioned services: GraphQL
? Provide API name: rtmessageboard
? Choose the default authorization type for the API API key
? Enter a description for the API key: public
? After how many days from now the API key should expire (1-365): 365
? Do you want to configure advanced settings for the GraphQL API Yes, I want to make some additional changes.
? Configure additional auth types? No
? Configure conflict detection? Yes
? Select the default resolution strategy Auto Merge
? Do you have an annotated GraphQL schema? No
? Choose a schema template: Single object with fields (e.g., “Todo” with ID, name, description)
? Do you want to edit the schema now? Yes

更改 GraphQL schema:

amplify/backend/api/rtmessageboard/schema.graphql
1
2
3
4
5
6
7
type Message @model {
id: ID!
title: String!
color: String
image: String
createdAt: String
}

現在,我們已經創建了 GraphQL API,並且已經有了一個 GraphQL schema,我們就可以用這個 GraphQL schema 創建本地 DataStore API 所需的資料模型(models)。

amplify codegen models

這將在我們的專案中創建一個名為 models 的新文件夾。 使用此文件夾中的模型,我們可以開始與 DataStore API 進行互動。

部署 API:

amplify push --y

3. 客戶端代碼

首先,打開 src/index.js 在最後一行 import 下面添加以下代碼來配置 Amplify 應用:

src/index.js
1
2
3
4
import 'antd/dist/antd.css'
import Amplify from '@aws-amplify/core'
import config from './aws-exports'
Amplify.configure(config)

接下來,打開 src/App.js 並使用以下代碼進行更新:

src/App.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import React, { useState, useEffect } from 'react'
import { SketchPicker } from 'react-color'
import { Input, Button } from 'antd'
import { DataStore } from '@aws-amplify/datastore'
import { Message } from './models'

const initialState = {color: '#000000', title: '' }

function App() {
const [formState, updateFormState] = useState(initialState)
const [messages, updateMessages] = useState([])
const [showPicker, updateShowPicker] = useState(false)

useEffect(() => {
fetchMessages()

const subscription = DataStore
.observe(Message)
.subscribe(() => fetchMessages())

return () => subscription.unsubscribe()
}, [])

async function fetchMessages() {
const messages = await DataStore.query(Message)
updateMessages(messages)
}

function onChange(e) {
if (e.hex) {
updateFormState({ ...formState, color: e.hex })
} else {
updateFormState({ ...formState, [e.target.name]: e.target.value })
}
}

async function createMessage() {
if (!formState.title) return

await DataStore.save(new Message({ ...formState }))
updateFormState(initialState)
}

async function deleteMessage(id) {
const message = await DataStore.query(Message, id)
DataStore.delete(message)
}

return (
<div style={container}>
<h1 style={heading}>Real Time Message Board</h1>
<Input
onChange={onChange}
name="title"
placeholder="Message title"
value={formState.title}
style={input}
/>
<div>
<Button
onClick={() => updateShowPicker(!showPicker)}
style={button}
>切換顏色選擇器</Button>
<p>Color:
<span style={{fontWeight: 'bold', color: formState.color}}>
{formState.color}
</span>
</p>
</div>
{
showPicker && (
<SketchPicker
color={formState.color}
onChange={onChange}
/>
)
}
<Button type="primary" onClick={createMessage}>Create Message</Button>
{
messages.map(message => (
<div
key={message.id}
style={{...messageStyle, backgroundColor: message.color}}
onDoubleClick={() => deleteMessage(message.id)}
>
<div style={messageBg}>
<p style={messageTitle}>{message.title}</p>
</div>
</div>
))
}
</div>
);
}

const container = { width: '100%', padding: 40, maxWidth: 900 }
const input = { marginBottom: 10 }
const button = { marginBottom: 10 }
const heading = { fontWeight: 'normal', fontSize: 40 }
const messageBg = { backgroundColor: 'white' }
const messageStyle = { padding: '10px', marginTop: 7, borderRadius: 4 }
const messageTitle = { margin: 0, padding: 9, fontSize: 20 }

export default App;

讓我們看一下這個組件中重要的部分:

  1. 我們從 Amplify 導入 DataStore API 以及導入 Message 資料模型
  2. 我們使用 useState 掛鉤創建三個組件狀態(Component state):
    • formState
      這個物件管理表單的狀態,包括將用於顯示消息的 title 與背景顏色的 color。
    • messages
      這將管理從 DataStore 提取的消息陣列。
    • showPicker
      這將管理一個 Boolean 值,該 Boolean 值將切換顯示或隱藏顏色選擇器,以填充消息的 color 值。
  3. 當組件加載時在 useEffect 掛鉤中我們調用 fetchMessages 函式獲取所有消息,並調用 DataStore.observe 創建一個訂閱以偵聽消息更新。當訂閱被觸發時,我們再次調用 fetchMessages 函式以獲取最新的數據來更新應用程序。
  4. fetchMessages 函式調用 DataStore.query,然後使用返回的消息陣列更新組件狀態。
  5. onChange 處理程序處理表單的輸入以及顏色選擇器的更新。
  6. 在 createMessage 中,我們首先檢查 title 以確保有值。 如果是,使用 DataStore.save 存儲消息,然後重置表單狀態。
  7. 在 deleteMessage 中先用 DataStore.query 取得要刪除的消息,然後使用 DataStore.delete 刪除它。

這裡使用 DataStore SDK,你只需直接執行 save、update 和 delete 之類的操作,直接寫入 DataStore 本身。 DataStore 可以為你處理其他所有事情:當你有 Internet 網路連接時,它會將數據同步到雲中;如果你離線時,則將會保留在 queue 中,在下次連線時將其同步到雲中。

啟動應用程序,讓我們測試看看:

yarn start

4. 測試遠端同步與離線功能

首先在正常連線下創建新的消息,然後觀察本地端的存儲與雲端的存儲。

要觀察雲端的存儲,請在 AWS 控制台中打開 AppSync API,然後選擇 Data Sources -> MessageTable:

amplify console api

現在,嘗試離線,創建新的消息,觀察兩端的存儲,然後重新連線,再度觀察兩端的存儲。 您應該注意到,當重新連線時,該應用程序在離線時所創建的有消息都同步到雲端的資料庫中。

5. 測試 Real-Time 功能

要測試 Read-Time 即時功能,請打開另一個瀏覽器窗口,或另外找一台電腦開啟瀏覽器窗口。 然後在一個窗口中創建一個新項目,並在另一窗口中查看 UI 介面是否自動更新。也觀察一下離線和重新連線時,即時功能會如何運作。

在此示例中,請注意以下幾點:

  • Amplify 支持兩種不同的 API 與 AppSync 進行互動:API class 以及 DataStore。
  • 使用 DataStore 時,不再需要直接使用 HTTP API 發送請求。 相反的,直接寫入本地端存儲引擎,然後 DataStore 會負責與雲端之間的同步。
  • 預設情況下,Amplify DataStore 使用離線(offline)工作。
  • 這裡,你也學到了基本的 Offline first 與 Edge computing。