sparkui 界面地址_Spark(四十七):Spark UI 数据可视化
导入:
Spark UI界面可以包含选项卡:Jobs,Stages,Storage,Enviroment,Executors,SQL
Spark UI(http server)是如何被启动?
接下来让我们从源码入手查看下Spark UI(http server)是如何被启动的,页面中的数据从哪里获取到。
Spark UI中用到的http server是jetty,jetty采用java编写,是比较不错的servlet engine和http server,能嵌入到用户程序中执行,不用用tomcat或jboss那样需要运行在独立jvm进程中。
1)SparkContext初始化时启动SparkUI
Spark UI(http server)在SparkContext初始化的时候被创建:
...
private var _listenerBus: LiveListenerBus = _
private var _statusStore: AppStatusStore = _
...private[spark] def ui: Option[SparkUI] =_ui_listenerBus= newLiveListenerBus(_conf)//Initialize the app status store and listener before SparkEnv is created so that it gets//all events.
_statusStore =AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)。。。。
_ui=
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this),_statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {//For tests, do not enable the UI
None
}//Bind the UI before starting the task scheduler to communicate//the bound port to the cluster manager properly
_ui.foreach(_.bind())。。。
_ui.foreach(_.setAppId(_applicationId))...
其中,_statusStore是AppStatusStore初始化对象,它内部包装了KVStore和AppStatusListener:
KVStore用于存储监控数据,
AppStatusListener注册到事件总线中的appStatus队列中。
_env.securityManager则是SparkEnv中初始化的安全管理器。
SparkContext通过调用SparkUI伴生对象中的create()方法来直接new出SparkUI实例,然后调用bind()方法将SparkUI绑定到Jetty服务。
2)SparkUI类对象初始化
SparkUI调用create方法后会初始化一个SparkUI对象,在SparkUI对象被初始化时,会调用SparkUI的initialize()方法
private[spark] class SparkUI private(
val store: AppStatusStore,
val sc: Option[SparkContext],
val conf: SparkConf,
securityManager: SecurityManager,
var appName: String,
val basePath: String,
val startTime: Long,
val appSparkVersion: String)extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
conf, basePath,"SparkUI")
with Logging
with UIRoot {
val killEnabled= sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
var appId: String=_private var streamingJobProgressListener: Option[SparkListener] =None/**Initialize all components of the server.*/def initialize(): Unit={
val jobsTab= new JobsTab(this, store)
attachTab(jobsTab)
val stagesTab= new StagesTab(this, store)
attachTab(stagesTab)
attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
attachHandler(createRedirectHandler("/", "/jobs/", basePath =basePath))
attachHandler(ApiRootResource.getServletHandler(this))//These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler("/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler("/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods= Set("GET", "POST")))
}
SparkUI类中有3个属性成员:
killEnabled由配置项spark.ui.killEnable控制。如果为true,则会在Spark Web UI界面中展示强行杀掉Spark Application Job的开关;
appId就是当前的Application ID;
streamingJobProgressListener是用于Spark Streaming作业进度的监听器。
在initialize()方法中,
首先,会创建JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab这5个Tab,并调用了attachTab()方法注册到Web UI。这里的Tab是Spark UI中的标签页,参考上图,名称也是一一对应。
然后,调用addStaticHandler()方法创建静态资源的ServletContextHandler,又调用createRedirectHandler()创建一些重定向的ServletContextHandler。
最后,逐一调用attachHandler()方法注册到Web UI。
备注:ServletContextHandler是Jetty中一个功能完善的处理器,负责接收并处理HTTP请求,再投递给Servlet。
3)执行bind()方法启动jetty服务
在上边SparkContext初始化时,创建了SparkUI对象,将会调用bind()方法将SparkUI绑定到Jetty服务,这个bind()方法SparkUI子类WebUI中的一个方法。
WebUI属性成员和Getter方法
protected val tabs =ArrayBuffer[WebUITab]()protected val handlers =ArrayBuffer[ServletContextHandler]()protected val pageToHandlers = newHashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]protected var serverInfo: Option[ServerInfo] =Noneprotected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
conf.get(DRIVER_HOST_ADDRESS))private val className = Utils.getFormattedClassName(this)
def getBasePath: String=basePath
def getTabs: Seq[WebUITab]=tabs
def getHandlers: Seq[ServletContextHandler]=handlers
def getSecurityManager: SecurityManager= securityManager
WebUI属性成员有6个:
tabls:持有WebUITab(Web UI中的tab页)的缓存;
handlers:持有Jetty ServletContextHandler的缓存;
pageToHandlers:保存WebUIPage(WebUITab的下一级组件)与其对应的ServletContextHandler的映射关系;
serverInfo:当前Web UI对应的Jetty服务器信息;
publicHostName:当前Web UI对应的Jetty服务主机名。先通过系统环境变量SPARK_PUBLIC_DNS获取,在通过spark.driver.host配置项获取。
className:当前类的名称,用Utils.getFormattedClassName()方法格式化过。
Getter方法有4个:
getTabs()和getHandlers()都是简单地获得对应属性的值;
getBasePath()获得构造参数中定义的Web UI基本路劲;
getSecurityManager()则取得构造参数中传入的安全管理器。
WebUI提供的attache/detach类方法
这些方法都是成对出现,一共有3对:
attachTab/detachTab:用于注册和移除WebUIPage;
attachPage/detachPage:用于注册和移除WebUIPage;
attachHandler/detaHandler:用于注册和移除ServletContextPage。
/**Attaches a tab to this UI, along with all of its attached pages.*/def attachTab(tab: WebUITab): Unit={
tab.pages.foreach(attachPage)
tabs+=tab
}/**Detaches a tab from this UI, along with all of its attached pages.*/def detachTab(tab: WebUITab): Unit={
tab.pages.foreach(detachPage)
tabs-=tab
}/**Detaches a page from this UI, along with all of its attached handlers.*/def detachPage(page: WebUIPage): Unit={
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
}/**Attaches a page to this UI.*/def attachPage(page: WebUIPage): Unit={
val pagePath= "/" +page.prefix
val renderHandler=createServletHandler(pagePath,
(request: HttpServletRequest)=>page.render(request), securityManager, conf, basePath)
val renderJsonHandler= createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest)=>page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
val handlers=pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
handlers+=renderHandler
}/**Attaches a handler to this UI.*/def attachHandler(handler: ServletContextHandler): Unit={
handlers+=handler
serverInfo.foreach(_.addHandler(handler))
}/**Detaches a handler from this UI.*/def detachHandler(handler: ServletContextHandler): Unit={
handlers-=handler
serverInfo.foreach(_.removeHandler(handler))
}def detachHandler(path: String): Unit={
handlers.find(_.getContextPath()==path).foreach(detachHandler)
}def addStaticHandler(resourceBase: String, path: String= "/static"): Unit ={
attachHandler(JettyUtils.createStaticHandler(resourceBase, path))
}
attachPage()方法流程:
1)调用Jetty工具类JettyUtils的createServletHandler()方法,为WebUIPage的两个渲染方法render()和readerJson()创建ServletContextHandler,也就是一个WebUIPage需要对应两个处理器。
2)然后,调用上述attachHandler()方法向Jetty注册处理器,并将映射关系写入handlers结构中。
WebUI绑定到Jetty服务
/**Binds to the HTTP server behind this web interface.*/def bind(): Unit={assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")try{
val host= Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo=Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
logInfo(s"Bound $className to $host, and started at $webUrl")
}catch{case e: Exception =>logError(s"Failed to bind $className", e)
System.exit(1)
}
}
这个bind()方法,包含节点信息:
a)其中调用startJettyServer(...)方法,该方法是JettyUtils.scala中的一个方法,这点也说明了SparkUI运行时基于jetty实现的。
b)调用startjettyServer(...)方法传递了host,port参数,这两个参数也是Spark UI访问的ip和端口,我们需要了解下这两个参数具体的配置在哪里。
√ host的获取代码:
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
spark-env.sh包含了参数(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
#-HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
#-SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
#-SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# Options read by executors and drivers running inside the cluster
#-SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
#-SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
#- SPARK_LOCAL_DIRS, storage directories to use on this node forshuffle and RDD data
#- MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so ifyou use Mesos
√ ip的获取代码在SparkUI object静态类中
private[spark] object SparkUI {
val DEFAULT_PORT= 4040val STATIC_RESOURCE_DIR= "org/apache/spark/ui/static"val DEFAULT_POOL_NAME= "default"def getUIPort(conf: SparkConf): Int={
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}/*** Create a new UI backed by an AppStatusStore.*/def create(
sc: Option[SparkContext],
store: AppStatusStore,
conf: SparkConf,
securityManager: SecurityManager,
appName: String,
basePath: String,
startTime: Long,
appSparkVersion: String= org.apache.spark.SPARK_VERSION): SparkUI ={newSparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)
}
}
Spark Web UI渲染
Spark Web UI实际上是一个三层的树形结构,树根节点为WebUI,中层节点是WebUITab,叶子节点是WebUIPage。
UI界面的展示就主要靠WebUITab与WebUIPage来实现。在Spark UI界面中,一个Tab(WebUITab)可以包含一个或多个Page(WebUIPage),且Tab(WebUITab)是可选的。
WebUITab定义:
/*** A tab that represents a collection of pages.
* The prefix is appended to the parent address to form a full path, and must not contain slashes.*/
private[spark] abstract classWebUITab(parent: WebUI, val prefix: String) {
val pages=ArrayBuffer[WebUIPage]()
val name=prefix.capitalize/**Attach a page to this tab. This prepends the page's prefix with the tab's own prefix.*/def attachPage(page: WebUIPage) {
page.prefix= (prefix + "/" + page.prefix).stripSuffix("/")
pages+=page
}/**Get a list of header tabs from the parent UI.*/def headerTabs: Seq[WebUITab]=parent.getTabs
def basePath: String=parent.getBasePath
}
由于一个Tab(WebUITab)可以包含多个Page(WebUIPage),因此WebUITab中属性val pages = ArrayBuffer[WebUIPage]()数组就是用来缓存该Tab(WebUITab)下所有的Page(WebUIPage)。
attachPage(...)方法就用于将Tab(WebUITab)的路径前缀与Page(WebUIPage)的路径前缀拼接在一起,并将其写入pages数组中。
WebUIPage定义:
/*** A page that represents the leaf node in the UI hierarchy.
*
* The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
* If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
* Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
* to form a relative path. The prefix must not contain slashes.*/
private[spark] abstract classWebUIPage(var prefix: String) {
def render(request: HttpServletRequest): Seq[Node]
def renderJson(request: HttpServletRequest): JValue=JNothing
}
render(...)方法用于渲染页面;
renderJson(...)方法用于生成渲染页面对应的JSON字符串。
渲染SparkUI页面
以Executors这个Tab页为例,因为这个页面具有代表性,一个Tab下可以展示两个Page(ExecutorsPage、ExecutorThreadDumpPage)
在Spark UI中Tab下包含页面如下:
executors->ExecutorsPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=1
executors->ExecutorThreadDumpPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=[executorId或者driver]
首先看下ExecutorsTab的代码
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
init()private def init(): Unit ={
val threadDumpEnabled=parent.sc.isDefined&& parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
attachPage(new ExecutorsPage(this, threadDumpEnabled))if(threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this, parent.sc))
}
}
}
其中SparkUITab就是对WebUITab的简单封装,加上了Application名称和Spark版本属性。ExecutorsTab类包含了init()方法,在构造函数中调用了该init()方法,init()方法内部调用了SparkUITab类预定义好的attachPage(...)方法,将ExecutorsPage加入,当属性threadDumpEnabled为true时,也将ExecutorThreadDumpPage加入。
再来看下ExecutorsPage的代码
private[ui] classExecutorsPage(
parent: SparkUITab,
threadDumpEnabled: Boolean)extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node]={
val content=
++
++
}
}
}
render()方法用来渲染页面内容,其流程如下:
1)将content内容封装好;
2)调用UIUtils.headerSparkPage()方法,将content内容响应给浏览器;
3)浏览器加载过程中会调用executorspage.js,该JS内部会通过Rest服务器根据当前applicationId,去获取allexecutos等信息,并将allexecutos信息按照模板executorspage-template.html渲染到executors页面上。
返回allexecutors信息的方法:
@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary]= withUI(_.store.executorList(false))
这里的store正式SparkUI的store。从该方法可以看出来:实际上,spark rest服务提供的数据是存储在SparkContext的AppStatusStore对象store上。
最后看下ExecutorThreadDumpPage的代码
private[ui] classExecutorThreadDumpPage(
parent: SparkUITab,
sc: Option[SparkContext])extends WebUIPage("threadDump") {//stripXSS is called first to remove suspicious characters used in XSS attacks
def render(request: HttpServletRequest): Seq[Node] ={
val executorId=Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId =>UIUtils.decodeURLParameter(executorId)
}.getOrElse {throw new IllegalArgumentException(s"Missing executorId parameter")
}
val time=System.currentTimeMillis()
val maybeThreadDump=sc.get.getExecutorThreadDump(executorId)
val content= maybeThreadDump.map { threadDump =>val dumpRows= threadDump.map { thread =>val threadId=thread.threadId
val blockedBy=thread.blockedByThreadId match {case Some(_) =>
case None => Text("")
}
val heldLocks= thread.holdingLocks.mkString(", ")
onmouseover={s"onMouseOverAndOut($threadId)"}
onmouseout={s"onMouseOverAndOut($threadId)"}>
{threadId}{thread.threadName}{thread.threadState}{blockedBy}{heldLocks}{thread.stackTrace.html}}Updated at {UIUtils.formatDate(time)}
{//scalastyle:off
//scalastyle:on
}
Thread IDThread NameThread StateThread Locks
{dumpRows}UIUtils.headerSparkPage(request, s"Thread dump for executor $executorId", content, parent)
}
}
该页面主要展示当前executor中thread运行情况。
参考:
更多推荐
所有评论(0)