Flink Application

对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

所以针对flink per job模式的一些问题,flink 引入了一个新的部署模式–Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。

提交方法如下:

jar包提交

那么可以将以上代码打成jar包,然后提交运行的方式进行处理。

bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount /home/bd/FLINK/FlinkTest-1.0-SNAPSHOT-jar-with-dependencies.jar

/opt/module/flink-1.11.4/bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount -yD yarn.provided.lib.dirs="hdfs://hadoop113:8020/jar/flink11/libs" /home/bd/FLINK/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar

通过jar提交成功之后,控制台输出如下:

2021-09-26 18:00:54,129 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-09-26 18:00:54,529 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop115:35553 of application 'application_1631184398602_0055'.

优雅退出和web访问均与以上相同。

java-api方式提交

提交相关代码如下:

public void crateStreamTaskByFlinkClient() {

    //flink的本地配置目录,为了得到flink的配置
    // 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
    // 则在flink-config.yaml加入
    // classloader.resolve-order: parent-first
    String configurationDirectory = "/opt/module/flink-1.11.4/conf";
    // String configurationDirectory = "/home/lxj/workspace/Olt-Test/bigdata/bigdataserver/src/main/resources/flink/conf";

    //存放flink集群相关的jar包目录
    String flinkLibs = "hdfs://hadoop113:8020/jar/flink11/libs";
    //用户jar
    String userJarPath = "hdfs://hadoop113:8020/jar/userTask/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar";
    String flinkDistJar = "hdfs://hadoop113:8020/jar/flink11/libs/flink-dist_2.12-1.11.4.jar";

    YarnClient yarnClient = YarnClient.createYarnClient();
    YarnConfiguration yarnConfiguration = new YarnConfiguration();
    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    // 设置日志的,没有的话看不到日志
    YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
        .create(yarnClient);

    //获取flink的配置
    Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
        configurationDirectory);

    flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

    flinkConfiguration.set(
        PipelineOptions.JARS,
        Collections.singletonList(userJarPath));

    Path remoteLib = new Path(flinkLibs);
    flinkConfiguration.set(
        YarnConfigOptions.PROVIDED_LIB_DIRS,
        Collections.singletonList(remoteLib.toString()));

    flinkConfiguration.set(
        YarnConfigOptions.FLINK_DIST_JAR,
        flinkDistJar);

    // 设置为application模式
    flinkConfiguration.set(
        DeploymentOptions.TARGET,
        YarnDeploymentTarget.APPLICATION.getName());

    // yarn application name
    flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

    YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

    ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
        .createClusterSpecification();

    // 设置用户jar的参数和主类
    ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[] {"test"}, "com.starnet.server.bigdata.flink.WordCount");

    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
        flinkConfiguration,
        yarnConfiguration,
        yarnClient,
        clusterInformationRetriever,
        true);

    try {
        ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
            clusterSpecification,
            appConfig);

        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

        ApplicationId applicationId = clusterClient.getClusterId();
        String webInterfaceURL = clusterClient.getWebInterfaceURL();
        log.error("applicationId is {}", applicationId);
        log.error("webInterfaceURL is {}", webInterfaceURL);
        
        // 退出
        // yarnClusterDescriptor.killCluster(applicationId);
    } catch (Exception e){
        log.error(e.getMessage(), e);
    }
}

此方法可以远程提交Flink任务到yarn上运行,并且可以通过javaApi获取到提交之后的applicationId和web地址,以及任务的退出,整体可以不依赖小程序,非常的方便。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐