Flink-Application模式提交任务到Yarn集群,包括jar包提交方法和javaApi提交方法
Flink Application对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络
Flink Application
对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。
此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。
所以针对flink per job模式的一些问题,flink 引入了一个新的部署模式–Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。
此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。
提交方法如下:
jar包提交
那么可以将以上代码打成jar包,然后提交运行的方式进行处理。
bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount /home/bd/FLINK/FlinkTest-1.0-SNAPSHOT-jar-with-dependencies.jar
/opt/module/flink-1.11.4/bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount -yD yarn.provided.lib.dirs="hdfs://hadoop113:8020/jar/flink11/libs" /home/bd/FLINK/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar
通过jar提交成功之后,控制台输出如下:
2021-09-26 18:00:54,129 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2021-09-26 18:00:54,529 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop115:35553 of application 'application_1631184398602_0055'.
优雅退出和web访问均与以上相同。
java-api方式提交
提交相关代码如下:
public void crateStreamTaskByFlinkClient() {
//flink的本地配置目录,为了得到flink的配置
// 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
// 则在flink-config.yaml加入
// classloader.resolve-order: parent-first
String configurationDirectory = "/opt/module/flink-1.11.4/conf";
// String configurationDirectory = "/home/lxj/workspace/Olt-Test/bigdata/bigdataserver/src/main/resources/flink/conf";
//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://hadoop113:8020/jar/flink11/libs";
//用户jar
String userJarPath = "hdfs://hadoop113:8020/jar/userTask/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar";
String flinkDistJar = "hdfs://hadoop113:8020/jar/flink11/libs/flink-dist_2.12-1.11.4.jar";
YarnClient yarnClient = YarnClient.createYarnClient();
YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnClient.init(yarnConfiguration);
yarnClient.start();
// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
.create(yarnClient);
//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
flinkConfiguration.set(
PipelineOptions.JARS,
Collections.singletonList(userJarPath));
Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString()));
flinkConfiguration.set(
YarnConfigOptions.FLINK_DIST_JAR,
flinkDistJar);
// 设置为application模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.createClusterSpecification();
// 设置用户jar的参数和主类
ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[] {"test"}, "com.starnet.server.bigdata.flink.WordCount");
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
yarnClient,
clusterInformationRetriever,
true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
log.error("applicationId is {}", applicationId);
log.error("webInterfaceURL is {}", webInterfaceURL);
// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e){
log.error(e.getMessage(), e);
}
}
此方法可以远程提交Flink任务到yarn上运行,并且可以通过javaApi获取到提交之后的applicationId和web地址,以及任务的退出,整体可以不依赖小程序,非常的方便。
更多推荐
所有评论(0)