0%

Oracle Continuous Query Notification (CQN)

Continuous Query Notification (CQN) 允許應用程式向資料庫註冊一些查詢 (Queries),如果與註冊的查詢 (Query) 有相關的物件 (Object) 或會影響註冊的查詢的結果 (Result set),資料庫就會觸發事件,通知註冊的事件處理程式 (Notification handler)。

這有兩種模式:

  1. Object change notification (OCN)
    如果查詢(Query)註冊為物件更改通知 (OCN),則只要一段交易 (transaction) 更改了查詢引用 (references) 和提交(commit) 的物件,資料庫就會通知應用程式。

  2. Query result change notification (QRCN)
    如果查詢註冊為結果更改通知 (QRCN),則只要交易更改影響了查詢結果並提交,資料庫就會通知應用程式。

這些一個或多個查詢的列表與通知類型 (OCN 或 QRCN) 需向 CQN 註冊,並提供通知處理程式 (Notification handler)。您可以使用 PL/SQL 或調用 Oracle OCI。如果使用 PL/SQL,則通知處理程式就是資料庫中的 PL/SQL stored procedure; 如果您使用 OCI,則通知處理程式會是客戶端 C 回調程式 (callback procedure)。除了使用 C,Node.js 的 Oracle Library node-oracledb 也支援 CQN。也就是說,當資料庫觸發此事件,就可即時反應在資料庫外端的應用程式。

Object change notification (OCN)

如果應用程式註冊查詢 (Query) 為物件更改通知 (OCN),則只要交易更改與註冊的 Query 有關聯的物件並提交,資料庫就會向應用程式發送 OCN,這只關心物件是否被更改,而不管查詢的結果是否發生更改。

例如,如果應用程式在將下面的查詢示例註冊為 OCN,並且用戶提交更改 EMP,則資料庫會向應用程式發送 OCN,即使更改的資料沒有滿足查詢的條件(例如,如果 deptno = 30)。

1
2
3
4
select empno, sal
from emp
where deptno = 10
/

Query result change notification (QRCN)

如果應用程式註冊查詢為查詢結果更改通知 (QRCN),則只要交易更改而影響了註冊的查詢結果並提交,資料庫就會向應用程式發送 QRCN。

例如,如果應用程式在將下面的查詢示例註冊查詢為 QRCN,則只有在查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN; 也就是說,如果其中一個 DML 語句提交:

  • INSERT 或 DELETE 滿足查詢條件 (deptno = 10)。
  • UPDATE 欄位 empno 或 sal 且滿足查詢條件 (deptno = 10) 的資料。
  • UPDATE 欄位 deptno,將其值從 10 以外的值更改為 10,造成將該行 (row) 添加到結果集中。
1
2
3
4
select empno, sal
from emp
where deptno = 10
/

預設的通知類型是 OCN。如果要注冊為 QRCN,可在 CQ_NOTIFICATION$_REG_INFO 的 QOSFLAGS 屬性中指定 QOS_QUERY。這我們會在後面的範例看到。

使用 QRCN,您可以選擇保證模式 (guaranteed mode 預設) 或盡力而為模式 (best-effort mode)。

  • Guaranteed Mode
    在保證模式下,只有在保證查詢結果集發生變化時,資料庫才會向應用程式發送 QRCN。例如,假設某個應用程式註冊了下面的查詢為 QRCN,員工 7839 歸屬在部門 10,並且執行了這些 UPDATE 語句。

    1
    2
    3
    4
    select empno, sal
    from emp
    where deptno = 10
    /
    1
    2
    3
    4
    5
    UPDATE emp SET sal = sal + 10 WHERE empno = 7839;

    UPDATE emp SET sal = sal – 10 WHERE empno = 7839;

    COMMIT;

    上面的交易中的每個 UPDATE 語句都會更改查詢結果集,但整個交易對查詢結果集沒有影響; 因此,資料庫不會向應用程序發送 QRCN。

    在保證模式下,某些查詢對於 QRCN 而言會過於復雜。

  • Best-Effort Mode
    某些查詢對於保證模式而言過於復雜,則可以在 QRCN 下註冊為盡力而為模式,CQN 會創建並註冊它們的簡單版本 (simpler versions)。

    例中的查詢對於保證模式下的 QRCN 而言過於復雜,因為它包含聚合函數 SUM。

    1
    SELECT sum(sal) FROM emp WHERE deptno = 20;

    在盡力而為模式下,CQN 在此示例中註冊了這個更簡單的查詢版本。

    1
    SELECT sal FROM emp WHERE deptno = 20;

    每當原始查詢的結果發生變化時,其更簡單版本的結果也會發生變化; 因此,簡化中不會丟失任何通知。但是,簡化可能會導致誤報,因為較簡單版本的結果可能會在原始查詢的結果不變 ( sum(sal) 的結果沒變 ) 時更改。

    如果是多表查詢 (join),則會更複雜,所以盡量簡單,不要註冊太複雜的查詢!

實作範例

需要的權限
SQL> grant change notification to demo;

SQL> grant execute on dbms_cq_notification to demo;

創建 notifications Table 用來儲存範例子中觸發的通知。

1
2
3
4
5
6
7
8
9
10
CREATE TABLE notifications (
id NUMBER,
message VARCHAR2(4000),
notification_date DATE
);

CREATE SEQUENCE notifications_seq;

ALTER TABLE notifications
ADD CONSTRAINT notifications_pk PRIMARY KEY(id);

事件處理程式 (Notification handler)

Notification Handler 就像 AQ 的 notify 程式,通常稱為事件處理器,當事件發生時的處理程式。在 JavaScript 通常則稱為 callback 函數。當事件發生時,CQN 會發出一個事件物件 (Event Object),內容包含事件的一些訊息,事件處理器必須有一參數承接此事件物件。

事件物件 (Event Object)

CQN 會發出 CQ_NOTIFICATION $ _DESCRIPTOR 類型的事件物件。 事件處理程式可以在 CQ_NOTIFICATION $ _DESCRIPTOR 物件的屬性中找到資料庫更改的詳細信息。

SQL> desc CQ_NOTIFICATION$_DESCRIPTOR
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
REGISTRATION_ID NUMBER
TRANSACTION_ID RAW(8)
DBNAME VARCHAR2(30)
EVENT_TYPE NUMBER
NUMTABLES NUMBER
TABLE_DESC_ARRAY SYS.CHNF$_TDESC_ARRAY
QUERY_DESC_ARRAY

CQ_NOTIFICATION $ _DESCRIPTOR 類型包含另外兩個屬性:

  • TABLE_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _TABLE 的表描述符。
  • QUERY_DESC_ARRAY,該屬性包含一個類型為 CQ_NOTIFICATION $ _QUERY 的結果集更改描述符。
SQL> desc CQ_NOTIFICATION$_TABLE
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
OPFLAGS NUMBER
TABLE_NAME VARCHAR2(64)
NUMROWS NUMBER
ROW_DESC_ARRAY SYS.CHNF$_RDESC_ARRAY
SQL> desc CQ_NOTIFICATION$_QUERY
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
QUERYID NUMBER
QUERYOP NUMBER
TABLE_DESC_ARRAY SYS.CHNF$_QDESC_ARRAY

如果在註冊過程中指定了 ROWID 選項,則 CQ_NOTIFICATION $ _TABLE 類型具有 ROW_DESC_ARRAY 屬性,其類型為 CQ_NOTIFICATION $ _ROW,其中包含已更改行的 ROWID。 如果在 CQ_NOTIFICATION $ _TABLE 物件的 OPFLAGS 字段中設置了 ALL_ROWS,則 ROWID 信息不可用。

SQL> desc CQ_NOTIFICATION$_ROW
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
OPFLAGS NUMBER
ROW_ID VARCHAR2(2000)
處理程式

瞭解了事件物件,可以開始創建事件處理程式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
create or replace PROCEDURE notifications_handler (
ntfnds IN CQ_NOTIFICATION$_DESCRIPTOR /* 事件物件 (Event Object) 參數 */
)
IS
l_regid NUMBER;
l_table_name VARCHAR2(60);
l_event_type NUMBER;
l_numtables NUMBER;
l_operation_type NUMBER;
l_operation_name VARCHAR2(30);
l_numrows NUMBER;
l_row_id VARCHAR2(2000);
l_numqueries NUMBER;
l_qid NUMBER;
l_qop NUMBER;
l_message VARCHAR2(4000) := NULL;
BEGIN
l_regid := ntfnds.registration_id;
l_event_type := ntfnds.event_type;

l_numqueries := 0;

IF (l_event_type = DBMS_CQ_NOTIFICATION.EVENT_QUERYCHANGE) THEN /* 這程式將只處理 QRCN event type 7 */
l_numqueries := ntfnds.query_desc_array.count; /* 註冊的 Query 數 */

FOR i IN 1..l_numqueries LOOP /* loop over queries */
l_qid := ntfnds.query_desc_array(i).queryid;
l_qop := ntfnds.query_desc_array(i).queryop;

l_numtables := 0;
l_numtables := ntfnds.query_desc_array(i).table_desc_array.count; /* 每個 Query 所牽涉的 Table 數 */

FOR j IN 1..l_numtables LOOP /* loop over tables */
l_table_name :=
ntfnds.query_desc_array(i).table_desc_array(j).table_name;
l_operation_type :=
ntfnds.query_desc_array(i).table_desc_array(j).Opflags; /* Insert、Update or ... */

IF (bitand(l_operation_type, DBMS_CQ_NOTIFICATION.ALL_ROWS) = 0) THEN
l_numrows := ntfnds.query_desc_array(i).table_desc_array(j).numrows; /* 影響的筆數 */
ELSE
l_numrows := 0; /* ROWID info not available 注冊 Query 時如果沒有要求 ROWID,筆數將會是 0 */
END IF;

CASE /* 這只是讓資料好看些,非必要。*/
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.INSERTOP) != 0 THEN
l_operation_name := 'Records Inserted';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UPDATEOP) != 0 THEN
l_operation_name := 'Records Updated';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DELETEOP) != 0 THEN
l_operation_name := 'Records Deleted';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.ALTEROP) != 0 THEN
l_operation_name := 'Table Altered';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.DROPOP) != 0 THEN
l_operation_name := 'Table Dropped';
WHEN BITAND(l_operation_type, DBMS_CQ_NOTIFICATION.UNKNOWNOP) != 0 THEN
l_operation_name := 'Unknown Operation';
ELSE
l_operation_name := '?';
END CASE;

/* l_numrows Table 所影響的筆數 */
l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - ' || l_operation_name || '. Rows=' || l_numrows;

INSERT INTO notifications (id, message, notification_date)
VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE);

/* Body of loop does not run when l_numrows is zero. */
FOR k IN 1..l_numrows LOOP /* loop over rows */
/* 影響的每筆及它的 ROWID。可使用 ROWID 讀取該筆資料,這應是你需要處理的地方。*/
l_row_id := ntfnds.query_desc_array(i).table_desc_array(j).row_desc_array(k).row_id;

l_message := 'QueryID:' || l_qid ||' Table (' || l_table_name || ') - RowID:' || l_row_id;

INSERT INTO notifications (id, message, notification_date)
VALUES (notifications_seq.NEXTVAL, l_message, SYSDATE);

END LOOP; /* loop over rows */
END LOOP; /* loop over tables */
END LOOP; /* loop over queries */
END IF;

COMMIT;
END;
/

註冊查詢 (Query)

有了事件處理程式,就可以註冊所要偵測的 Queries。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DECLARE
reginfo CQ_NOTIFICATION$_REG_INFO;
mgr_id NUMBER;
dept_id NUMBER;
v_cursor SYS_REFCURSOR;
regid NUMBER;
BEGIN
reginfo := cq_notification$_reg_info (
'notifications_handler', /* 事件處理程式 */
DBMS_CQ_NOTIFICATION.QOS_QUERY /* QOS_QUERY 表示註冊為 QRCN */
+ DBMS_CQ_NOTIFICATION.QOS_ROWIDS, /* QOS_ROWIDS 發出的事件物件才會包含每筆資料的 ROWID,處理程式才抓得到所影響的單筆詳細資料 */
0, /* registration persists until unregistered */
0, /* notify on all operations */
0 /* notify immediately */
);

/* Create registration. */
regid := DBMS_CQ_NOTIFICATION.new_reg_start(reginfo);

/* 註冊的 Query,當所有會影響此段 Query 結果的異動,都會觸發事件發出通知(Notification)。
這裡可以註冊多段的 Query。 */
OPEN v_cursor FOR
SELECT dbms_cq_notification.CQ_NOTIFICATION_QUERYID, sal, comm
FROM DEMO.EMP
WHERE deptno = 10;

CLOSE v_cursor;

DBMS_CQ_NOTIFICATION.reg_end;
END;
/

註冊的 Query 牽涉到 Emp Table 的 sal、comm 與 deptno 欄位。

  • 查詢已經註冊的事件處理程式:
1
2
3
select regid, table_name, callback
from user_change_notification_regs
/
     REGID TABLE_NAME           CALLBACK
---------- -------------------- ----------------------------------------
1231 DEMO.EMP plsql://notifications_handler?PR=0
  • 查詢已經註冊的 Queries:
1
2
3
select queryid, querytext, regid
from user_cq_notification_queries
/
QUERYID QUERYTEXT                                                         REGID
------- ------------------------------------------------------------ ----------
48 SELECT DEMO.EMP.SAL , DEMO.EMP.COMM FROM DEMO.EMP WHERE D 1231
EMO.EMP.DEPTNO = 10
  • 可以用此註銷註冊的 Query:
1
SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231);

實際運作

SQL> select * from notifications;

no rows selected
SQL> select * from emp where empno in (7608, 7934);

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------ ------ ------ -------
7608 馬小九 ANALYST 7788 28-JUN-10 1000 100 40
7934 楊喆 CLERK 7902 23-JAN-82 1500 10
SQL>  update emp set comm = 99 where empno in (7608, 7934);

2 rows updated. /* 異動兩筆資料,但只有一筆會影響註冊 Query 的結果 */

SQL> commit;

Commit complete.
SQL> select * from notifications;

/* 註冊 Query 時如果沒有 QOS_ROWIDS, ID 261 Rows=0, ID 262 則不會存在 */

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
/* 只觸發一筆資料的 notification */

SQL> select * from emp where rowid = 'AACzQEAAEAAACAXAAN';

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10
SQL> select * from emp where empno = 7934;

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10

SQL> update emp set comm = 99 where empno = 7934;

1 row updated.

SQL> commit;

Commit complete. /* 這個異動沒有影響註冊 Query 的結果 */
/* 沒有觸發 notification 事件 */
SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
SQL> update emp set deptno = 20 where empno = 7934;

1 row updated. /* 將部門由 10 改為 20 */
/* 觸發了 notification 事件 */
SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
=> 263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
=> 264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
SQL> insert into emp values(9099, '李小明', 'CLERK', 7698, sysdate, 23100, null, 10);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
=> 265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55
=> 266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55

SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB';

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
9099 李小明 CLERK 7698 2020-12-25 09:15:52 23100 10
SQL> delete emp where empno = 9099;

1 row deleted.

SQL> commit;

Commit complete.

SQL> select * from notifications;

ID MESSAGE NOTIFICATION_DATE
------- ------------------------------------------------------------ -------------------
261 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 08:27:48
262 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 08:27:48
263 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=1 2020-12-25 09:05:12
264 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAN 2020-12-25 09:05:12
265 QueryID:48 Table (DEMO.EMP) - Records Inserted. Rows=1 2020-12-25 09:15:55
266 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:15:55
=> 267 QueryID:48 Table (DEMO.EMP) - Records Deleted. Rows=1 2020-12-25 09:19:00
=> 268 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAVAAB 2020-12-25 09:19:00

8 rows selected.

SQL> select * from emp where rowid = 'AACzQEAAEAAACAVAAB';

no rows selected

現在來看複雜一點的:

SQL> truncate table notifications;

Table truncated.

SQL> select empno, ename, comm, deptno from emp where deptno = 10;

EMPNO ENAME COMM DEPTNO
------- ---------- ------ -------
7839 KING 10
7782 楊建華 10
7934 楊喆 99 10
8907 牸祢 10

SQL> update emp set comm = 99 where deptno = 10;

4 rows updated. /* update 4 筆資料 */

SQL> commit;

Commit complete.

SQL> select * from notifications;

/* 只有觸發 3 筆資料的 notification */

ID MESSAGE NOTIFICATION_DATE
----- ------------------------------------------------------------ -------------------
275 QueryID:48 Table (DEMO.EMP) - Records Updated. Rows=3 2020-12-25 09:32:15
276 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAA 2020-12-25 09:32:15
277 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAC 2020-12-25 09:32:15
278 QueryID:48 Table (DEMO.EMP) - RowID:AACzQEAAEAAACAXAAS 2020-12-25 09:32:15
SQL> truncate table notifications;

Table truncated.

SQL> select * from emp where empno = 7934;

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
------- ---------- ---------- ------- ------------------- ------ ------ -------
7934 楊喆 CLERK 7902 1982-01-23 00:00:00 1500 99 10

SQL> update emp set comm = comm + 10 where empno = 7934;

1 row updated.

SQL> update emp set comm = comm - 10 where empno = 7934;

1 row updated.

SQL> commit; /* 同一個交易 (Transaction) 沒有改變註冊 Query 的結果 */

Commit complete.

SQL> select * from notifications;

no rows selected /* 沒有觸發 notification 事件 */

測試完,註銷:

SQL> exec DBMS_CQ_NOTIFICATION.DEREGISTER(regid => 1231);

PL/SQL procedure successfully completed.

SQL> select regid, table_name, callback
2 from user_change_notification_regs
3 /

no rows selected

SQL> select queryid, querytext, regid
2 from user_cq_notification_queries
3 /

no rows selected