1.JobManager启动流程解析.md
本文档采取总分总的方式,先介绍JobManager的启动顺序,再从启动顺序中拆解每个组件最底层的实现。最后再总结一次JobManager的实现。本文主要介绍standalone模式下的JobManager实现。大部分组件的实现都类似,只是在高可用、executeGraph上会有实现的差异。可以把 JobManager 的启动理解为“先把地基服务准备好,再把三大组件创建并启动”。JobMaster。
JobManager启动流程解析
本文档采取总分总的方式,先介绍JobManager的启动顺序,再从启动顺序中拆解每个组件最底层的实现。最后再总结一次JobManager的实现。
本文主要介绍standalone模式下的JobManager实现。大部分组件的实现都类似,只是在高可用、executeGraph上会有实现的差异。
本文的主题是:JobManager 启动时,如何把集群的三大核心组件拉起来——Dispatcher、ResourceManager、WebRestEndpoint。可以把 JobManager 的启动理解为“先把地基服务准备好,再把三大组件创建并启动”。
- ResourceManager(资源管理):负责集群资源的生命周期管理与分配,例如 TaskManager 的注册与心跳、Slot 资源的管理与分配,以及与外部资源提供者(Standalone/YARN/K8s 等)交互以申请或释放资源。
- Dispatcher(作业调度入口):负责作业提交入口与调度编排,接收作业提交请求,创建/恢复
JobMaster(每个作业一个),并维护作业/集群相关的运行信息与对外服务的网关。 - WebRestEndpoint(对外服务入口):提供 Web UI 与 REST API 能力,对外暴露集群与作业的查询、运维与控制接口(例如作业提交、取消、查询状态、拉取 metrics 等)。
在启动三大组件之前,需要先初始化一组“基础服务组件”,为后续组件创建提供运行环境与依赖:
- BlobServer(作业/依赖分发服务):负责作业 Jar、UDF、依赖包等文件的上传、缓存与分发。
- RPC 通信服务(commonRpcService):三大组件之间以及与 TaskManager 等进程之间的核心通信基础设施。
- 指标注册/查询服务(MetricRegistry & MetricQueryService):指标采集、注册、上报与查询的统一入口,支撑 Web/REST 的 metrics 展示与查询。
- 以及 HA/心跳/工作目录/安全令牌 等支撑性能力(standalone 下多为默认实现,但生命周期与依赖关系一致)。
1. 启动入口与类图分析
首先找到 Standalone 启动类,路径位于:
[StandaloneSessionClusterEntrypoint.java](file:///c:/Users/Administrator/Desktop/flink-sourcecode/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java)
类继承关系(UML图):
- AutoCloseableAsync:实现了
AutoCloseable接口,用于异步关闭资源。 - FatalErrorHandler:用于处理致命错误,如内存溢出、线程死亡等。
- ClusterEntrypoint:集群启动基类。内部初始化了 JobManager 集群所需的核心组件。
- StandaloneSessionClusterEntrypoint:继承了
ClusterEntrypoint。其main方法是整个集群的启动入口。
2. 核心启动流程源码拆解
先用“流程视角”总结一下 JobManager 的核心启动顺序(standalone session 模式):
- 启动入口阶段(main):打印环境信息 → 注册信号接收器(用于优雅退出/打印结束日志)→ 解析启动参数并生成集群配置 → 初始化集群入口实例 → 启动集群入口。
- 基础环境准备(startCluster):注册退出机制(拦截用户代码的
System.exit)→ 初始化插件管理 → 初始化文件系统 → 安全上下文(Kerberos 等)→ 进入受控环境后启动集群主流程。 - 运行集群(runCluster):先初始化所有基础服务(
initializeServices)→ 再创建并启动三大核心组件(Dispatcher / ResourceManager / WebRestEndpoint)→ 注册 shutdown 回调,阻止主线程退出,直到应用最终状态完成。
JobManager 的启动流程主要围绕以下四个核心方法展开:
2.1 StandaloneSessionClusterEntrypoint.main
作为启动入口,主要负责解析配置、注册信号处理、初始化入口类实例,并调用基类的启动方法。
public static void main(String[] args) {
// 1. startup checks and logging (打印环境信息)
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
// 2. 信号注册。JobManager接收信号
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 3. 创建集群入口配置。这里面只包含了Java启动的传参。
final EntrypointClusterConfiguration entrypointClusterConfiguration =
// 这个函数会解析参数,返回一个EntrypointClusterConfiguration(里面包含了flink运行需要的所有参数)
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
// 4. 进一步根据传参创建集群配置
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
// 5. 实例化 StandaloneSessionClusterEntrypoint
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
// 6. 调用基类的静态启动方法
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
2.2 ClusterEntrypoint.runClusterEntrypoint
这是一个静态帮助方法,用于包裹实际的启动逻辑,并处理启动失败和集群退出的异常。
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
// 调用实例的 startCluster 方法启动集群
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
int returnCode;
Throwable throwable = null;
try {
// 阻塞等待集群终止,获取退出码
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
} catch (Throwable e) {
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
}
LOG.info(
"Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
}
2.3 ClusterEntrypoint.startCluster
该方法负责集群的基础环境准备,包括安全配置、插件管理、文件系统初始化,并最终在安全上下文中调用 runCluster。
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
// 1. 注册退出机制策略,确保用户代码的 System.exit 被拦截
FlinkSecurityManager.setFromConfiguration(configuration);
// 2. 初始化插件管理器
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
// 3. 对文件系统进行初始化。主要依靠 configuration 和 pluginManager
configureFileSystems(configuration, pluginManager);
// 4. 初始化 security context。Kerberos 是大数据安全认证协议。
SecurityContext securityContext = installSecurityContext(configuration);
// 5. 确保未捕获异常,只打印,不退出。
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
// 6. 在安全上下文中运行集群
securityContext.runSecured(
(Callable<Void>)
() -> {
// 运行集群核心逻辑
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
// 异常处理与资源清理逻辑...
throw new ClusterEntrypointException(
String.format(
"Failed to initialize the cluster entrypoint %s.",
getClass().getSimpleName()),
strippedThrowable);
}
}
2.4 ClusterEntrypoint.initializeServices
在 runCluster 方法中,最先调用的就是 initializeServices。这个方法负责初始化 JobManager 运行所需的各种基础服务组件,如 RPC 系统、高可用服务、心跳服务、BlobServer(文件分发)和 Metric 监控等。
protected void initializeServices(Configuration configuration, PluginManager pluginManager)
throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
// 1. 获取jobmanager的resourceId
resourceId =
configuration
.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
.map(
value ->
DeterminismEnvelope.deterministicValue(
new ResourceID(value)))
.orElseGet(
() ->
DeterminismEnvelope.nondeterministicValue(
ResourceID.generate()));
LOG.debug(
"Initialize cluster entrypoint {} with resource id {}.",
getClass().getSimpleName(),
resourceId);
// 2. 创建 JobManager 的工作目录
workingDirectory =
ClusterEntrypointUtils.createJobManagerWorkingDirectory(
configuration, resourceId);
LOG.info("Using working directory: {}.", workingDirectory);
// 3. rpcsystem 系统被初始化
rpcSystem = RpcSystem.load(configuration);
// 4. 最基础的rpc通信服务 (基于 Akka 或其他 RPC 实现)
commonRpcService =
RpcUtils.createRemoteRpcService(
rpcSystem,
configuration,
configuration.get(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.get(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
// 5. JMX监控,主要是jvm信息
JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));
// 更新配置中的地址和端口,供后续 HA 服务使用
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
// 6. 初始化固定大小的IO线程池
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
// 7. 初始化安全令牌管理器 (Delegation Token)
delegationTokenManager =
DefaultDelegationTokenManagerFactory.create(
configuration,
pluginManager,
commonRpcService.getScheduledExecutor(),
ioExecutor);
// 获取令牌并传递给本地 JVM,这是因为 BlobServer 等组件可能需要连接外部安全文件系统
delegationTokenManager.obtainDelegationTokens();
// 8. HA高可用服务初始化。这里默认是standalone模式,所以用的是StandaloneHaServices
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
// 9. 作业,udf等文件分发服务。初始化 BlobServer 并启动
blobServer =
BlobUtils.createBlobServer(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
haServices.createBlobStore());
blobServer.start();
// 调用完后,将实际绑定的端口设置回配置中
configuration.set(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
// 10. 心跳初始化服务
heartbeatServices = createHeartbeatServices(configuration);
// 11. Failure 增强接口,用于对外暴露和丰富失败信息
failureEnrichers = FailureEnricherUtils.getFailureEnrichers(configuration);
// 12. 监控 MetricRegistry 注册
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
// 创建并启动 metricQueryServiceRpcService
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
commonRpcService.getAddress(),
configuration.get(JobManagerOptions.BIND_HOST),
rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
// 实例化进程级别的 MetricGroup
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
// 13. 存储 ExecutionGraph 的组件(如 MemoryExecutionGraphInfoStore)
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
}
}
2.5 ClusterEntrypoint.runCluster
这里是真正的 JobManager 核心组件初始化的地方。在调用 initializeServices 初始化基础服务(如 HA、RPC通信)之后,会创建和启动三大核心组件:Dispatcher、ResourceManager 和 WebMonitorEndpoint。
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 1. 先初始化最基础的服务组件。比如高可用,心跳,blob服务,rpc通信等。
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
// 2. 创建三大组件的工厂类
// - dispatcher组件
// - resourceManager组件
// - webRestEndpoint组件
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 3. 利用工厂类,传入基础服务,创建并启动三大核心组件
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration, // 集群配置
resourceId.unwrap(), // jobmanager id
ioExecutor, // 线程池
commonRpcService, // rpc服务
haServices, // 高可用服务
blobServer, // blob服务
heartbeatServices, // 心跳服务
delegationTokenManager, // token令牌,主要是用户认证
metricRegistry, // metric服务
executionGraphInfoStore, // 作业执行图
new RpcMetricQueryServiceRetriever( // 构造metric查询服务网关
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers, // 失败增强
this); // 退出作业接口
// 4. 注册关闭回调:获取app的最终运行状态并关闭集群
// whenComplete只有异步任务完成后才会调用,确保了集群不会直接退出
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
}
});
}
}
更多推荐
所有评论(0)