openGauss 之 smp实现代码走读
本文分析了openGauss实现SMP(单机并行)功能的代码实现。首先在查询优化阶段生成并行执行计划,根据代价模型选择是否使用并行扫描;然后通过Stream算子汇聚并行计算结果。在执行阶段启动多个Stream Worker线程,各线程根据smp_id分配不同的数据块范围进行并行扫描。生产者线程将数据推送到共享队列,消费者线程通过Stream算子读取数据并返回给上层。这种设计避免了分布式执行计划转换
一. 前言
openGauss smp是指在单机内并行执行SQL从而达到分布式并行计算的效果。如下执行计划所示的dop 1:4 代表着有四个seq同时执行,然后通过Stream算子汇聚,从而实现并行计划。

本文主要走读代码了解openGauss怎么实现smp功能的。
二. 生成smp执行计划:
实现smp功能的首先是要生成并行计算的执行计划。在openGauss中,在生成base 表的路径的时候,如果query_dop > 1,都会尝试生成smp并行执行计划,在根据执行计划代价选择走并行还是单机。生成并行计算计划的入口在set_plain_rel_pathlist函数中:
set_plain_rel_pathlist
add_path(root, rel, create_seqscan_path(root, rel, required_outer));
if (can_parallel) {
add_path(root, rel, create_seqscan_path(root, rel, required_outer, u_sess->opt_cxt.query_dop)); // 生成并行执行计划
pathnode->dop = dop;
cost_seqscan
run_cost += u_sess->opt_cxt.smp_thread_cost * (dop - 1); // 代价中加上stream线程的启动代价
if (u_sess->attr.attr_sql.enable_seqscan_dopcost)
run_cost += spc_seq_page_cost * baserel->pages / dop; // 但是seqscan的代价却减少dop倍,最终根据stream线程的启动时间和扫描数据的减少时间的总和决定是否选用并行执行计划
else
run_cost += spc_seq_page_cost * baserel->pages;
}
三. 生成汇聚stream算子
并行计划的结果需要汇总,这个是通过Stream算子实现的。在openGauss中,对于Join,Union,聚合等算子均需要数据聚合后再进行计算,此时便需要通过create_local_gather添加汇聚的stream算子实现聚合。如下为简单的聚合代码流程:
internal_grouping_planner
make_simple_RemoteQuery
if (lefttree->dop > 1) {
create_local_gather
stream_node = makeNode(Stream);
stream_plan->lefttree = plan;
}
四. 启动stream线程
smp的stream_worker线程是在执行sql的时候再启动的,启动的入口在InitPlan中,如下为代码的调用栈:
InitPlan
if (plannedstmt->num_streams > 0) {
StartUpStreamInParallel
foreach(lc, pairList) {
StreamProducer *producer = (StreamProducer *)linitial(pair->producerList); // 依次拿出所有的生产者启动
StartupStreamThread(stream_state);
foreach (cell, pair->producerList) {
StreamProducer* producer = (StreamProducer*)lfirst(cell);
producer->init
u_sess->stream_cxt.global_obj->initStreamThread(producer, smpId, pair); // 启动stream 线程并且绑定smpId
ThreadId producerThreadId = ApplyStreamThread(producer);
tid = initialize_util_thread(STREAM_WORKER, producer); // 启动STREAM_WORKER的线程
StreamMain
execute_stream_plan(u_sess->stream_cxt.producer_obj);
PlannedStmt* planstmt = producer->getPlan(); // 拿到执行计划
PortalDefineQuery(portal, NULL, "DUMMY", commandTag, lappend(NULL, planstmt), NULL);
PortalStart(portal, producer->getParams(), 0, producer->getSnapShot()); // 执行执行计划
}
}
}
五. 分配block
与大多数并行计算引擎不同,openGauss实现smp的时候并没有将执行计划转成分布式执行计划,所有的smp worker收到的执行计划是一样的。但是真正执行计划的时候,会根据smp_id来分配不同的block扫描范围,不同线程的smp扫描不同区段的block来实现并行扫描处理。入口在HeapamScanInitParallelSeqscan中:
ExecInitSeqScan
scan_handler_tbl_init_parallel_seqscan
tableam_scan_init_parallel_seqscan
HeapamScanInitParallelSeqscan
heap_init_parallel_seqscan
uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP; // 根据smp_id分配需要扫描的block
if (scan->rs_base.rs_rangeScanInRedis.isRangeScanInRedis) {
scan->rs_base.rs_startblock += paral_blocks; // 如果设置了扫描偏移,那么再加上线程id对应的block偏移
} else {
scan->rs_base.rs_startblock = paral_blocks; // 设置线程id对应的偏移为扫描的bloc kid
}
同样的,next_page也需要跳过不属于自己线程的页面:
next_page
if (scan->dop > 1) {
if (BackwardScanDirection == dir) {
....
} else {
page++;
if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) {
page += (scan->dop - 1) * PARALLEL_SCAN_GAP; // 需要跳过不属于自己线程的page
}
}
}
六. 生产者
smp 中的所谓的生产者其实就是stream-worker 线程,用于读到数据后往生产者-消费者的共享队列中推送。
ExecutePlan
(*dest->receiveSlot)(slot, dest)
printLocalRoundRobinTuple
rec->arg->localRoundRobinStream(tuple)
StreamProducer::sendByMemory
gs_memory_send(tuple, batchSrc, m_sharedContext, nthChannel, nthRow)
ExecCopySlot(tupleVec->tupleVector[n], tuple); // 推送到消费者队列
tupleVec->tuplePointer++;
七. 消费者
smp中的消费者并没单独的线程,对应的角色其实是由stream算子填充,用户读取生产者推送的数据并且返回个上层。如下与Stream算子读取各个生产者的元组数据并且返回给上层为例进行代码走读。
ExecStream
node->StreamScan
ScanMemoryStream
gs_memory_recv
gs_find_memory_data(node, &waitnode_count);
do {
i++;
node->sharedContext->scanLoc[u_sess->stream_cxt.smp_id] = i; // 遍历所有的生产者进行消费
gs_consume_memory_data
TupleVector* tuplesrc = sharedContext->sharedTuples[u_sess->stream_cxt.smp_id][loc];
for (int i = 0; i < tuplesrc->tuplePointer; i++) {
(void)ExecCopySlot(tupledst->tupleVector[i], tuplesrc->tupleVector[i]); // 消费掉数据
}
}
更多推荐
所有评论(0)