Postgres中UPDATE更新语句源码分析(update parametertype)墙裂推荐

随心笔谈2年前发布 编辑
194 0
🌐 经济型:买域名、轻量云服务器、用途:游戏 网站等 《腾讯云》特点:特价机便宜 适合初学者用 点我优惠购买
🚀 拓展型:买域名、轻量云服务器、用途:游戏 网站等 《阿里云》特点:中档服务器便宜 域名备案事多 点我优惠购买
🛡️ 稳定型:买域名、轻量云服务器、用途:游戏 网站等 《西部数码》 特点:比上两家略贵但是稳定性超好事也少 点我优惠购买



目录PG中UPDATE源码分析整体流程分析解析部分——生成语法解析树UpdateStmt解析部分——生成查询树Query优化器——生成执行计划执行器事务总结

本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本。

以这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程。

SQL流程如下:

parser(语法解析,生成语法解析树UpdateStmt,检查是否有语法层面的错误)

analyze(语义分析, UpdateStmt转为查询树Query, 会查系统表检查有无语义方面的错误)

rewrite(规则重写, 根据规则rules重写查询树Query, 根据事先存储在系统表中的规则进行重写,没有的话不进行重写,另外加一句,视图的实现是根据规则系统实现的,也是在这里需要进行处理)

optimizer(优化器:逻辑优化、物理优化、生成执行计划, 由Query生成对应的执行计划PlannedStmt, 基于代价的优化器,由最佳路径Path生成最佳执行计划Plan)

executor(执行器,会有各种算子,依据执行计划进行处理,火山模型,一次一元组)

storage(存储引擎)。中间还有事务处理。事务处理部分的代码这里不再进行分析,免得将问题复杂化。存储引擎那部分也不进行分析,重点关注解析、优化、执行这三部分。

对应的代码:

exec_simple_query(const?char?*query_string)
//?——-?解析器部分————–
–>?pg_parse_query(query_string);//生成语法解析树
–>?pg_analyze_and_rewrite(parsetree,?query_string,NULL,?0,?NULL);?//?生成查询树Query
–>?parse_analyze(parsetree,?query_string,?paramTypes,?numParams,queryEnv);?//?语义分析
–>?pg_rewrite_query(query);//?规则重写

//?——–优化器———-
–>?pg_plan_queries()

//——–?执行器———-
–>?PortalStart(portal,?NULL,?0,?InvalidSnapshot);
–>?PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);//?执行器执行
–>?PortalDrop(portal,?false);

关键数据结构:、、:

typedef?struct?UpdateStmt
{
?NodeTagtype;
?RangeVar?*relation;
?List*targetList;?//?对应语句中的set?id?=?0;信息在这里
?Node*whereClause;?
?List*fromClause;
?List*returningList;?
?WithClause?*withClause;
}?UpdateStmt;

//?dtea?表
typedef?struct?RangeVar
{
?NodeTagtype;
?char*catalogname;?
?char*schemaname;
?char*relname;
?boolinh;?
?charrelpersistence;?
?Alias*alias;?
?int?location;
}?RangeVar;

//?set?id?=?0;?经transformTargetList()?->?transformTargetEntry,会转为TargetEntry
typedef?struct?ResTarget
{
?NodeTagtype;
?char*name;//?id?column
?List*indirection;?
?Node*val;?//?=?1表达式节点存在这里
?int?location;
}?ResTarget;

用户输入的update语句由字符串会转为可由数据库理解的内部数据结构语法解析树。执行逻辑在中,需要理解flex与bison。

gram.y中Update语法的定义:

//结合这条语句分析?update?dtea?set?id?=?0;
UpdateStmt:?opt_with_clause?UPDATE?relation_expr_opt_alias
?SET?set_clause_list?from_clause?where_or_current_clause?returning_clause
{
?UpdateStmt?*n?=?makeNode(UpdateStmt);
?n->relation?=?$3;
?n->targetList?=?$5;
?n->fromClause?=?$6;
?n->whereClause?=?$7;
?n->returningList?=?$8;
?n->withClause?=?$1;
?$$?=?(Node?*)n;
}
;

set_clause_list:
?set_clause?{?$$?=?$1;?}
?|?set_clause_list?’,’?set_clause?{?$$?=?list_concat($1,$3);?}
;
//?对应的是?set?id?=?0
set_clause:?//?id?=?0
?set_target?’=’?a_expr
{
?$1->val?=?(Node?*)?$3;
?$$?=?list_make1($1);
}
?|?'(‘?set_target_list?’)’?’=’?a_expr
{
?int?ncolumns?=?list_length($2);
?int?i?=?1;
?ListCell?*col_cell;

?foreach(col_cell,?$2)?
?{
ResTarget?*res_col?=?(ResTarget?*)?lfirst(col_cell);
MultiAssignRef?*r?=?makeNode(MultiAssignRef);

r->source?=?(Node?*)?$5;
r->colno?=?i;
r->ncolumns?=?ncolumns;
res_col->val?=?(Node?*)?r;
i++;
?}

?$$?=?$2;
}
;

set_target:
?ColId?opt_indirection
{
?$$?=?makeNode(ResTarget);
?$$->name?=?$1;
?$$->indirection?=?check_indirection($2,?yyscanner);
?$$->val?=?NULL;?
?$$->location?=?@1;
}
;

set_target_list:
?set_target{?$$?=?list_make1($1);?}
?|?set_target_list?’,’?set_target{?$$?=?lappend($1,$3);?}
;

生成了后, 会经由语义分析,生成查询树,以供后续优化器生成执行计划。主要代码在中

analyze.c : transform the raw parse tree into a query tree

parse_analyze()
–>?transformTopLevelStmt(pstate,?parseTree);
–>?transformOptionalSelectInto(pstate,?parseTree->stmt);
–>?transformStmt(pstate,?parseTree);
//?transforms?an?update?statement
–>?transformUpdateStmt(pstate,?(UpdateStmt?*)?parseTree);//?实际由UpdateStmt转为Query的处理函数

具体的我们看一下函数实现:

static?Query?*transformUpdateStmt(ParseState?*pstate,?UpdateStmt?*stmt)?{
?Query*qry?=?makeNode(Query);
?ParseNamespaceItem?*nsitem;
?Node*qual;

?qry->commandType?=?CMD_UPDATE;
?pstate->p_is_insert?=?false;

?
?if?(stmt->withClause)?{
qry->hasRecursive?=?stmt->withClause->recursive;
qry->cteList?=?transformWithClause(pstate,?stmt->withClause);
qry->hasModifyingCTE?=?pstate->p_hasModifyingCTE;
?}

?qry->resultRelation?=?setTargetTable(pstate,?stmt->relation,?stmt->relation->inh,?true,?ACL_UPDATE);
?nsitem?=?pstate->p_target_nsitem;

?
?nsitem->p_lateral_only?=?true;
?nsitem->p_lateral_ok?=?false;

?
?transformFromClause(pstate,?stmt->fromClause);

?
?nsitem->p_lateral_only?=?false;
?nsitem->p_lateral_ok?=?true;

?qual?=?transformWhereClause(pstate,?stmt->whereClause,EXPR_KIND_WHERE,?”WHERE”);
?qry->returningList?=?transformReturningList(pstate,?stmt->returningList);

?
?qry->targetList?=?transformUpdateTargetList(pstate,?stmt->targetList);//?处理SQL语句中的?set?id?=1?

?qry->rtable?=?pstate->p_rtable;
?qry->jointree?=?makeFromExpr(pstate->p_joinlist,?qual);
?qry->hasTargetSRFs?=?pstate->p_hasTargetSRFs;
?qry->hasSubLinks?=?pstate->p_hasSubLinks;

?assign_query_collations(pstate,?qry);

?return?qry;
}

这里面要重点关注一下,会将抽象语法树中的转为查询器的。

typedef?struct?TargetEntry
{
?Exprxpr;
?Expr*expr;?
?AttrNumber?resno;?
?char*resname;
?Indexressortgroupref;?
?Oid?resorigtbl;
?AttrNumber?resorigcol;
?boolresjunk;
}?TargetEntry;

对于其内部处理可参考源码中的相关处理,这里不再细述。需要重点阅读一下README,PG源码中所有的README都是非常好的资料,一定要认真读。

这块的内容很多,主要的逻辑是先进行逻辑优化,比如子查询、子链接、常量表达式、选择下推等等的处理,因为我们要分析的这条语句十分简单,所以逻辑优化的这部分都没有涉及到。物理优化,涉及到选择率,代价估计,索引扫描还是顺序扫描,选择那种连接方式,应用动态规划呢还是基因算法,选择nestloop-join、merge-join还是hash-join等。因为我们这个表没有建索引,更新单表也不涉及到多表连接,所以物理优化这块涉及的也不多。路径生成,生成最佳路径,再由最佳路径生成执行计划。

在路径生成这块,最基础的是对表的扫描方式,比如顺序扫描、索引扫描,再往上是连接方式,采用那种连接方式,再往上是比如排序、Limit等路径……,由底向上生成路径。我们要分析的语句很简单,没有其他处理,就顺序扫描再更新就可以了。

这里先不考虑并行执行计划。我们先看一下其执行计划结果:

postgres@postgres=#?explain?update?dtea?set?id?=?0;
QUERY?PLAN
————————————————————–
?Update?on?dtea(cost=0.00..19.00?rows=900?width=68)
?->Seq?Scan?on?dtea(cost=0.00..19.00?rows=900?width=68)
(2?rows)

下面我们分析一下其执行计划的生成流程:

//?由查询树Query–>?Path?–>?Plan?(PlannedStmt)
pg_plan_queries()
–>?pg_plan_query()
–>?planner()
–>?standard_planner(Query?*parse,?const?char?*query_string,?int?cursorOptions,ParamListInfo?boundParams)
//?由Query—>?PlannerInfo
–>?subquery_planner(glob,?parse,?NULL,false,?tuple_fraction);//?涉及到很多逻辑优化的内容,很多不列出
–>?pull_up_sublinks(root);
–>?pull_up_subqueries(root);?//?这里只列出几个重要的逻辑优化内容,其他的不再列出……
//?如果是update/delete分区表继承表则走inheritance_planner(),其他情况走grouping_planner()
–>?inheritance_planner()?//?update/delete分区表继承表的情况
–>?grouping_planner()
–>?grouping_planner()?//?非分区表、继承表的情况
–>?preprocess_targetlist(root);?//?update虽然只更新一列,但是插入一条新元组的时候,需要知道其他列信息.
–>?rewriteTargetListUD(parse,?target_rte,?target_relation);
–>?expand_targetlist()
–>?query_planner(root,?standard_qp_callback,?&qp_extra);?//?重要
–>?add_base_rels_to_query()
–>?deconstruct_jointree(root);
–>?add_other_rels_to_query(root);?//?展开分区表到PlannerInfo中的相关字段中?
–>?expand_inherited_rtentry()
–>?expand_planner_arrays(root,?num_live_parts);
–>?make_one_rel(root,?joinlist);?
–>?set_base_rel_sizes(root);?
–>?set_rel_size();
?–>?set_append_rel_size(root,?rel,?rti,?rte);?//?如果是分区表或者继承走这里,否则走下面
–>?set_rel_size(root,?childrel,?childRTindex,?childRTE);?//?处理子分区表
?–>?set_plain_rel_size(root,?rel,?rte);
–>?set_plain_rel_size()?//?如果不是分区表或者继承
–>?set_baserel_size_estimates()
–>?set_base_rel_pathlists(root);
–>?set_rel_pathlist(root,?rel,?rti,?root->simple_rte_array[rti]);
?–>?set_append_rel_pathlist(root,?rel,?rti,?rte);?//?生成各分区表的访问路径
–>?make_rel_from_joinlist(root,?joinlist);//?动态规划还是基因规划
–>?standard_join_search()?//?动态规划
–>?geqo()?//?基因规划与动态规划二选一
–>?apply_scanjoin_target_to_paths()
–>?create_modifytable_path()
//?由PlannerInfo—>?RelOptInfo?
–>?fetch_upper_rel(root,?UPPERREL_FINAL,?NULL);
//?由RelOptInfo—>?Path
–>?get_cheapest_fractional_path(final_rel,?tuple_fraction);
//?由?PlannerInfo+Path—>?Plan
–>?create_plan(root,?best_path);
//?后续处理,由Plan?—>?PlannedStmt

核心数据结构:PlannedStmt、PlannerInfo、RelOptInfo(存储访问路径及其代价)、Path

Path:所有的路径都继承自Path,所以这个比较重要。

typedef?struct?Path
{
?NodeTagtype;
?NodeTagpathtype;

?RelOptInfo?*parent;?
?PathTarget?*pathtarget;

?ParamPathInfo?*param_info;?

?boolparallel_aware;?
?boolparallel_safe;?
?int?parallel_workers;?

?
?doublerows;?
?Coststartup_cost;?
?Costtotal_cost;

?List*pathkeys;
?
}?Path;

typedef?struct?ModifyTablePath
{
?Pathpath;?//?可以看到ModifyTablePath继承自Path
?CmdTypeoperation;
?boolcanSetTag;
?IndexnominalRelation;?
?IndexrootRelation;?
?boolpartColsUpdated;?
?List*resultRelations;?
?List*subpaths;
?List*subroots;
?List*withCheckOptionLists;?
?List*returningLists;?
?List*rowMarks;
?OnConflictExpr?*onconflict;?
?int?epqParam;
}?ModifyTablePath;

生成update执行路径,最终都是要生成ModifyTablePath,本例中路径生成过程:Path–>ProjectionPath–>ModifyTablePath,也就是先顺序扫描表,再修改表。后面由路径生成执行计划。

ModifyTablePath?*create_modifytable_path(PlannerInfo?*root,?RelOptInfo?*rel,
CmdType?operation,?bool?canSetTag,
Index?nominalRelation,?Index?rootRelation,
bool?partColsUpdated,
List?*resultRelations,?List?*subpaths,
List?*subroots,
List?*withCheckOptionLists,?List?*returningLists,
List?*rowMarks,?OnConflictExpr?*onconflict,
int?epqParam)
{
?ModifyTablePath?*pathnode?=?makeNode(ModifyTablePath);
?doubletotal_size;
?ListCell?*lc;

?Assert(list_length(resultRelations)?==?list_length(subpaths));
?Assert(list_length(resultRelations)?==?list_length(subroots));
?Assert(withCheckOptionLists?==?NIL?||?list_length(resultRelations)?==?list_length(withCheckOptionLists));
?Assert(returningLists?==?NIL?||?list_length(resultRelations)?==?list_length(returningLists));

?pathnode->path.pathtype?=?T_ModifyTable;
?pathnode->path.parent?=?rel;

?pathnode->path.pathtarget?=?rel->reltarget;?
?
?pathnode->path.param_info?=?NULL;
?pathnode->path.parallel_aware?=?false;
?pathnode->path.parallel_safe?=?false;
?pathnode->path.parallel_workers?=?0;
?pathnode->path.pathkeys?=?NIL;

?
?pathnode->path.startup_cost?=?0;
?pathnode->path.total_cost?=?0;
?pathnode->path.rows?=?0;
?total_size?=?0;
?foreach(lc,?subpaths)
?{
Path*subpath?=?(Path?*)?lfirst(lc);

if?(lc?==?list_head(subpaths))?
?pathnode->path.startup_cost?=?subpath->startup_cost;
pathnode->path.total_cost?+=?subpath->total_cost;
pathnode->path.rows?+=?subpath->rows;
total_size?+=?subpath->pathtarget->width?*?subpath->rows;
?}

?
?if?(pathnode->path.rows?>?0)
total_size?/=?pathnode->path.rows;
?pathnode->path.pathtarget->width?=?rint(total_size);

?pathnode->operation?=?operation;
?pathnode->canSetTag?=?canSetTag;
?pathnode->nominalRelation?=?nominalRelation;
?pathnode->rootRelation?=?rootRelation;
?pathnode->partColsUpdated?=?partColsUpdated;
?pathnode->resultRelations?=?resultRelations;
?pathnode->subpaths?=?subpaths;
?pathnode->subroots?=?subroots;
?pathnode->withCheckOptionLists?=?withCheckOptionLists;
?pathnode->returningLists?=?returningLists;
?pathnode->rowMarks?=?rowMarks;
?pathnode->onconflict?=?onconflict;
?pathnode->epqParam?=?epqParam;

?return?pathnode;
}

现在我们生成了最优的update路径,需要由路径生成执行计划:

Plan?*create_plan(PlannerInfo?*root,?Path?*best_path)
{
?Plan*plan;
?Assert(root->plan_params?==?NIL);?

?
?root->curOuterRels?=?NULL;
?root->curOuterParams?=?NIL;

?
?plan?=?create_plan_recurse(root,?best_path,?CP_EXACT_TLIST);?//?实际实现是在这里

?
?if?(!IsA(plan,?ModifyTable))
apply_tlist_labeling(plan->targetlist,?root->processed_tlist);

?
?SS_attach_initplans(root,?plan);

?
?if?(root->curOuterParams?!=?NIL)
elog(ERROR,?”failed?to?assign?all?NestLoopParams?to?plan?nodes”);

?
?root->plan_params?=?NIL;

?return?plan;
}

//?由最佳路径生成最佳执行计划
static?ModifyTable?*create_modifytable_plan(PlannerInfo?*root,?ModifyTablePath?*best_path)
{
?ModifyTable?*plan;
?List*subplans?=?NIL;
?ListCell?*subpaths,
*subroots;

?
?forboth(subpaths,?best_path->subpaths,?subroots,?best_path->subroots)
?{
Path*subpath?=?(Path?*)?lfirst(subpaths);
PlannerInfo?*subroot?=?(PlannerInfo?*)?lfirst(subroots);
Plan*subplan;

subplan?=?create_plan_recurse(subroot,?subpath,?CP_EXACT_TLIST);

apply_tlist_labeling(subplan->targetlist,?subroot->processed_tlist);

subplans?=?lappend(subplans,?subplan);
?}

?plan?=?make_modifytable(root,best_path->operation,best_path->canSetTag,
best_path->nominalRelation,best_path->rootRelation,
best_path->partColsUpdated,best_path->resultRelations,
subplans,best_path->subroots,best_path->withCheckOptionLists,
best_path->returningLists,best_path->rowMarks,
best_path->onconflict,best_path->epqParam);

?copy_generic_path_info(&plan->plan,?&best_path->path);

?return?plan;
}

最终的执行计划是ModifyTable:

typedef?struct?ModifyTable
{
?Planplan;
?CmdTypeoperation;
?boolcanSetTag;
?IndexnominalRelation;?
?IndexrootRelation;?
?boolpartColsUpdated;?
?List*resultRelations;?
?int?resultRelIndex;?
?int?rootResultRelIndex;?
?List*plans;?
?List*withCheckOptionLists;?
?List*returningLists;?
?List*fdwPrivLists;?
?Bitmapset*fdwDirectModifyPlans;?
?List*rowMarks;
?int?epqParam;
?OnConflictAction?onConflictAction;?
?List*arbiterIndexes;?
?List*onConflictSet;?
?Node*onConflictWhere;?
?IndexexclRelRTI;
?List*exclRelTlist;?
}?ModifyTable;

根据上面的执行计划,去执行。主要是各种算子的实现,其中要理解执行器的运行原理,主要是火山模型,一次一元组。我们看一下其调用过程。

CreatePortal(“”,?true,?true);
PortalDefineQuery(portal,NULL,query_string,commandTag,plantree_list,NULL);
PortalStart(portal,?NULL,?0,?InvalidSnapshot);
PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);
–>?PortalRunMulti()
?–>?ProcessQuery()
–>?ExecutorStart(queryDesc,?0);
?–>?standard_ExecutorStart()
–>?estate?=?CreateExecutorState();?//?创建EState
–>?estate->es_output_cid?=?GetCurrentCommandId(true);?//?获得cid,后面更新的时候要用
–>?InitPlan(queryDesc,?eflags);
?–>?ExecInitNode(plan,?estate,?eflags);
–>?ExecInitModifyTable()?//?初始化ModifyTableState
–>?ExecutorRun(queryDesc,?ForwardScanDirection,?0L,?true);
?–>?standard_ExecutorRun()
–>?ExecutePlan()
?–>?ExecProcNode(planstate);?//?一次一元组?火山模型
–>?node->ExecProcNode(node);
?–>?ExecProcNodeFirst(PlanState?*node)
–>?node->ExecProcNode(node);
?–>?ExecModifyTable(PlanState?*pstate)
–>?ExecUpdate()
?–>?table_tuple_update(Relation?rel,?……)
–>?rel->rd_tableam->tuple_update()
?–>?heapam_tuple_update(Relation?relation,?……)
–>?heap_update(relation,?otid,?tuple,?cid,?……)

–>?ExecutorFinish(queryDesc);
–>?ExecutorEnd(queryDesc);
PortalDrop(portal,?false);

关键数据结构:

//?ModifyTableState?information
typedef?struct?ModifyTableState
{
?PlanState?ps;
?CmdTypeoperation;
?boolcanSetTag;
?boolmt_done;
?PlanState?**mt_plans;
?int?mt_nplans;
?int?mt_whichplan;?
?TupleTableSlot?**mt_scans;?
?ResultRelInfo?*resultRelInfo;?
?ResultRelInfo?*rootResultRelInfo;?
?List?**mt_arowmarks;?
?EPQState?mt_epqstate;?
?boolfireBSTriggers;?

?
?TupleTableSlot?*mt_root_tuple_slot;

?struct?PartitionTupleRouting?*mt_partition_tuple_routing;?

?struct?TransitionCaptureState?*mt_transition_capture;?

?
?struct?TransitionCaptureState?*mt_oc_transition_capture;

?
?TupleConversionMap?**mt_per_subplan_tupconv_maps;
}?ModifyTableState;

核心执行算子实现:

static?TupleTableSlot?*ExecModifyTable(PlanState?*pstate)
{
?ModifyTableState?*node?=?castNode(ModifyTableState,?pstate);
?PartitionTupleRouting?*proute?=?node->mt_partition_tuple_routing;
?EState*estate?=?node->ps.state;
?CmdTypeoperation?=?node->operation;
?ResultRelInfo?*saved_resultRelInfo;
?ResultRelInfo?*resultRelInfo;
?PlanState*subplanstate;
?JunkFilter?*junkfilter;
?TupleTableSlot?*slot;
?TupleTableSlot?*planSlot;
?ItemPointer?tupleid;
?ItemPointerData?tuple_ctid;
?HeapTupleData?oldtupdata;
?HeapTuple?oldtuple;

?CHECK_FOR_INTERRUPTS();

?
?if?(estate->es_epq_active?!=?NULL)
elog(ERROR,?”ModifyTable?should?not?be?called?during?EvalPlanQual”);

?
?if?(node->mt_done)
return?NULL;

?
?if?(node->fireBSTriggers)
?{
fireBSTriggers(node);
node->fireBSTriggers?=?false;
?}

?
?resultRelInfo?=?node->resultRelInfo?+?node->mt_whichplan;
?subplanstate?=?node->mt_plans[node->mt_whichplan];
?junkfilter?=?resultRelInfo->ri_junkFilter;

?
?saved_resultRelInfo?=?estate->es_result_relation_info;
?estate->es_result_relation_info?=?resultRelInfo;

?
?for?(;;)
?{

ResetPerTupleExprContext(estate);

if?(pstate->ps_ExprContext)
?ResetExprContext(pstate->ps_ExprContext);

planSlot?=?ExecProcNode(subplanstate);
if?(TupIsNull(planSlot))
{
?
?node->mt_whichplan++;?//?分区表的update,每个分区分布对应一个subplan,当执行完一个分区再执行下一个分区
?if?(node->mt_whichplan?<?node->mt_nplans)
?{
resultRelInfo++;
subplanstate?=?node->mt_plans[node->mt_whichplan];
junkfilter?=?resultRelInfo->ri_junkFilter;
estate->es_result_relation_info?=?resultRelInfo;
EvalPlanQualSetPlan(&node->mt_epqstate,?subplanstate->plan,?node->mt_arowmarks[node->mt_whichplan]);

if?(node->mt_transition_capture?!=?NULL)?{
?node->mt_transition_capture->tcs_map?=?tupconv_map_for_subplan(node,?node->mt_whichplan);
}
if?(node->mt_oc_transition_capture?!=?NULL)?{
?node->mt_oc_transition_capture->tcs_map?=?tupconv_map_for_subplan(node,?node->mt_whichplan);
}
continue;
?}
?else
break;
}

if?(node->mt_scans[node->mt_whichplan]->tts_ops?!=?planSlot->tts_ops)?{
?ExecCopySlot(node->mt_scans[node->mt_whichplan],?planSlot);
?planSlot?=?node->mt_scans[node->mt_whichplan];
}

if?(resultRelInfo->ri_usesFdwDirectModify)
{
?Assert(resultRelInfo->ri_projectReturning);
?slot?=?ExecProcessReturning(resultRelInfo->ri_projectReturning,?RelationGetRelid(resultRelInfo->ri_RelationDesc),?NULL,?planSlot);

?estate->es_result_relation_info?=?saved_resultRelInfo;
?return?slot;
}

EvalPlanQualSetSlot(&node->mt_epqstate,?planSlot);
slot?=?planSlot;

tupleid?=?NULL;
oldtuple?=?NULL;
if?(junkfilter?!=?NULL)
{
?
?if?(operation?==?CMD_UPDATE?||?operation?==?CMD_DELETE)
?{
charrelkind;
Datumdatum;
boolisNull;

relkind?=?resultRelInfo->ri_RelationDesc->rd_rel->relkind;
if?(relkind?==?RELKIND_RELATION?||?relkind?==?RELKIND_MATVIEW)
{
?datum?=?ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
?
?if?(isNull)
elog(ERROR,?”ctid?is?NULL”);

?tupleid?=?(ItemPointer)?DatumGetPointer(datum);
?tuple_ctid?=?*tupleid;?
?tupleid?=?&tuple_ctid;
}

else?if?(AttributeNumberIsValid(junkfilter->jf_junkAttNo))
{
?datum?=?ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
?
?if?(isNull)
elog(ERROR,?”wholerow?is?NULL”);

?oldtupdata.t_data?=?DatumGetHeapTupleHeader(datum);
?oldtupdata.t_len?=?HeapTupleHeaderGetDatumLength(oldtupdata.t_data);
?ItemPointerSetInvalid(&(oldtupdata.t_self));
?
?oldtupdata.t_tableOid?=?(relkind?==?RELKIND_VIEW)?InvalidOid?:?RelationGetRelid(resultRelInfo->ri_RelationDesc);
?oldtuple?=?&oldtupdata;
}
else
?Assert(relkind?==?RELKIND_FOREIGN_TABLE);
?}

?
?if?(operation?!=?CMD_DELETE)
slot?=?ExecFilterJunk(junkfilter,?slot);
}

switch?(operation)
{
?case?CMD_INSERT:
if?(proute)
?slot?=?ExecPrepareTupleRouting(node,?estate,?proute,?resultRelInfo,?slot);
slot?=?ExecInsert(node,?slot,?planSlot,?NULL,?estate->es_result_relation_info,?estate,?node->canSetTag);
if?(proute)
?estate->es_result_relation_info?=?resultRelInfo;
break;
?case?CMD_UPDATE:
slot?=?ExecUpdate(node,?tupleid,?oldtuple,?slot,?planSlot,
&node->mt_epqstate,?estate,?node->canSetTag);
break;
?case?CMD_DELETE:
slot?=?ExecDelete(node,?tupleid,?oldtuple,?planSlot,
&node->mt_epqstate,?estate,
true,?node->canSetTag,?false,?NULL,?NULL);
break;
?default:
elog(ERROR,?”unknown?operation”);
break;
}

if?(slot)?{
?estate->es_result_relation_info?=?saved_resultRelInfo;
?return?slot;
}
?}

?estate->es_result_relation_info?=?saved_resultRelInfo;?
?fireASTriggers(node);?

?node->mt_done?=?true;

?return?NULL;
}

我们看一下具体执行Update的实现

“`c++

static?TupleTableSlot?*
ExecUpdate(ModifyTableState?*mtstate,
?ItemPointer?tupleid,
?HeapTuple?oldtuple,
?TupleTableSlot?*slot,
?TupleTableSlot?*planSlot,
?EPQState?*epqstate,
?EState?*estate,
?bool?canSetTag)
{
?ResultRelInfo?*resultRelInfo;
?Relation?resultRelationDesc;
?TM_Result?result;
?TM_FailureData?tmfd;
?List*recheckIndexes?=?NIL;
?TupleConversionMap?*saved_tcs_map?=?NULL;

?
?if?(IsBootstrapProcessingMode())
elog(ERROR,?”cannot?UPDATE?during?bootstrap”);

?ExecMaterializeSlot(slot);

?
?resultRelInfo?=?estate->es_result_relation_info;
?resultRelationDesc?=?resultRelInfo->ri_RelationDesc;

?
?if?(resultRelInfo->ri_TrigDesc?&&?resultRelInfo->ri_TrigDesc->trig_update_before_row)
?{
if?(!ExecBRUpdateTriggers(estate,?epqstate,?resultRelInfo,?tupleid,?oldtuple,?slot))
?return?NULL;
?}

?
?if?(resultRelInfo->ri_TrigDesc?&&?resultRelInfo->ri_TrigDesc->trig_update_instead_row)
?{
if?(!ExecIRUpdateTriggers(estate,?resultRelInfo,?oldtuple,?slot))
?return?NULL;
?}
?else?if?(resultRelInfo->ri_FdwRoutine)
?{

if?(resultRelationDesc->rd_att->constr?&&?resultRelationDesc->rd_att->constr->has_generated_stored)
?ExecComputeStoredGenerated(estate,?slot,?CMD_UPDATE);

slot?=?resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate,?resultRelInfo,?slot,?planSlot);

if?(slot?==?NULL)
?return?NULL;

slot->tts_tableOid?=?RelationGetRelid(resultRelationDesc);
?}
?else
?{
LockTupleMode?lockmode;
boolpartition_constraint_failed;
boolupdate_indexes;

slot->tts_tableOid?=?RelationGetRelid(resultRelationDesc);

if?(resultRelationDesc->rd_att->constr?&&?resultRelationDesc->rd_att->constr->has_generated_stored)
?ExecComputeStoredGenerated(estate,?slot,?CMD_UPDATE);

lreplace:;

ExecMaterializeSlot(slot);

partition_constraint_failed?=?resultRelInfo->ri_PartitionCheck?&&?!ExecPartitionCheck(resultRelInfo,?slot,?estate,?false);

if?(!partition_constraint_failed?&&?resultRelInfo->ri_WithCheckOptions?!=?NIL)
{
?
?ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,?resultRelInfo,?slot,?estate);
}

if?(partition_constraint_failed)
{
?booltuple_deleted;
?TupleTableSlot?*ret_slot;
?TupleTableSlot?*orig_slot?=?slot;
?TupleTableSlot?*epqslot?=?NULL;
?PartitionTupleRouting?*proute?=?mtstate->mt_partition_tuple_routing;
?int?map_index;
?TupleConversionMap?*tupconv_map;

?
?if?(((ModifyTable?*)?mtstate->ps.plan)->onConflictAction?==?ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
?errmsg(“invalid?ON?UPDATE?specification”),
?errdetail(“The?result?tuple?would?appear?in?a?different?partition?than?the?original?tuple.”)));

?
?if?(proute?==?NULL)
ExecPartitionCheckEmitError(resultRelInfo,?slot,?estate);

?
?ExecDelete(mtstate,?tupleid,?oldtuple,?planSlot,?epqstate,?estate,?false,?false,?true,?&tuple_deleted,?&epqslot);

?
?if?(!tuple_deleted)
?{

if?(TupIsNull(epqslot))
?return?NULL;
else
{
?slot?=?ExecFilterJunk(resultRelInfo->ri_junkFilter,?epqslot);
?goto?lreplace;
}
?}

?
?if?(mtstate->mt_transition_capture)
saved_tcs_map?=?mtstate->mt_transition_capture->tcs_map;

?
?map_index?=?resultRelInfo?-?mtstate->resultRelInfo;
?Assert(map_index?>=?0?&&?map_index?<?mtstate->mt_nplans);
?tupconv_map?=?tupconv_map_for_subplan(mtstate,?map_index);
?if?(tupconv_map?!=?NULL)
slot?=?execute_attr_map_slot(tupconv_map->attrMap,?slot,?mtstate->mt_root_tuple_slot);

?
?Assert(mtstate->rootResultRelInfo?!=?NULL);
?slot?=?ExecPrepareTupleRouting(mtstate,?estate,?proute,?mtstate->rootResultRelInfo,?slot);

?ret_slot?=?ExecInsert(mtstate,?slot,?planSlot,
orig_slot,?resultRelInfo,
estate,?canSetTag);

?
?estate->es_result_relation_info?=?resultRelInfo;
?if?(mtstate->mt_transition_capture)
?{
mtstate->mt_transition_capture->tcs_original_insert_tuple?=?NULL;
mtstate->mt_transition_capture->tcs_map?=?saved_tcs_map;
?}

?return?ret_slot;
}

if?(resultRelationDesc->rd_att->constr)
?ExecConstraints(resultRelInfo,?slot,?estate);

result?=?table_tuple_update(resultRelationDesc,?tupleid,?slot,?estate->es_output_cid,
?estate->es_snapshot,?estate->es_crosscheck_snapshot,?true,&tmfd,?&lockmode,?&update_indexes);

switch?(result)
{
?case?TM_SelfModified:

if?(tmfd.cmax?!=?estate->es_output_cid)
?ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg(“tuple?to?be?updated?was?already?modified?by?an?operation?triggered?by?the?current?command”),
errhint(“Consider?using?an?AFTER?trigger?instead?of?a?BEFORE?trigger?to?propagate?changes?to?other?rows.”)));

return?NULL;

?case?TM_Ok:
break;

?case?TM_Updated:
{
?TupleTableSlot?*inputslot;
?TupleTableSlot?*epqslot;

?if?(IsolationUsesXactSnapshot())
ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg(“could?not?serialize?access?due?to?concurrent?update”)));

?
?inputslot?=?EvalPlanQualSlot(epqstate,?resultRelationDesc,resultRelInfo->ri_RangeTableIndex);

?result?=?table_tuple_lock(resultRelationDesc,?tupleid,?estate->es_snapshot,inputslot,?estate->es_output_cid,?lockmode,?LockWaitBlock,?TUPLE_LOCK_FLAG_FIND_LAST_VERSION,&tmfd);

?switch?(result)
?{
case?TM_Ok:
?Assert(tmfd.traversed);
?epqslot?=?EvalPlanQual(epqstate,?resultRelationDesc,?resultRelInfo->ri_RangeTableIndex,?inputslot);
?if?(TupIsNull(epqslot))

return?NULL;

?slot?=?ExecFilterJunk(resultRelInfo->ri_junkFilter,?epqslot);
?goto?lreplace;

case?TM_Deleted:
?
?return?NULL;

case?TM_SelfModified:

?
?if?(tmfd.cmax?!=?estate->es_output_cid)
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
?errmsg(“tuple?to?be?updated?was?already?modified?by?an?operation?triggered?by?the?current?command”),errhint(“Consider?using?an?AFTER?trigger?instead?of?a?BEFORE?trigger?to?propagate?changes?to?other?rows.”)));
?return?NULL;

default:
?
?elog(ERROR,?”unexpected?table_tuple_lock?status:?%u”,?result);
?return?NULL;
?}
}

break;

?case?TM_Deleted:
if?(IsolationUsesXactSnapshot())
?ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg(“could?not?serialize?access?due?to?concurrent?delete”)));

return?NULL;

?default:
elog(ERROR,?”unrecognized?table_tuple_update?status:?%u”,
result);
return?NULL;
}

if?(resultRelInfo->ri_NumIndices?>?0?&&?update_indexes)
?recheckIndexes?=?ExecInsertIndexTuples(slot,?estate,?false,?NULL,?NIL);
?}

?if?(canSetTag)
(estate->es_processed)++;

?
?ExecARUpdateTriggers(estate,?resultRelInfo,?tupleid,?oldtuple,?slot,recheckIndexes,mtstate->operation?==?CMD_INSERTmtstate->mt_oc_transition_capture?:?mtstate->mt_transition_capture);

?list_free(recheckIndexes);

?
?if?(resultRelInfo->ri_WithCheckOptions?!=?NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK,?resultRelInfo,?slot,?estate);

?if?(resultRelInfo->ri_projectReturning)?
return?ExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelationDesc),slot,?planSlot);

?return?NULL;
}

再往下就是涉及到存储引擎的部分了,我们重点看一下其对外的接口输入参数。重点是这4个参数:

relation – table to be modified (caller must hold suitable lock) (要更新的那个表)

otid – TID of old tuple to be replaced (要更新的元组ID,对应的是老的元组,更新后相当于是插入一条新元组,老元组的tid值要更新为新的tid值)

slot – newly constructed tuple data to store (新元组的值)

cid – update command ID (used for visibility test, and stored into cmax/cmin if successful) (cid值,事务相关) 执行器层面的更新算子是建立在存储引擎提供的底层table_tuple_update接口之上的。是我们编写ExecUpdate以及ExecModifyTable的基础。

static?inline?TM_Result
table_tuple_update(Relation?rel,?ItemPointer?otid,?TupleTableSlot?*slot,?CommandId?cid,?
?Snapshot?snapshot,?Snapshot?crosscheck,?bool?wait,?TM_FailureData?*tmfd,?LockTupleMode?*lockmode,?bool?*update_indexes)
{
?return?rel->rd_tableam->tuple_update(rel,?otid,?slot,?cid,?
?snapshot,?crosscheck,?wait,?tmfd,?lockmode,?update_indexes);
}

这一块主要是要理解PG中update语句并不是原地更新元组,而是插入一条新元组。因为PG实现MVCC与Mysql,Oracle的实现方式有所不同,并不是通过undo日志实现的,相当于把undo日志记录到了原有的表中,并不是单独存放在一个地方。具体的不再细述,内容太多了,以后再分析事务部分。

好了,内容很多,分析源码的时候,涉及到的知识点以及逻辑是非常多的,我们最好每次分析只抓一个主干,不然每个都分析,最后就会比较乱。就先分析到这里吧。

到此这篇关于Postgres中UPDATE更新语句源码分析的文章就介绍到这了,更多相关Postgres中UPDATE源码内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:解决postgresql 数据库 update更新慢的原因基于postgresql行级锁for update测试实操MySQL+PostgreSQL批量插入更新insertOrUpdatePostgreSQL批量update与oracle差异详解

© 版权声明

相关文章