spark 与hadoop分开部署_Spark项目案例实战和分布式部署
前面讲到Hbase的时候可以通过Java API的方式操作Hbase数据库,由于Java和Scala可以互相调用,本节使用Scala语言通过Spark平台来实现分布式操作Hbase数据库,并且打包部署到Spark集群上面。这样我们对Spark+Scala项目开发有一个完整的认识和实际工作场景的一个体会。我们创建一个Spark的工程,然后创建一个HbaseJob的object类文件,项目的功能是从H
前面讲到Hbase的时候可以通过Java API的方式操作Hbase数据库,由于Java和Scala可以互相调用,本节使用Scala语言通过Spark平台来实现分布式操作Hbase数据库,并且打包部署到Spark集群上面。这样我们对Spark+Scala项目开发有一个完整的认识和实际工作场景的一个体会。
我们创建一个Spark的工程,然后创建一个HbaseJob的object类文件,项目的功能是从Hbase批量读取课程商品表数据然后存储到Hadoop的HDFS上的功能,如代码3.15所示:
【代码3.15】 HbaseJob.scala
package com.chongdianleme.mailimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{Result, Get, HConnectionManager}import org.apache.hadoop.hbase.util.{ArrayUtils, Bytes}import org.apache.spark._import scopt.OptionParserimport scala.collection.mutable.ListBuffer/** * Created by 充电了么App - 陈敬雷 * Spark分布式操作Hbase实战 * 网站:http://www.chongdianleme.com * 充电了么App - 专业上班族职业技能提升的在线教育平台 */object HbaseJob {case class Params(//输入目录的数据就是课程ID,每行记录就一个课程ID,后面根据课程ID作为rowKey从Hbase里查询数据
inputPath: String = "file:///D:chongdianlemeHbase项目input",
outputPath: String = "file:///D:chongdianlemeHbase项目output",
table: String = "chongdianleme_kc",
minPartitions: Int = 1,
mode: String = "local"
)def main(args: Array[String]) {val defaultParams = Params()val parser = new OptionParser[Params]("HbaseJob") {
head("HbaseJob: 解析参数.")
opt[String]("inputPath")
.text(s"inputPath 输入目录, default: ${defaultParams.inputPath}}")
.action((x, c) => c.copy(inputPath = x))
opt[String]("outputPath")
.text(s"outputPath 输出目录, default: ${defaultParams.outputPath}")
.action((x, c) => c.copy(outputPath = x))
opt[Int]("minPartitions")
.text(s"minPartitions , default: ${defaultParams.minPartitions}")
.action((x, c) => c.copy(minPartitions = x))
opt[String]("table")
.text(s"table table, default: ${defaultParams.table}")
.action((x, c) => c.copy(table = x))
opt[String]("mode")
.text(s"mode 运行模式, default: ${defaultParams.mode}")
.action((x, c) => c.copy(mode = x))
note(""" |For example, the following command runs this app on a HbaseJob dataset: """.stripMargin)
}
parser.parse(args, defaultParams).map { params => {println("参数值:" + params)readFilePath(params.inputPath,params.outputPath,params.table, params.minPartitions, params.mode)
}
}getOrElse {
System.exit(1)
}println("充电了么App - Spark分布式批量操作Hbase实战 -- 计算完成!")
}def readFilePath(inputPath: String,outputPath:String,table:String,minPartitions:Int,mode:String) = {val sparkConf = new SparkConf().setAppName("HbaseJob")
sparkConf.setMaster(mode)val sc = new SparkContext(sparkConf)//加载数据文件val data = sc.textFile(inputPath,minPartitions)
data.mapPartitions(batch(_,table)).saveAsTextFile(outputPath)
sc.stop()
}def batch(keys: Iterator[String],hbaseTable:String) = {val lineList = ListBuffer[String]()import scala.collection.JavaConversions._val conf = HBaseConfiguration.create()//每批数据创建一个Hbase连接,多条数据操作共享这个连接val connection = HConnectionManager.createConnection(conf)//获取表val table = connection.getTable(hbaseTable)
keys.foreach(rowKey=>{try {//根据rowKey主键也就是课程ID查询数据val get = new Get(rowKey.getBytes())//指定需要获取的列蔟和列
get.addColumn("kcname".getBytes(), "name".getBytes())
get.addColumn("saleinfo".getBytes(), "price".getBytes())
get.addColumn("saleinfo".getBytes(), "issale".getBytes())val result = table.get(get)var nameRS= result.getValue("kcname".getBytes(),"name".getBytes())var kcName = "";if(nameRS != null&&nameRS.length > 0){
kcName = new String(nameRS);
}val priceRS = result.getValue("saleinfo".getBytes, "price".getBytes)var price = ""if (priceRS != null && priceRS.length > 0)
price = new String(priceRS)val issaleRS = result.getValue("saleinfo".getBytes, "issale".getBytes)var issale = ""if (issaleRS != null && issaleRS.length > 0)
issale = new String(issaleRS)
lineList += rowKey+"001"+ kcName + "001"+ price + "001"+issale
} catch {case e: Exception => e.printStackTrace()
}
})//每批数据操作完毕,别忘了关闭表和数据库连接
table.close()
connection.close()
lineList.toIterator
}
}
代码开发完成后,我们看看怎么部署到Spark集群上去运行,运行的方式和我们的Spark集群怎么部署的有关,Spark集群部署有三种方式:单独Standalone集群部署,Spark on yarn部署,local本地模式三种灵活部署方式,前两种都是分布式部署,后面的是单机方式。一般大数据部门都有Hadoop集群,所以推荐Spark on Yarn部署,这样更方便服务器资源的统一管理和分配。
Spark on Yarn部署非常简单,主要是把Spark的包解压就可以用了,每台服务器上放一份,并且放在相同的目录下。步骤如下:
1)配置scala环境变量
#解压Scala的包,然后vim /etc/profile
export SCALA_HOME=/home/hadoop/software/scala-2.11.8
2)解压tar xvzf spark-*-bin-hadoop*.tgz,每台hadoop服务器上放在同一个目录下
不用任何配置值即可,用spark-submit提交就行。
Spark环境部署好之后,把我们的操作Hbase项目编译打包,一个是项目本身的jar,另一个是项目依赖的jar集合,分别上传到任意一台服务器就行,不要每台服务器都传,在哪台服务器运行就在哪台服务器上上传就行,依赖的jar包放在这个目录/home/hadoop/chongdianleme/chongdianleme-spark-task-1.0.0/lib/下,项目本身的jar包放在这里目录下/home/hadoop/chongdianleme/,然后通过spark-submit提交如下脚本即可:
hadoop fs -rmr /ods/kc/dim/ods_kc_dim_hbase/;
/home/hadoop/software/spark21/bin/spark-submit --jars $(echo /home/hadoop/chongdianleme/chongdianleme-spark-task-1.0.0/lib/*.jar | tr ' ' ',') --master yarn --queue hadoop --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 --class com.chongdianleme.mail.HbaseJob /home/hadoop/chongdianleme/hbase-task.jar --inputPath /mid/kc/dim/mid_kc_dim_kcidlist/ --outputPath /ods/kc/dim/ods_kc_dim_hbase/ --table chongdianleme_kc --minPartitions 6 --mode yarn
其中hadoop fs -rmr /ods/kc/dim/ods_kc_dim_hbase/;是为了下次执行这个任务避免输出目录已经存在,我们提前先把输出先删掉,执行完之后输出目录会重新生成。
脚本参数说明:
--jars 是你程序依赖的所有jar存放的目录
--master 是指定在哪里跑,在Hadoop的Yarn上跑写Yarn,本地方式写Local。
-- queue 如果是Yarn方式,指定分配到哪个队列的资源上。
-- num-executors 指定跑几个Task。
--driver.maxResultSize driver的最大内存设置,默认1G比较小。超过了会OOM,可以根据情况设置大一些。
-- executor-memory 每个Task分配内存。
-- executor-cores每个Task分配几个虚拟CPU。
-- class 你的程序的入口类,后面跟jar包,在后面是Java或Scala的main函数的业务参数。
这就是我们从编程,编译打包、部署到服务器如何分布式运行的完整过程,后面章节讲的Spark分布式机器学习也是这么来打包和部署的。
知乎视频www.zhihu.com除了Spark项目案例实战和分布式部署,其它深度学习框架也有不错的开源实现,比如MXNet,后面请大家关注充电了么app,课程,微信群,更多内容请看新书《分布式机器学习实战(人工智能科学与技术丛书)》
《分布式机器学习实战》本书对应清华大学出版社京东自营链接地址:
https://item.jd.com/12743009.html
Python编程零基础小白快速入门必听课
https://ke.qq.com/course/package/29782?flowToken=1028733
更多推荐
所有评论(0)