|
package com.jojo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Description:Hbase批量加载 同一列族多列
*/
object HbaseBulkLoadApp {
val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息
val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件
val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径
val hdfsRootPath = "hdfs://cdh03:8020/"//根路径
val tableName = "sample_07"//表名
val familyName = "basic"//列族
val arr = Array("code","description", "total_emp","salary")//列的名字集合
def main(args: Array[String]): Unit = {
//获取content
val sparkConf = new SparkConf()
.setAppName(s"${this.getClass.getSimpleName}")
.setMaster("local")
//指定序列化格式,默认是java序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//告知哪些类型需要序列化
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
val sc = new SparkContext(sparkConf)
//hadoop配置
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
//获取输出路径
val fileSystem = FileSystem.get(hadoopConf)
//获取hbase配置
val hconf = HBaseConfiguration.create()
//设置zookeeper集群
hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)
//设置端口
hconf.set("hbase.zookeeper.property.clientPort", "2181");
//设置hfile最大个数
hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")
//设置hfile的大小
hconf.set("hbase.hregion.max.filesize","10737418240")
hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
//获取hbase连接
val hbaseConn = ConnectionFactory.createConnection(hconf)
val admin = hbaseConn.getAdmin
/**
* 保存生成的HFile文件
* 注:bulk load 生成的HFile文件需要落地
* 然后再通过LoadIncrementalHFiles类load进Hbase
* 此处关于 sortBy 操作详解:
* 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,
* 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,
* 这就要求我们在插入数据的时候,要插在rowkey该在的位置。
* 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序
* 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行
* 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)
*
* @param hfileRDD
*/
// 0. 准备程序运行的环境
// 如果 HBase 表不存在,就创建一个新表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(familyName)
desc.addFamily(hcd)
admin.createTable(desc)
print("创建了一个新表")
}
// 如果存放 HFile文件的路径已经存在,就删除掉
if(fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
print("删除hdfs上存在的路径")
}
// 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
// java.io.IOException: Added a key not lexically larger than previous.
val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)
.map(row => {
// 处理数据的逻辑
val arrs = row.split("\t")
var kvlist: Seq[KeyValue] = List()//存储多个列
var rowkey: Array[Byte] = null
var cn: Array[Byte] = null
var v: Array[Byte] = null
var kv: KeyValue = null
val cf = familyName.getBytes //列族
rowkey = Bytes.toBytes(arrs(0)) //key
for (i <- 1 to (arrs.length - 1)) {
cn = arr(i).getBytes() //列的名称
v = Bytes.toBytes(arrs(i)) //列的值
//将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value
kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)
}
(new ImmutableBytesWritable(rowkey), kvlist)
})
val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data
.flatMapValues(_.iterator)
// 2. Save Hfiles on HDFS
val table = hbaseConn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hconf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
hfileRDD
.sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整体有序
.saveAsNewAPIHadoopFile(hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hconf)
print("成功生成HFILE")
val bulkLoader = new LoadIncrementalHFiles(hconf)
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
hbaseConn.close()
sc.stop()
}
}
|
所有评论(0)