scala版本,spark将hive的数据批量导入hbase
1 spark将hive上的数据同步到hbase将hive数据转换为hfile, 快速导入hbase ,里面有很多坑.比如 : 版本不一致.还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。2 代码package com.test.taskimport java
·
spark将hive上的数据同步到hbase
1 、前言
在数据处理过程中经常需要将各种数据导入各个数据库, 以及经常有需要将离线处理的结果数据,导入实时数据仓库,以便API查询。
本文档就是将hive数据转换为hfile, 通过bulkload方式 快速导入hbase 。
里面有很多坑.
比如 : 版本不一致.
还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。尤其是hbase的2以前的版本。
2、环境
spark2.4
hbase2.1
scala2.11
3、功能
1、将HIVE离线的结果数据批量导入HABSE。
2、支持DEV与PRD的自由切换(通过传参)。
3、bulkload方式导入。
4、代码
package com.test.task
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object Hive2Hbase{
/**
* 处理null字段
* @param str
* @return
*/
def nullHandle(str: String):String = {
if(str == null || "".equals(str)){
"NULL"
}else{
str
}
}
private var cdhPath = ""
private var zookeeperQuorum = ""
private var hdfsRootPath = ""
private var hFilePath = ""
private var haNamenodes = ""
private var nameNode61 = ""
private var nameNode61Value = ""
private var nameNode251 = ""
private var nameNode251Value = ""
private var hiveMetastore = ""
private val tableName:String = "api:user_label"
private val familyName = "baseInfo"
def main(args: Array[String]): Unit = {
//设置用户
System.setProperty("HADOOP_USER_NAME", "say")
if (args.length >= 1) {
println("设置参数,运行环境:"+args(0))
if ("online".equals(args(0))) {
cdhPath = "hdfs://say-cdh-master02.say.com:8020"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
haNamenodes = "namenode61,namenode251"
nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
nameNode61Value = "192.168.1.101:8020"
nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
nameNode251Value = "192.168.1.102:8020"
hiveMetastore = "thrift://192.168.1.102:9083,thrift://192.168.1.103:9083"
} else {
cdhPath = "hdfs://say-cdh-master02.say.net:8020"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
haNamenodes = "namenode61,namenode251"
nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
nameNode61Value = "192.168.1.101:8020"
nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
nameNode251Value = "192.168.1.102:8020"
hiveMetastore = "thrift://192.168.1.102:9083"
}
} else {
println("运行环境: test")
cdhPath = "hdfs://say-cdh-master02.say.net:8020"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
haNamenodes = "namenode61,namenode251"
nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
nameNode61Value = "192.168.1.101:8020"
nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
nameNode251Value = "192.168.1.102:8020"
hiveMetastore = "thrift://192.168.1.102:9083"
}
//conf配置
val sparkConf = new SparkConf()
.setAppName("Hive2HBase")
.set("dfs.nameservices","dc-hdfs-cluster")
.set("dfs.ha.namenodes.dc-hdfs-cluster",haNamenodes)
.set(nameNode61, nameNode61Value)
.set(nameNode251, nameNode251Value)
.set("dfs.client.failover.proxy.provider.dc-hdfs-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
.set("hive.metastore.uris",hiveMetastore)
.set("spark.rdd.compress","true")
//指定序列化格式,默认是java序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//告知哪些类型需要序列化
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
val spark = SparkSession.builder().config(conf=sparkConf)
// .master("local[*]")
.enableHiveSupport()
.getOrCreate()
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
val hconf = HBaseConfiguration.create()
hconf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)
//这个设置很重要,默认没有这么多。防止导入失败
hconf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000)
hconf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
//创建连接
val conn = ConnectionFactory.createConnection(hconf)
val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
val table = conn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hconf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor)
// 如果存放 HFile文件的路径已经存在,就删除掉
if (fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
}
//从hive中读取数据,数据是在hdfs上,hive是个外部表,你也可以用内部表,都有一样
//由于标签字段是不知道名称以及不知道有哪些字段。所以只能*
val data = spark.sql("select * from sdr_dm.dm_user_label_tag")
//表结构字段
var fields = data.columns
//去掉rowKey字段
fields = fields.dropWhile(_ == "user_id")
//需要将宽表转换为窄表并获取字段名称。
val dataRdd = data.rdd.map(row => {
val rowKey = row.getAs("user_id").toString
fields.map(feild => {
var fieldValue = row.getAs[Any](feild)
if(fieldValue==null){
fieldValue="0"
}
val value = "{\"labelId\":\""+feild+"\",\"value\":\""+fieldValue+"\",\"deadTime\":\"2100-01-01T08:00:00Z\"}"
val tupValue = Tuple2(feild,value)
(rowKey, tupValue)
})
}).flatMap(item=>item)
// .persist(StorageLevel.MEMORY_AND_DISK)
//打印测试
// dataRdd.take(10).foreach { x => {
// println(x._1+"|"+x._2._1+"|"+x._2._2)
// } }
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
import spark.implicits._
//一定需要全局排序,否则报错
dataRdd
.sortBy(x=> (x._1, x._2._1), ascending = true)
.map(rdd => {
val rowKey = Bytes.toBytes(rdd._1)
val family = Bytes.toBytes(familyName)
val colum = Bytes.toBytes(rdd._2._1)
val value = Bytes.toBytes(rdd._2._2)
// println(rdd(0)+"|"+familyName+"|"+rdd(1)._1+"|"+rdd(1)._2)
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
}).saveAsNewAPIHadoopDataset(jobConfiguration)
println("生产hfile成功!")
//bulkload导入
val load = new LoadIncrementalHFiles(hconf)
load.doBulkLoad(new Path(hFilePath), conn.getAdmin, table, regionLocator)
table.close()
conn.close()
spark.close()
}
}
5 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.task</groupId>
<artifactId>Hive2Hbase</artifactId>
<version>1.0.0-release</version>
<properties>
<spark.version>2.4.0</spark.version>
<scala.version>2.11.12</scala.version>
<hbase.version>2.1.0</hbase.version>
<hadoop.version>2.7.5</hadoop.version>
<app.version>1.0.0</app.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
<jvmArgs>
<jvmArg>-Xss2048K</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.zoom.wqz.filter.Application</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>prd</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<env>prd</env>
<ver>${app.version}-prd</ver>
</properties>
<build>
<resources>
<resource>
<directory>src/main/resources/prd/</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>dev</id>
<properties>
<env>dev</env>
<ver>${app.version}-dev</ver>
</properties>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<resources>
<resource>
<directory>src/main/resources/dev/</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>test</id>
<properties>
<env>test</env>
<ver>${app.version}-test</ver>
</properties>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<resources>
<resource>
<directory>src/main/resources/test/</directory>
</resource>
</resources>
</build>
</profile>
</profiles>
</project>
更多推荐
所有评论(0)