spark源码-shuffle原理分析-3-MapOutputTracker
spark源码、shuffle原理分析、MapOutputTracker
·
MapOutputTracker
1.概述
本次分析基于spark版本2.11进行;
在spark中,一个job的执行,被dag根据款依赖切分为多个stage,stage根据分区划分为多个task;一个stage的执行,必定存在从上游stage获取数据,当前stage根据数据计算后,数据写出到磁盘的过程;
在这些stage间,数据的通过MapOutputTracker实现数据地址信息的传递,从而实现stage间数据的传递;
接下来我们就MapOutputTracker在stage间如何实现数据地址的传递进行分析;
2.MapOutputTracker的实例化
说明:
- 在构建sparkEnv的时候,会初始化sparkEnv的属性,其中包括mapOutputTracker属性;
- 如果构建的是driver节点的SparkEnv,将构建MapOutputTrackerMaster对象赋值给mapOutputTracker属性完成初始化;
- 如果构建的是executor节点的SparkEnv,将构建MapOutputTrackerWorker对象赋值给mapOutputTracker属性完成初始化;
- 构建mapOutputTracker对象后,会注册MapOutputTrackerMasterEndpoint节点,并将节点引用对象维护在mapOutputTracker中;
- 如果在driver端,会构建MapOutputTrackerMasterEndpoint节点并以MapOutputTracker名称注册到rpcEnv中,然后返回节点引用对象;
- 如果在executor端,会根据MapOutputTracker名称从rpcEnv中获取节点应用对象并返回;
- 构建SparkEnv时,判断是否是driver端,是通过executorId进行判断的,当executorId == "driver"时,此次是为driver端构建SparkEnv,否则为executor端构建SparkEnv;
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
//driver端判断
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
//---------其他代码--------
//实例化mapOutputTracker
val mapOutputTracker = if (isDriver) {
//driver节点构建master
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
//executor节点构建worker
new MapOutputTrackerWorker(conf)
}
//注册master节点,将master节点的引用对象维护到tracker中
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(//构建一个tracker master节点
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
//---------其他代码--------
//实例化SparkEnv对象
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,//初始化mapOutputTracker属性
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
//---------其他代码--------
envInstance
}
//注册tracker master节点,返回节点引用对象
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {//driver节点,将tracker master节点注册到rpcEnv中,并返回节点引用对象
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {//executor节点,根据节点名称查询master节点引用对象并返回
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
//创建Driver端SparkEnv
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
//---------其他代码--------
create(
conf,
SparkContext.DRIVER_IDENTIFIER,//指定executorId为diver
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
//创建Executor端SparkEnv
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
hostname,
None,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
}
object SparkContext extends Logging {
private[spark] val DRIVER_IDENTIFIER = "driver"
}
2.1.构建Driver端MapOutputTrackerMaster
说明:
- 在driver端进行sparkContex实例化过程中,会完成driver端SparkEnv构建;
- 在构建driver的SparkEnv对象过程中,会完成MapOutputTrackerMaster实例化并维护在driver端的SparkEnv中,并且会注册MapOutputTrackerMasterEndpoint节点到rpcEnv中,返回节点引用对象维护在tracker master中;
class SparkContext(config: SparkConf) extends Logging {
private var _env: SparkEnv = _
try {
//初始化SparkContext的SparkEnv属性
_env = createSparkEnv(_conf, isLocal, listenerBus)
}
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
//创建driver端SparkEnv:在这过程中完成Driver端MapOutputTrackerMaster构建工作
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
}
2.2.构建Executor端MapOutputTrackerWorker
说明:
- 在Executor端进行executor创建过程中,会完成executor端SparkEnv构建;
- 在构建Executor的SparkEnv对象过程中,会完成MapOutputTrackerWorker实例化并维护在Executor端的SparkEnv中,并且会从rpcEnv中获取tracker master节点引用对象维护在tracker worker中;
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
Utils.initDaemon(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
//---------其他代码--------
//初始化Executor的SparkEnv:在这过程中完成Executor端MapOutputTrackerWorker构建工作
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
//向rpcEnv注册Executor
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
//---------其他代码--------
}
}
}
3.MapOutputTracker的使用
3.1.任务执行结果信息维护在tracker中
说明:
- shuffleMapTask任务值执行完任务后,会将任务执行的结果信息封装在MapStatus中;
- 然后经过一些列流转,组合将MapStatus缓存在MapOutputTrackerMaster的shuffleStatuses属性中;
- shuffleStatuses属性是一个ConcurrentHashMap结合,元素格式为(Int, ShuffleStatus),key为shuffleId,value为ShuffleStatus对象,ShuffleStatus对象中mapStatuses属性时一个数组,用来缓存MapStatus对象;
- 一个shuffleId对应一个ShuffleStatus对象,一个ShuffleStatus对象缓存0~n个MapStatus对象;
- 一个shuffle有0~n个任务,一个任务对应一个分区,一个任务产生一个分区的MapStatus对象;
- shuffleMapTask的输出请参考【spark源码-shuffle原理分析-1-ShuffleWriter】;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8lgJ4Zry-1661593853206)(/Users/daiqing/Documents/个人资料/gitRepos/studies/spark/学习笔记/shuffle/spark源码-shuffle原理分析-3-MapOutputTracker-shuffle输出状态缓存到tracker中.jpg)]
3.1.1.向MapOutputTrackerMaster注册Map端输出信息
说明:
- MapOutputTrackerMaster记录整个应用程序执行过程中,每个shuffle的所有输出信息;
- 通过ConcurrentHashMap类型的shuffleStatuses属性缓存shuffle输出信息;
- 一次shuffle一条数据;
- key为shuffleId,value为shuffle输出信息对象ShuffleStatus;
- 由于一个shuffleStage存在0n个分区,及0n个任务,每个任务都会有一个MapStatus任务输出信息对象;故一个shuffleId对应由0~n个MapStatus任务输出信息对象;
private[spark] class MapOutputTrackerMaster(
conf: SparkConf,
broadcastManager: BroadcastManager,
isLocal: Boolean)
extends MapOutputTracker(conf) {
//缓存shuffle的输出信息
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
//对map端的输出信息进行缓存
shuffleStatuses(shuffleId).addMapOutput(mapId, status)
}
}
3.1.1.1.缓存分区的map端输出信息
说明:
- ShuffleStatus作为shuffle输出信息类,缓存每个shuffle所有的输出信息;
- Array类型的属性mapStatuses实现每个分区(每个任务)的输出信息的缓存;
- 数组以分区编号为索引,以分区(任务)对应的输出信息对象为value;
private class ShuffleStatus(numPartitions: Int) {
//缓存每个分区对应的输出信息对象
val mapStatuses = new Array[MapStatus](numPartitions)
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
if (mapStatuses(mapId) == null) {
_numAvailableOutputs += 1
invalidateSerializedMapOutputStatusCache()
}
//将map输出信息对象缓存到对应分区上
mapStatuses(mapId) = status
}
}
3.2.从tracker获取任务执行需要的数据
说明:
- 当stage的第一个RDD从节点拉取上游stage写出到节点的数据时,会首先通过
MapOutputTrackerWorker.getMapSizesByExecutorId()
函数获取当前shuffle对应分区的数据分布位置; - shuffleMapTask的数据读取请参考【spark源码-shuffle原理分析-2-ShuffleReader】;
private[spark] class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
context: TaskContext,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
extends ShuffleReader[K, C] with Logging {
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
//定义从tracker中获取分区数据块位置的逻辑
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
//---------其他代码--------
}
}
3.2.1.获取分区数据库位置信息
说明:
- 根据shuffleId获取shuffle所有输出信息对象;
- 将输出信息对象转换为位置信息;
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
//获取shuffle所有输出信息对象
val statuses = getStatuses(shuffleId)
try {
//转换输出位置新对象:转换为每个节点的数据块列表
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
mapStatuses.clear()
throw e
}
}
}
3.2.1.1.获取输出信息对象
说明:
- 首先尝试从本地缓存根据shuffleId获取输出信息对象;获取成功,直接返回;
- 本地获取失败,根据shuffleId从MapOutputTrackerMaster拉取数据,拉取成功,将数据缓存到本地并返回;
- 返回的是MapStatus数组;
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
//从本地缓存中获取
val statuses = mapStatuses.get(shuffleId).orNull
//获取失败,从master拉取
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
//如果其他节点在从本地缓存取数,等待
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
}
// 再次从本地缓存获取
fetchedStatuses = mapStatuses.get(shuffleId).orNull
//获取失败,记录失败shuffleId
if (fetchedStatuses == null) {
// We have to do the fetch, get others to wait for us.
fetching += shuffleId
}
}
//从本地缓存获取失败,根据失败的shuffleId从master拉取
if (fetchedStatuses == null) {
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
//根据shuffleId从MapOutputTrackerMaster拉取数据
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
//数据反序列化
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
//数据缓存到本地
mapStatuses.put(shuffleId, fetchedStatuses)
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
}
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")
//数据拉取成功,返回数据,否则,抛异常
if (fetchedStatuses != null) {
fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
//获取成功,直接返回
statuses
}
}
}
3.2.2.输出信息转为位置信息列表
说明:
- 根据shuffleId、mapId、partitionId组装shuffleBlockId;
- 从mapStatus中根据分区id获取数据块大小;
private[spark] object MapOutputTracker extends Logging {
def convertMapStatuses(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
//以HashMap组装每个节点的数据块列表
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]]
//遍历所有的输出信息对象
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
//空信息抛异常
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
//遍历分区
for (part <- startPartition until endPartition) {
//获取每个分区数据块的大小
val size = status.getSizeForBlock(part)
if (size != 0) {
//构建数据块id,将数据库id与大小添加到列表中
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
}
}
}
}
//返回位置信息迭代器
splitsByAddress.iterator
}
}
4.总结
说明:
- MapOutputTracker有2个子类,分布式MapOutputTrackerMaster、MapOutputTrackerWorker;
- 当driver端构建SparkEnv时,构建MapOutputTrackerMaster对象并注册到rpcEnv中;
- 当executor端构建SparkEnv时,构建MapOutputTrackerWorker对象并获取master的节点引用对象;
- MapOutputTrackerMaster中通过ConcurrentHashMap类型的shuffleStatuses缓存应用程序所有shuffle的shuffle输出信息;
- 一个shuffle一条数据,key为shuffleId,value为ShuffleStatus对象;
- ShuffleStatus类中Array类型的mapStatuses属性缓存一个shuffle的所有map端输出信息;以分区编号为数组索引,以MapStatus为索引对应的元素;
- MapOutputTrackerWorker中通过ConcurrentHashMap类型的mapStatuses缓存当前executor上所有shuffle的输出信息;
- 一个shuffle一条数据,key为shuffleId,value为MapStatus元素数组;
- MapStatus元素数组中,以分区编号为数组索引,以MapStatus为索引对应的元素;
- 一个shuffleMapTask的执行,从获取数据开始,从写出数据结束;
- 在获取上游stage数据时,根据shuffleId从MapOutputTrackerWorker中获取对应数据块信息(位置信息、数据块大小信息);
5.参考资料
Spark任务输出追踪器MapOutputTracker详解
更多推荐
所有评论(0)