一. 前言

​     openGauss smp是指在单机内并行执行SQL从而达到分布式并行计算的效果。如下执行计划所示的dop 1:4 代表着有四个seq同时执行,然后通过Stream算子汇聚,从而实现并行计划。

     本文主要走读代码了解openGauss怎么实现smp功能的。

二. 生成smp执行计划:

      实现smp功能的首先是要生成并行计算的执行计划。在openGauss中,在生成base 表的路径的时候,如果query_dop > 1,都会尝试生成smp并行执行计划,在根据执行计划代价选择走并行还是单机。生成并行计算计划的入口在set_plain_rel_pathlist函数中:

set_plain_rel_pathlist
     add_path(root, rel, create_seqscan_path(root, rel, required_outer));
     if (can_parallel) {
         add_path(root, rel, create_seqscan_path(root, rel, required_outer, u_sess->opt_cxt.query_dop));    // 生成并行执行计划
             pathnode->dop = dop;
             cost_seqscan
                 run_cost += u_sess->opt_cxt.smp_thread_cost * (dop - 1);   // 代价中加上stream线程的启动代价
                     if (u_sess->attr.attr_sql.enable_seqscan_dopcost)
                         run_cost += spc_seq_page_cost * baserel->pages / dop;  // 但是seqscan的代价却减少dop倍,最终根据stream线程的启动时间和扫描数据的减少时间的总和决定是否选用并行执行计划
                     else
                         run_cost += spc_seq_page_cost * baserel->pages;
             
     }

三.  生成汇聚stream算子

       并行计划的结果需要汇总,这个是通过Stream算子实现的。在openGauss中,对于Join,Union,聚合等算子均需要数据聚合后再进行计算,此时便需要通过create_local_gather添加汇聚的stream算子实现聚合。如下为简单的聚合代码流程:

internal_grouping_planner
    make_simple_RemoteQuery
        if (lefttree->dop > 1) {
            create_local_gather
                stream_node = makeNode(Stream);
                stream_plan->lefttree = plan;
    }

四. 启动stream线程

  smp的stream_worker线程是在执行sql的时候再启动的,启动的入口在InitPlan中,如下为代码的调用栈:

InitPlan
    if (plannedstmt->num_streams > 0) {
        StartUpStreamInParallel
            foreach(lc, pairList) {
                StreamProducer *producer = (StreamProducer *)linitial(pair->producerList);   // 依次拿出所有的生产者启动
                StartupStreamThread(stream_state);
                foreach (cell, pair->producerList) {
                    StreamProducer* producer = (StreamProducer*)lfirst(cell);
                    producer->init
                    u_sess->stream_cxt.global_obj->initStreamThread(producer, smpId, pair);  // 启动stream 线程并且绑定smpId
                        ThreadId producerThreadId = ApplyStreamThread(producer);
                            tid = initialize_util_thread(STREAM_WORKER, producer);  // 启动STREAM_WORKER的线程
                               StreamMain
                                   execute_stream_plan(u_sess->stream_cxt.producer_obj);
                                       PlannedStmt* planstmt = producer->getPlan();    // 拿到执行计划
                                       PortalDefineQuery(portal, NULL, "DUMMY", commandTag, lappend(NULL, planstmt), NULL);
                                       PortalStart(portal, producer->getParams(), 0, producer->getSnapShot());  // 执行执行计划
                }
            }
    }

五. 分配block

       与大多数并行计算引擎不同,openGauss实现smp的时候并没有将执行计划转成分布式执行计划,所有的smp worker收到的执行计划是一样的。但是真正执行计划的时候,会根据smp_id来分配不同的block扫描范围,不同线程的smp扫描不同区段的block来实现并行扫描处理。入口在HeapamScanInitParallelSeqscan中:

ExecInitSeqScan
    scan_handler_tbl_init_parallel_seqscan
        tableam_scan_init_parallel_seqscan
            HeapamScanInitParallelSeqscan
                heap_init_parallel_seqscan
                    uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP;   // 根据smp_id分配需要扫描的block     
                    if (scan->rs_base.rs_rangeScanInRedis.isRangeScanInRedis) {
                        scan->rs_base.rs_startblock += paral_blocks;   // 如果设置了扫描偏移,那么再加上线程id对应的block偏移
                    } else {
                        scan->rs_base.rs_startblock = paral_blocks;    // 设置线程id对应的偏移为扫描的bloc kid
                    }


同样的,next_page也需要跳过不属于自己线程的页面:
next_page
    if (scan->dop > 1) {
        if (BackwardScanDirection == dir) {
            ....
        } else {
            page++;
            if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) {
                page += (scan->dop - 1) * PARALLEL_SCAN_GAP;      // 需要跳过不属于自己线程的page
            }
        }
    }

六. 生产者

     smp 中的所谓的生产者其实就是stream-worker 线程,用于读到数据后往生产者-消费者的共享队列中推送。

ExecutePlan
    (*dest->receiveSlot)(slot, dest)
        printLocalRoundRobinTuple
            rec->arg->localRoundRobinStream(tuple)
                StreamProducer::sendByMemory
                    gs_memory_send(tuple, batchSrc, m_sharedContext, nthChannel, nthRow)
                        ExecCopySlot(tupleVec->tupleVector[n], tuple);  // 推送到消费者队列
                        tupleVec->tuplePointer++;   

七. 消费者

     smp中的消费者并没单独的线程,对应的角色其实是由stream算子填充,用户读取生产者推送的数据并且返回个上层。如下与Stream算子读取各个生产者的元组数据并且返回给上层为例进行代码走读。

ExecStream
    node->StreamScan
        ScanMemoryStream
            gs_memory_recv
                gs_find_memory_data(node, &waitnode_count);
                    do {
                        i++;
                        node->sharedContext->scanLoc[u_sess->stream_cxt.smp_id] = i;   // 遍历所有的生产者进行消费
                        gs_consume_memory_data
                            TupleVector* tuplesrc = sharedContext->sharedTuples[u_sess->stream_cxt.smp_id][loc]; 
                            for (int i = 0; i < tuplesrc->tuplePointer; i++) {
                                (void)ExecCopySlot(tupledst->tupleVector[i], tuplesrc->tupleVector[i]);  // 消费掉数据
                            }                        
                    }

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐