springboot集成spark,ETL demo
·
1.依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.springboot.spark</groupId>
<artifactId>spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spark</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spark.version>2.4.5</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.application.yml
spring:
application:
name: springboot-spark
server:
port: 8088
#sparkconfig配置
spark:
appName: ${spring.application.name}
master: local #spark://192.168.203.132:7077连接不上
sparkHome: 1
3.spring bean配置
package com.springboot.spark.config;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>
* Description: TODO
* </p>
*
* @author majun
* @version 1.0
* @date 2020-03-08 02:01
*/
@Configuration
public class SparkConfig {
@Bean
@ConfigurationProperties(prefix = "spark")
public SparkConf getSparkConfig() {
return new SparkConf()
.set("spark.executor.memory","512m")
.set("spark.driver.memory","512m");
}
@Bean
public JavaSparkContext getSparkContext() {
return new JavaSparkContext(getSparkConfig());
}
@Bean
public SparkSession sqlContext() {
return new SparkSession(getSparkContext().sc());
}
}
4.抽象job
package com.springboot.spark.core;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j
public abstract class AbstractSparkJob{
@Autowired
private JavaSparkContext sparkContext;
protected abstract void execute(JavaSparkContext sparkContext, String[] args);
protected void close(JavaSparkContext javaSparkContext) {
javaSparkContext.close();
}
public void startJob(String[] args) {
this.execute(sparkContext, args);//继承并实现该方法
this.close(sparkContext);
}
}
5.WordCount demo & ELT demo
package com.springboot.spark.job;
import com.springboot.spark.core.AbstractSparkJob;
import org.apache.spark.api.java.JavaSparkContext;
import org.mortbay.util.ajax.JSON;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
@Component
public class WordCountJob extends AbstractSparkJob {
@Override
public void execute(JavaSparkContext sparkContext,String[] args) {
//读取文件wordcount后输出
List<Tuple2<Integer, String>> topK = sparkContext.textFile(args[1])
.flatMap(str -> Arrays.asList(str.split("\n| ")).iterator())
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((integer1, integer2) -> integer1 + integer2)
.filter(tuple2 -> tuple2._1.length() > 0)
.mapToPair(tuple2 -> new Tuple2<>(tuple2._2, tuple2._1)) //单词与频数倒过来为新二元组,按频数倒排序取途topK
.sortByKey(false)
.take(10);
for (Tuple2<Integer, String> tuple2 : topK) {
System.out.println(JSON.toString(tuple2));
};
sparkContext.parallelize(topK).coalesce(1).saveAsTextFile(args[2]);
}
}
package com.springboot.spark.core;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.storage.StorageLevel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class ETLServiceDemo {
@Autowired
private SparkSession session;
@PostConstruct
public void etl() throws Exception {
//原表
Properties prod = new Properties();
prod.put("user", "root");
prod.put("password", "123456");
prod.put("driver", "com.mysql.jdbc.Driver");
// 落地表配置
Properties local = new Properties();
local.put("user", "root");
local.put("password", "123456");
local.put("driver", "com.mysql.jdbc.Driver");
writeLive(prod, local, session);
}
public void writeLive(Properties prod, Properties local, SparkSession session) {
long start = System.currentTimeMillis();
Dataset d1 = session.read().option(JDBCOptions.JDBC_BATCH_FETCH_SIZE(), 1000).jdbc("jdbc:mysql://192.168.203.132:3306/test1?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8", "user", prod)
.selectExpr("id", "user_name", "password")
.persist(StorageLevel.MEMORY_ONLY_SER());
d1.createOrReplaceTempView("userTemp");
//从临时表中读取数据
Dataset d2 = session.sql("select * from userTemp");
//讲读取到内存的表处理后再次写到mysql中 local 配置为 目的库配置
d2.write().mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), 1000).jdbc("jdbc:mysql://192.168.203.132:3306/test2?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8", "user", local);
long end = System.currentTimeMillis();
System.out.println("耗时--->>>>" + (end - start) / 1000L);
//运行完后释放内存
d1.unpersist(true);
d2.unpersist(true);
}
}
6.如何启动,启动implements CommandLineRuner ,run方法接受到main方法启动参数
的args[0] 即bean name从bean容器找到job对象执行startJob方法。启动时会先初始化spring 容器,然后运行CommandLineRuner.run()运行完后退出jvm. 即ELT demo->Word Count demo ->退出
package com.springboot.spark;
import com.springboot.spark.core.AbstractSparkJob;
import org.apache.spark.util.Utils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.support.ApplicationObjectSupport;
@SpringBootApplication
public class SparkApplication extends ApplicationObjectSupport implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SparkApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
String className = args[0];
Class clazz = Utils.classForName(className);
Object sparkJob = this.getApplicationContext().getBean(clazz);
if (sparkJob instanceof AbstractSparkJob ){
((AbstractSparkJob) sparkJob).startJob(args);
}else {
logger.error("你指定的启动job类"+className+"不存在");
}
}
}
idea启动指定main方法参数

linux启动:java -jar spark-0.0.1-SNAPSHOT.jar com.springboot.spark.job.WordCountJob /root/test.txt /root/result
7.存在的问题
springboot项目使用spark-submit启动不了。使用java -Dspark.master=spark://192.168.203.132:7077 -jar spark-0.0.1-SNAPSHOT.jar com.springboot.spark.job.WordCountJob /root/test.txt /root/result ,指定了spark.master=spark://192.168.203.132:7077以便在dashboard能监控到job执行情况,但是假如该参数后显示连接成功但实际并不能连接成功。netstat -ntlp|grep 7077 可见正常监听,spark进程修改监听0.0.0.0无用

更多推荐
所有评论(0)