0%

Oracle Streaming Table Functions

Table function 可以像 SELECT 語句的 FROM 子句中的 Table 一樣使用的函數。 Table function 的一個常見用法是像資料流 (stream data) 一樣,直接從一個來源進程傳輸或轉換到下一個流程,而中間不需有暫存的過程。以這種方式使用的 table function 稱為 Streaming table function。這種技術最常用於數據倉儲,作為提取 (extract),轉換 (transform) 和加載 (load) ETL 操作的一部分。

之前面我們展示了 Table function 的概述。 現在則來看看創建 Streaming table function 所需的基本步驟。

在深入了解詳細細節之前,我們看一下 Streaming table function 功能示例:

INSERT INTO tickers
SELECT *
FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)))
/

這裡發生了什麼? 讓我們從內到外逐步來看:

Code Description
SELECT * FROM stocks 獲取 stocks TABLE 中的所有的 rows
CURSOR ( ) 使用 CURSOR 表達式創建指向結果集 (result set) 的 cursor 變數
( ) 將 cursor 變數傳遞給 doubled table function
doubled ( ) 執行 doubled 函數的轉換並返回結果集
SELECT * FROM TABLE ( … ) SQL 引擎將從 doubled 函數返回的結果集轉換為關係的行和列集(relational set of rows and columns)
INSERT INTO tickers 將這些行插入到 tickers TABLE 中

有時(通常?),您需要在資料流式處理過程 (Streaming process) 中執行多個轉換。沒問題,您可以串聯多個 Table function 的調用:

INSERT INTO tickers
SELECT *
FROM TABLE (transform2 (
CURSOR (SELECT *
FROM TABLE (transform1 (
CURSOR (SELECT * FROM stocks
))))))
/

範例

要將數據從一個 Table 轉換到另一個 Table,我們需要 Table 和這些 Table 中的數據。在此例,從 stocks Table 開始,每筆資料 (row) 包含每個股票代碼的開盤價和收盤價。這裡有 Stocks 資料範本

1
2
3
4
5
6
7
CREATE TABLE stocks (
ticker VARCHAR2(20),
trade_date DATE,
opening_price NUMBER,
closing_price NUMBER
)
/

這裡的轉換很簡單: 將 stocks Table 中的每一筆資料 (row), 拆開成為 tickers Table 的兩筆資料 (將開盤價和收盤價拆為兩筆):

1
2
3
4
5
6
7
CREATE TABLE tickers (
ticker VARCHAR2(20),
pricedate DATE,
pricetype VARCHAR2(1),
price NUMBER
)
/

在繼續之前,必須要指出,對於這個特定的轉換 (從 stocks 中的一行到 tickers 的兩行) 並不需要一個 Table function 來完成工作。你可以使用 INSERT ALL 兩次插入代碼:

1
2
3
4
5
6
7
INSERT ALL
INTO tickers (ticker, pricedate, pricetype, price)
VALUES (ticker, trade_date, 'O', opening_price)
INTO tickers (ticker, pricedate, pricetype, price)
VALUES (ticker, trade_date, 'C', closing_price)
SELECT * FROM stocks
/

你也可以使用 UNPIVOT:

1
2
3
4
5
6
INSERT INTO tickers (ticker, pricedate, pricetype, price)
SELECT *
FROM stocks UNPIVOT ( price
FOR price_type
IN (opening_price AS 'O', closing_price AS 'C'))
/

SQL 是一種非常強大的語言。你的轉換很可能在純 SQL 中就可行,那就應該避免使用 Table function。但是,如果轉換需要使用處理邏輯 (因此需要 PL/SQL),或者如果 SQL 語法無法完成,則 Table function 提供了一種強大,直接的方式來完成工作。在此,假設轉換要復雜得多,並且需要使用 Table function。

之前關於 Table function 所示,當你需要一個 Table function 返回的集合中每個元素(行)含多個欄位數據時,而不僅僅是一個字符串或數字,那你就需要創建一個物件類型 (Object type) 以及這些物件類型的集合類型 (Nested table type)。

在這裡的示例中,要將 stocks 的數據移動到 stickers ,因此我需要一個 “看起來像” stickers table 的物件類型。理想情況下,應該創建一個像下面的集合類型:

CREATE TYPE tickers_nt AS TABLE OF tickers%ROWTYPE
/

但是 %ROWTYPE 是 PL/SQL 聲明屬性,SQL 引擎不認得該屬性,因此該語句會失敗:

PLS-00329: schema-level type has illegal reference to TICKERS

因此我們不能在 Schema level 下使用 tickers%ROWTYPE 必須另外創建一個物件類型。

替代的,需要創建了一個模仿 stickers Table 結構的物件類型,如下所示:

1
2
3
4
5
6
7
8
9
CREATE TYPE ticker_ot
AUTHID DEFINER IS OBJECT
(
ticker VARCHAR2(20),
pricedate DATE,
pricetype VARCHAR2(1),
price NUMBER
)
/

然後創建這些物件類型的集合類型 (Nested table type):

1
2
CREATE TYPE tickers_nt AS TABLE OF ticker_ot
/

現在這個 tickers_nt 集合類型裡的每個元素 (row) 長的就像 ticker_ot 物件類型。與 tickers Table 看起來一模一樣。

SQL> desc tickers_nt;
tickers_nt TABLE OF TICKER_OT
Name Null? Type
----------------------------------------- -------- ----------------------------
TICKER VARCHAR2(20)
PRICEDATE DATE
PRICETYPE VARCHAR2(1)
PRICE NUMBER

SQL> desc tickers;
Name Null? Type
----------------------------------------- -------- ----------------------------
TICKER VARCHAR2(20)
PRICEDATE DATE
PRICETYPE VARCHAR2(1)
PRICE NUMBER

這裡打算在 Table function 使用資料流式的處理(Streaming process)。 意思是將從 SQL 傳遞一組數據 (rows and columns)到 table function。為此,需要定義一個強型態 (strong) 的 REF CURSOR 類型,該類型將用作接受 SQL 語句內數據集的函數參數的數據類型。

在下面的 Package specification 中,創建了兩個強型態 REF CURSOR 類型,一個用於 stocks table 中的 row,另一個用於 tickers table。

1
2
3
4
5
6
7
8
9
10
CREATE OR REPLACE PACKAGE stock_mgr
AUTHID DEFINER
IS
TYPE stocks_rc IS REF CURSOR /* 這裡有 RETURN 特定的物件類型,是一個強型態 Strong REF CURSOR type */
RETURN stocks%ROWTYPE; /* 可以不定義 RETURN,則會是弱型態 Weak REF CURSOR type */

TYPE tickers_rc IS REF CURSOR
RETURN tickers%ROWTYPE;
END stock_mgr;
/
CURSOR 和 REF CURSOR 之間的區別

從最基本的技術層面上講,它們是相同的。“正常”PLSQL CURSOR 在定義中是靜態的 (static)。REF CURSOR 可以動態的打開或基於邏輯動態的打開。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DECLARE
TYPE rc IS REF CURSOR; /* 弱型態 CURSOR */

CURSOR c IS SELECT * FROM dual; /* 靜態的 (static) */

l_cursor rc;
BEGIN
IF (TO_CHAR(SYSDATE, 'dd') = 30) THEN /* 弱型態 REF CURSUR 可以是 */
OPEN l_cursor FOR 'SELECT * FROM emp'; /* static SQL 或 dynamic SQL statement*/
ELSIF (TO_CHAR(SYSDATE, 'dd') = 29) THEN
OPEN l_cursor FOR SELECT * FROM dept;
ELSE
OPEN l_cursor FOR SELECT * FROM dual;
END IF;

OPEN C;
END;
/
  • 無論運行多少次,CURSOR c 始終是 “SELECT * FROM dual”。 REF CURSOR 則可以是任何東西,而且可以是 static SQL 或 dynamic SQL statement。
  • REF CURSOR 可以返回 (return) 給客戶端。 CURSOR 無法返回給客戶端。
  • CURSOR 可以是全域的 (global), 可以定義在 SQL 中;REF CURSOR 不能,它只能定義在子程式中 (你不能定義它們在 PROCEDURE / FUNCTION 的外面)。
  • REF CURSOR 可作為參數在子程式中傳遞,CURSOR 不能。
  • Static SQL (不使用 REF CURSOR) 效率較佳。
  • REF CURSOR 的使用應侷限於:
    • 將結果集返回給客戶端
    • 當沒有其他有效的手段來實現目標時也就是說,您首先應使用靜態 SQL (實際上使用 implicit cursors),只在必要時才使用 REF CURSOR。

使用 REF CURSOR 類型聲明的變數就是 CURSOR 變數 (cursor variable)。在 PL/SQL 中,可以這樣使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DECLARE
l_cursor stock_mgr.stocks_rc; /* stock_mgr.stocks_rc 是一個強型態的 REF CURSOR 類型 */

l_stock stocks%ROWTYPE;
BEGIN
OPEN l_cursor FOR SELECT * FROM stocks; /* 強型態的 REF CURSOR 要使用 static SQL statement */
/* 不能使用 dynamic SQL statement */
LOOP
FETCH l_cursor INTO l_stock;

EXIT WHEN l_cursor%NOTFOUND;
END LOOP;

CLOSE l_cursor;
END;
/

您可以對 cursor 變數使用所有常用的 cursor attributes 和 operators: FETCH,%FOUND,CLOSE 等。

但後面會看到,在 SQL 的 streaming table function 使用此 REF CURSOR 類型的方式會有所不同。

Streaming table function 和“普通”Table function 之間的主要區別是 streaming table function 的參數至少一個是 CURSOR 類型的變數。 Table function 可以有多個 CURSOR 變數的輸入和其他類型的其他參數,如字符串或日期。 在此的 streaming table function,將只使用單個 CURSOR 變數參數。

通常,Streaming table function 的流程是:

  1. 從 CURSOR 變數中獲取一行。
  2. 處裡每一行的轉換應用。
  3. 將轉換後的數據放入集合中。
  4. 完成後返回集合。

現在讓我們看看這個模式是如何在 doubled function 中展開的,將一筆 stocks table 的 row 拆解成兩筆 tickers rows 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
CREATE OR REPLACE FUNCTION doubled (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;

l_stocks stocks_aat;

l_doubled tickers_nt := tickers_nt();
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;

FOR l_row IN 1 .. l_stocks.COUNT
LOOP
l_doubled.EXTEND;
l_doubled(l_doubled.LAST) := ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'O',
l_stocks (l_row).opening_price
);

l_doubled.EXTEND;
l_doubled(l_doubled.LAST) := ticker_ot( l_stocks (l_row).ticker,
l_stocks (l_row).trade_date,
'C',
l_stocks (l_row).closing_price
);
END LOOP;
END LOOP;
CLOSE rows_in;

RETURN l_doubled;
END;
/
  • 第 1 行,使用 Package stock_mgr 中定義的 REF CURSOR 類型。因為要從 stocks Table 中取出資料,所以使用 stocks_rc 類型。
  • 第 2 行,返回一組集合,其中 tickers_nt 類型每個元素看起來就像 tickers Table 中的 row。
  • 第 7 行,聲明一個關聯陣列 (Associative array) 來保存從 rows_in 游標變數中獲取的 rows (因使用 BULK COLLECT 所以需要)。
  • 第 9 行,l_doubled 是一個 local 變數,儲存要返回 SELECT 語句的資料。
  • 第 12 ~ 13 行,啟動一個簡單的循環從 rows_in 游標變數中獲取 row。CURSOR 表達式會自動 Open 這個游標變數。使用 BULK COLLECT 功能每次提取 100 rows。避免沒有效率的逐行處理。當關聯陣列為空時退出循環。
  • 第 15 行,取出關聯陣列每個 row。
  • 第 17 ~ 22 行,使用 EXTEND 在 nested table 的末尾添加另一個元素,然後將 tickers 物件放入集合中新的最後一個索引值。
  • 第 24 ~ 29 行,添加另一個元素,因為我們將一筆拆成兩筆。
  • 第 32 行,現在已經獲取了所有的 row,關閉游標變數。 注意:此步驟是可選的。當使用 CURSOR 表達式傳入結果集時, cursor 將在函數終止時自動關閉。
  • 第 34 行返回結果集。

在這個例子中的 FETCH-BULK COLLECT-LIMIT,使用了值 100 作為 LIMIT 子句。這是一個不錯的預設值。但是,如果您正在處理極大量的 rows 並希望從函數中獲得更好的性能,則可以嘗試更大的 LIMIT 值。但請注意,這會消耗更多的 PGA 記憶體,並且在某些時候,由於記憶體消耗過多,程式碼將會變慢。您可將 LIMIT 值作為參數傳遞,以便能夠在不重新編譯函數的情況下改變執行的性能,如:

1
2
3
4
5
6
7
8
9
10
CREATE OR REPLACE FUNCTION doubled (
rows_in stock_mgr.stocks_rc, limit_in INTEGER DEFAULT 100)
...
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT limit_in;
EXIT WHEN l_stocks.COUNT = 0;
...
END;
/

現在可以來處裡一些 Streaming 的作業了。這裡有 Stocks 資料範本

SQL> select count(*) from stocks;

COUNT(*)
----------
1000

SQL> select count(*) from tickers;

COUNT(*)
----------
0
INSERT INTO tickers
SELECT * FROM TABLE(doubled (CURSOR (SELECT * from stocks)))
/
SQL> select count(*) from tickers;

COUNT(*)
----------
2000

這裡只展示了一個步驟的資料流轉換串接,你可以再建多個 steaming table function,做多步驟的資料流轉換串接。執行的時候就像:

SELECT *
FROM TABLE ( another (CURSOR ( SELECT *
FROM TABLE ( doubled (CURSOR ( SELECT * FROM stocks where ticker = '1500'))))))

Streaming table function 在數據倉儲 ETL 操作中扮演著至關重要的角色。Oracle Database 透過實現 PL/SQL CURSOR 變數和 CURSOR 表達式,使構建此類函數變得容易。

請記住,由 Streaming table function 返回的集合將消耗 PGA 記憶體,因此通過 CURSOR 變數傳遞到函數如果是非常大的數據集可能會導致記憶體耗用錯誤。那能做些什麼?

那就得將 streaming table function 轉為 pipelined streaming table function

現在讓我們創建 doubled 函數的 Pipelined table function 版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE OR REPLACE FUNCTION doubled_pl (rows_in stock_mgr.stocks_rc)
RETURN tickers_nt
PIPELINED
AUTHID DEFINER
IS
TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER;

l_stocks stocks_aat;
BEGIN
LOOP
FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;
EXIT WHEN l_stocks.COUNT = 0;

FOR l_row IN 1 .. l_stocks.COUNT
LOOP
PIPE ROW (ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'O',
l_stocks(l_row).opening_price)
);

PIPE ROW (ticker_ot( l_stocks(l_row).ticker,
l_stocks(l_row).trade_date,
'C',
l_stocks (l_row).closing_price)
);
END LOOP;
END LOOP;

RETURN;
END;
/
  • 第 3 行,加了 PIPELINED 關鍵字。
  • 第 16 行,使用 PIPE ROW 立即將該數據發送回調用查詢。所以不會有阻塞或記憶體消耗的問題,這對大量的 ETL 數據處裡很重要。
  • 第 30 行 RETURN 之前你不需要顯式關閉 CURSOR 變數(rows_in); 當 Table function 終止時,Oracle 資料庫將自動關閉使用 CURSOR 表達式創建的 CURSOR 變數。
  • 第 30 行,只返回控制,沒有其它數據。數據已經都用 PIPE ROW 送出了。

現在我們來看看有使用 pipelined 與沒有使用 pipelined 的區別。

SQL> select count(*) from stocks;

COUNT(*)
----------
256000

SQL> set timing on
SQL> SELECT * FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)))
2 WHERE ROWNUM < 10;

TICKER PRICEDATE P PRICE
-------------------- ------------------ - ----------
1001 24-OCT-18 O 48
1001 24-OCT-18 C 43
...

9 rows selected.

Elapsed: 00:00:02.06
SQL> SELECT * FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
2 WHERE ROWNUM < 10;

TICKER PRICEDATE P PRICE
-------------------- ------------------ - ----------
1001 24-OCT-18 O 48
1001 24-OCT-18 C 43
...

9 rows selected.

Elapsed: 00:00:00.01
  • 使用非 pipelined 版本,我必須等待 256000 行轉換加倍到 512000 行 (同樣消耗大量的 PGA 記憶體)。然後將所有這些行傳遞回 SELECT 語句,此時 SQL 引擎說:“好吧,我只想要前 9 個” 並拋棄其餘的行。

  • 使用 pipelined 版本的回應時間明顯縮短,清楚地 SELECT 語句能夠跟踪函數立即返回的行。只要傳回 9 行,SQL 引擎就會終止 Pipelined table function 的執行 (WHEN NO_DATA_NEEDED)。這只會終止函數,但不會終止調用它的 SELECT 語句。

現在試試沒有 WHERE 條件句。

SQL> SELECT count(*) FROM TABLE (doubled (CURSOR (SELECT * FROM stocks)));

COUNT(*)
----------
512000

Elapsed: 00:00:02.06
SQL> SELECT count(*) FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)));

COUNT(*)
----------
512000

Elapsed: 00:00:00.51

即使沒有 WHERE 條件 pipelined 版本也明顯得快很多,這是因為減少耗用大量 PGA 記憶體的 I/O,如果 PGA 不夠用而需用到 Temporary Tablespace ,則影響更大。

從記憶體的角度來看,非 Pipelined table function 比 pipelined 版本消耗更多的 PGA 記憶體。 這結果應該是完全合理的,因為我們不需要聲明並使用函數所需暫存返回的資料集合。

在使用 APEX 時有些複雜的資料常會使用 APEX Collection,這不僅效能不好,也常是 Oracle 資料庫效能不佳的原因,建議您試試 Table Function!