当前位置:首页 > 开发 > 数据库 > 正文

Overview of pipelined functions(原创)

发表于: 2013-06-29   作者:czmmiao   来源:转载   浏览次数:
摘要: pipelined functionsSince Oracle 8.0, it has been possible to select from a collection of data (usually loaded via a function) as a "virtual table". This technique became popular in the 8i ti

pipelined functions
Since Oracle 8.0, it has been possible to select from a collection of data (usually loaded via a function) as a "virtual table". This technique became popular in the 8i timeframe thanks to Tom Kyte and numerous web forums where "SELECT FROM TABLE(CAST(plsql_function AS collection_type))" became a common technique for binding user-generated lists of data. However, as a technique for processing large volumes of data, "table functions" of this sort are limited due to their potentially large memory footprints.
In 9i Release 1 (9.0), Oracle has introduced pipelined table functions (known simply as pipelined functions). These build on the table function principle but with some critical differences, three of which stand out in particular:
    first, data is piped (or streamed) to the caller in small arrays of prepared data, rather than fully materialised as with original table functions;
    second, pipelined functions can be parallelised by Oracle which means that PL/SQL can be executed by multiple slaves for the first time; and
    third, pipelined functions make it easy to convert PL/SQL procedures into rowsources for bulk SQL operations, combining complex transformation logic with the benefits of SQL.
This article introduces the fundamentals of pipelined functions, starting with a simple example.
a simple pipelined function
The best way to visualise a pipelined function is to see a simple example. For any pipelined function we require the following elements:
    a collection type; and
    the pipelined PL/SQL function (usually contained inside a package).
In the following example, we will create a simple row-generator that pipes out a given number of records. To begin, we require a collection type that is structured according to the data that we wish to pipe. In our row-generator function, we will pipe a single number attribute, so we can simply create a collection type of Oracle's NUMBER datatype.
SQL> CREATE OR REPLACE TYPE number_ntt AS TABLE OF NUMBER;
  2  /
Type created.
Oracle will use this collection type to "buffer" small numbers of records as it pipes data to the function caller. We can now create our first pipelined function. We will create a standalone function for simplicity (we will normally use packaged functions) and some syntax will be unfamiliar. Some comments follow the code.
SQL> CREATE FUNCTION row_generator (
  2                  rows_in IN PLS_INTEGER
  3                  ) RETURN number_ntt PIPELINED IS
  4  BEGIN
  5     FOR i IN 1 .. rows_in LOOP
  6        PIPE ROW (i);
  7     END LOOP;
  8     RETURN;
  9  END;
 10  /
Function created.
Note in particular the following:
    the PIPELINED keyword in the function definition. There are several extensions to this that we will see later in this article;
    the return type must be a collection type (in our example we are using the simple NUMBER_NTT collection type based on NUMBER);
    the new PIPE ROW syntax to add a single record into the return collection stream. The record we pipe must be based on the same underlying type as the collection type. In our row-generator, we pipe out records of a single number attribute;
    the "empty" RETURN statement. In a pipelined function, the PIPE ROW statement is responsible for "returning" data and not the RETURN statement.   --区别于其他函数,pipeline function并不通过return返回函数值,而是通过pipe row子句,故会有个空的return.
Now we have created a simple pipelined function, we can use it to generate 10 rows of data, as follows.
SQL> SELECT *
  2  FROM   TABLE( row_generator(10) );
COLUMN_VALUE
------------
           1
           2
           3
           4
           5
           6
           7
           8
           9
          10
10 rows selected.
Note how we wrap the function call in the TABLE operator. This "virtualises" the collection and allows us to select from it as a datasource such as a table or view. The TABLE operator has been available since Oracle 8.0 (when it was named "THE" - it was renamed to "TABLE" in 8i).
Note also the name of the column generated by our function. Remember we based our collection type on the built-in datatype NUMBER. In situations such as this (i.e. when the collection type is not based on any explicitly-named attributes), Oracle defaults to COLUMN_VALUE. In practice, we are more likely to base our collection types on more complex structures than a single scalar built-in, as we will see later in this article.
streaming vs materialising: a visual example
It is quite difficult to visualise how a pipelined function differs from a table function from a static example in an article. In the following example, we will attempt to demonstrate the "streaming" effect of pipelined functions and compare this to a standard table function. First we will create a simple table function that will return a small collection of our previous collection type.
SQL> CREATE FUNCTION table_function RETURN number_ntt AS
  2     nt number_ntt := number_ntt();
  3  BEGIN
  4     FOR i IN 1 .. 5 LOOP
  5        DBMS_LOCK.SLEEP(1);
  6        nt.EXTEND;
  7        nt(nt.LAST) := i;
  8     END LOOP;
  9     RETURN nt; --<-- return whole collection
 10  END table_function;
 11  /
Function created.
Note how the table function loads a collection variable in full before returning it to the "consumer". Next we create a pipelined function to generate the same number of rows as the table function.
SQL> CREATE FUNCTION pipelined_function RETURN number_ntt PIPELINED AS
  2  BEGIN
  3     FOR i IN 1 .. 5 LOOP
  4        DBMS_LOCK.SLEEP(1);
  5        PIPE ROW (i); --<-- send row to consumer
  6     END LOOP;
  7     RETURN;
  8  END pipelined_function;
  9  /
Function created.
We can see that both functions include a 1 second sleep inside their respective loops and that the overall duration will be approximately 5 seconds. We will use this to demonstrate the difference between the execution of the functions and the delivery of the first rows. To help us with this, we will create a small function to return a TIMESTAMP against each generated row (using the built-in LOCALTIMESTAMP function directly will not work as it returns a constant value for the entire recordset).
SQL> CREATE FUNCTION get_time RETURN TIMESTAMP IS
  2  BEGIN
  3     RETURN LOCALTIMESTAMP;
  4  END get_time;
  5  /
Function created.
Finally, we setup our session's timestamp format and set the sqlplus arraysize to 1 to ensure that the pipelined function streams a single record at a time.
SQL> ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'HH24:MI:SS.FF3';
Session altered.
SQL> set arraysize 1
We are now ready to demonstrate the effect of the pipelined function. We will begin by selecting from the simple table function.
SQL> SELECT get_time() AS ts FROM DUAL;
TS
----------------------------------------------------------
07:02:25.000
1 row selected.
SQL> SELECT column_value
  2  ,      get_time() AS ts
  3  FROM   TABLE(table_function);
COLUMN_VALUE TS
------------ ---------------------------------------------
           1 07:02:30.062  --<-- first row takes 5 seconds
           2 07:02:30.062
           3 07:02:30.062
           4 07:02:30.062
           5 07:02:30.062
5 rows selected.
We can see quite clearly that the table function did not deliver any data until the function completed. The elapsed time between the call to the function and the delivery of the data was a little over 5 seconds. We can compare this now to the pipelined function, as follows.
SQL> SELECT get_time() AS ts FROM dual;
TS
----------------------------------------------------------
07:02:35.359
1 row selected.
SQL> SELECT column_value
  2  ,      get_time() AS ts
  3  FROM   TABLE(pipelined_function);
COLUMN_VALUE TS
------------ ---------------------------------------------
           1 07:02:36.390 --<-- first row after 1 second
           2 07:02:37.390
           3 07:02:38.390
           4 07:02:39.390
           5 07:02:40.390 --<-- last row after 5 seconds
5 rows selected.
The pipelined function starts to return data as soon as it is prepared (in this case after a 1 second sleep). This means that the consumer is provided with data almost as soon as it is ready and not when the entire rowsource is materialised (in Oracle literature, pipelined functions often feed other pipelined functions). In actual fact, no rowsource is ever materialised with pipelined functions and this leads to memory savings and performance improvements. We will now see the impact on memory.

--这是pipeline table function优于普通table function的优势之一,不需要等待查询结果集全部获取再进行处理,而是马上获取马上处理。
streaming vs materialising: memory usage
To continue with our comparison of pipelined functions with table functions, we will now examine the memory usage and its effect on timings. In the following example, we will create two functions; one table and one pipelined. Both functions will generate 1,000,000 records of 10 bytes. We start by creating a general VARCHAR2 collection type and the two functions.
SQL> CREATE OR REPLACE TYPE varchar2_ntt AS TABLE OF VARCHAR2(4000);
  2  /
Type created.
SQL> CREATE OR REPLACE FUNCTION table_function RETURN varchar2_ntt AS
  2     nt varchar2_ntt := varchar2_ntt();
  3  BEGIN
  4     FOR i IN 1 .. 1000000 LOOP
  5        nt.EXTEND;
  6        nt(nt.LAST) := RPAD('x',10);
  7     END LOOP;
  8     RETURN nt; --<-- return whole collection
  9  END table_function;
 10  /
Function created.
SQL> CREATE OR REPLACE FUNCTION pipelined_function RETURN varchar2_ntt PIPELINED AS
  2  BEGIN
  3     FOR i IN 1 .. 1000000 LOOP
  4        PIPE ROW (RPAD('x',10)); --<-- send row to consumer
  5     END LOOP;
  6     RETURN;
  7  END pipelined_function;
  8  /
Function created.
Next we create a small function to give us the PGA memory usage in our session.
SQL> CREATE FUNCTION get_memory RETURN NUMBER IS
  2     n NUMBER;
  3  BEGIN
  4     SELECT value INTO n
  5     FROM   v$mystat
  6     WHERE  statistic# = 20; --<-- 'pga memory max'
  7     RETURN n;
  8  END get_memory;
  9  /
Function created.
We can now compare the memory "footprint" of the functions. For each function we will capture the memory usage, run a full select from the function using autotrace (to limit screen output) and output the memory delta. We will begin with the table function.
SQL> var v_memory NUMBER;
SQL> exec :v_memory := get_memory();
PL/SQL procedure successfully completed.
SQL> set timing on
SQL> set autotrace traceonly statistics
SQL> SELECT *
  2  FROM   TABLE(table_function);
1000000 rows selected.
Elapsed: 00:00:14.03
Statistics
----------------------------------------------------------
         21  recursive calls
          0  db block gets
         23  consistent gets
          0  physical reads
          0  redo size
   13067026  bytes sent via SQL*Net to client
     733825  bytes received via SQL*Net from client
      66668  SQL*Net roundtrips to/from client
          5  sorts (memory)
          0  sorts (disk)
    1000000  rows processed
SQL> BEGIN
  2     DBMS_OUTPUT.PUT_LINE(
  3        TO_CHAR(get_memory() - :v_memory) || ' bytes of PGA used.'
  4        );
  5  END;
  6  /
38736696 bytes of PGA used.
PL/SQL procedure successfully completed.
We can see that the table function used approximately 35Mb of PGA memory to materialise the entire collection. For this reason, table functions are neither scalable nor suitable for concurrency. This somewhat restricts their suitability to smaller array "helper" functions, such as in-list binding of user-inputs (this is a common technique that has become an FAQ on some online forums).
Pipelined functions, on the other hand, are targetted directly at high-volume processing of complex data transformations, which means they are designed to be scalable and efficient. To be scalable, they must be intelligent with memory. To test whether this is the case, we can repeat the simple memory test with our single-attribute pipelined function as follows.
SQL> exec :v_memory := get_memory();
PL/SQL procedure successfully completed.
SQL> set autotrace traceonly statistics
SQL> SELECT *
  2  FROM   TABLE(pipelined_function);
1000000 rows selected.
Elapsed: 00:00:12.01
Statistics
----------------------------------------------------------
         21  recursive calls
          0  db block gets
         26  consistent gets
          0  physical reads
          0  redo size
   13067026  bytes sent via SQL*Net to client
     733825  bytes received via SQL*Net from client
      66668  SQL*Net roundtrips to/from client
          5  sorts (memory)
          0  sorts (disk)
    1000000  rows processed
SQL> BEGIN
  2     DBMS_OUTPUT.PUT_LINE(
  3        TO_CHAR(get_memory() - :v_memory) || ' bytes of PGA used.'
  4        );
  5  END;
  6  /
196608 bytes of PGA used.
PL/SQL procedure successfully completed.
We can see a huge saving in the memory footprint. In fact, it doesn't really matter if we process 1 row or 1 million rows from the pipelined function. The memory usage will be low (assuming we don't pipe massive LOBs of course).

--之所以会使用更少的内存是因为不需要将所有的结果集物化到内存中。
complex pipelined functions
So far we have seen a simple, single-attribute pipelined function based on a built-in datatype (we have also seen a lot of detail on the mechanics of pipelined functions). Of course, in "real" systems, we are more likely to be processing more complex data structures (i.e. records with multiple attributes). To process complex records, a pipelined function requires three elements as follows:
    a defining record structure. This can be an object type or a PL/SQL record type defined in a package specification;
    a collection type of the underlying record structure. This can be a collection type created via the CREATE TYPE... command or a collection type declared in a PL/SQL package specification; and
    the pipelined function itself, which returns the collection type (as we saw with our simple row-generator example).
Therefore, we have two ways to define the underlying types; either by creating them or by declaring them in a package. We will examine both methods below.
pipelined functions with user-defined types
This method requires us to create an object type to define the record structure that is returned by our pipelined function. The object types we create can have methods if required, but in most cases we will use them purely as persistent record definitions.
In the following example, we will create a pipelined function based on the ubiquitous EMP table. We will define our record and collection structures using an object and collection type, respectively. The pipelined function will simply fetch and pipe EMP data. Obviously this is not the intended use for pipelined functions, but the focus here is to demonstrate the mechanics as clearly as possible.
We will begin by creating our EMP "record" as an object type.
SQL> CREATE TYPE emp_ot AS OBJECT
  2  ( empno    NUMBER(4)
  3  , ename    VARCHAR2(10)
  4  , job      VARCHAR2(9)
  5  , mgr      NUMBER(4)
  6  , hiredate DATE
  7  , sal      NUMBER(7,2)
  8  , comm     NUMBER(7,2)
  9  , deptno   NUMBER(2)
 10  );
 11  /
Type created.
Next we create a collection type based on this structure. This will be the return type of our pipelined function.
SQL> CREATE TYPE emp_ntt AS TABLE OF emp_ot;
  2  /
Type created.
Using these types, we will now create our pipelined function to pipe out EMP data as follows. Note the use of the object type constructor to prepare a single row in the correct format for piping.
SQL> CREATE FUNCTION pipelined_emp RETURN emp_ntt PIPELINED AS
  2  BEGIN
  3     FOR r_emp IN (SELECT * FROM emp) LOOP
  4        PIPE ROW ( emp_ot( r_emp.empno,
  5                           r_emp.ename,
  6                           r_emp.job,
  7                           r_emp.mgr,
  8                           r_emp.hiredate,
  9                           r_emp.sal,
 10                           r_emp.comm,
 11                           r_emp.deptno ) );
 12     END LOOP;
 13     RETURN;
 14  END pipelined_emp;
 15  /
Function created.
We can now select from this function as follows. The columns names are derived from the underlying object type that we used to define a single record (EMP_OT).
SQL> SELECT pe.empno
  2  ,      pe.ename
  3  ,      pe.job
  4  ,      pe.sal
  5  FROM   TABLE(pipelined_emp) pe;
     EMPNO ENAME      JOB              SAL
---------- ---------- --------- ----------
      7369 SMITH      CLERK            800
      7499 ALLEN      SALESMAN        1600
      7521 WARD       SALESMAN        1250
      7566 JONES      MANAGER         2975
      7654 MARTIN     SALESMAN        1250
      7698 BLAKE      MANAGER         2850
      7782 CLARK      MANAGER         2450
      7788 SCOTT      ANALYST         3000
      7839 KING       PRESIDENT       5000
      7844 TURNER     SALESMAN        1500
      7876 ADAMS      CLERK           1100
      7900 JAMES      CLERK            950
      7902 FORD       ANALYST         3000
      7934 MILLER     CLERK           1300
14 rows selected.
pipelined functions with pl/sql types
In addition to creating our own types to support a pipelined function, we can also let Oracle do it for us. This alternative method enables us to define our underlying record and collection types inside a PL/SQL package specification. Oracle use these types as the basis for creating a system-generated set of object/collection types to support the pipelined function. We can see a small example of this below.
The following package specification contains an EMP record type and associated collection type. We declare our pipelined function to use these types.
SQL> CREATE PACKAGE pipelined_pkg AS
  2
  3     TYPE emp_rt IS RECORD
  4     ( empno    NUMBER(4)
  5     , ename    VARCHAR2(10)
  6     , job      VARCHAR2(9)
  7     , mgr      NUMBER(4)
  8     , hiredate DATE
  9     , sal      NUMBER(7,2)
 10     , comm     NUMBER(7,2)
 11     , deptno   NUMBER(2)
 12     );
 13
 14     TYPE emp_ntt IS TABLE OF pipelined_pkg.emp_rt;
 15
 16     FUNCTION pipelined_emp
 17        RETURN pipelined_pkg.emp_ntt PIPELINED;
 18
 19  END pipelined_pkg;
 20  /
Package created.
Remember that Oracle uses these PL/SQL types as the basis for creating SQL types. We can see this in the dictionary as follows. The types are system-named according to the object ID of the package specification.
SQL> SELECT type_name
  2  ,      typecode
  3  ,      attributes
  4  FROM   user_types
  5  WHERE  INSTR( type_name, (SELECT object_id
  6                            FROM   user_objects
  7                            WHERE  object_name = 'PIPELINED_PKG'
  8                            AND    object_type = 'PACKAGE') ) > 0;
TYPE_NAME                    TYPECODE           ATTRIBUTES
---------------------------- ------------------ ----------
SYS_PLSQL_33433_71_1         COLLECTION                  0
SYS_PLSQL_33433_9_1          OBJECT                      8
SYS_PLSQL_33433_DUMMY_1      COLLECTION                  0
3 rows selected.
SQL> SELECT type_name
  2  ,      attr_no
  3  ,      attr_name
  4  ,      attr_type_name
  5  FROM   user_type_attrs
  6  WHERE  INSTR( type_name, (SELECT object_id
  7                            FROM   user_objects
  8                            WHERE  object_name = 'PIPELINED_PKG'
  9                            AND    object_type = 'PACKAGE') ) > 0
 10  ORDER  BY
 11         attr_no;
TYPE_NAME               ATTR_NO ATTR_NAME        ATTR_TYPE_NAME
---------------------- -------- ---------------- -------------------
SYS_PLSQL_33433_9_1           1 EMPNO            NUMBER
SYS_PLSQL_33433_9_1           2 ENAME            VARCHAR2
SYS_PLSQL_33433_9_1           3 JOB              VARCHAR2
SYS_PLSQL_33433_9_1           4 MGR              NUMBER
SYS_PLSQL_33433_9_1           5 HIREDATE         DATE
SYS_PLSQL_33433_9_1           6 SAL              NUMBER
SYS_PLSQL_33433_9_1           7 COMM             NUMBER
SYS_PLSQL_33433_9_1           8 DEPTNO           NUMBER
8 rows selected.
SQL> SELECT type_name
  2  ,      coll_type
  3  ,      elem_type_name
  4  FROM   user_coll_types
  5  WHERE  INSTR( type_name, (SELECT object_id
  6                            FROM   user_objects
  7                            WHERE  object_name = 'PIPELINED_PKG'
  8                            AND    object_type = 'PACKAGE') ) > 0;
TYPE_NAME                   COLL_TYPE           ELEM_TYPE_NAME
--------------------------- ------------------- ---------------------------
SYS_PLSQL_33433_71_1        TABLE               SYS_PLSQL_33433_9_1
SYS_PLSQL_33433_DUMMY_1     TABLE               NUMBER
2 rows selected.
Note that the record type in this example actually matches the EMP table, so we could instead do the following, replacing emp%ROWTYPE for emp_rt wherever needed and dispensing with the explicit record type for brevity.
SQL> CREATE OR REPLACE PACKAGE pipelined_pkg AS
  2
  3     TYPE emp_ntt IS TABLE OF emp%ROWTYPE;
  4
  5     FUNCTION pipelined_emp
  6        RETURN pipelined_pkg.emp_ntt PIPELINED;
  7
  8  END pipelined_pkg;
  9  /
Package created.
The main difference between this and an explicit record structure is that Oracle uses the EMP table as the basis for creating the underlying object type (i.e. we can search the dictionary for the system-generated types with the EMP table's OBJECT_ID in the name).
Either way, we now have a defining record type (emp%ROWTYPE or emp_rt) and a collection type (emp_ntt). We can now implement our pipelined function in the package body. Note that this package body matches our original specification that included the explicit emp_rt record type.
SQL> CREATE PACKAGE BODY pipelined_pkg AS
  2
  3     FUNCTION pipelined_emp RETURN pipelined_pkg.emp_ntt PIPELINED IS
  4
  5        r_emp pipelined_pkg.emp_rt;
  6
  7     BEGIN
  8
  9        FOR r_tmp IN (SELECT * FROM emp WHERE ROWNUM <= 5) LOOP
 10
 11           /*
 12           || Our implicit cursor-for-loop record matches the target
 13           || record type so we can actually pipe it out...
 14           */
 15           PIPE ROW (r_tmp);
 16
 17           /*
 18           || We are more likely to use an explicit record variable.
 19           || Let's pipe out another record...
 20           */
 21           r_emp := r_tmp;
 22           r_emp.ename := LOWER(r_emp.ename); --<-- dummy transformation
 23           r_emp.job := LOWER(r_emp.job);     --<-- dummy transformation
 24           PIPE ROW (r_emp);
 25
 26        END LOOP;
 27
 28        RETURN;
 29
 30     END pipelined_emp;
 31
 32  END pipelined_pkg;
 33  /
Package body created.
The key difference to note is that we do not need to "understand" the underlying object type. We work only with our PL/SQL record type, which is possibly more familiar to many developers. Oracle is responsible for casting the records to the underlying type that the SQL engine understands.
For demonstration purposes only, we are piping two records for every source record in this example. This enables us to see that as long as the record variable matches the type that underpins the returning collection, we can pipe it. In our package body we pipe out the implicit record from our cursor-for-loop in addition to the explicit record variable in our function declaration. These happen to be of the same structure. The following SQL statement shows clearly that we have piped two output records for every input record (remember we transformed some of the attributes of the explicit record variable).
SQL> SELECT pe.empno
  2  ,      pe.ename
  3  ,      pe.job
  4  ,      pe.sal
  5  FROM   TABLE(pipelined_pkg.pipelined_emp) pe;
     EMPNO ENAME      JOB              SAL
---------- ---------- --------- ----------
      7369 SMITH      CLERK            800
      7369 smith      clerk            800
      7499 ALLEN      SALESMAN        1600
      7499 allen      salesman        1600
      7521 WARD       SALESMAN        1250
      7521 ward       salesman        1250
      7566 JONES      MANAGER         2975
      7566 jones      manager         2975
      7654 MARTIN     SALESMAN        1250
      7654 martin     salesman        1250
10 rows selected.
recap: complex pipelined functions
To recap therefore, we can use one of two methods to define the underlying record structures for use in our pipelined functions. We can create object and collection types explicitly (CREATE TYPE...) or we can use standard PL/SQL declarations in a package specification (record and collection types). If we choose the former, we create additional dependencies and more source objects, but we have the full implementation of the application under our control. If we choose the latter, we can use familiar PL/SQL record syntax but need to be aware that Oracle will create additional database objects on our behalf.
The examples we have seen so far have been contrived and extremely basic. We have not seen anything in these functions that actually warrants their use! For the remainder of this article we will examine pipelined functions from a more real-life perspective, including options for performance and their practical application.
parallel pipelined functions
Pipelined functions have a unique capability over any other form of PL/SQL processing - they are able to be parallelised by Oracle. There are certain conditions required for this to happen, but it basically means that Oracle can divide a unit of serialised PL/SQL processing among a set of parallel slaves.
To create a parallel pipelined function we require two additional elements to our code:
    additional parallel-enabling syntax in our function definition; and
    source data supplied via a cursor variable (i.e. not statically compiled within the function itself as in our previous examples).
We will see a simple example of a parallel pipelined function as follows. We will use explicit types rather than PL/SQL types and, to keep the code simple, we will create a standalone function. We will use a larger dataset based on a multiple of ALL_SOURCE (we will generate approximately 1 million records) to compare the impact of parallel processing.
First, we create our types as follows.
SQL> CREATE TYPE all_source_ot AS OBJECT
  2  ( owner VARCHAR2(30)
  3  , name  VARCHAR2(30)
  4  , type  VARCHAR2(12)
  5  , line  NUMBER
  6  , text  VARCHAR2(4000)
  7  );
  8  /
Type created.
SQL> CREATE TYPE all_source_ntt
  2     AS TABLE OF all_source_ot;
  3  /
Type created.
We create our parallel-enabled pipelined function as follows. Note how we include the session SID in the output. This will enable us to see the effect of parallelism (i.e. each slave will have its own session).
SQL> CREATE FUNCTION parallel_pipelined_function(
  2                  cursor_in IN SYS_REFCURSOR
  3                  ) RETURN all_source_ntt
  4                    PIPELINED
  5                    PARALLEL_ENABLE (PARTITION cursor_in BY ANY) IS
  6
  7     TYPE incoming_data_ntt IS TABLE OF all_source%ROWTYPE;
  8     v_incoming incoming_data_ntt;
  9
 10     v_outgoing all_source_ot;
 11
 12     v_sid      INTEGER;
 13
 14  BEGIN
 15
 16     /*
 17     || This will help us to see parallelism in action...
 18     */
 19     SELECT sid INTO v_sid
 20     FROM   v$mystat
 21     WHERE  ROWNUM = 1;
 22
 23     /*
 24     || Process the incoming datasource...
 25     */
 26     LOOP
 27
 28        FETCH cursor_in BULK COLLECT INTO v_incoming LIMIT 500;
 29
 30        FOR i IN 1 .. v_incoming.COUNT LOOP
 31
 32           v_outgoing := all_source_ot( v_sid,
 33                                        v_incoming(i).name,
 34                                        v_incoming(i).type,
 35                                        v_incoming(i).line,
 36                                        v_incoming(i).text );
 37
 38           PIPE ROW (v_outgoing);
 39
 40        END LOOP;
 41
 42        EXIT WHEN cursor_in%NOTFOUND;
 43
 44     END LOOP;
 45     CLOSE cursor_in;
 46
 47     RETURN;
 48
 49  END parallel_pipelined_function;
 50  /
Function created.
We can see some new syntax in our parallel pipelined function. In particular, note the following:
    Line 2: the function accepts a cursor parameter. This is a prerequisite of parallel pipelined functions, i.e. we cannot parallelise cursors that are embedded within the function itself (such as those in our earlier examples). In our example we have used the built-in weak refcursor type SYS_REFCURSOR, but we can alternatively use our own weak or strong refcursor types (defined in a package specification);
    Line 5: we declare the function parallel-enabled (non-pipelined functions can also be declared with PARALLEL_ENABLE) and the syntax in parentheses is specific to pipelined functions. The PARTITION BY clause tells Oracle how to divide the source dataset (pointed to by the cursor parameter) among the parallel slaves. For weak refcursors such as SYS_REFCURSOR, we can only use the ANY keyword (i.e. Oracle will decide how to divide the source data). If we use a strong refcursor type, however, we can partition our source dataset by HASH or RANGE on a named cursor attribute or list of attributes (the attribute names are derived from the strong refcursor's return record type). This is useful when the incoming data needs to be processed in related groups and maybe in a certain order; using RANGE or HASH partitioning ensures that inter-dependent data is sent to the same parallel slave for processing. We will not see any examples of RANGE or HASH parallel pipelined functions in this article, but note that performance will degrade slightly with the PARTITION BY RANGE option as Oracle will need to sort the source data first. The PARTITION BY HASH option displays similar performance to PARTITION BY ANY;
    Lines 7-8 and 28: this is purely optional but seeing as we are coding for performance, we will fetch data from the source cursor in arrays using BULK COLLECT;
    Lines 19-21 and 32: we have included the session SID in the output. This will enable us to see the effect of parallelism when we use this function later (i.e. each slave will have its own session ID). This is purely for demonstration purposes and has nothing to do with parallel pipelined function syntax; and
    Line 45: the cursor parameter is already open when it is passed into the pipelined function. As our function is the only user of this cursor, we need to close it.
Moving on, we now have a parallel pipelined function ready for testing. To test it, we will create a large input dataset based on ALL_OBJECTS. We will create a single table of this dataset for simplicity as follows.
SQL> CREATE TABLE source_table
  2  NOLOGGING
  3  AS
  4     SELECT a.*
  5     FROM   all_source a
  6     ,      TABLE(row_generator(20));
Table created.
SQL> SELECT COUNT(*) FROM source_table;
  COUNT(*)
----------
    998960
1 row selected.
To test our parallel pipelined function, we will select from it twice; once in parallel and once in serial. We will time each query and verify that Oracle parallelised the query in two ways: first using the returned data itself (remember that we piped the session ID from our function) and second using V$PQ_SESSTAT. First we will run the parallel version.
SQL> SELECT nt.owner
  2  ,      COUNT(*)
  3  FROM   TABLE(
  4            parallel_pipelined_function(
  5               CURSOR(SELECT * FROM source_table))) nt
  6  GROUP  BY
  7         nt.owner;
OWNER                            COUNT(*)
------------------------------ ----------
10                                 496553
13                                 502407
2 rows selected.
Elapsed: 00:00:38.07
SQL> SELECT *
  2  FROM   v$pq_sesstat
  3  WHERE  statistic = 'Queries Parallelized';
STATISTIC                      LAST_QUERY SESSION_TOTAL
------------------------------ ---------- -------------
Queries Parallelized                    1             1
1 row selected.
We can see that Oracle used two parallel slaves and divided the work relatively evenly between them. Note how we passed in the cursor parameter using the CURSOR expression. This opens the cursor for the embedded SQL statement and passes it through to the pipelined function as a cursor variable.
Finally we can test the parallel pipelined function in serial as follows.
SQL> ALTER SESSION DISABLE PARALLEL QUERY;
Session altered.
SQL> set timing on
SQL> SELECT nt.owner
  2  ,      COUNT(*)
  3  FROM   TABLE(
  4            parallel_pipelined_function(
  5               CURSOR(SELECT * FROM source_table))) nt
  6  GROUP  BY
  7         nt.owner;
OWNER                            COUNT(*)
------------------------------ ----------
11                                 998960
1 row selected.
Elapsed: 00:00:49.01
SQL> SELECT *
  2  FROM   v$pq_sesstat
  3  WHERE  statistic = 'Queries Parallelized';
STATISTIC                      LAST_QUERY SESSION_TOTAL
------------------------------ ---------- -------------
Queries Parallelized                    0             2
1 row selected.
This time we can see that the function has executed in serial. It is interesting to note that we haven't saved much time with the parallel execution on the demonstration database. Note that on larger database servers, this approach will yield much better gains where the number of CPUs/slaves will be higher and the degree of parallelism can be controlled by appropriate hints.
pipelined functions and the optimizer
We have seen that pipelined functions are alternative rowsources to tables. Our examples in this article have been simple, but usually pipelined functions will be full of complex transformation logic that turns input row A into output row B (and maybe B2, B3 etc). It is usually the case that they will generate significant volumes of data (as they are primarily an ETL tool).
Because pipelined functions generate data, the CBO needs to know how much, especially if a function is one of several rowsources in a SQL statement (i.e. it is joined to tables/views/other pipelined functions). The following demonstrates the execution plan for a simple select from our parallel pipelined function using Autotrace.
SQL> set autotrace traceonly explain
SQL> SELECT *
  2  FROM   TABLE(
  3            parallel_pipelined_function(
  4               CURSOR(SELECT * FROM source_table))) nt;
Execution Plan
--------------------------------------------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=11 Card=8168 Bytes=898480)
   1    0   VIEW* (Cost=11 Card=8168 Bytes=898480)                                              :Q84001
   2    1     COLLECTION ITERATOR* (PICKLER FETCH) OF 'PARALLEL_PIPELINED_FUNCTION'             :Q84001
   3    2       TABLE ACCESS* (FULL) OF 'SOURCE_TABLE' (Cost=360 Card=998960 Bytes=87908480)    :Q84000
   1 PARALLEL_TO_SERIAL            SELECT C0 FROM (SELECT VALUE(A2) C0 FROM
                                   TABLE("SCOTT"."PARALLEL_PIPELINED_FUNCT
   2 PARALLEL_COMBINED_WITH_PARENT
   3 PARALLEL_TO_PARALLEL          SELECT /*+ NO_EXPAND ROWID(A1) */ A1."OWNER"
                                   C0,A1."NAME" C1,A1."TYPE" C2,A1."LI
Two things stand out from this plan. First, the COLLECTION ITERATOR PICKLER FETCH. This is Oracle's mechanism for materialising the collection data from memory. Second, and more important, is the CBO's estimated rowcount. This defaults to 8,168 which is clearly incorrect. There is no way for Oracle to correctly identify the number of rows that will be generated from the execution of a pipelined function, even though it is clear that it is accessing all the source table's data (Step 3). The CBO cannot see inside the function's logic and even if it could, the function might generate multiple rows from one input or might discard most rows due to certain conditions; the possibilities are endless.
Note that Step 3 in the plan is related to parallel execution only. This is the step used by Oracle to assign the incoming data to parallel slaves for processing. It disappears from the plan with parallel query disabled (we will see this below).
There is an undocumented CARDINALITY hint that enables us to tell the CBO how many rows will be generated by a pipelined function. The problem with this hint though is that its behaviour is seemingly erratic (and is not explicitly supported by Oracle). In the following Autotrace explain plans, we can see the effect (or lack of effect) of this hint. First we can see the effect on our simple pipelined_emp function.
SQL> set autotrace traceonly explain
SQL> SELECT *
  2  FROM   TABLE(pipelined_emp);
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE
   1    0   COLLECTION ITERATOR (PICKLER FETCH) OF 'PIPELINED_EMP'
SQL> SELECT --+ CARDINALITY(e,1000)
  2         *
  3  FROM   TABLE(pipelined_emp) e;
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=11 Card=1000 Bytes=2000)
   1    0   COLLECTION ITERATOR (PICKLER FETCH) OF 'PIPELINED_EMP'
In the hinted version, we have told the CBO that our function will generate 1,000 records and we can see this in the plan. In the unhinted version, Oracle does not even make an estimate (this appears to be the case with selects from pipelined functions without either a join or an input cursor). Next we can see the effect of this hint with a join.
SQL> SELECT *
  2  FROM   dept                 d
  3  ,      TABLE(pipelined_emp) e
  4  WHERE  d.deptno = e.deptno;
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=14 Card=8168 Bytes=163360)
   1    0   HASH JOIN (Cost=14 Card=8168 Bytes=163360)
   2    1     TABLE ACCESS (FULL) OF 'DEPT' (Cost=2 Card=4 Bytes=72)
   3    1     COLLECTION ITERATOR (PICKLER FETCH) OF 'PIPELINED_EMP'
SQL> SELECT --+ CARDINALITY(e,100000)
  2         *
  3  FROM   dept                 d
  4  ,      TABLE(pipelined_emp) e
  5  WHERE  d.deptno = e.deptno;
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=14 Card=100000 Bytes=2000000)
   1    0   HASH JOIN (Cost=14 Card=100000 Bytes=2000000)
   2    1     TABLE ACCESS (FULL) OF 'DEPT' (Cost=2 Card=4 Bytes=72)
   3    1     COLLECTION ITERATOR (PICKLER FETCH) OF 'PIPELINED_EMP'
We can see the 8,168 figure appearing again in our first example. It is interesting that this time the cardinality is applied to the join and not the pipelined function fetch. We can see this more clearly with the second example where we told the CBO that the function would generate 100,000 rows. Oracle has assumed a join for every row and applied this cardinality to Step 1 (hash join between the two rowsources). If we return to our parallel pipelined function, we can see that this hint has no such effect; in fact, it is not recognised at all (note that this is a serial plan, rather than parallel, which explains why the CURSOR SQL does not appear).
SQL> SELECT *
  2  FROM   TABLE(
  3            parallel_pipelined_function(
  4               CURSOR(SELECT * FROM source_table))) nt;
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=11 Card=8168 Bytes=898480)
   1    0   VIEW (Cost=11 Card=8168 Bytes=898480)
   2    1     COLLECTION ITERATOR (PICKLER FETCH) OF 'PARALLEL_PIPELINED_FUNCTION'
SQL> SELECT --+ CARDINALITY(nt, 100000)
  2         *
  3  FROM   TABLE(
  4            parallel_pipelined_function(
  5               CURSOR(SELECT * FROM source_table))) nt;
Execution Plan
----------------------------------------------------------
   0      SELECT STATEMENT Optimizer=CHOOSE (Cost=11 Card=8168 Bytes=898480)
   1    0   VIEW (Cost=11 Card=8168 Bytes=898480)
   2    1     COLLECTION ITERATOR (PICKLER FETCH) OF 'PARALLEL_PIPELINED_FUNCTION'
As a strategy, therefore, the undocumented CARDINALITY hint does not seem very suitable! Its behaviour is inconsistent and applies to very limited scenarios. For "sensitive" queries involving joins to pipelined functions, we would be far better using some other form of plan stability, whether it be by hints or stored outlines.
practical uses for pipelined functions
As has been stated, the examples in this article are deliberately simple for the purposes of demonstration. In "real" systems and applications, pipelined functions have far more flexibility and can solve a number of problems, including performance issues. To complete this article, I will briefly describe some examples of my pipelined function implementations.
usage 1: pl/sql-based etl
This is by far the most important and exciting application for pipelined functions and is how Oracle markets the technology. In many cases, PL/SQL ETL routines seem to follow a standard "cursor-for-loop" process that can be expressed by the following pseudo-code:
PROCEDURE load_business_data IS
   CURSOR cursor_name IS
      SELECT some_columns
      FROM   some_staging_tables
      WHERE  they_join_and_match_certain_criteria;  
BEGIN
   FOR record IN cursor LOOP
      --[do transformations, lookups, validations]--
      INSERT INTO parent_table VALUES (record.attributes, variables, etc...);
      INSERT INTO child_table VALUES (record.attributes, variables, etc...);
      --[and so on]--
   END LOOP;
   COMMIT;
END;
Sometimes, the "do transformations, lookups, validations" stage is just too complex to be expressed as a bulk SQL statement (i.e. the "best practice" for ETL in Oracle). Pipelined functions can therefore transform these ETL processes to provide a middle-ground between the slow PL/SQL-only implementation and the fast SQL-only implementation. The pseudo-code for a pipelined function-based ETL process is as follows:
PIPELINED FUNCTION
------------------
FUNCTION pipelined_transformation(
         cursor_in IN refcursor_type
         ) RETURN collection_type PIPELINED PARALLEL_ENABLE (PARTITION cursor_in BY ANY) IS
   fetch_array array_type;
   pipe_record record_type;
BEGIN
   LOOP
      FETCH cursor_in BULK COLLECT INTO fetch_array LIMIT arraysize;
      EXIT WHEN fetch_array.COUNT = 0;
      FOR i in 1 .. fetch_array.COUNT LOOP
         --[do transformations, lookups, validations]--
         --[prepare pipe_record]--
         PIPE ROW (pipe_record);
      END LOOP;
   END LOOP;
   CLOSE cursor_in;
   RETURN;
END;
LOAD PROCEDURE
--------------
PROCEDURE load_business_data IS
BEGIN
   INSERT ALL
      INTO parent_table VALUES (...)
      INTO child_table VALUES (...)
      --[and so on]--
   SELECT *
   FROM   TABLE(
             pipelined_transformation(
                CURSOR( SELECT some_columns
                        FROM   some_staging_tables
                        WHERE  they_join_and_match_certain_criteria )));
   COMMIT;
END;
I have used this technique several times and it is very easy to convert existing row-by-row PL/SQL routines to use this method. In general, the main body of the existing code (the looping PL/SQL parts) become the pipelined function logic. Insert statements are replaced with PIPE ROW statements and the embedded SQL cursor is removed and a cursor parameter added (if the aim is to use parallel SQL and DML). The existing load procedure itself becomes an INSERT..SELECT from the new pipelined function as seen in the pseduo-code above. The performance gains from this re-factoring are two-fold: first, the performance benefits of a bulk SQL INSERT statement; second the potentially massive performance gains of parallel SQL and DML. With the former in serial mode I have achieved up to 30% gain in performance, but with the latter I have achieved up to 80% reduction in load time.
Note that it is also possible to have the pipelined function return different record types for loads involving multiple tables. This will be the subject of a future article.
usage 2: wrappers over dictionary views with long columns
Several views in the data dictionary have LONG columns. These are problematic as we cannot search inside or copy these columns easily. The common views where this causes problems are XXX_VIEWS, XXX_TRIGGERS, XXX_TAB_COLUMNS, XXX_CONSTRAINTS, XXX_TAB_PARTITIONS and XXX_SUBPARTITIONS (where XXX = USER/ALL/DBA as appropriate). Using pipelined functions, we can create a simple wrapper over each view to overcome these problems, taking advantage of PL/SQL's ability to implicitly convert LONG columns to VARCHAR2 (up to 32,767 bytes).
For the underlying object type to each pipelined function, we take a copy of the respective dictionary view structure but with the LONG changed to a CLOB for the relevant attribute. During processing, any LONG values that exceed 32,767 bytes (some large view-texts for example) are handled as special cases using DBMS_SQL. As these are exceptional cases, they do not generally slow down the processing of the majority of records that have LONG values under 32,767 bytes in length. We can then add a view over the pipelined function and read and query the CLOB column as required. In 9i, CLOB columns can be interrogated using Oracle's standard string functions such as INSTR, SUBSTR and LIKE. For additional functionality, we can also add filters to the views using application contexts. This prevents us from returning every record from the underlying views with every query we issue.
usage 3: querying objects with stale/empty statistics
The DBMS_STATS.GATHER_SCHEMA_STATS procedure contains an option to list tables and indexes with stale or empty statistics. As this is a PL/SQL procedure that returns an index-by array of a PL/SQL record, its use is restricted to PL/SQL. However, we can easily add a pipelined function (or even a simple table function) wrapper to this procedure to be able to present a SQL view of the objects. We can also create a view to encapsulate the pipelined function call.
further reading

参考至:http://www.oracle-developer.net/display.php?id=207

               http://docs.oracle.com/cd/B28359_01/appdev.111/b28425/pipe_paral_tbl.htm#ADDCI2140

本文原创,转载请注明出处、作者

如意错误,欢迎指正

邮箱:czmcj@163.com

Overview of pipelined functions(原创)

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
//概况 //基本上,当你希望一个PLSQL(或者java或者c)程序作为数据源,而不是表, //你可能会用到管
WCF实际上是构建了一个框架,这个框架实现了在互联系统中各个Application之间如何通信。使得Develop
在过去的半年里,定期或者不定期地写点东西已经成为了我的一种习惯。可是最近两个月来一直忙于工作
Overview LSMW 是导入数据的一种工具,最大的好处就在于它的灵活性,举一个例子来说 ,different mat
  WCF实际上是构建了一个框架,这个框架实现了在互联系统中各个Application之间如何通信。使得Dev
整个Jar只有 4个class。 http://www.cs.umd.edu/projects/PL/multithreadedtc/overview.html http:/
ACE Overview ACE(ADAPTIVE COmmunication Environment) is a open-source, cross platform, object-
QTP QTP全称(QuickTestProfessional)是Mercury公司开发的一种自动测试工具,使用QTP的目的是想用
ACE Overview ACE(ADAPTIVE COmmunication Environment) is a open-source, cross platform, object-
ACE Overview ACE(ADAPTIVE COmmunication Environment) is a open-source, cross platform, object-
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号