0%

Node.js 非同步控制流程模式 - 並行限制與循序執行

使用 RabbitMQ 時有時會碰到一個問題,例如從 RabbitMQ 即時接收到 50,000 筆資料要寫到 Oracle 資料庫,RabbitMQ 的消費模式速度非常快,如果沒有限制 Oracle 的 Connection 數量,他會需要 50,000 個 Oracle Connection,通常的結果會是資料庫崩溃,這種大量資料,你不能一筆一筆寫到資料庫,而要用批次處裡的方式,一個 connection 一次寫入 50,000 筆資料。Node.js 的 oracledb 程式庫就有提供 execute 與 executeMany 的方法。

但是 RabbitMQ 的 Playload 有大小的限制,不適合用來一次接收大數據的資料,因此你無法使用 executeMany 方法。通常會是即時的小數據資料,一個繁忙的系統,短時間幾萬筆資料也不是不可能。因此我們必須控制,當收到資料時控制可同時寫到資料庫的平行處理數量,這在 Oracle 的 oracledb 程式庫可以用 Connection Pool 來達成,在 Pool 設定最多 10 Connection,同時間大量的寫入就只能輪流使用這 10 個 Connection,其他的就必須等待,Oracle Connection Pool 內部有自己的 queue 機制。但太多的 queue,等待太久仍然會有 Timeout 的問題。

除了資料庫有這種問題外,只要牽涉到 I/O 都有可能遇到這個問題,例如網路。REST API 就是我們常會碰到的。

這個解決方案的概念就是來自於李玉華所碰到的問題。她會即時從 Oracle 一次送出多筆需求到 RabbitMQ,RabbitMQ 消費後經過一番複雜的處理然後會調用外界的 REST API。單筆處裡都不會有問題,但如果一次處理多筆,對方伺服器就會來不及反應,通常只會有一筆成功,其餘的都會失敗。所以我們要將從 RabbitMQ 接收到的資料,每筆送到 REST API 都可以有一些延遲的設定,避免對方伺服器來不及回應。

Task Queue

taskQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TaskQueue {
constructor(concurrency=1) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
this.pushTask = this.pushTask.bind(this);
this.next = this.next.bind(this);
}

pushTask(task) {
this.queue.push(task);
this.next();
}

next() {
while(this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
task(() => {
this.running--;
this.next();
});
this.running++;
}
}
}

module.exports = TaskQueue;

這讓人很驚訝,短短的 27 行的程式碼,功能卻很強大。不限於用在這個例子,你可用在你需要的地方,包含 APEX 上。

這裡用了幾個 JavaScript 幾個重要的基本概念:

  • 函式是第一等公民(First class)
  • 延續傳遞風格(Continuation-passing style),簡稱 CPS
  • 閉包(Closure)

使用的方式很簡單,就是把你要處裡的工作依一定的簽章推入 Queue 中,Queue 會依照先進先出的方式逐一執行。

其中的 concurrency 則可以控制並行的數量,預設值是 1,就是一次處理一個工作,會依照先進先出的方式逐一執行,所以如果 concurrency 設為 1,則不管是同步還是非同步的工作,完成的時間也會依序。

concurrency 設為 2,則可以一次同時平行處理 2 筆。雖然也是依照先進先出的方式逐一執行,但如果是非同步的工作,完成的時間就比較難掌控了,因為它一次可以同時處理兩個工作,也許一個平行作業處理比較久,但另一個平行作業已經處理完好幾個工作了。

應用測試

先用一個簡單的例子來看看如何應用:

run.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const TaskQueue = require('./taskQueue');
const runQueue = new TaskQueue();
const series = [1000, 800, 600, 400, 200, 900, 700, 500, 300, 100];

const doSomething = (data, done) => {
setTimeout(() => {
if ( Math.random() < 0.2) {
console.log(`Data: ${data} ERROR!`);
} else {
console.log(`Data: ${data} successfully completed`);
}
done();
}, data)
}

series.forEach(value => {
runQueue.pushTask((done) => {
doSomething(value, done);
});
});

doSomething 是你要處裡的工作,使用 setTimeout 來模擬非同步的工作,函式參數除了你自己的資料參數外,另外需要有一個強迫性的參數,在這個例子中我們取名為 done,你可以取任和的名子。done 將會是個函式,當你的工作完成後需要調用它(第 12 行),Queue 才會往下執行下一個工作。

這裡我們用 series 陣列模擬從 RabbitMQ 消費的資料,然後經過 doSomething 處理。第 17 行我們將 doSomething 及資料包裹在閉包中,然後將閉包 push 到 Queue 中。所以現在存在 Task Queue 中的其實是一個閉包,所以 queue 中應該會像:

Task Queue
1
2
3
4
5
6
[
(done) => { doSomething(value, done) }, // value => 1000
(done) => { doSomething(value, done) }, // value => 800
(done) => { doSomething(value, done) }, // value => 600
...
]

執行了 runQueue.pushTask( ) 後,taskQueue 會自動執行 Queue 中我們所推入的工作(Task),taskQueue.js 中的第 17 行從 Queue 中取出先前 push 進去的 task,隨即執行 task,執行時會帶入一個引數(argument):

done
1
2
3
4
() => {
this.running--;
this.next();
};

這個引數是一個函式,也就是 done !!! 要記得,函式是第一等公民,與一般的資料型態沒有甚麼不同,可以是函式的引數,也可以從函式 return 出來。

Run
$ node run
Data: 1000 successfully completed
Data: 800 successfully completed
Data: 600 ERROR!
Data: 400 successfully completed
Data: 200 successfully completed
Data: 900 successfully completed
Data: 700 successfully completed
Data: 500 successfully completed
Data: 300 successfully completed
Data: 100 successfully completed

目前 concurrency 是預設的 1, 雖然推入的工作都是非同步的,但完成的時間也都按照推入的順序。

將 concurrency 改為 2 看看結果如何:

Task Queue
1
2
3
const TaskQueue = require('./taskQueue');
const runQueue = new TaskQueue(2);
.....
Task Queue
$ node run
Data: 800 successfully completed
Data: 1000 successfully completed
Data: 400 ERROR!
Data: 600 successfully completed
Data: 200 ERROR!
Data: 700 successfully completed
Data: 900 successfully completed
Data: 300 successfully completed
Data: 100 successfully completed
Data: 500 ERROR!

因為同時可以平行處理兩筆工作,雖然是先進先出取出處理,但因工作都是非同步的關係,完成的順序就比較不能掌控了。

到現在的測試都還未延遲兩個工作間的間距,我們想讓前一個工作完成後,延遲一些時間再執行下一個工作。這只需要修改 doSomething 函式中調用 done( ) 的時間。

run.js
1
2
3
4
5
6
7
8
9
10
11
12
const doSomething = (data, done) => {
const next = () => setTimeout(() => done(), 1000);

setTimeout(() => {
if ( Math.random() < 0.2) {
console.log(`Data: ${data} ERROR!`);
} else {
console.log(`Data: ${data} successfully completed`);
}
next();
}, data)
}

第 2 行使用 setTimeout 延遲調用 done( ) 的時間,延遲多久由你自己控制。就這樣解決了李玉華的問題。

RabbitMQ 與 Oracle Database 範例

了解了基本運作,現在將它實際應用到 RabbitMQ 與 Oracle 資料庫上。RabbitMQ 與 Oracle 的抽象層在文章後端的原始碼中。

從基本的開始,先不用 taskQueue。

receiveToOracle.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const rabbit = require('./rabbitmq');
const oradb = require('./database');

rabbit.on('ready', async ({ receive }) => {
await oradb.initialize();
console.log('rabbit and db initilized.');

receive('demo-example', '', (err, data, ack) => {
if (err) {
return console.log(err);
}
doSomething(data, ack)
});
});

function doSomething(data, ack=f=>f) {
const empno = JSON.parse(data).empno;
const statement = "insert into demo_example select empno ||' '|| ename from emp where empno = :empno";

oradb.doExecute(statement, [empno]).then(
result => {
console.log(`Result: ${JSON.stringify(result)} at: ${new Date()}`);
ack();
},
error => {
console.log(`Result: ${error.message} at: ${new Date()}`);
}
);
}

從 RabbitMQ 消費取的資料,依據資料從 EMP 資料表取得資料,然後新增到 DEMO_EXAMPLE 資料表。

Receive to oracle
$ node receiveToOracle
rabbit and db initilized.

啟動以後就開始等待 RabbitMQ queue demo-example 的消費。現在送一些資料到 RabbitMQ 的 queue demo-example。

sendToQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const rabbit = require('./rabbitmq');

const data = [{ empno: 7369 }, { empno: 7566 }, { empno: 7900 }, { empno: 7654 }, { empno: 7698 }];

rabbit.on('ready', ({ sendToQueue }) => {
const send = () => {
for (let i = 0; i < 10; i++) {
data.forEach(value => {
sendToQueue('demo-example', JSON.stringify(value));
})
}
};

send();
setTimeout(() => send(), 2000);
setTimeout(() => send(), 3000);
});

這裡每隔一秒送出一次,連續送出 3 次,一次 50 筆資料。

Send to RabbitMQ queue
$ node sendToQueue

回到消費者的終端畫面,很快的在約 3 秒內就消費完畢,寫入 Oracle 資料庫中了。

Receive to oracle
$ node receiveToOracle
rabbit and db initilized.
Result: {"lastRowid":"AAC8uRACgAAAJTuAAu","rowsAffected":1} at: Tue Jul 14 2020 08:20:46 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8uRACgAAAJTvAAd","rowsAffected":1} at: Tue Jul 14 2020 08:20:46 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8uRACgAAAJTwAAA","rowsAffected":1} at: Tue Jul 14 2020 08:20:46 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8uRACgAAAJTsAAe","rowsAffected":1} at: Tue Jul 14 2020 08:20:46 GMT+0800 (GMT+08:00)
.....

因為這裡 Oracle 的 Connection Pool 最大值設為 10, Oracle 火力全開,使用了全部的 10 個 Connection。記得一定要用 Oracle Connection Pool,如果沒有用 Oracle Connection Pool,而直接使用 Oracle Connection,因為 RabbitMQ 非常的快,有可能 Oracle 會嘗試一次開 150 connections,如果是 10,000筆資料,Oracle Database 就崩潰了。

如果你打算使用 Oracle ORDS REST API,千萬千萬不要一次送太多資料,它會每個 Request 嘗試打開一個 connection,資料庫會崩潰!!! 不要試,因為我做過!

Oracle Database connections
DEMO     40   10 XXX\7x0x0x4xP3  10857    INACTIVE    3583 node.exe             node.exe
48 11 XXX\7x0x0x4xP3 10869 INACTIVE 2521 node.exe node.exe
41 40 XXX\7x0x0x4xP3 10859 INACTIVE 29317 node.exe node.exe
42 68 XXX\7x0x0x4xP3 10861 INACTIVE 42469 node.exe node.exe
45 163 XXX\7x0x0x4xP3 10863 INACTIVE 49803 node.exe node.exe
37 166 XXX\7x0x0x4xP3 10851 INACTIVE 11951 node.exe node.exe
38 191 XXX\7x0x0x4xP3 10853 INACTIVE 53789 node.exe node.exe
46 194 XXX\7x0x0x4xP3 10865 INACTIVE 35991 node.exe node.exe
47 225 XXX\7x0x0x4xP3 10867 INACTIVE 3537 node.exe node.exe
39 230 XXX\7x0x0x4xP3 10855 INACTIVE 3133 node.exe node.exe

Oracle 會用輪流使用這 10 個 Connection Pool 中的 Connection,等待處裡的則會放入 Oracle Connection 的 queue 中。Connection queue 的預設等待時間 Timeout 是 60 秒,因此如果資料太多,有些就會出現 timeout 錯誤。

timeout error
error: Error: NJS-040: connection request timeout. Request exceeded queueTimeout of 60000
at Timeout._onTimeout (I:\RabbitMQ\gittemp\rabbit-consume-delay\node_modules\oracledb\lib\pool.js:127:18)

現在加入 taskQueue 來控制延遲。

receiveToOracle.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const rabbit = require('./rabbitmq');
const oradb = require('./database');
const TaskQueue = require('./src/taskQueue');
const runQueue = new TaskQueue();

rabbit.on('ready', async ({ receive }) => {
await oradb.initialize();
console.log('rabbit and db initilized.');

receive('demo-example', '', (err, data, ack) => {
if (err) {
return console.log(err);
}
runQueue.pushTask((done) => {
doSomething(data, ack, done);
});
});
});

function doSomething(data, ack=f=>f, done=f=>f) {
const empno = JSON.parse(data).empno;
const statement = "insert into demo_example select empno ||' '|| ename from emp where empno = :empno";
const next = () => setTimeout(() => done(), 100);

oradb.doExecute(statement, [empno]).then(
result => {
console.log(`Result: ${JSON.stringify(result)} at: ${new Date()}`);
ack();
next();
},
error => {
console.log(`Result: ${error.message} at: ${new Date()}`);
console.log(error);
next();
}
);
}

第 14 行將工作推入 Task Queue 中,在第 29 與 34 行延遲調用 done( )。

Receive to oracle
$ node receiveToOracle
rabbit and db initilized.
Result: {"lastRowid":"AAC8vmACgAAAJTvAAA","rowsAffected":1} at: Tue Jul 14 2020 09:28:12 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8vmACgAAAJTvAAB","rowsAffected":1} at: Tue Jul 14 2020 09:28:12 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8vmACgAAAJTvAAC","rowsAffected":1} at: Tue Jul 14 2020 09:28:12 GMT+0800 (GMT+08:00)
.....
Result: {"lastRowid":"AAC8vmACgAAAJTvACT","rowsAffected":1} at: Tue Jul 14 2020 09:28:27 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8vmACgAAAJTvACU","rowsAffected":1} at: Tue Jul 14 2020 09:28:27 GMT+0800 (GMT+08:00)
Result: {"lastRowid":"AAC8vmACgAAAJTvACV","rowsAffected":1} at: Tue Jul 14 2020 09:28:27 GMT+0800 (GMT+08:00)

在延遲 0.1 秒下,150 筆資料費了約 15 秒。顯然慢了許多。

現在測試一下大量資料,一次送入 50,000 筆,但是將 concurrency 將設為 5。

receiveToOracle.js
1
2
3
...
const runQueue = new TaskQueue(5);
...
Oracle Database connections
DEMO     34   67 XXX\7x0x0x4xP3  26301    INACTIVE   48553 node.exe             node.exe
36 131 XXX\7x0x0x4xP3 27394 INACTIVE 21051 node.exe node.exe
37 163 XXX\7x0x0x4xP3 27396 INACTIVE 57175 node.exe node.exe
39 225 XXX\7x0x0x4xP3 25052 INACTIVE 3575 node.exe node.exe

這次沒有 Timeout Error,觀察 Oracle Connection Pool 一直都只開 3 ~ 4 個 Connection,延遲 0.1 秒對 Oracle 資料庫是綽綽有餘。

微服務少不了消息代理伺服器, RabbitMQ 與 Apache Kafka 不只提供了資料的一致性整合外,也可以當成系統的緩存區。假如有一個系統同時湧入幾萬個使用者,資料庫反應不及,可以利用 RabbitMQ 或 Apache Kafka 延遲資料的寫入,或者可以透過多個消費者作分流,分散系統的負載。熟悉它,就會有無限的功能應用。

RabbitMQ 與 Oracle 資料庫抽象層

database.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
const oracledb = require('oracledb');

const dbconfig = {
user: "xxxx",
password: ""xxxxxxxx,
connectString: "10.11.xx.xxx:1521/xxxx.xxx.com.tw",
poolMin: 0,
poolMax: 10,
poolIncrement: 1,
queueMax: -1,
};

async function initialize() {
try {
const pool = await oracledb.createPool(dbconfig);

process.once('SIGINT', () => {
console.log('oracledb connection pool close.');
close();
});
}
catch (err) {
console.log(err);
}
}

async function close() {
try {
await oracledb.getPool().close();
}
catch (err) {
console.log(err);
}
}

function doExecute(statement, binds = [], opts = {}) {
return new Promise(
async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
try {
conn = await oracledb.getConnection();
const result = await conn.execute(statement, binds, opts);
resolve(result);
}
catch (err) { reject({ error: err }); }
finally {
if (conn) {
try {
await conn.close();
//console.log('ok');
}
catch (err) { console.log(err); }
}
}
}
);
}

function doExecuteMany(statement, binds = [], opts = {}) {
return new Promise(async (resolve, reject) => {
let conn;
opts.outFormat = oracledb.OBJECT;
opts.autoCommit = true;
opts.batchErrors = true;
try {
conn = await oracledb.getConnection();
const result = await conn.executeMany(statement, binds, opts);
resolve(result);
}
catch (err) { reject(err); }
finally {
if (conn) {
try { await conn.close(); }
catch (err) { console.log(err); }
}
}
});
}

module.exports.initialize = initialize;
module.exports.close = close;
module.exports.doExecute = doExecute;
module.exports.doExecuteMany = doExecuteMany;
rabbitmq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
const EventEmitter = require('events').EventEmitter;
const amqp = require("amqplib");
const uuid = require('node-uuid');

class RabbitMQ extends EventEmitter {
constructor() {
super();
this.url = "xxxxx:xxxxx@10.11.xx.xxx";
this.channel = null;
this.exchange = "dummy.exchange";

this.sendToQueue = this.sendToQueue.bind(this);
this.publish = this.publish.bind(this);
this.receive = this.receive.bind(this);
this.initialize = this.initialize.bind(this);

this.initialize().then(() => this.emit('ready', {
publish: this.publish,
sendToQueue: this.sendToQueue,
receive: this.receive
}));
}

initialize() {
return amqp
.connect(`amqp://${this.url}`)
.then(conn => {
process.once('SIGINT', function () {
console.log('amqp connection close.');
conn.close();
});

return conn.createChannel()
})
.then(channel => {
this.channel = channel;
})
.catch(console.warn)
};

sendToQueue(queue, content) {
return this.channel.sendToQueue(
queue,
new Buffer.from(content),
{
persistent: true,
headers: {
messageId: uuid.v4(),
api: "Demo-sendToQueue-v1",
firstPublish: Date.now()
}
}
);
}

publish(exchange, routingKey, content, delay=0) {
return this.channel.publish(
exchange,
routingKey,
new Buffer.from(content),
{
persistent: true,
headers: {
messageId: uuid(),
api: "Demo-publish-v1",
firstPublish: Date.now()
}
}
);
}

receive(queue, routingKey, callback) {
return this.channel.assertQueue(queue)
.then(q => {
return this.channel.bindQueue(queue, this.exchange, routingKey)
})
.then(() => {
return this.channel.consume(queue, msg => {
let data;

if (msg) {
// console.log(msg.properties.headers);
try {
data = msg.content.toString();
callback(null, data, () => this.channel.ack(msg));
} catch (err) {
callback(err);
}
} else {
callback(new Error('amqp consume error.'));
}
}, { noAck: false });
})
.catch(console.warn);
}
}

module.exports = new RabbitMQ();