spark2原理分析-BlockManagerMaster实现原理
概述本文讲说明spark中BlockManager的基本原理。BlockManager的基本概念BlockManager运行在spark的每个节点上(包括driver和executors),它提供了一个保存和获取本地或远端数据块到内存、磁盘、或off-heap中的统一接口。BlockManage的实现分析数据块管理的总体架构spark数据块管理的总体架构如下图所示:从该架构图可见,...
概述
本文讲说明spark中BlockManager的基本原理。
BlockManager的基本概念
BlockManager运行在spark的每个节点上(包括driver和executors),它提供了一个保存和获取本地或远端数据块到内存、磁盘、或off-heap中的统一接口。
BlockManage的实现分析
数据块管理的总体架构
spark数据块管理的总体架构如下图所示:

从该架构图可见,在spark的每个任务执行器中都有一个blockmanager类实例,该实例会向driver端的BlockManagerMaster对象注册自己的信息。
BlockManagerMaster运行在driver端,负责对数据块的信息进行更新,对各个executor的blockmanager信息进行管理。
spark数据块管理实现类图
我们通过这篇文章知道,在创建sparkEnv对象时会创建一个BlockManagerMaster对象,同时会创建一个BlockManagerMasterEndpoint对象来接收来自各个BlockManager的注册,数据块更新等信息。
在SparkEnv类中的实现代码如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
所以,BlockManagerMaster利用了BlockManagerMasterEndpoint对象提供的各种方法,它进行了更上层的封装。
下面我们来看一下BlockManagerMasterEndpoint的具体实现。
BlockManagerMasterEndpoint类的实现
在该类的代码实现部分有一段注释,如下:
/** BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
* of all slaves' block managers.
*/
大意是说:BlockManagerMasterEndpoint是一个ThreadSafeRpcEndpoint类,它运行在数据块管理的master节点上,它会跟踪所有的slave节点(也就是BlockManager对象)。
该类主要实现了以下一些功能:
- 创建一个从BlockManagerId到BlockManagerInfo的HashMap,这样就可以通过数据块管理的Id,找到数据块的管理信息。代码如下:
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
- 创建一个从执行器id(executor id)到BlockManagerId的HashMap,可以通过执行器id找到BlockManagerId的信息。实现代码如下:
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
- 同一个数据块可能被多个BlockManager管理,所以还需要创建一个Map,通过一个BlockId找到一个BlockManagerId集。实现代码如下:
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- 创建一个线程池,用来处理各个slave的BlockManger发送过来的信息,代码如下:
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
- 获取配置参数spark.storage.replication.topologyMapper的值,并实例化:
private val topologyMapper = {
val topologyMapperClassName = conf.get(
"spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName)
val clazz = Utils.classForName(topologyMapperClassName)
val mapper =
clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
logInfo(s"Using $topologyMapperClassName for getting topology information")
mapper
}
- 获取配置参数spark.storage.replication.proactive的值,默认是false,代码如下:
val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
- 定义函数receiveAndReply,用来处理各个slave的BlockManager发送过来的消息,带函数的原型如下:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
}
该函数主要用来处理以下一些消息:注册BlockManager,获取内存状态,更新块状态信息,获取块状态,获取匹配的块id,删除rdd,删除广播变量,删除块,删除执行器,停止blockMangerMaster,BlockManager心跳信息等。
各个消息的具体实现,在后面的文章中进行详细讲解。
BlockManagerMaster的实现
该类运行在driver端,主要对BlockManager进行管理,处理来自各个BlockManager的消息。该类的声明如下:
private[spark]
class BlockManagerMaster(
var driverEndpoint: RpcEndpointRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging {
...
}
在使用该类时会先创建RpcEndpointRef对象,初始化代码如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
这里实例化了BlockManagerMasterEndpoint类。
BlockManagerMaster通过driverEndpoint来对功能进行封装,实现所有的功能。这些实现主要是通过封装的接口,向BlockManagerMasterEndpoint服务发送消息来实现的。
主要实现了以下一些函数接口:
- removeExecutor函数
实现代码:
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
该函数从driver端的BlockManagerMaster删除死掉executor的blockmanager的信息。该函数仅仅在driver端被调用。注意,该函数是同步的,调用时可能会阻塞。
- removeExecutorAsync函数
def removeExecutorAsync(execId: String) {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}
和removeExecutor函数的功能相同,但该函数是异步的,调用时不会发生阻塞。可以看出,主要是通过向在driver端的BlockManagerMasterEndpointi对象发送RemoveExecutor消息。
-
registerBlockManager函数
在driver端注册BlockManager信息。输入的BlockManagerId不包含toplogy的信息。向master发送RegisterBlockManager消息。 -
通过数据块id获取数据块信息
这里提供了两个函数,一个是给出单个blockId获取单个块的信息;一个是给出块列表,获取一个列表的数据块信息。
分别是: -
更新数据块的信息
主要是向driverEndpoint发送UpdateBlockInfo消息来实现,代码如下:
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = driverEndpoint.askSync[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}
其实就是发送了一个UpdateBlockInfo消息。
-
从driver端获取数据块的位置信息
主要是通过getLocations,getLocationsAndStatus,getLocations,这些函数来实现的。这些函数分别发送:GetLocations,GetLocationsAndStatus,GetLocationsMultipleBlockIds消息。 -
删除给定rdd的所有数据块
该功能通过removeRdd函数来实现。该函数主要通过异步发送RemoveRdd消息来实现。实现代码如下:
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.failed.foreach(e =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
- 删除给定广播变量的所有数据块
实现该功能的函数是:removeBroadcast。主要通过发送RemoveBroadcast消息来实现。要使用该函数时,需要提供广播变量的id,可以指定是否阻塞,是否需要从master端删除。
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
- 从执行器中删除给定id的数据块
发送RemoveBlock消息来实现。代码如下:
def removeBlock(blockId: BlockId) {
driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
}
- 删除给定shuffle id的所以数据块
该函数的声明如下:
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.failed.foreach(e =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
该函数主要发送RemoveShuffle(shuffId)消息。
-
获取内存状态信息
通过getMemoryStatus函数来实现,该函数会发送一个GetMemoryStatus消息。 -
获取存储状态信息
通过getStorageStatus函数来实现,该函数会发送一个GetStorageStatus消息。 -
获取块状态信息
通过getBlockStatus函数来实现,该函数会发送一个GetBlockStatus消息。 -
获取匹配的数据块的信息
该函数会调用参数:filter函数来过滤数据块信息,返回符合条件的数据块id列表。实现代码如下:
def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
timeout.awaitResult(future)
}
- 找出在executor端是否有cache的数据块
def hasCachedBlocks(executorId: String): Boolean = {
driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId))
}
总结
本文分析了BlockManagerMaster的实现原理。
更多推荐
所有评论(0)