Oracle的return pipelined管道函数可以使一次返回的集合类型,变为 逐条返回pipe row(集合中的一条)给SQL层,大大减少内存的使用。Postgresql的return setof函数并不能起到降低内存使用的效果,return next 单条数据只起到了缓存的效果,并不会把数据逐条返回SQL层处理,没有降低内存的效果。
exec_stmt_return_next中的tupledesc从执行计划node中取出,返回值需要满足desc要求,缓存值也会按该desc保存。return next对rec类型和row类型处理的区别rec类型本质上就是tuple,数据和desc都以扩展形式存放在erh中。如果需要转换为tuple,有几个标准函数提供转换功能,且支持类型转换。【转换后调用tuplestore的标准接口缓存tuple】row类型本质上是一个虚拟行(由一组datum位置组成),row->varnos[i]指向某一个datum,如果想把row转换为tuple,需要用exec_eval_datum算出varnos指向的datum的值,然后组装成values和nulls数组,用heap_form_tuple构造。注意这种转换过程不会有类型转换,如果需要的desc和算出来的列类型对不上,返回空。成功【转换后调用tuplestore的标准接口缓存tuple】return next对var类型的处理:var看做单列tuple,按执行计划给的desc转换类型后构造tuple。【转换后调用tuplestore的标准接口缓存tuple】
通用类型转换:exec_cast_value(传入的值不能是eoh真实的头,使用前需要转成eoh存的1be头,1be指向真实头)数组拼接minimaltuple:heap_form_minimal_tuple有一个tuple和desc转换为另一个desc的tuple:convert_tuples_by_position、execute_attr_map_tupletuplestore:用values数组存tuple(用tuplestore_puttuple_common拼好后传tuple):tuplestore_putvalues用HeapTuple存tuple(直接传tuple):tuplestore_puttuple类型根据类型id和mod找出desc:lookup_rowtype_tupdescerh从erh扩展类型拿到紧凑tuple:expanded_record_get_tuple
oracle支持pipelined函数,可以在函数定义时指定RETURN 集合类型 PIPELINED 来说明当前函数是管道函数。
管道函数最大的作用就是可以使一次返回的集合类型,变为 逐条返回,大大减少内存的使用。
例如:嵌套表类型outrecset是函数f_trans的返回值,普通函数只能组装好嵌套表outrecset(全部缓存在内存),一次性返回。如果嵌套表内容较多,可能会占用较大的内存空间。
如果使用管道函数,可以通过pipe row(嵌套表中的一行)来代替语句,函数把嵌套表逐行返回给上层处理,无需缓存,降低内存使用。
ORACLE实例:
TYPE refcur_t IS REF CURSOR RETURN employees%ROWTYPE;
TYPE outrec_typ IS RECORD (
var_num NUMBER(6),
var_char1 VARCHAR2(30),
var_char2 VARCHAR2(30)
);
TYPE outrecset IS TABLE OF outrec_typ;
FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED;
END refcur_pkg;
/
CREATE OR REPLACE PACKAGE BODY refcur_pkg IS
FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED IS
out_rec outrec_typ;
in_rec p%ROWTYPE;
BEGIN
LOOP
FETCH p INTO in_rec; — input row
EXIT WHEN p%NOTFOUND;
out_rec.var_num :=in_rec.employee_id;
out_rec.var_char1 :=in_rec.first_name;
out_rec.var_char2 :=in_rec.last_name;
PIPE ROW(out_rec); — first transformed output row
out_rec.var_char1 :=in_rec.email;
out_rec.var_char2 :=in_rec.phone_number;
PIPE ROW(out_rec); — second transformed output row
END LOOP;
CLOSE p;
RETURN;
END f_trans;
END refcur_pkg;
/
SELECT * FROM TABLE (
refcur_pkg.f_trans (
CURSOR (SELECT * FROM employees WHERE department_id=60)
)
);
在PG中,普通的return语句也是需要一次性返回数据,但PG应该是参考ORACLE实现了return next的功能,也希望逐条返回数据(PG没有集合类型,已普通类型为例):
create or replace function f1(in i int, out j int) returns setof int as $$
begin
j :=i+1;
return next;
j :=i+2;
return next;
return;
end$$ language plpgsql;
select * from f1(42);
j
—-
43
44
但在内核实现中,并不是逐条返回的,return next其实只起到了缓存数据的功能,总的数据集也是一次性返回SQL层的,和直接return没有区别(只有语法上的区别)。
所以PG的return setof函数并不能起到降低内存使用的效果。下面来分析具体过程。
return next目前支持三类数据的返回,var、rec、rows return next也可以不加参数,返回值按out参数列表拼接
exec_stmt_return_next(PLpgSQL_execstate *estate,
PLpgSQL_stmt_return_next *stmt)
{
TupleDesc tupdesc;
int natts;
HeapTuple tuple;
MemoryContext oldcontext;
初始化总结:
1 初始化的过程就是在构造Tuplestorestate,主要动作:
给Tuplestorestate新的内存上下文ExecutorState记录不能随机访问:eflags=EXEC_FLAG_REWIND记录三个操作函数:copytup_heap、writetup_heap、readtup_heap
2 给estate->tuple_store_desc添加desc,desc来源:
从执行计划节点中node(Tuplestorestate)拿到后,传入ExecMakeTableFunctionResultExecMakeTableFunctionResult组装ReturnSetInfo挂到fcinfo->resultinfo上plpgsql_exec_function时从fcinfo中拿出ReturnSetInfo取到descplpgsql_estate_setup将取到的desc存入estate->rsi=rsi
#1 0x00007fe0a3992064 in plpgsql_exec_function (func=0x2419028, fcinfo=0x24da5a8, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:485
#2 0x00007fe0a39ac8f9 in plpgsql_call_handler (fcinfo=0x24da5a8) at pl_handler.c:277
#3 0x0000000000738829 in ExecMakeTableFunctionResult (setexpr=0x24e0b40, econtext=0x24e0a10, argContext=0x24da490, expectedDesc=0x24e1110, randomAccess=false) at execSRF.c:235
#4 0x0000000000753eed in FunctionNext (node=0x24e0800) at nodeFunctionscan.c:95
#5 0x000000000073a081 in ExecScanFetch (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:133
#6 0x000000000073a0f6 in ExecScan (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:182
#7 0x000000000075428c in ExecFunctionScan (pstate=0x24e0800) at nodeFunctionscan.c:270
#8 0x000000000073614e in ExecProcNodeFirst (node=0x24e0800) at execProcnode.c:464
#9 0x000000000072a08a in ExecProcNode (node=0x24e0800) at /src/include/executor/executor.h:262
#10 0x000000000072cb80 in ExecutePlan (estate=0x24e05d8, planstate=0x24e0800, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24d5910, execute_once=true) at execMain.c:1632
#11 0x000000000072a6d1 in standard_ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:364
#12 0x000000000072a50b in ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308
#13 0x0000000000997ba9 in PortalRunSelect (portal=0x2474a28, forward=true, count=0, dest=0x24d5910) at pquery.c:924
#14 0x0000000000997867 in PortalRun (portal=0x2474a28, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x24d5910, altdest=0x24d5910, qc=0x7ffd81e300b0) at pquery.c:768
#15 0x0000000000991408 in exec_simple_query (query_string=0x23c9518 “select * from f1(42);”) at postgres.c:1238
#16 0x0000000000995a3e in PostgresMain (dbname=0x2400998 “postgres”, username=0x23c5178 “mingjie”) at postgres.c:4563
#17 0x00000000008d3cfe in BackendRun (port=0x23f7220) at postmaster.c:4396
#18 0x00000000008d3697 in BackendStartup (port=0x23f7220) at postmaster.c:4124
#19 0x00000000008d00b8 in ServerLoop () at postmaster.c:1791
#20 0x00000000008cf98a in PostmasterMain (argc=1, argv=0x23c3120) at postmaster.c:1463
#21 0x00000000007ada4b in main (argc=1, argv=0x23c3120) at main.c:200
分析:
exec_init_tuple_store(estate);
tupdesc=estate->tuple_store_desc;
natts=tupdesc->natts;
if (stmt->retvarno >=0)
{
PLpgSQL_datum *retvar=estate->datums[stmt->retvarno];
switch (retvar->dtype)
{
初始化函数exec_init_tuple_store
exec_init_tuple_store(PLpgSQL_execstate *estate)
{
ReturnSetInfo *rsi=estate->rsi;
MemoryContext oldcxt;
ResourceOwner oldowner;
// 从”SPI Proc”切换到”ExecutorState”
oldcxt=MemoryContextSwitchTo(estate->tuple_store_cxt);
// 从“Portal”切换到”Portal”
oldowner=CurrentResourceOwner;
CurrentResourceOwner=estate->tuple_store_owner;
// 进入tuplestore_begin_heap函数
estate->tuple_store=tuplestore_begin_heap(rsi->allowedModes & SFRM_Materialize_Random, false, work_mem);
CurrentResourceOwner=oldowner;
MemoryContextSwitchTo(oldcxt);
// 给estate添加DESC,rsi->expectedDesc的来源?
estate->tuple_store_desc=rsi->expectedDesc;
}
进入tuplestore_begin_heap
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
{
// 输入false不允许随机访问、false、8192
Tuplestorestate *state;
int eflags;
// eflags=EXEC_FLAG_REWIND
eflags=randomAccess ?
(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
(EXEC_FLAG_REWIND);
// 进入tuple store模块开始初始化返回Tuplestorestate,注意他会直接拿当前的memcontext
state=tuplestore_begin_common(eflags, interXact, maxKBytes);
// 返回的Tuplestorestate状态:
// state={status=TSS_INMEM, eflags=2, backward=false, interXact=false,
// truncated=false, availMem=8372200, allowedMem=8388608, tuples=0,
// myfile=0x0, context=”ExecutorState”, resowner=”Portal”, copytup=0x0,
// writetup=0x0, readtup=0x0, memtuples=0x24f0d88, memtupdeleted=0,
// memtupcount=0, memtupsize=2048, growmemtuples=true, readptrs=0x24e7a70,
// activeptr=0, readptrcount=1, readptrsize=8, writepos_file=0,writepos_offset=0}
state->copytup=copytup_heap;
state->writetup=writetup_heap;
state->readtup=readtup_heap;
return state;
}
后面根据返回值的不同,进入几个分支。
在进入前,desc已经获取到了:
{
PLpgSQL_var *var=(PLpgSQL_var *) retvar;
Datum retval=var->value;
bool isNull=var->isnull;
Form_pg_attribute attr=TupleDescAttr(tupdesc, 0);
if (natts !=1)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg(“wrong result type supplied in RETURN NEXT”)));
// retval是一个eoh的头,后续处理需要一个1be的头(1be的data部分指向eoh)
retval=MakeExpandedObjectReadOnly(retval, isNull, var->datatype->typlen);
// 转成需要的类型
retval=exec_cast_value(estate,
retval,
&isNull,
var->datatype->typoid,
var->datatype->atttypmod,
attr->atttypid,
attr->atttypmod);
tuplestore_putvalues(estate->tuple_store, tupdesc, &retval, &isNull);
}
break;
执行tuplestore_putvalues保存元组
tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
Datum *values, bool *isnull)
{
MinimalTuple tuple;
MemoryContext oldcxt=MemoryContextSwitchTo(state->context);
tuple=heap_form_minimal_tuple(tdesc, values, isnull);
// 记录使用了多少空间,修改state->availMem
USEMEM(state, GetMemoryChunkSpace(tuple));
tuplestore_puttuple_common(state, (void *) tuple);
MemoryContextSwitchTo(oldcxt);
}
static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
TSReadPointer *readptr;
int i;
ResourceOwner oldowner;
state->tuples++;
switch (state->status)
{
内存态直接用数组缓存tuple,tuple使用的内存是在外层函数切换上下文申请的。
readptr=state->readptrs;
for (i=0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i !=state->activeptr)
{
readptr->eof_reached=false;
readptr->current=state->memtupcount;
}
}
if (state->memtupcount >=state->memtupsize – 1)
{
(void) grow_memtuples(state);
}
state->memtuples[state->memtupcount++]=tuple;
if (state->memtupcount < state->memtupsize && !LACKMEM(state))
return;
PrepareTempTablespaces();
oldowner=CurrentResourceOwner;
CurrentResourceOwner=state->resowner;
state->myfile=BufFileCreateTemp(state->interXact);
CurrentResourceOwner=oldowner;
state->backward=(state->eflags & EXEC_FLAG_BACKWARD) !=0;
state->status=TSS_WRITEFILE;
dumptuples(state);
break;
…
{
PLpgSQL_rec *rec=(PLpgSQL_rec *) retvar;
TupleDesc rec_tupdesc;
TupleConversionMap *tupmap;
拿到record:
{dtype=PLPGSQL_DTYPE_REC, dno=1, refname=0x24db608 “r”, lineno=3, isconst=false, notnull=false, default_val=0x0, datatype={typname=’foo’}, rectypeid=17117, firstfield=-1, erh=0x2509708}
数据和desc都在erh中,列名在firstfield指向的位置。数据类型在datatype中:foo数据类型oid在rectypeid中:17117->foo
instantiate_empty_record_variable(estate, rec);
if (ExpandedRecordIsEmpty(rec->erh))
deconstruct_expanded_record(rec->erh);
// “SPI Proc”切到”ExprContext”
oldcontext=MemoryContextSwitchTo(get_eval_mcontext(estate));
// return erh->er_tupdesc;
rec_tupdesc=expanded_record_get_tupdesc(rec->erh);
// 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第一步:生成转换map
tupmap=convert_tuples_by_position(rec_tupdesc,
tupdesc,
gettext_noop(“wrong record type supplied in RETURN NEXT”));
tuple=expanded_record_get_tuple(rec->erh);
if (tupmap)
// 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第二步:用map生成转换后的元组
tuple=execute_attr_map_tuple(tuple, tupmap);
// 缓存元组
tuplestore_puttuple(estate->tuple_store, tuple);
MemoryContextSwitchTo(oldcontext);
}
break;
必须是两列以上的out参数,直接return next空,才会使用这段逻辑。
{
PLpgSQL_row *row=(PLpgSQL_row *) retvar;
oldcontext=MemoryContextSwitchTo(get_eval_mcontext(estate));
// 必须严格匹配tupdesc的类型,对不上则转换失败
tuple=make_tuple_from_row(estate, row, tupdesc);
if (tuple==NULL)
ereport(ERROR,…)
tuplestore_puttuple(estate->tuple_store, tuple);
MemoryContextSwitchTo(oldcontext);
}
break;
default:
elog(ERROR, “unrecognized dtype: %d”, retvar->dtype);
break;
}
}
create or replace function f1(in i int, out j int) returns setof int as $$
begin
j :=i+1;
return next;
j :=i+2;
return next;
return;
end$$ language plpgsql;
select * from f1(42);
—-
CREATE TABLE foo (fooid INT, foosubid INT, fooname TEXT);
INSERT INTO foo VALUES (1, 2, ‘three’);
INSERT INTO foo VALUES (4, 5, ‘six’);
CREATE OR REPLACE FUNCTION get_all_foo() RETURNS SETOF foo AS
$BODY$
DECLARE
r foo%rowtype;
BEGIN
FOR r IN
SELECT * FROM foo WHERE fooid > 0
LOOP
— can do some processing here
RETURN NEXT r; — return current row of SELECT
END LOOP;
RETURN;
END;
$BODY$
LANGUAGE plpgsql;
SELECT * FROM get_all_foo();
——–
drop function f1(int);
create function f1(in i int, out j int, out k text) returns setof record as $$
begin
j :=i+1;
k :=’foo’;
return next;
j :=j+1;
k :=’foot’;
return next;
return;
end$$ language plpgsql;
select * from f1(42);
以上就是Postgresql源码分析returns setof函数oracle管道pipelined的详细内容,更多关于Postgresql returns setof函数的资料请关注脚本之家其它相关文章!