jdbc

public static void jdbc() {
		// test为数据库名,spark为表名
		final String url = "jdbc:mysql://localhost:3306/test";
		final String username = "root";
		final String password = "123456";
		final String sql = "INSERT INTO spark(head, tail, relationship, count, date) VALUES(?,?,?,?,?);";
                
		List<String[]> list = new ArrayList<>();
		for (int i = 0; i < 100000; i++) {
			String[] v = {"head_"+i, "tail_"+i, "test", String.valueOf(i), "2019-10-09"};
			list.add(v);
		}
		
		long s1 = System.currentTimeMillis();
		
		SparkConf conf = new SparkConf().setAppName("Spark Mysql").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("ERROR");
		
		JavaRDD<String[]> data = sc.parallelize(list);
		
		data.foreach(new VoidFunction<String[]>() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public void call(String[] t) throws Exception {
				Connection conn = DriverManager.getConnection(url, username, password);
				PreparedStatement stmt = conn.prepareStatement(sql);
				stmt.setString(1, t[0]);
				stmt.setString(2, t[1]);
				stmt.setString(3, t[2]);
				stmt.setLong(4, Long.valueOf(t[3]));
				stmt.setDate(5, Date.valueOf(t[4]));
				stmt.executeUpdate();
				
				stmt.close();
				conn.close();
			}
		});
		
		long s2 = System.currentTimeMillis();
		System.out.println(s2 - s1);
		
		sc.close();
	}

使用的依赖如下:

 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.17</version>
    </dependency>

spark sql

public static void sparkSql() {
                
		List<Row> list = new ArrayList<>();
		for (int i = 0; i < 100000; i++) {
			Row row = RowFactory.create("head_"+i, "tail_"+i, "test", 
					Long.valueOf(i), Date.valueOf("2019-10-09"));
			list.add(row);
		}
		
		long s1 = System.currentTimeMillis();
		
		
		SparkConf conf = new SparkConf().setAppName("Spark Mysql").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
		sc.setLogLevel("ERROR");
		
		JavaRDD<Row> data = sc.parallelize(list);
		
		ArrayList<StructField> fields = new ArrayList<StructField>();  
        StructField field = null;  
        field = DataTypes.createStructField("head", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("tail", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("relationship", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("count", DataTypes.LongType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("date", DataTypes.DateType, true);  
        fields.add(field);  

        StructType schema = DataTypes.createStructType(fields);  

        Dataset<Row> df = spark.createDataFrame(data, schema); 
        
        Properties prop = new Properties();
        prop.put("user", "root");
        prop.put("password", "123456");
        prop.put("driver", "com.mysql.jdbc.Driver");
        df.write().mode("append").jdbc("jdbc:mysql://localhost:3306/test",
                "test.spark", prop);
        
        long s2 = System.currentTimeMillis();
		System.out.println(s2 - s1);
		
		sc.close();
	}

使用的依赖如下:

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

引用的库类

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

效率对比

通过我自己的测试,将1W条数据写入mysql,spark sql使用的时间为10s,10W花费的时间为30s,算是比较快的了;
但是如果是jdbc的话,效率非常堪忧,比spark sql慢了不是一个档次。

连接mysql错误

Caused by: com.mysql.cj.exceptions.InvalidConnectionAttributeException: The server time zone value ‘й׼ʱ’ is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

出现上面的错误时,只需要在url后面加上?serverTimezone=UTC即可。
例如我上面的例子,原本的url为jdbc:mysql://localhost:3306/test
修改后即为jdbc:mysql://localhost:3306/test?serverTimezone=UTC,就可以解决这个错误了。

欢迎关注同名公众号:“我就算饿死也不做程序员”。
交个朋友,一起交流,一起学习,一起进步。在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐