WoT (Web of Things) 是 IoT (Internet of Things 物聯網) 的 Application 層,就是使用 Web 技術來打造 application。也就是說,IoT + Web-enabled technologies 就是 WoT。對 WoT 來說,最重要的觀念,就是以 URL 來表示 IoT 裝置;為 IoT 加入 URL 的觀念,就是 Google 提出的 Physical Web 計畫。
未來每個物件 (Objects) 全部都會和 Web 結合, 而首要條件就是所謂的 Things 必須是可以和 Web 溝通的裝置, 這些智慧裝置之間也能透過 Web 標準來互相溝通。WoT 重點在於使用 Web 的軟體標準和框架, 例如 REST, HTTP 和 URI 來建立應用程式以及服務, 並和其他裝置做溝通, 簡單來說就是日常生活所有的東西都可以存取 Web, 就是 WoT 的精隨。
除了能夠相互溝通,時間性當然也很重要,Real-time 數據越來越受歡迎。基本思路很簡單: 當物件發生變化時,任何查看物件數據的客戶端都應立即能看得到異動。最初,在 Web 應用程式中模擬 real-time 數據的唯一方法是從瀏覽器每隔一段固定的時間不斷循環的從 Web 服務器抓取資料 (Polling)。 這通常很複雜,難以擴大應用。WebSocket,改變了這種狀況,這是一種以高效率的方式在瀏覽器和 Web 服務器之間提供雙向通信的協議。
Oracle Database 的 UTL_HTTP 是傳統上與外界 Web 溝通的方式,後來加入了 ORDS,努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,但這仍無法主動的通知資料庫內資料的異動,你可以使用 Trigger 加上 AQ,但這些微的資料庫資料變化仍很難跨出資料庫,Continuous Query Notification (CQN) 則改變了這個狀況。
之前介紹了 Oracle Continuous Query Notification (CQN) ,使用 PL/SQL API 為事件處理程式 (Handler)。Oracle 在 Node.js 的程式庫 node-oracledb 則使其功能更強大,它讓 CQN 跨出了 Oracle 資料庫。
基本觀念及使用與 PL/SQL API 一樣。
function demoCallback (event ) { console .log(event); } connection.subscribe('demosub' , { sql: "SELECT sal, comm FROM emp WHERE deptno = 10" , callback: demoCallback, qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS });
相較於使用 Kafka 或 RabbitMQ,node-oracledb 的 CQN API,不需另外架構 Server,只需在一台不會關機的機器上啟動你的 Node.js Client 程式,就像 Kafka 或 RabbitMQ Consumer 一樣。
另外當然需要 JavaScript 技術,JavaScript 在這股 WoT 的潮流中佔有非常重要的地位,強烈建議,除了 PL/SQL,JavaScript 是你現在必須要熟悉的技術。
範例 程式碼看起來有點長,因為要示範的關係,將所有程式碼放在一個資料檔上,你可以將它模組化。
首先你需要建立一個 Node.js 專案資料夾,然後安裝 oracle node-oracledb 。
demo-cqn.jsconst oracledb = require ('oracledb' );const dbConfig = { user: "demo" , password: "secret" , connectString: "10.11.25.139:1522/db99.example.com" } const oradb = database(); startup(); async function startup ( ) { await oradb.initialize(); await registerQuery(); } async function registerQuery ( ) { const config = { ...dbConfig, events: true } const options = { sql: 'SELECT sal, comm FROM emp where deptno = 10' , callback: demoCallback, timeout: 600 , qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS }; let connection; try { connection = await oracledb.getConnection(config); await connection.subscribe('demosub' , options); console .log("Subscription created..." ); } catch (err) { console .error(err); } finally { if (connection) { try { await connection.close(); } catch (err) { console .error(err); } } } } async function demoCallback (event ) { const statement = 'select * from emp where rowid = :rid' ; console .log("event type:" , event.type); if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) { console .log("Deregistration has taken place..." ); return ; } console .log("event database name:" , event.dbName); console .log("event transaction id:" , event.txId); const queries = event.queries || [{ tables : event.tables }]; for (const query of queries) { const tables = query.tables; for (const table of tables) { console .log(`--> Table Name: ${table.name} ` ); console .log(`--> Table Operation: ${operationName(table.operation)} Rows=${table.rows.length} ` ); if (table.rows) { for (const row of table.rows) { console .log(`--> --> Row Rowid: ${row.rowid} ` ); console .log(`--> --> Row Operation: ${operationName(row.operation)} ` ); try { const { rows } = await oradb.doExecute(statement, [row.rowid]); console .log(rows); } catch (err) { console .log(err); } console .log(Array (61 ).join("-" )); } } console .log(Array (61 ).join("=" )); } } } function operationName (operation ) { let operationName; switch (operation) { case oracledb.CQN_OPCODE_INSERT: operatonName = 'Records Inserted' ; break ; case oracledb.CQN_OPCODE_UPDATE: operatonName = 'Records Updated' ; break ; case oracledb.CQN_OPCODE_DELETE: operatonName = 'Records Deteled' ; break ; case oracledb.CQN_OPCODE_ALTER: operatonName = 'Table Altered' ; break ; case oracledb.CQN_OPCODE_DROP: operatonName = 'Table Dropped' ; break ; default : operatonName = 'Unknown Operation' ; } return operatonName; } function database ( ) { const config = { ...dbConfig, poolAlias: 'db99demo' , poolMin: 1 , poolMax: 2 , poolIncrement: 1 , queueMax: -1 }; async function initialize ( ) { try { await oracledb.createPool(config); } catch (err) { console .log(err); } } async function close ( ) { try { oracledb.getPool(config.poolAlias).close(); } catch (err) { console .log(err); } } function doExecute (statement, binds = [], opts = {} ) { return new Promise ( async (resolve, reject) => { let connection; opts.outFormat = oracledb.OBJECT; opts.autoCommit = true ; try { connection = await oracledb.getConnection(config.poolAlias); const result = await connection.execute(statement, binds, opts); resolve(result); } catch (err) { reject({ error : err }); } finally { if (connection) { try { await connection.close(); } catch (err) { console .log(err); } } } } ); } return { initialize, close, doExecute }; } process .on('SIGTERM' , () => { console .log("\nTerminating" ); oradb.close(); process.exit(0 ); }) .on('SIGINT' , () => { console .log("\nTerminating" ); oradb.close(); process.exit(0 ); }) .on('uncaughtException' , err => { console .log("\nUncaught exception" ); oradb.close(); console .log(err); process.exit(1 ); });
啟動 $ node demo-cqn Subscription created...
啟動並註冊後,Node.js process 會等待通知事件,並呼叫 Callback 函數。
SQL> select regid, callback, table_name, timeout from user_change_notification_regs; REGID CALLBACK TABLE_NAME TIMEOUT 1536 net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30) DEMO.EMP 200 (PORT=49919))?PR=0
資料庫會用 OCI API net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30)(PORT=49919))?PR=0 回呼你的 Node.js callback 函數。所以你 Client 端程式的機器及 port 要保持暢通。TIMEOUT 顯示這個註冊的查詢還剩多少時間就會 timeout。如果註冊時沒有設定 timeout,這裡的值會是非常大的數字 4294967295。那你可能必須手動使用 DBMS_CQ_NOTIFICATION.DEREGISTER 註銷註冊的 Query。
SQL> select * from user_cq_notification_queries; QUERYID QUERYTEXT REGID 71 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1536 EMO.EMP.DEPTNO = 10
SQL> update emp set comm = comm + 1 where empno = 7934; 1 row updated. SQL> commit; Commit complete.
$ node demo-cqn Subscription created... event type: 7 event database name: lx26 event transaction id: <Buffer 06 00 0d 00 f4 a3 0a 00> [ { EMPNO: 7934, ENAME: '楊喆', JOB: 'CLERK', MGR: 7902, HIREDATE: 1982-01-22T16:00:00.000Z, SAL: 1500, COMM: 13, DEPTNO: 10 } ] ============================================================
在 WoT 與 Real-time 的潮流中,Kafka 受到了非常多得關注,RabbitMQ 也有銀行界的加持,有幸為公司導入了這兩個架構,今又有 Oracle CQN 可以選擇,如何應用,應該看應用的範圍與規模,Kafka 與 RabbitMQ 可以是資訊策略的一環,CQN 則適合單打獨鬥,它不需要另外架構 Server,但他只適合用於較小量的資料異動,千萬不要用於大量資料的處理;Kafka 則彈性的多,它可處理一天多達 Petabyte 的資料。RabbitMQ 則有較佳的 GUI 管理介面。不管如何,都可以讓你有些新思維。
Go Real-time!