导入:

Spark UI界面可以包含选项卡:Jobs,Stages,Storage,Enviroment,Executors,SQL

Spark UI(http server)是如何被启动?

接下来让我们从源码入手查看下Spark UI(http server)是如何被启动的,页面中的数据从哪里获取到。

Spark UI中用到的http server是jetty,jetty采用java编写,是比较不错的servlet engine和http server,能嵌入到用户程序中执行,不用用tomcat或jboss那样需要运行在独立jvm进程中。

1)SparkContext初始化时启动SparkUI

Spark UI(http server)在SparkContext初始化的时候被创建:

...

private var _listenerBus: LiveListenerBus = _

private var _statusStore: AppStatusStore = _

...private[spark] def ui: Option[SparkUI] =_ui_listenerBus= newLiveListenerBus(_conf)//Initialize the app status store and listener before SparkEnv is created so that it gets//all events.

_statusStore =AppStatusStore.createLiveStore(conf)

listenerBus.addToStatusQueue(_statusStore.listener.get)。。。。

_ui=

if (conf.getBoolean("spark.ui.enabled", true)) {

Some(SparkUI.create(Some(this),_statusStore, _conf, _env.securityManager, appName, "",

startTime))

} else {//For tests, do not enable the UI

None

}//Bind the UI before starting the task scheduler to communicate//the bound port to the cluster manager properly

_ui.foreach(_.bind())。。。

_ui.foreach(_.setAppId(_applicationId))...

其中,_statusStore是AppStatusStore初始化对象,它内部包装了KVStore和AppStatusListener:

KVStore用于存储监控数据,

AppStatusListener注册到事件总线中的appStatus队列中。

_env.securityManager则是SparkEnv中初始化的安全管理器。

SparkContext通过调用SparkUI伴生对象中的create()方法来直接new出SparkUI实例,然后调用bind()方法将SparkUI绑定到Jetty服务。

2)SparkUI类对象初始化

SparkUI调用create方法后会初始化一个SparkUI对象,在SparkUI对象被初始化时,会调用SparkUI的initialize()方法

private[spark] class SparkUI private(

val store: AppStatusStore,

val sc: Option[SparkContext],

val conf: SparkConf,

securityManager: SecurityManager,

var appName: String,

val basePath: String,

val startTime: Long,

val appSparkVersion: String)extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),

conf, basePath,"SparkUI")

with Logging

with UIRoot {

val killEnabled= sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

var appId: String=_private var streamingJobProgressListener: Option[SparkListener] =None/**Initialize all components of the server.*/def initialize(): Unit={

val jobsTab= new JobsTab(this, store)

attachTab(jobsTab)

val stagesTab= new StagesTab(this, store)

attachTab(stagesTab)

attachTab(new StorageTab(this, store))

attachTab(new EnvironmentTab(this, store))

attachTab(new ExecutorsTab(this))

addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)

attachHandler(createRedirectHandler("/", "/jobs/", basePath =basePath))

attachHandler(ApiRootResource.getServletHandler(this))//These should be POST only, but, the YARN AM proxy won't proxy POSTs

attachHandler(createRedirectHandler("/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))

attachHandler(createRedirectHandler("/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,

httpMethods= Set("GET", "POST")))

}

SparkUI类中有3个属性成员:

killEnabled由配置项spark.ui.killEnable控制。如果为true,则会在Spark Web UI界面中展示强行杀掉Spark Application Job的开关;

appId就是当前的Application ID;

streamingJobProgressListener是用于Spark Streaming作业进度的监听器。

在initialize()方法中,

首先,会创建JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab这5个Tab,并调用了attachTab()方法注册到Web UI。这里的Tab是Spark UI中的标签页,参考上图,名称也是一一对应。

然后,调用addStaticHandler()方法创建静态资源的ServletContextHandler,又调用createRedirectHandler()创建一些重定向的ServletContextHandler。

最后,逐一调用attachHandler()方法注册到Web UI。

备注:ServletContextHandler是Jetty中一个功能完善的处理器,负责接收并处理HTTP请求,再投递给Servlet。

3)执行bind()方法启动jetty服务

在上边SparkContext初始化时,创建了SparkUI对象,将会调用bind()方法将SparkUI绑定到Jetty服务,这个bind()方法SparkUI子类WebUI中的一个方法。

WebUI属性成员和Getter方法

protected val tabs =ArrayBuffer[WebUITab]()protected val handlers =ArrayBuffer[ServletContextHandler]()protected val pageToHandlers = newHashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]protected var serverInfo: Option[ServerInfo] =Noneprotected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(

conf.get(DRIVER_HOST_ADDRESS))private val className = Utils.getFormattedClassName(this)

def getBasePath: String=basePath

def getTabs: Seq[WebUITab]=tabs

def getHandlers: Seq[ServletContextHandler]=handlers

def getSecurityManager: SecurityManager= securityManager

WebUI属性成员有6个:

tabls:持有WebUITab(Web UI中的tab页)的缓存;

handlers:持有Jetty ServletContextHandler的缓存;

pageToHandlers:保存WebUIPage(WebUITab的下一级组件)与其对应的ServletContextHandler的映射关系;

serverInfo:当前Web UI对应的Jetty服务器信息;

publicHostName:当前Web UI对应的Jetty服务主机名。先通过系统环境变量SPARK_PUBLIC_DNS获取,在通过spark.driver.host配置项获取。

className:当前类的名称,用Utils.getFormattedClassName()方法格式化过。

Getter方法有4个:

getTabs()和getHandlers()都是简单地获得对应属性的值;

getBasePath()获得构造参数中定义的Web UI基本路劲;

getSecurityManager()则取得构造参数中传入的安全管理器。

WebUI提供的attache/detach类方法

这些方法都是成对出现,一共有3对:

attachTab/detachTab:用于注册和移除WebUIPage;

attachPage/detachPage:用于注册和移除WebUIPage;

attachHandler/detaHandler:用于注册和移除ServletContextPage。

/**Attaches a tab to this UI, along with all of its attached pages.*/def attachTab(tab: WebUITab): Unit={

tab.pages.foreach(attachPage)

tabs+=tab

}/**Detaches a tab from this UI, along with all of its attached pages.*/def detachTab(tab: WebUITab): Unit={

tab.pages.foreach(detachPage)

tabs-=tab

}/**Detaches a page from this UI, along with all of its attached handlers.*/def detachPage(page: WebUIPage): Unit={

pageToHandlers.remove(page).foreach(_.foreach(detachHandler))

}/**Attaches a page to this UI.*/def attachPage(page: WebUIPage): Unit={

val pagePath= "/" +page.prefix

val renderHandler=createServletHandler(pagePath,

(request: HttpServletRequest)=>page.render(request), securityManager, conf, basePath)

val renderJsonHandler= createServletHandler(pagePath.stripSuffix("/") + "/json",

(request: HttpServletRequest)=>page.renderJson(request), securityManager, conf, basePath)

attachHandler(renderHandler)

attachHandler(renderJsonHandler)

val handlers=pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())

handlers+=renderHandler

}/**Attaches a handler to this UI.*/def attachHandler(handler: ServletContextHandler): Unit={

handlers+=handler

serverInfo.foreach(_.addHandler(handler))

}/**Detaches a handler from this UI.*/def detachHandler(handler: ServletContextHandler): Unit={

handlers-=handler

serverInfo.foreach(_.removeHandler(handler))

}def detachHandler(path: String): Unit={

handlers.find(_.getContextPath()==path).foreach(detachHandler)

}def addStaticHandler(resourceBase: String, path: String= "/static"): Unit ={

attachHandler(JettyUtils.createStaticHandler(resourceBase, path))

}

attachPage()方法流程:

1)调用Jetty工具类JettyUtils的createServletHandler()方法,为WebUIPage的两个渲染方法render()和readerJson()创建ServletContextHandler,也就是一个WebUIPage需要对应两个处理器。

2)然后,调用上述attachHandler()方法向Jetty注册处理器,并将映射关系写入handlers结构中。

WebUI绑定到Jetty服务

/**Binds to the HTTP server behind this web interface.*/def bind(): Unit={assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")try{

val host= Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")

serverInfo=Some(startJettyServer(host, port, sslOptions, handlers, conf, name))

logInfo(s"Bound $className to $host, and started at $webUrl")

}catch{case e: Exception =>logError(s"Failed to bind $className", e)

System.exit(1)

}

}

这个bind()方法,包含节点信息:

a)其中调用startJettyServer(...)方法,该方法是JettyUtils.scala中的一个方法,这点也说明了SparkUI运行时基于jetty实现的。

b)调用startjettyServer(...)方法传递了host,port参数,这两个参数也是Spark UI访问的ip和端口,我们需要了解下这两个参数具体的配置在哪里。

√  host的获取代码:

val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")

spark-env.sh包含了参数(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):

# Options read when launching programs locally with

# ./bin/run-example or ./bin/spark-submit

#-HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files

#-SPARK_LOCAL_IP, to set the IP address Spark binds to on this node

#-SPARK_PUBLIC_DNS, to set the public dns name of the driver program

# Options read by executors and drivers running inside the cluster

#-SPARK_LOCAL_IP, to set the IP address Spark binds to on this node

#-SPARK_PUBLIC_DNS, to set the public DNS name of the driver program

#- SPARK_LOCAL_DIRS, storage directories to use on this node forshuffle and RDD data

#- MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so ifyou use Mesos

√ ip的获取代码在SparkUI object静态类中

private[spark] object SparkUI {

val DEFAULT_PORT= 4040val STATIC_RESOURCE_DIR= "org/apache/spark/ui/static"val DEFAULT_POOL_NAME= "default"def getUIPort(conf: SparkConf): Int={

conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)

}/*** Create a new UI backed by an AppStatusStore.*/def create(

sc: Option[SparkContext],

store: AppStatusStore,

conf: SparkConf,

securityManager: SecurityManager,

appName: String,

basePath: String,

startTime: Long,

appSparkVersion: String= org.apache.spark.SPARK_VERSION): SparkUI ={newSparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)

}

}

Spark Web UI渲染

Spark Web UI实际上是一个三层的树形结构,树根节点为WebUI,中层节点是WebUITab,叶子节点是WebUIPage。

UI界面的展示就主要靠WebUITab与WebUIPage来实现。在Spark UI界面中,一个Tab(WebUITab)可以包含一个或多个Page(WebUIPage),且Tab(WebUITab)是可选的。

WebUITab定义:

/*** A tab that represents a collection of pages.

* The prefix is appended to the parent address to form a full path, and must not contain slashes.*/

private[spark] abstract classWebUITab(parent: WebUI, val prefix: String) {

val pages=ArrayBuffer[WebUIPage]()

val name=prefix.capitalize/**Attach a page to this tab. This prepends the page's prefix with the tab's own prefix.*/def attachPage(page: WebUIPage) {

page.prefix= (prefix + "/" + page.prefix).stripSuffix("/")

pages+=page

}/**Get a list of header tabs from the parent UI.*/def headerTabs: Seq[WebUITab]=parent.getTabs

def basePath: String=parent.getBasePath

}

由于一个Tab(WebUITab)可以包含多个Page(WebUIPage),因此WebUITab中属性val pages = ArrayBuffer[WebUIPage]()数组就是用来缓存该Tab(WebUITab)下所有的Page(WebUIPage)。

attachPage(...)方法就用于将Tab(WebUITab)的路径前缀与Page(WebUIPage)的路径前缀拼接在一起,并将其写入pages数组中。

WebUIPage定义:

/*** A page that represents the leaf node in the UI hierarchy.

*

* The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.

* If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.

* Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent

* to form a relative path. The prefix must not contain slashes.*/

private[spark] abstract classWebUIPage(var prefix: String) {

def render(request: HttpServletRequest): Seq[Node]

def renderJson(request: HttpServletRequest): JValue=JNothing

}

render(...)方法用于渲染页面;

renderJson(...)方法用于生成渲染页面对应的JSON字符串。

渲染SparkUI页面

以Executors这个Tab页为例,因为这个页面具有代表性,一个Tab下可以展示两个Page(ExecutorsPage、ExecutorThreadDumpPage)

在Spark UI中Tab下包含页面如下:

executors->ExecutorsPage

http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=1

executors->ExecutorThreadDumpPage

http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=[executorId或者driver]

首先看下ExecutorsTab的代码

private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {

init()private def init(): Unit ={

val threadDumpEnabled=parent.sc.isDefined&& parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)

attachPage(new ExecutorsPage(this, threadDumpEnabled))if(threadDumpEnabled) {

attachPage(new ExecutorThreadDumpPage(this, parent.sc))

}

}

}

其中SparkUITab就是对WebUITab的简单封装,加上了Application名称和Spark版本属性。ExecutorsTab类包含了init()方法,在构造函数中调用了该init()方法,init()方法内部调用了SparkUITab类预定义好的attachPage(...)方法,将ExecutorsPage加入,当属性threadDumpEnabled为true时,也将ExecutorThreadDumpPage加入。

再来看下ExecutorsPage的代码

private[ui] classExecutorsPage(

parent: SparkUITab,

threadDumpEnabled: Boolean)extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node]={

val content=

{

++

++

}

UIUtils.headerSparkPage(request,"Executors", content, parent, useDataTables = true)

}

}

render()方法用来渲染页面内容,其流程如下:

1)将content内容封装好;

2)调用UIUtils.headerSparkPage()方法,将content内容响应给浏览器;

3)浏览器加载过程中会调用executorspage.js,该JS内部会通过Rest服务器根据当前applicationId,去获取allexecutos等信息,并将allexecutos信息按照模板executorspage-template.html渲染到executors页面上。

返回allexecutors信息的方法:

@GET

@Path("allexecutors")

def allExecutorList(): Seq[ExecutorSummary]= withUI(_.store.executorList(false))

这里的store正式SparkUI的store。从该方法可以看出来:实际上,spark rest服务提供的数据是存储在SparkContext的AppStatusStore对象store上。

最后看下ExecutorThreadDumpPage的代码

private[ui] classExecutorThreadDumpPage(

parent: SparkUITab,

sc: Option[SparkContext])extends WebUIPage("threadDump") {//stripXSS is called first to remove suspicious characters used in XSS attacks

def render(request: HttpServletRequest): Seq[Node] ={

val executorId=Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId =>UIUtils.decodeURLParameter(executorId)

}.getOrElse {throw new IllegalArgumentException(s"Missing executorId parameter")

}

val time=System.currentTimeMillis()

val maybeThreadDump=sc.get.getExecutorThreadDump(executorId)

val content= maybeThreadDump.map { threadDump =>val dumpRows= threadDump.map { thread =>val threadId=thread.threadId

val blockedBy=thread.blockedByThreadId match {case Some(_) =>

case None => Text("")

}

val heldLocks= thread.holdingLocks.mkString(", ")

onmouseover={s"onMouseOverAndOut($threadId)"}

onmouseout={s"onMouseOverAndOut($threadId)"}>

{threadId}{thread.threadName}{thread.threadState}{blockedBy}{heldLocks}{thread.stackTrace.html}}

Updated at {UIUtils.formatDate(time)}

{//scalastyle:off

Expand All

Search:

//scalastyle:on

}

Thread IDThread NameThread StateThread Locks

{dumpRows}
}.getOrElse(Text("Error fetching thread dump"))

UIUtils.headerSparkPage(request, s"Thread dump for executor $executorId", content, parent)

}

}

该页面主要展示当前executor中thread运行情况。

参考:

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐