0%

Asynchronously initialized module (非同步初始化模塊)

在 Node.js 模塊系統的基本屬性 require( )是同步工作的,且 module.exports 不能設置為非同步。在 Node.js 核心模塊和許多 npm 軟件包中有些存在有同步 API 的主要原因之一,是在提供一種方便的替代方法,而這個替代方法主要用於初始化任務,而不是要取代原有的非同步 API 。

不幸的是,這並不總是可能的。 同步 API 可能並不總是可取得,尤其是對於在使用網絡組件的初始化階段,例如,執行握手協議(handshake protoclos)或檢索配置參數(configuration parameters)。 許多中間件系統的資料庫驅動程序和客戶端(例如消息隊列 message queues)也都有初始化階段的情況。

經典解決方案

讓我們舉個例子:一個名為 db 的模塊,該模塊連接到遠程資料庫。 只有在完成與資料庫伺服器的連接和握手之後,db 模塊才能接受請求。 在這種情況下,我們通常有兩個選擇:

  • 在開始使用模塊之前,確保已對其進行了初始化,否則需等待其初始化。 每當我們要在非同步模塊上調用操作時,都必須完成此過程:
1
2
3
4
5
6
7
8
9
10
11
12
13
const db = require('aDb');   //非同步模塊

module.exports = function findAll(type, callback) {
if (db.connected) { //is it initialized?
runFind();
} else {
db.once('connected', runFind);
}

function runFind() {
db.findAll(type, callback);
}
}
  • 使用 Dependency Injection(DI)而不是直接使用非同步模塊。這樣做,我們可以延遲某些模塊的初始化,直到它們的非同步依賴關係被完全初始化為止。 這種技術將管理模塊初始化的複雜性轉移到了另一個組件(通常是父組件)上。 在以下示例中,此組件是 app.js:
app.js
1
2
3
4
5
6
const db = require('aDB');  //非同步模塊
const findAllFactory = require('./findAll');

db.on('connected', function() {
const findAll = findAllFactory(db);
});
findAll.js
1
2
3
4
5
6
module.exports = function(db) {
//db is guaranteed to be initialized
return function findAll(type, callback) {
db.findAll(type, callback)
};
}

第一種選擇將涉及大量的代碼數量,可能不會是好的選擇。另外,使用 DI 的第二個選項有時也不可取,在大型專案中,它可能很快變得過於復雜,尤其是如果手動完成並使用非同步初始化的模塊。

但是,我們將看到,還有第三種選擇,它使我們能夠輕鬆地將模塊與其依賴項的初始化狀態隔離開來。

預初始化隊列(Preinitialization queues)

使模塊與依賴項的初始化狀態脫鉤的簡單模式包括使用隊列(queues)和命令模式(Command pattern)。 這個想法是保存尚未初始化的模塊所接收的所有操作(operations),然後在完成所有初始化步驟後立即執行它們。

實現非同步初始化的模塊

為了展示這種簡單而有效的技術,讓我們構建一個小型測試應用程序; 沒什麼花哨的,只是可以驗證我們假設的東西,讓我們從創建一個非同步初始化的模塊 asyncModule.js 開始;稍後,我們將會將它實際用在 Oracle 資料庫上。

asyncModule.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const asyncModule = module.exports;

asyncModule.initialized = false;

asyncModule.initialize = (callback) => {
setTimeout(function () {
asyncModule.initialized = true;
callback();
}, 10000);
};

asyncModule.tellMeSomething = (callback) => {
process.nextTick(() => {
if(!asyncModule.initialized) {
return callback(new Error("I don't have anythins to say right now"));
}
callback(null, `Current time is: ${new Date()}`);
});
};

在前面的代碼中,asynModule 嘗試展示非同步初始化的模塊的工作方式。它公開了 initialize( )方法,該方法在延遲 10 秒後將初始化變數設置為 true 並通知其 callback(對於實際情況來說 10 秒是很長的時間) 應用程序,但是對我們來說,可以突顯出任何的 race conditions)。另一個方法 tellMeSomething( ) 返回當前時間,但是如果模塊尚未初始化,則會產生錯誤。

下一步是根據我們剛剛創建的服務創建另一個模塊。 讓我們考慮一下在一個名為 routes.js 的文件中實現的簡單 HTTP 請求處理程序:

routes.js
1
2
3
4
5
6
7
8
9
10
11
12
const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if (err) {
res.writeHead(500);
return res.end(`Error: ${err.message}`);
}
res.writeHead(200);
res.end(`I say: ${something}`);
});
};

處理程序調用 asyncModule的tellMeSomething( )方法,然後將結果寫入 HTTP 回應中。 如我們所看到的,我們沒有對 asyncModule 的初始化狀態執行任何檢查,可以想像,這可能將導致問題。

現在,讓我們創建一個非常基本的 HTTP 服務器,僅使用核心 http 模塊。

app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule');

asyncModule.initialize(() => {
console.log('Async module initialized.');
});

http.createServer((req, res) => {
if(req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));

前面的小模塊是我們應用程序的入口,它所做的只是觸發 asyncModule 的初始化,並創建一個 HTTP 服務器,該服務器利用我們先前創建的 routes.say( ) 的請求處理程序。

現在,我們可以像往常一樣通過執行 app.js 模塊來嘗試啟動服務器。

$ node app.js

服務器啟動後,我們可以嘗試使用瀏覽器訪問 URL http://localhost:8000/say ,並查看 asyncModule 的返回結果。

Error: I don’t have anythins to say right now

這意味著 asyncModule 尚未初始化,但我們仍然嘗試使用它。根據非同步初始化的 Module 的實現細節,我們可能會收到一個錯誤,丟失重要信息甚至崩潰整個應用程序。

通常,必須完全避免剛剛描述的情況。在大多數情況下,初始化可能非常快速,不必擔心幾個失敗的請求,甚至在實踐中它可能永遠不會發生。但是,對於旨在自動擴展的高負載應用程序和雲服務器,這兩種不太可能發生錯誤的假設都必須完全避免。

用預初始化隊列包裝模塊

為了增加伺務器的穩健性,我們現在將重構它。我們將對尚未初始化而在 asyncModule 上調用的所有操作進行隊列化(queue),直到準備好處理它們時再將隊列刷新。 這看起來像是 State 模式的絕佳應用! 我們將需要兩種狀態,

  • 一種狀態是在模塊尚未初始化時將所有操作送入隊列。
  • 另一種狀態是在初始化完成後將隊列中的每個操作直接執行原始 asyncModule 模塊中的操作。

通常,我們沒有機會修改非同步模塊的代碼。 因此,要添加我們的隊列層,我們將需要圍繞原始 asyncModule 模塊創建一個代理 Proxy。

讓我們創建一個名為 asyncModuleWrapper.js 的新檔案,然後開始逐段構建它。 我們需要做的第一件事是創建將操作委託給活動狀態的物件

1
2
3
4
5
6
7
8
9
10
11
12
const asyncModule = require('./asyncModule');

const asyncModuleWrapper = module.exports;

asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function () {
activeState.initialize.apply(activeState, arguments);
};

asyncModuleWrapper.tellMeSomething = function () {
activeState.tellMeSomething.apply(activeState, arguments);
};

在前面的代碼中,asyncModuleWrapper 簡單地將其每個方法委託給當前活動狀態(currently active state)。 從 notInitializedState 開始,讓我們看看這兩種狀態的樣子:

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
let pending = [];
let notInitializedState = {
initialize: function (callback) {
asyncModule.initialize(() => {
asyncModuleWrapper.initalized = true;
activeState = initializedState;

pending.forEach(req => {
asyncModule[req.method].apply(null, req.args);
});
pending = [];
callback();
});
},

tellMeSomething: function (callback) {
return pending.push({
method: 'tellMeSomething',
args: arguments
});
}
};

當調用 initialize( )方法時,我們觸發原始 asyncModule 模塊的初始化(第 16 行),並提供回調代理。 這使我們的包裝器知道何時初始化了原始模塊,並因此觸發了以下操作:

  1. 第 18 行使用流程中的下一個狀態物件 initializedState 更新 activeState 變數。
  2. 第 20 行執行之前存儲在 pending 隊列中的所有操作指令。
  3. 第 24 行調用原始回調(callback)。

開始時由於模塊尚未初始化,因此此狀態(notInitializedState)的 tellMeSomething( )方法簡單地創建一個新的 Command 物件將其添加到 pending 隊列的操作中。

此時,當原始 asyncModule 模塊尚未初始化時,模式應該已經很清楚了,我們的包裝器簡單地將所有接收到的請求送入 pending 隊列排隊。 然後,當通知我們初始化已完成時,我們將執行所有排隊的操作,然後將內部狀態切換為 initializedState。讓我們看看包裝的最後一部分是什麼樣子:

35
let initializedState = asyncModule;

毫不奇怪,initializedState 物件只是對原始 asyncModule 的引用! 實際上,初始化完成後,我們可以安全地將任何請求直接路由到原始模塊。 不需要什麼 Proxy 代理了。

最後,我們必須設置啟動這個模組的初始活動狀態,該狀態當然是 notInitializedState:

36
let activeState = notInitializedState;

現在,我們可以嘗試再次啟動測試服務器,但首先,請不要忘記用新的 asyncModuleWrapper 物件替換對原始 asyncModule 模塊的引用; 這必須修改 app.js 和 routes.js 模塊。

1
2
3
...
const asyncModule = require('./asyncModuleWrapper');
...

完成此操作後,如果嘗試再次向服務器發送請求,我們將看到在 asyncModule 模塊尚未初始化的時間內,請求不會失敗; 相反,它們將等待直到初始化完成,然後才實際執行。 我們可以肯定地這是一個更加穩健的伺服器。

現在,我們的服務器可以在啟動後立即開始接受請求,並保證這些請求不會因其模塊的初始化狀態而失敗。 我們能夠獲得此結果,而無需使用 DI 或進行冗長且易於出錯的檢查來驗證非同步混淆的狀態。

這種模式,如果模塊是非同步初始化的,將所有操作送入隊列,直到模塊完全初始化為止。許多資料庫驅動程序和 ORM 程式庫都使用該模式。不必等待資料庫的連接就可以發送查詢,因為每個操作都會先存到隊列中,然後在與資料庫的連接完全建立後再執行。

Oracle 應用實例

這裡使用 Oracle XE,可以設定多個 Oracle Connection Pools。建議盡量使用 Connection Pools,效能較好,同時可以控制連線的數量。直接使用 Connection 方式,當同時湧入大量的需求,資料庫有可能就會崩潰。

dbconfig.js
1
2
3
4
5
6
7
8
9
10
11
12
module.exports = {
xepdb1: {
user: "demo",
password: "demo6460",
connectString: "localhost:1521/xepdb1",
poolAlias: 'xepdb1demo',
poolMin: 2,
poolMax: 5,
poolIncrement: 1,
queueMax: -1
}
};

這裡將 Oracle 資料庫驅動程式 node-oracledb 包裝在我們的模塊中。這與之前的 asyncModuleWrapper 模塊有相同的概念。這個模塊也分為兩種狀態,一種狀態是在模塊尚未初始化時將所有操作送入隊列,另一種狀態是在初始化完成後將每個操作直接執行原始 oracledb 模塊中的操作。

database.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
const { EventEmitter } = require('events');
const oracledb = require('oracledb');
const dbconfig = require('./dbconfig');

const METHODS_REQUIRING_CONNECTION = ['doExecute', 'doExecuteMany'];
const deactivate = Symbol('deactivate');

class InitializedState {
constructor(db) {
this.dbsPool = db.dbsPool;
}

async doExecute(poolAlias, statement, binds = [], opts = {}) {
return new Promise(async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
try {
conn = await oracledb.getConnection(this.dbsPool[poolAlias]);
const result = await conn.execute(statement, binds, opts);
resolve(result);
}
catch (err) { reject({ error: err }); }
finally {
if (conn) {
try {
await conn.close();
} catch (err) {
console.log(err);
}
}
}
}
);
}

async doExecuteMany(poolAlias, statement, binds = [], opts = {}) {
return new Promise(async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
opts.batchErrors = true;
try {
conn = await oracledb.getConnection(this.dbsPool[poolAlias]);
const result = await conn.executeMany(statement, binds, opts);
resolve(result);
}
catch (err) { reject(err); }
finally {
if (conn) {
try {
await conn.close();
} catch (err) {
console.log(err);
}
}
}
});
}
}

class QueuingState {
constructor(db) {
this.db = db;
this.commandsQueue = [];

METHODS_REQUIRING_CONNECTION.forEach(methodName => {
this[methodName] = function (...args) {
console.log('Command queued:', methodName, args);
return new Promise((resolve, reject) => {
const command = () => {
db[methodName](...args)
.then(resolve, reject);
};
this.commandsQueue.push(command);
});
};
});
}

[deactivate]() {
this.commandsQueue.forEach(command => command());
this.commandsQueue = [];
}
}

class DB extends EventEmitter {
constructor() {
super();
this.dbsPool = {};
this.state = new QueuingState(this);
}

async initialize() {
try {
for (let db of Object.keys(dbconfig)) {
let config = dbconfig[db];
this.dbsPool[config.poolAlias] = await oracledb.createPool(config);
}

this.initialized = true;
this.emit('initialized');
const oldState = this.state;
this.state = new InitializedState(this);
oldState[deactivate] && oldState[deactivate]();
} catch (err) {
console.log(err);
}
}

async doExecute(poolAlias, statement, binds = [], opts = {}) {
return this.state.doExecute(poolAlias, statement, binds, opts);
}

async doExecuteMany(poolAlias, statement, binds = [], opts = {}) {
return this.state.doExecuteMany(poolAlias, statement, binds, opts);
}

async close() {
try {
for (let db of Object.keys(dbconfig)) {
await oracledb.getPool(dbconfig[db].poolAlias).close();
}
}
catch (err) {
console.log(err);
}
}
}

module.exports = new DB();

第 8 行的 InitializedState 類與第 62 行的 QueuingState 類分別代表初始化完成後與尚未初始化的兩種狀態。

第 87 行的 DB 類是維一對外揭露的類,在建構式中第 91 行,開始是處於尚未初始化的 QueuingState。在這個狀態下,把所有的需求操作都送入 commandsQueue 陣列中,尚未實際調用資料庫驅動程式對資料庫提出需求,因為此時尚未完成 Connection Pools 的創建。

第 94 行 initialize( ) 方法開始創建 Oracle Connection Pools,在創建 Connection Pools 完成後,在第 104 行將狀態切換到初始化完成後的 InitializedState。在這之後所有的需求操作都會直接調用資料庫驅動程式 oracledb 對資料庫提出需求,不再需要使用 QueuingState。

先來測試看看,第 3 行在開始初始化時,我們並沒有等到 Connection Pools 完成,主程式就繼續往下執行,馬上對資料庫提出需求操作。

simpleTest.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const oradb = require('./database');

oradb.initialize();
oradb.on('initialized', () => console.log('Database initialized.'));

const statement = 'select * from emp where empno = :empno';

doExecute("xepdb1demo", statement, [7788]);
doExecute("xepdb1demo", statement, [7934]);
//employees();

setTimeout(() => {
doExecute("xepdb1demo", statement, [7782]);
doExecute("xepdb1demo", statement, [7876]);
}, 500);

setTimeout(() => oradb.close(), 30000);

async function doExecute(dbpool, statement, binds) {
try {
const result = await oradb.doExecute(dbpool, statement, binds);
console.log(result);
} catch (err) {
console.log(err);
}
}

async function doExecuteMany(dbpool, statement, binds) {
try {
const result = await oradb.doExecuteMany(dbpool, statement, binds);
console.log(result);
} catch (err) {
console.log(err);
}
}

async function employees() {
const dbpool = "xepdb1demo";
const statement = 'select * from emp';
try {
const result = await oradb.doExecute(dbpool, statement);
console.log(result);
} catch (err) {
console.log(err);
}
}

現在將它實際應用在伺服器上,讓我們創建一個非常基本的 HTTP 服務器來使用 Oracle Database。

routes.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const oradb = require('./database');

module.exports.listEmployees = (req, res) => {
const dbpool = "xepdb1demo";
const statement = 'select * from emp';

oradb.doExecute(dbpool, statement)
.then(result => {
res.writeHead(200, {'Content-Type': 'application/json; charset=utf8'});
res.end(JSON.stringify(result));
})
.catch(err => {
res.writeHead(500);
return res.end(`Error: ${err.message}`);
});
};
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
const http = require('http');
const routes = require('./routes');
const oradb = require('./database');

oradb.initialize();
oradb.on('initialized', () => console.log('Database initialized.'));

http.createServer((req, res) => {
if(req.method === 'GET' && req.url === '/employees') {
return routes.listEmployees(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));

That is it.