spark update mysql_SPARK SQL - 使用DataFrames和JDBC更新MySql表
遗憾的是,在Spark中没有SaveMode.Upsert模式,因为这种情况很普遍,比如插入。zero322在一般情况下是正确的,但我认为应该有可能(在性能上有所妥协)提供这种替换功能。我也想为这种情况提供一些java代码。 当然,它并不像内置的火花一样具有高性能 - 但它应该是您的要求的良好基础。只需将其修改为您的需求:myDF.repartition(20); //one connection
遗憾的是,在Spark中没有SaveMode.Upsert模式,因为这种情况很普遍,比如插入。
zero322在一般情况下是正确的,但我认为应该有可能(在性能上有所妥协)提供这种替换功能。
我也想为这种情况提供一些java代码。 当然,它并不像内置的火花一样具有高性能 - 但它应该是您的要求的良好基础。只需将其修改为您的需求:
myDF.repartition(20); //one connection per partition, see below
myDF.foreachPartition((Iterator t) -> {
Connection conn = DriverManager.getConnection(
Constants.DB_JDBC_CONN,
Constants.DB_JDBC_USER,
Constants.DB_JDBC_PASS);
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
// better than REPLACE INTO, less cycles
statement.addBatch(("INSERT INTO mytable " + "VALUES ("
+ "'" + row.getAs("_id") + "',
+ "'" + row.getStruct(1).get(0) + "'
+ "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
//conn.commit();
if (++i % batchSize == 0) {
statement.executeBatch();
}
} catch (SQLIntegrityConstraintViolationException e) {
//should not occur, nevertheless
//conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
//conn.commit();
statement.executeBatch();
}
}
int[] ret = statement.executeBatch();
System.out.println("Ret val: " + Arrays.toString(ret));
System.out.println("Update count: " + statement.getUpdateCount());
conn.commit();
statement.close();
conn.close();
更多推荐
所有评论(0)