Spark在执行Action算子时,Spark会根据Action操作之前一系列Transform操作的关联关系生成一个DAG,在后续的操作中对DAG进行Stage划分,生成Task并最终运行。整个过程分为如下三步

  1. DAGScheduler对每个Application进行分析,根据各RDD之间的依赖关系划分stage
  2. DAGScheduler根据划分的每个Stage生成一组Task,将TaskSet提交到TaskScheduler
  3. TaskScheduler启动Executor进行计算

在这一过程最重要的三个类是

  1. org.apache.spark.scheduler.DAGScheduler
  2. org.apache.spark.scheduler.SchedulerBackend(为Task分配计算资源)
  3. org.apache.spark.scheduler.TaskScheduler

将在本篇以及接下来的几篇文章介绍这三个类的协作过程

1.DAGScheduler数据结构

这里写图片描述

2.Job提交

一个Job实际上是从RDD调用一个Action操作开始的,该Action操作最终会进入到org.apache.spark.SparkContext.runJob()方法中,在SparkContext中有多个重载的runJob方法,最终入口是下面这个:
这里写图片描述

可以看到在这个方法的末尾调用了DAGScheduler.runJob()方法

这里写图片描述
在该方法中利用submitJob得到一个JobWaiter实例监听Job的执行情况,如果job成功则输出成功日志否则抛出异常

这里写图片描述

DAGScheduler的submitJob方法首先检查rdd的分区信息,确保rdd分区信息的正确,利用eventProcessLoop对象发送信息,eventProcessLoop对象父类EventLoop的post方法将该提交的job加入到事件队列中,在EventLoop中利用如下数据结构

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

存储事件,可以看到该队列没有设置队列大小,所以要确保队列及时消费避免OOM。在EventLoop类中同时会启动一个线程
来处理提交上来的Job事件,这是一个典型的生产者消费者模式
这里写图片描述

在该线程中是使用子类的onReceive方法处理事件,而在onReceive方法调用了doOnReceive方法处理事件

这里写图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐