WoT (Web of Things) 是 IoT (Internet of Things 物聯網) 的 Application 層,就是使用 Web 技術來打造 application。也就是說,IoT + Web-enabled technologies 就是 WoT。對 WoT 來說,最重要的觀念,就是以 URL 來表示 IoT 裝置;為 IoT 加入 URL 的觀念,就是 Google 提出的 Physical Web 計畫。
未來每個物件 (Objects) 全部都會和 Web 結合, 而首要條件就是所謂的 Things 必須是可以和 Web 溝通的裝置, 這些智慧裝置之間也能透過 Web 標準來互相溝通。WoT 重點在於使用 Web 的軟體標準和框架, 例如 REST, HTTP 和 URI 來建立應用程式以及服務, 並和其他裝置做溝通, 簡單來說就是日常生活所有的東西都可以存取 Web, 就是 WoT 的精隨。
除了能夠相互溝通,時間性當然也很重要,Real-time 數據越來越受歡迎。基本思路很簡單: 當物件發生變化時,任何查看物件數據的客戶端都應立即能看得到異動。最初,在 Web 應用程式中模擬 real-time 數據的唯一方法是從瀏覽器每隔一段固定的時間不斷循環的從 Web 服務器抓取資料 (Polling)。 這通常很複雜,難以擴大應用。WebSocket,改變了這種狀況,這是一種以高效率的方式在瀏覽器和 Web 服務器之間提供雙向通信的協議。
Oracle Database 的 UTL_HTTP 是傳統上與外界 Web 溝通的方式,後來加入了 ORDS,努力的讓資料庫裡的物件 (Things) 能與外界的物件溝通,但這仍無法主動的通知資料庫內資料的異動,你可以使用 Trigger 加上 AQ,但這些微的資料庫資料變化仍很難跨出資料庫,Continuous Query Notification (CQN) 則改變了這個狀況。
之前介紹了 Oracle Continuous Query Notification (CQN) ,使用 PL/SQL API 為事件處理程式 (Handler)。Oracle 在 Node.js 的程式庫 node-oracledb 則使其功能更強大,它讓 CQN 跨出了 Oracle 資料庫。
基本觀念及使用與 PL/SQL API 一樣。
function demoCallback (event ) { console .log(event); } connection.subscribe('demosub' , { sql: "SELECT sal, comm FROM emp WHERE deptno = 10" , callback: demoCallback, qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS });
相較於使用 Kafka 或 RabbitMQ,node-oracledb 的 CQN API,不需另外架構 Server,只需在一台不會關機的機器上啟動你的 Node.js Client 程式,就像 Kafka 或 RabbitMQ Consumer 一樣。
另外當然需要 JavaScript 技術,JavaScript 在這股 WoT 的潮流中佔有非常重要的地位,強烈建議,除了 PL/SQL,JavaScript 是你現在必須要熟悉的技術。
範例 程式碼看起來有點長,因為要示範的關係,將所有程式碼放在一個資料檔上,你可以將它模組化。
首先你需要建立一個 Node.js 專案資料夾,然後安裝 oracle node-oracledb 。
demo-cqn.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 const oracledb = require ('oracledb' );const dbConfig = { user: "demo" , password: "secret" , connectString: "10.11.25.139:1522/db99.example.com" } const oradb = database(); startup(); async function startup ( ) { await oradb.initialize(); await registerQuery(); } async function registerQuery ( ) { const config = { ...dbConfig, events: true } const options = { sql: 'SELECT sal, comm FROM emp where deptno = 10' , callback: demoCallback, timeout: 600 , qos: oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS }; let connection; try { connection = await oracledb.getConnection(config); await connection.subscribe('demosub' , options); console .log("Subscription created..." ); } catch (err) { console .error(err); } finally { if (connection) { try { await connection.close(); } catch (err) { console .error(err); } } } } async function demoCallback (event ) { const statement = 'select * from emp where rowid = :rid' ; console .log("event type:" , event.type); if (event.type == oracledb.SUBSCR_EVENT_TYPE_DEREG) { console .log("Deregistration has taken place..." ); return ; } console .log("event database name:" , event.dbName); console .log("event transaction id:" , event.txId); const queries = event.queries || [{ tables : event.tables }]; for (const query of queries) { const tables = query.tables; for (const table of tables) { console .log(`--> Table Name: ${table.name} ` ); console .log(`--> Table Operation: ${operationName(table.operation)} Rows=${table.rows.length} ` ); if (table.rows) { for (const row of table.rows) { console .log(`--> --> Row Rowid: ${row.rowid} ` ); console .log(`--> --> Row Operation: ${operationName(row.operation)} ` ); try { const { rows } = await oradb.doExecute(statement, [row.rowid]); console .log(rows); } catch (err) { console .log(err); } console .log(Array (61 ).join("-" )); } } console .log(Array (61 ).join("=" )); } } } function operationName (operation ) { let operationName; switch (operation) { case oracledb.CQN_OPCODE_INSERT: operatonName = 'Records Inserted' ; break ; case oracledb.CQN_OPCODE_UPDATE: operatonName = 'Records Updated' ; break ; case oracledb.CQN_OPCODE_DELETE: operatonName = 'Records Deteled' ; break ; case oracledb.CQN_OPCODE_ALTER: operatonName = 'Table Altered' ; break ; case oracledb.CQN_OPCODE_DROP: operatonName = 'Table Dropped' ; break ; default : operatonName = 'Unknown Operation' ; } return operatonName; } function database ( ) { const config = { ...dbConfig, poolAlias: 'db99demo' , poolMin: 1 , poolMax: 2 , poolIncrement: 1 , queueMax: -1 }; async function initialize ( ) { try { await oracledb.createPool(config); } catch (err) { console .log(err); } } async function close ( ) { try { oracledb.getPool(config.poolAlias).close(); } catch (err) { console .log(err); } } function doExecute (statement, binds = [], opts = {} ) { return new Promise ( async (resolve, reject) => { let connection; opts.outFormat = oracledb.OBJECT; opts.autoCommit = true ; try { connection = await oracledb.getConnection(config.poolAlias); const result = await connection.execute(statement, binds, opts); resolve(result); } catch (err) { reject({ error : err }); } finally { if (connection) { try { await connection.close(); } catch (err) { console .log(err); } } } } ); } return { initialize, close, doExecute }; } process .on('SIGTERM' , () => { console .log("\nTerminating" ); oradb.close(); process.exit(0 ); }) .on('SIGINT' , () => { console .log("\nTerminating" ); oradb.close(); process.exit(0 ); }) .on('uncaughtException' , err => { console .log("\nUncaught exception" ); oradb.close(); console .log(err); process.exit(1 ); });
啟動 $ node demo-cqn Subscription created...
啟動並註冊後,Node.js process 會等待通知事件,並呼叫 Callback 函數。
SQL> select regid, callback, table_name, timeout from user_change_notification_regs; REGID CALLBACK TABLE_NAME TIMEOUT 1536 net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30) DEMO.EMP 200 (PORT=49919))?PR=0
資料庫會用 OCI API net8://(ADDRESS=(PROTOCOL=tcp)(HOST=10.11.100.30)(PORT=49919))?PR=0 回呼你的 Node.js callback 函數。所以你 Client 端程式的機器及 port 要保持暢通。TIMEOUT 顯示這個註冊的查詢還剩多少時間就會 timeout。如果註冊時沒有設定 timeout,這裡的值會是非常大的數字 4294967295。那你可能必須手動使用 DBMS_CQ_NOTIFICATION.DEREGISTER 註銷註冊的 Query。
SQL> select * from user_cq_notification_queries; QUERYID QUERYTEXT REGID 71 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1536 EMO.EMP.DEPTNO = 10
SQL> update emp set comm = comm + 1 where empno = 7934; 1 row updated. SQL> commit; Commit complete.
$ node demo-cqn Subscription created... event type: 7 event database name: lx26 event transaction id: <Buffer 06 00 0d 00 f4 a3 0a 00> [ { EMPNO: 7934, ENAME: '楊喆', JOB: 'CLERK', MGR: 7902, HIREDATE: 1982-01-22T16:00:00.000Z, SAL: 1500, COMM: 13, DEPTNO: 10 } ] ============================================================
在 WoT 與 Real-time 的潮流中,Kafka 受到了非常多得關注,RabbitMQ 也有銀行界的加持,有幸為公司導入了這兩個架構,今又有 Oracle CQN 可以選擇,如何應用,應該看應用的範圍與規模,Kafka 與 RabbitMQ 可以是資訊策略的一環,CQN 則適合單打獨鬥,它不需要另外架構 Server,但他只適合用於較小量的資料異動,千萬不要用於大量資料的處理;Kafka 則彈性的多,它可處理一天多達 Petabyte 的資料。RabbitMQ 則有較佳的 GUI 管理介面。不管如何,都可以讓你有些新思維。
Go Real-time!