一,开发环境:

操作系统:win19 64位

IDE:IntelliJ IDEA

JDK:1.8

scala:scala-2.10.6

集群:linux上cdh集群,其中spark为1.5.2,hadoop:2.6.0(其实我也想用spark最新版和hadoop的最新版,但1.6以前有spark-assembly-1.x.x-hadoop2.x.x.jar)

二,实现步骤:

1,设置maven的pom.xml

4.0.0

spark

test

1.0-SNAPSHOT

2008

2.10.6

scala-tools.org

Scala-Tools Maven2 Repository

http://scala-tools.org/repo-releases

scala-tools.org

Scala-Tools Maven2 Repository

http://scala-tools.org/repo-releases

junit

junit

4.12

org.specs

specs

1.2.5

test

commons-logging

commons-logging

1.1.1

jar

org.apache.commons

commons-lang3

3.1

log4j

log4j

1.2.9

org.apache.spark

spark-core_2.10

1.5.2

org.apache.spark

spark-sql_2.10

1.5.2

org.apache.hadoop

hadoop-client

2.6.0

org.apache.spark

spark-mllib_2.10

1.5.2

org.apache.spark

spark-hive_2.10

1.5.2

org.apache.spark

spark-streaming_2.10

1.5.2

org.scala-lang

scala-library

2.10.6

src/main/scala

src/test/scala

org.scala-tools

maven-scala-plugin

compile

testCompile

${scala.version}

-target:jvm-1.5

org.apache.maven.plugins

maven-eclipse-plugin

true

ch.epfl.lamp.sdt.core.scalabuilder

ch.epfl.lamp.sdt.core.scalanature

org.eclipse.jdt.launching.JRE_CONTAINER

ch.epfl.lamp.sdt.launching.SCALA_CONTAINER

org.scala-tools

maven-scala-plugin

${scala.version}

2,编写简单程序:

object test {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setMaster("spark://xxxxx:7077").setAppName("test")

val sc= newSparkContext(conf)

sc.addJar("E:\\sparkTest\\out\\artifacts\\sparkTest_jar\\sparkTest.jar")

val count= sc.parallelize(1 to 4).filter { _ =>val x=math.random

val y=math.random

x*x + y*y < 1}.count()

println(s"Pi is roughly ${4.0 * count / 4}")

sc.stop()

}

}

3,打jar包,即:file->projectStruct->Artifacts->Build->Build Artifacts,点击run运行即可(刚刚试试了下,发现不要jar也能运行,只是控制台还没结果输出?)

4,pom.xml的spark版本号要和集群中spark的版本号一致(不一致出现:exception1:java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem)

5,异常: Could not locate executable null\bin\winutils.exe in the Hadoop binaries

解决方法:

1,下载hadoop的包,我下了hadoop-2.7.3,解压,并配置HADOOP_HOME即可

3,重启idea异常消失

6, Exception while deleting Spark temp dir: C:\U

sers\tend\AppData\Local\Temp\spark-70484fc4-167d-48fa-a8f6-54db9752402e\userFiles-27a65cc7

-817f-4476-a2a2-58967d7b6cc1    解决方法:目前spark在windows系统下存在这个问题。不想看的话,就把log4j.properties中log的level设置为FATAL吧(呵呵呵)

7,com.google.protobuf.InvalidProtocolBufferException: Protocol message end-gro:hdfs的ip地址或端口号输入有问题,

hdfs://xxxx:8020//usr/xxx (新版本端口多为9000)

8,oracle读写操作:

packagesparkimportorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.hive.HiveContext

object readFromOracle {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf=new SparkConf().setMaster("spark://xxxxxx:7077").setAppName("read")

.setJars(List("E:\\softs\\softDownload\\ojdbc14.jar"))//添加ojdbc14的jar包,会出现

val sc=new SparkContext(conf)

val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxxx:1521:testdb11g"

val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxxx","password"->"xxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")

val sqlContext = new HiveContext(sc)

val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load

jdbcDF.show(3)

}

}

package spark

import java.sql.{Connection, DriverManager, PreparedStatement}

import java.util.Properties

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}

import org.apache.spark.sql.types._

/**

* Created by Administrator on 2017/7/17.

*/

object writeToOracle {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

/*

记得设置jar包,虽然build时添加了ojdbc.jar,但仍然出现jdbc:oracle:thin:@xxxxxxxx:testdb11g

at java.sql.DriverManager.getConnection(DriverManager.java:689),看来build时不行

最好将依赖的jar包上传到hdfs上不要在本地

*/

val conf=new SparkConf().setMaster("spark://xxxxxxx:7077").setAppName("write")

.setJars(List("E:\\sparkTest\\out\\artifacts\\writeToOracle_jar\\sparkTest.jar","E:\\softs\\softDownload\\ojdbc14.jar"))

val sc=new SparkContext(conf)

val sqlContext = new HiveContext(sc)

val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:testdb11g"

val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxx","password"->"xxxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")

val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load

jdbcDF.foreachPartition(rows => {

Class.forName("oracle.jdbc.driver.OracleDriver")

val connection: Connection = DriverManager.getConnection(oracleDriverUrl, "xxxx","xxxxxxx")

val prepareStatement: PreparedStatement = connection.prepareStatement("insert into MYTABLE2 values(?,?,?,?,?,?,?,?,?)")

rows.foreach(row => {

prepareStatement.setString(1, row.getString(0))

prepareStatement.setString(2, row.getString(0))

prepareStatement.setString(3, row.getString(0))

prepareStatement.setString(4, row.getString(0))

prepareStatement.setString(5, row.getString(0))

prepareStatement.setString(6, row.getString(0))

prepareStatement.setString(7, row.getString(0))

prepareStatement.setString(8, row.getString(0))

prepareStatement.setString(9,row.getString(0))

prepareStatement.addBatch()

})

prepareStatement.executeBatch()

prepareStatement.close()

connection.close()

})

}

}

复制数据库,操作:

packagespark.sqlimportjava.util.Propertiesimportorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.execution.datasources.jdbc.JdbcUtilsimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.sql.hive.HiveContextimportorg.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}importorg.apache.spark.sql.types._/*** Created by Administrator on 2017/7/21.*/object OperateOracle {

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:1521:testdb11g"val jdbcMap=Map("url" ->oracleDriverUrl,"user"->"xxxxxx","password"->"xxxxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")

def main(args: Array[String]) {//创建SparkContext

val sc =createSparkContext//创建sqlContext用来连接oracle、Hive等

val sqlContext = newHiveContext(sc)//加载oracle表数据,为lazy方式

val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load

jdbcDF.registerTempTable("MYTABLEDF")

val df2Oracle= sqlContext.sql("select * from MYTABLEDF")//Registering the OracleDialect

JdbcDialects.registerDialect(OracleDialect)

val connectProperties= newProperties()

connectProperties.put("user", "xxxxxx")

connectProperties.put("password", "xxxxxxx")

Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()//write back Oracle//Note: When writing the results back orale, be sure that the target table existing

JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "MYTABLE2", connectProperties)

sc.stop

}

def createSparkContext: SparkContext={

val conf= new SparkConf().setAppName("Operate")

.setMaster("spark://xxxxxx:7077")

.setJars(List("hdfs://xxxxx:8020//user//ojdbc14.jar"))//SparkConf parameters setting//conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")

/*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/

//conf.set("spark.sql.codegen", "true")

/*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/

//conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

/*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/

//conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")

val sc = newSparkContext(conf)

sc

}//overwrite JdbcDialect fitting for Oracle

val OracleDialect = newJdbcDialect {

override def canHandle(url: String): Boolean= url.startsWith("jdbc:oracle") || url.contains("oracle")//getJDBCType is used when writing to a JDBC table

override def getJDBCType(dt: DataType): Option[JdbcType] =dt match {case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))//case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))

case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))case _ =>None

}

}

}

此时的pom.xml:

4.0.0

spark

test

1.0-SNAPSHOT

2008

2.10.6

scala-tools.org

Scala-Tools Maven2 Repository

http://scala-tools.org/repo-releases

scala-tools.org

Scala-Tools Maven2 Repository

http://scala-tools.org/repo-releases

junit

junit

4.12

org.specs

specs

1.2.5

test

commons-logging

commons-logging

1.1.1

jar

org.apache.commons

commons-lang3

3.1

log4j

log4j

1.2.9

org.apache.spark

spark-core_2.10

1.5.2

org.apache.spark

spark-sql_2.10

1.5.2

org.apache.hadoop

hadoop-client

2.6.0

org.apache.spark

spark-mllib_2.10

1.5.2

org.apache.spark

spark-hive_2.10

1.5.2

org.apache.spark

spark-streaming_2.10

1.5.2

com.databricks

spark-csv_2.10

1.5.0

org.scala-lang

scala-library

2.10.6

src/main/scala

src/test/scala

org.scala-tools

maven-scala-plugin

compile

testCompile

${scala.version}

-target:jvm-1.5

org.apache.maven.plugins

maven-eclipse-plugin

true

ch.epfl.lamp.sdt.core.scalabuilder

ch.epfl.lamp.sdt.core.scalanature

org.eclipse.jdt.launching.JRE_CONTAINER

ch.epfl.lamp.sdt.launching.SCALA_CONTAINER

org.scala-tools

maven-scala-plugin

${scala.version}

View Code

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐