spark sql和jdbc将数据写入mysql的对比
目录jdbcspark sql引用的库类效率对比连接mysql错误jdbcpublic static void jdbc() {// test为数据库名,spark为表名final String url = "jdbc:mysql://localhost:3306/test";final String username = "root";final String pas...
jdbc
public static void jdbc() {
// test为数据库名,spark为表名
final String url = "jdbc:mysql://localhost:3306/test";
final String username = "root";
final String password = "123456";
final String sql = "INSERT INTO spark(head, tail, relationship, count, date) VALUES(?,?,?,?,?);";
List<String[]> list = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
String[] v = {"head_"+i, "tail_"+i, "test", String.valueOf(i), "2019-10-09"};
list.add(v);
}
long s1 = System.currentTimeMillis();
SparkConf conf = new SparkConf().setAppName("Spark Mysql").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("ERROR");
JavaRDD<String[]> data = sc.parallelize(list);
data.foreach(new VoidFunction<String[]>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String[] t) throws Exception {
Connection conn = DriverManager.getConnection(url, username, password);
PreparedStatement stmt = conn.prepareStatement(sql);
stmt.setString(1, t[0]);
stmt.setString(2, t[1]);
stmt.setString(3, t[2]);
stmt.setLong(4, Long.valueOf(t[3]));
stmt.setDate(5, Date.valueOf(t[4]));
stmt.executeUpdate();
stmt.close();
conn.close();
}
});
long s2 = System.currentTimeMillis();
System.out.println(s2 - s1);
sc.close();
}
使用的依赖如下:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
spark sql
public static void sparkSql() {
List<Row> list = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
Row row = RowFactory.create("head_"+i, "tail_"+i, "test",
Long.valueOf(i), Date.valueOf("2019-10-09"));
list.add(row);
}
long s1 = System.currentTimeMillis();
SparkConf conf = new SparkConf().setAppName("Spark Mysql").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
sc.setLogLevel("ERROR");
JavaRDD<Row> data = sc.parallelize(list);
ArrayList<StructField> fields = new ArrayList<StructField>();
StructField field = null;
field = DataTypes.createStructField("head", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("tail", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("relationship", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("count", DataTypes.LongType, true);
fields.add(field);
field = DataTypes.createStructField("date", DataTypes.DateType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = spark.createDataFrame(data, schema);
Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "123456");
prop.put("driver", "com.mysql.jdbc.Driver");
df.write().mode("append").jdbc("jdbc:mysql://localhost:3306/test",
"test.spark", prop);
long s2 = System.currentTimeMillis();
System.out.println(s2 - s1);
sc.close();
}
使用的依赖如下:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
引用的库类
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
效率对比
通过我自己的测试,将1W条数据写入mysql,spark sql使用的时间为10s,10W花费的时间为30s,算是比较快的了;
但是如果是jdbc的话,效率非常堪忧,比spark sql慢了不是一个档次。
连接mysql错误
Caused by: com.mysql.cj.exceptions.InvalidConnectionAttributeException: The server time zone value ‘йʱ’ is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
出现上面的错误时,只需要在url后面加上?serverTimezone=UTC
即可。
例如我上面的例子,原本的url为jdbc:mysql://localhost:3306/test
,
修改后即为jdbc:mysql://localhost:3306/test?serverTimezone=UTC
,就可以解决这个错误了。
欢迎关注同名公众号:“我就算饿死也不做程序员”。
交个朋友,一起交流,一起学习,一起进步。
更多推荐
所有评论(0)