是的! 你沒有看錯標題。Oracle AQ 也跨出了資料庫。在 node-oracledb 4.0 中引入了用於 AQ 的 Node-oracledb API。
Oracle AQ 的運作已有一段時間了,基本運作方式大家都很熟了,這裡直接看範例,讓 AQ 走出 Oracle 資料庫。
Create and Start Queue 1 2 3 4 5 6 CREATE OR REPLACE TYPE demo_msg_type AS OBJECT ( subject VARCHAR2 (30 ), text VARCHAR2 (100 ) ) /
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB' , QUEUE_PAYLOAD_TYPE => 'DEMO.DEMO_MSG_TYPE' ); DBMS_AQADM.CREATE_QUEUE( QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE', QUEUE_TABLE => 'DEMO.DEMO_MSG_QUEUE_TAB'); DBMS_AQADM.START_QUEUE( QUEUE_NAME => 'DEMO.DEMO_MSG_QUEUE', ENQUEUE => TRUE); END ;/
Enqueue demo-enq.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 const oracledb = require ('oracledb' );const dbConfig = { user: "demo" , password: "secret" , connectString: "10.11.25.139:1522/db99.example.com" } const queueName = "DEMO_MSG_QUEUE" ;enqueue(); async function enqueue ( ) { let connection; try { connection = await oracledb.getConnection(dbConfig); const queue = await connection.getQueue(queueName, { payloadType : "DEMO.DEMO_MSG_TYPE" }); const message = new queue.payloadTypeClass( { SUBJECT: "Node.js AQ 範例" , TEXT: `Hello Tainan, 台南! ${Date ()} ` } ); console .log(`Enqueuing: ${message} ` ); await queue.enqOne(message); await connection.commit(); } catch (err) { console .error(err); } finally { if (connection) { try { await connection.close(); } catch (err) { console .error(err); } } } }
Dequeue demo-deq.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 const oracledb = require ('oracledb' );const dbConfig = { user: "demo" , password: "secret" , connectString: "10.11.25.139:1522/db99.example.com" } const queueName = "DEMO_MSG_QUEUE" ;dequeue(); async function dequeue ( ) { let connection; try { connection = await oracledb.getConnection(dbConfig); const queue = await connection.getQueue(queueName, { payloadType : "DEMO.DEMO_MSG_TYPE" }); const msg = await queue.deqOne(); if (msg) { console .log('Dequeued: ' + msg.payload); } await connection.commit(); } catch (err) { console .error(err); } finally { if (connection) { try { await connection.close(); } catch (err) { console .error(err); } } } }
現在可以傳送資料了。
$ node demo-enq Enqueuing: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例', TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)' }
接收資料:
$ node demo-deq Dequeued: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例', TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 09:52:19 GMT+0800 (GMT+08:00)' }
如果你先 run dequeue, 預設情況下,deqOne( ) 將等待直到接收到消息。
這裡的 dequeue 一次接收一筆資料,我們也可以註冊一個 notification。
Register notification 這裡的 Notification 就像 Apache Kafka 與 RabbitMQ 的 Consumer 一樣。註冊方式與使用 Oracle CQN 一樣,細節請參考 Oracle CQN Using Node.js 。註冊時一樣要設定事件處理程式 (Notification handler) 一般都稱呼為 Callback 函數。這觀念與使用 PL/SQL 是一樣的。
demo-notify.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 const oracledb = require ('oracledb' );const dbConfig = { user: "demo" , password: "secret" , connectString: "10.11.25.139:1522/db99.example.com" , poolAlias: 'db99demo' , poolMin: 1 , poolMax: 2 , poolIncrement: 1 , queueMax: -1 } startup(); async function startup ( ) { try { await oracledb.createPool(dbConfig); registerAndNotify(); } catch (err) { console .log(err); } } async function registerAndNotify ( ) { const config = { user: dbConfig.user, password: dbConfig.password, connectString: dbConfig.connectString, events: true } const queueName = "DEMO_MSG_QUEUE" ; const subscrOptions = { namespace: oracledb.SUBSCR_NAMESPACE_AQ, callback: notifyCallback, timeout: 600 }; let connection; try { connection = await oracledb.getConnection(config); await connection.subscribe(queueName, subscrOptions); console .log("AQ notification created..." ); } catch (err) { console .error(err); } finally { if (connection) { try { await connection.close(); } catch (err) { console .error(err); } } } } async function notifyCallback (event ) { console .log("event type:" , event.type); if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) { console .log("Deregistration has taken place..." ); return ; } try { connection = await oracledb.getConnection(dbConfig.poolAlias); const queue = await connection.getQueue(event.queueName, { payloadType : "DEMO.DEMO_MSG_TYPE" }); const msg = await queue.deqOne(); if (msg) { console .log('Dequeued: ' + msg.payload); } await connection.commit(); } catch (err) { console .log(err) } finally { if (connection) { try { await connection.close(); } catch (err) { console .log(err); } } } } process .on('SIGTERM' , () => { console .log("\nTerminating" ); oracledb.getPool(dbConfig.poolAlias).close(); process.exit(0 ); }) .on('SIGINT' , () => { console .log("\nTerminating" ); oracledb.getPool(dbConfig.poolAlias).close(); process.exit(0 ); }) .on('unhandledRejection' , err => { console .log("\nUnhandledRejection exception" ); oracledb.getPool(dbConfig.poolAlias).close(); console .log(err); process.exit(1 ); }) .on('uncaughtException' , err => { console .log("\nUncaught exception" ); oracledb.getPool(dbConfig.poolAlias).close(); console .log(err); process.exit(1 ); });
現在啟動 notification,它會一直等待接收消息。
$ node demo-notify AQ notification created...
Enqueue 一筆資料:
$ node demo-enq Enqueuing: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例', TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)' }
事件處理程式馬上收到消息,然後繼續等待消息。
$ node demo-notify AQ notification created... Dequeued: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例', TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)' }
現在從 Oracle 資料庫使用 PLSQL Enqueue 一筆資料。
demo_msg_enqueue.sql 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE OR REPLACE PROCEDURE demo_msg_enqueue( msg_in IN VARCHAR2 ) AS enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_handle RAW(16); message DEMO.DEMO_MSG_TYPE; BEGIN message := DEMO.DEMO_MSG_TYPE('Node.js AQ 範例 from PLSQL', msg_in || ' ' || TO_CHAR(sysdate, 'yyyy-mm-dd hh24:mi:ss')); DBMS_AQ.ENQUEUE( queue_name => 'DEMO.DEMO_MSG_QUEUE', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle ); END; /
demo_msg_enqueue.sql 1 2 3 4 5 6 7 SQL> exec demo_msg_enqueue('Hello Tainan, 台南!' ); PL/SQL procedure successfully completed. SQL> commit; Commit complete.
$ node demo-notify AQ notification created... Dequeued: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例', TEXT: 'Hello Tainan, 台南! Tue Dec 29 2020 10:33:36 GMT+0800 (GMT+08:00)' } Dequeued: [DEMO.DEMO_MSG_TYPE] { SUBJECT: 'Node.js AQ 範例 from PLSQL', TEXT: 'Hello Tainan, 台南! 2020-12-29 10:46:52' }
現在你可以即時的將資料庫外的資料用 Node.js enqueue,然後用 PLSQL dequeue 傳入 Oracle 資料庫。相反的,可以用 PLSQL enqueue,然後使用 Node.js dequeue 即時的讓資料庫內的資料傳出資料庫。
Oracle 努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,使用傳統的 UTL_HTTP 在組合 JSON 格式的資料,實在是一種夢靨。現在你有了另一種選擇了。
除了 Apache Kafka、RabbitMQ 之外,你又多了一些選擇可以讓 Oracle 資料庫中的物件 (Things) 即時 (Read-Time) 的走出 Oracle 資料庫。你一定得試試看!