Continuous Query Notification (CQN) 允許應用程式向資料庫註冊一些查詢 (Queries),如果與註冊的查詢 (Query) 有相關的物件 (Object) 或會影響註冊的查詢的結果 (Result set),資料庫就會觸發事件,通知註冊的事件處理程式 (Notification handler)。
這有兩種模式:
Object change notification (OCN) 如果查詢(Query)註冊為物件更改 通知 (OCN),則只要一段交易 (transaction) 更改了查詢引用 (references) 和提交(commit) 的物件,資料庫就會通知應用程式。
Query result change notification (QRCN) 如果查詢註冊為結果更改 通知 (QRCN),則只要交易更改影響了查詢結果並提交,資料庫就會通知應用程式。
這些一個或多個查詢的列表與通知類型 (OCN 或 QRCN) 需向 CQN 註冊,並提供通知處理程式 (Notification handler)。您可以使用 PL/SQL 或調用 Oracle OCI。如果使用 PL/SQL,則通知處理程式就是資料庫中的 PL/SQL stored procedure; 如果您使用 OCI,則通知處理程式會是客戶端 C 回調程式 (callback procedure)。除了使用 C,Node.js 的 Oracle Library node-oracledb 也支援 CQN。也就是說,當資料庫觸發此事件,就可即時反應在資料庫外端的應用程式。
Object change notification (OCN) 如果應用程式註冊查詢 (Query) 為物件更改通知 (OCN),則只要交易更改與註冊的 Query 有關聯的物件並提交,資料庫就會向應用程式發送 OCN,這只關心物件是否被更改,而不管查詢的結果是否發生更改。
例如,如果應用程式在將下面的查詢示例註冊為 OCN,並且用戶提交更改 EMP,則資料庫會向應用程式發送 OCN,即使更改的資料沒有滿足查詢的條件(例如,如果 deptno = 30)。
1 2 3 4 select empno, sal from emp where deptno = 10 /
Query result change notification (QRCN) 如果應用程式註冊查詢為查詢結果更改通知 (QRCN),則只要交易更改而影響了註冊的查詢結果並提交,資料庫就會向應用程式發送 QRCN。
例如,如果應用程式在將下面的查詢示例註冊查詢為 QRCN,則只有在查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN; 也就是說,如果其中一個 DML 語句提交:
INSERT 或 DELETE 滿足查詢條件 (deptno = 10)。
UPDATE 欄位 empno 或 sal 且滿足查詢條件 (deptno = 10) 的資料。
UPDATE 欄位 deptno,將其值從 10 以外的值更改為 10,造成將該行 (row) 添加到結果集中。
1 2 3 4 select empno, sal from emp where deptno = 10 /
預設的通知類型是 OCN。如果要注冊為 QRCN,可在 CQ_NOTIFICATION$_REG_INFO 的 QOSFLAGS 屬性中指定 QOS_QUERY。這我們會在後面的範例看到。
使用 QRCN,您可以選擇保證模式 (guaranteed mode 預設) 或盡力而為模式 (best-effort mode)。
Guaranteed Mode 在保證模式下,只有在保證查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN。例如,假設某個應用程式註冊了下面的查詢為 QRCN,員工 7839 歸屬在部門 10,並且執行了這些 UPDATE 語句。
1 2 3 4 select empno, sal from emp where deptno = 10 /
1 2 3 4 5 UPDATE emp SET sal = sal + 10 WHERE empno = 7839 ;UPDATE emp SET sal = sal – 10 WHERE empno = 7839 ;COMMIT ;
上面的交易 中的每個 UPDATE 語句都會更改查詢結果集,但整個交易 對查詢結果集沒有影響; 因此,資料庫不會向應用程序發送 QRCN。
在保證模式下,某些查詢對於 QRCN 而言會過於復雜。
Best-Effort Mode 某些查詢對於保證模式而言過於復雜,則可以在 QRCN 下註冊為盡力而為模式,CQN 會創建並註冊它們的簡單版本 (simpler versions)。
例中的查詢對於保證模式下的 QRCN 而言過於復雜,因為它包含聚合函數 SUM。
1 SELECT sum (sal) FROM emp WHERE deptno = 20 ;
在盡力而為模式下,CQN 在此示例中註冊了這個更簡單的查詢版本。
1 SELECT sal FROM emp WHERE deptno = 20 ;
每當原始查詢的結果發生變化時,其更簡單版本的結果也會發生變化; 因此,簡化中不會丟失任何通知。但是,簡化可能會導致誤報,因為較簡單版本的結果可能會在原始查詢的結果不變 ( sum(sal) 的結果沒變 ) 時更改。
如果是多表查詢 (join),則會更複雜,所以盡量簡單,不要註冊太複雜的查詢!
實作範例 需要的權限 SQL> grant change notification to demo; SQL> grant execute on dbms_cq_notification to demo;
創建 notifications Table 用來儲存範例子中觸發的通知。
1 2 3 4 5 6 7 8 9 10 CREATE TABLE notifications ( id NUMBER , message VARCHAR2 (4000 ), notification_date DATE ); CREATE SEQUENCE notifications_seq;ALTER TABLE notifications ADD CONSTRAINT notifications_pk PRIMARY KEY (id );
事件處理程式 (Notification handler) Notification Handler 就像 AQ 的 notify 程式,通常稱為事件處理器,當事件發生時的處理程式。在 JavaScript 通常則稱為 callback 函數。當事件發生時,CQN 會發出一個事件物件 (Event Object),內容包含事件的一些訊息,事件處理器必須有一參數承接此事件物件。
事件物件 (Event Object) CQN 會發出 CQ_NOTIFICATION $ _DESCRIPTOR 類型的事件物件。 事件處理程式可以在 CQ_NOTIFICATION $ _DESCRIPTOR 物件的屬性中找到資料庫更改的詳細信息。
SQL> desc CQ_NOTIFICATION$_DESCRIPTOR Name Null? Type REGISTRATION_ID NUMBER TRANSACTION_ID RAW(8) DBNAME VARCHAR2(30) EVENT_TYPE NUMBER NUMTABLES NUMBER TABLE_DESC_ARRAY SYS.CHNF$_TDESC_ARRAY QUERY_DESC_ARRAY
CQ_NOTIFICATION $ _DESCRIPTOR 類型包含另外兩個屬性:
TABLE_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _TABLE 的表描述符。
QUERY_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _QUERY 的結果集更改描述符。
SQL> desc CQ_NOTIFICATION$_TABLE Name Null? Type OPFLAGS NUMBER TABLE_NAME VARCHAR2(64) NUMROWS NUMBER ROW_DESC_ARRAY SYS.CHNF$_RDESC_ARRAY
SQL> desc CQ_NOTIFICATION$_QUERY Name Null? Type QUERYID NUMBER QUERYOP NUMBER TABLE_DESC_ARRAY SYS.CHNF$_QDESC_ARRAY
如果在註冊過程中指定了 ROWID 選項,則 CQ_NOTIFICATION $ _TABLE 類型具有 ROW_DESC_ARRAY 屬性,其類型為 CQ_NOTIFICATION $ _ROW,其中包含已更改行的 ROWID。 如果在 CQ_NOTIFICATION $ _TABLE 物件的 OPFLAGS 字段中設置了 ALL_ROWS,則 ROWID 信息不可用。
SQL> desc CQ_NOTIFICATION$_ROW Name Null? Type OPFLAGS NUMBER ROW_ID VARCHAR2(2000)
處理程式 瞭解了事件物件,可以開始創建事件處理程式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 create or replace PROCEDURE notifications_handler ( ntfnds IN CQ_NOTIFICATION$_DESCRIPTOR ) IS l_regid NUMBER ; l_table_name VARCHAR2(60); l_event_type NUMBER; l_numtables NUMBER; l_operation_type NUMBER; l_operation_name VARCHAR2(30); l_numrows NUMBER; l_row_id VARCHAR2(2000); l_numqueries NUMBER; l_qid NUMBER; l_qop NUMBER; l_message VARCHAR2(4000) := NULL; BEGIN l_regid := ntfnds.registration_id; l_event_type := ntfnds.event_type; l_numqueries := 0; IF (l_event_type = DBMS_CQ_NOTIFICATION.EVENT_QUERYCHANGE) THEN l_numqueries := ntfnds.query_desc_array.count; FOR i IN 1..l_numqueries LOOP l_qid := ntfnds.query_desc_array(i).queryid; l_qop := ntfnds.query_desc_array(i).queryop; l_numtables := 0; l_numtables := ntfnds.query_desc_array(i).table_desc_array.count; FOR j IN 1..l_numtables LOOP l_table_name := ntfnds.query_desc_array(i).table_desc_array(j).table_name; l_operation_type := ntfnds.query_desc_array(i).table_desc_array(j).Opflags; IF (bitand(l_operation_type, DBMS_CQ_NOTIFICATION.ALL_ROWS) = 0) THEN l_numrows := ntfnds.query_desc_array(i).table_desc_array(j).numrows; ELSE l_numrows := 0; END IF ; CASE WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.INSERTOP) != 0 THEN l_operation_name := 'Records Inserted'; WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UPDATEOP) != 0 THEN l_operation_name := 'Records Updated'; WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DELETEOP) != 0 THEN l_operation_name := 'Records Deleted'; WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.ALTEROP) != 0 THEN l_operation_name := 'Table Altered'; WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DROPOP) != 0 THEN l_operation_name := 'Table Dropped'; WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UNKNOWNOP) != 0 THEN l_operation_name := 'Unknown Operation'; ELSE l_operation_name := '?'; END CASE ; l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - ' || l_operation_name || '. Rows=' || l_numrows; INSERT INTO notifications (id , message, notification_date) VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE ); FOR k IN 1..l_numrows LOOP l_row_id := ntfnds.query_desc_array(i).table_desc_array(j).row_desc_array(k).row_id; l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - RowID:' || l_row_id; INSERT INTO notifications (id , message, notification_date) VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE ); END LOOP ; END LOOP ; END LOOP ; END IF ; COMMIT ; END ;/
註冊查詢 (Query) 有了事件處理程式,就可以註冊所要偵測的 Queries。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 DECLARE reginfo CQ_NOTIFICATION$_REG_INFO; mgr_id NUMBER; dept_id NUMBER; v_cursor SYS_REFCURSOR; regid NUMBER; BEGIN reginfo := cq_notification$_reg_info ( 'notifications_handler' , DBMS_CQ_NOTIFICATION.QOS_QUERY + DBMS_CQ_NOTIFICATION.QOS_ROWIDS, 0 , 0 , 0 ); regid := DBMS_CQ_NOTIFICATION.new_reg_start(reginfo); OPEN v_cursor FOR SELECT dbms_cq_notification.CQ_NOTIFICATION_QUERYID, sal, comm FROM DEMO.EMP WHERE deptno = 10 ; CLOSE v_cursor; DBMS_CQ_NOTIFICATION.reg_end; END ;/
註冊的 Query 牽涉到 Emp Table 的 sal、comm 與 deptno 欄位。
1 2 3 select regid, table_name, callback from user_change_notification_regs /
REGID TABLE_NAME CALLBACK 1231 DEMO.EMP plsql://notifications_handler?PR=0
1 2 3 select queryid, querytext, regid from user_cq_notification_queries /
QUERYID QUERYTEXT REGID 48 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1231 EMO.EMP.DEPTNO = 10
1 SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231);
實際運作 SQL> select * from notifications; no rows selected
SQL> select * from emp where empno in (7608, 7934); EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO 7608 馬小九 ANALYST 7788 28-JUN-10 1000 100 40 7934 楊喆 CLERK 7902 23-JAN-82 1500 10
SQL> update emp set comm = 99 where empno in (7608, 7934); 2 rows updated. SQL> commit; Commit complete.
SQL> select * from notifications; ID MESSAGE NOTIFICATION_DATE 261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48 262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48 SQL> select * from emp where rowid = 'AACzQEAAEAAACAXAAN'; EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO 7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10
SQL> select * from emp where empno = 7934; EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO 7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10 SQL> update emp set comm = 99 where empno = 7934; 1 row updated. SQL> commit; Commit complete. SQL > select * from notifications; ID MESSAGE NOTIFICATION_DATE 261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48 262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
SQL> update emp set deptno = 20 where empno = 7934; 1 row updated. SQL> commit; Commit complete.SQL > select * from notifications; ID MESSAGE NOTIFICATION_DATE 261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48 262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48 => 263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12 => 264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
SQL> insert into emp values(9099, '李小明', 'CLERK', 7698, sysdate, 23100, null, 10); 1 row created. SQL> commit; Commit complete.SQL > select * from notifications; ID MESSAGE NOTIFICATION_DATE 261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48 262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48 263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12 264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12 => 265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55 => 266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55 SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB'; EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO 9099 李小明 CLERK 7698 2020-12-25 09:15:52 23100 10
SQL> delete emp where empno = 9099; 1 row deleted. SQL> commit; Commit complete.SQL > select * from notifications; ID MESSAGE NOTIFICATION_DATE 261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48 262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48 263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12 264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12 265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55 266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55 => 267 QueryID:48 Table (DEMO.EMP) - Records Deleted. Rows=1 2020-12-25 09:19:00 => 268 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:19:00 8 rows selected. SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB'; no rows selected
現在來看複雜一點的:
SQL> truncate table notifications; Table truncated. SQL> select empno, ename, comm, deptno from emp where deptno = 10; EMPNO ENAME COMM DEPTNO 7839 KING 10 7782 楊建華 10 7934 楊喆 99 10 8907 牸祢 10 SQL> update emp set comm = 99 where deptno = 10; 4 rows updated. SQL> commit; Commit complete.SQL > select * from notifications; ID MESSAGE NOTIFICATION_DATE 275 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=3 2020-12-25 09:32:15 276 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAA 2020-12-25 09:32:15 277 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAC 2020-12-25 09:32:15 278 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAS 2020-12-25 09:32:15
SQL> truncate table notifications; Table truncated. SQL> select * from emp where empno = 7934; EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO 7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10 SQL> update emp set comm = comm + 10 where empno = 7934; 1 row updated. SQL> update emp set comm = comm - 10 where empno = 7934; 1 row updated. SQL> commit; /* 同一個交易 (Transaction) 沒有改變註冊 Query 的結果 */ Commit complete.SQL > select * from notifications;no rows selected
測試完,註銷:
SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231); PL/SQL procedure successfully completed. SQL> select regid, table_name, callback 2 from user_change_notification_regs 3 / no rows selected SQL> select queryid, querytext, regid 2 from user_cq_notification_queries 3 / no rows selected