spark将hive上的数据同步到hbase

1 、前言

在数据处理过程中经常需要将各种数据导入各个数据库, 以及经常有需要将离线处理的结果数据,导入实时数据仓库,以便API查询。
本文档就是将hive数据转换为hfile, 通过bulkload方式 快速导入hbase 。
里面有很多坑.
比如 : 版本不一致.
还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。尤其是hbase的2以前的版本。

2、环境

spark2.4
hbase2.1
scala2.11

3、功能

1、将HIVE离线的结果数据批量导入HABSE。
2、支持DEV与PRD的自由切换(通过传参)。
3、bulkload方式导入。

4、代码

package com.test.task

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel


object Hive2Hbase{

    /**
     * 处理null字段
     * @param str
     * @return
     */
    def nullHandle(str: String):String = {
        if(str == null || "".equals(str)){
            "NULL"
        }else{
            str
        }
    }

    private var cdhPath = ""
    private var zookeeperQuorum = ""
    private var hdfsRootPath = ""
    private var hFilePath = ""
    private var haNamenodes = ""
    private var nameNode61 = ""
    private var nameNode61Value = ""
    private var nameNode251 = ""
    private var nameNode251Value = ""
    private var hiveMetastore = ""
    private val tableName:String = "api:user_label"
    private val familyName = "baseInfo"

    def main(args: Array[String]): Unit = {

        //设置用户
        System.setProperty("HADOOP_USER_NAME", "say")

        if (args.length >= 1) {
            println("设置参数,运行环境:"+args(0))
            if ("online".equals(args(0))) {
                cdhPath = "hdfs://say-cdh-master02.say.com:8020"
            zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
            hdfsRootPath = cdhPath+"/user/say/"
            hFilePath = cdhPath+"/user/say/hbase/hfile"
            haNamenodes = "namenode61,namenode251"
            nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
            nameNode61Value = "192.168.1.101:8020"
            nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
            nameNode251Value = "192.168.1.102:8020"
            hiveMetastore = "thrift://192.168.1.102:9083,thrift://192.168.1.103:9083"
            } else {
                cdhPath = "hdfs://say-cdh-master02.say.net:8020"
            zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
            hdfsRootPath = cdhPath+"/user/say/"
            hFilePath = cdhPath+"/user/say/hbase/hfile"
            haNamenodes = "namenode61,namenode251"
            nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
            nameNode61Value = "192.168.1.101:8020"
            nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
            nameNode251Value = "192.168.1.102:8020"
            hiveMetastore = "thrift://192.168.1.102:9083"
            }
        } else {
            println("运行环境: test")
            cdhPath = "hdfs://say-cdh-master02.say.net:8020"
            zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
            hdfsRootPath = cdhPath+"/user/say/"
            hFilePath = cdhPath+"/user/say/hbase/hfile"
            haNamenodes = "namenode61,namenode251"
            nameNode61 = "dfs.namenode.rpc-address.dc-hdfs-cluster.namenode61"
            nameNode61Value = "192.168.1.101:8020"
            nameNode251 = "dfs.namenode.rpc-address.say-hdfs-cluster.namenode251"
            nameNode251Value = "192.168.1.102:8020"
            hiveMetastore = "thrift://192.168.1.102:9083"
        }

		//conf配置
        val sparkConf = new SparkConf()
          .setAppName("Hive2HBase")
          .set("dfs.nameservices","dc-hdfs-cluster")
          .set("dfs.ha.namenodes.dc-hdfs-cluster",haNamenodes)
          .set(nameNode61, nameNode61Value)
          .set(nameNode251, nameNode251Value)
          .set("dfs.client.failover.proxy.provider.dc-hdfs-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
          .set("hive.metastore.uris",hiveMetastore)
          .set("spark.rdd.compress","true")
          //指定序列化格式,默认是java序列化
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          //告知哪些类型需要序列化
          .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

        val spark = SparkSession.builder().config(conf=sparkConf)
//          .master("local[*]")
          .enableHiveSupport()
          .getOrCreate()
        val hadoopConf = new Configuration()
        hadoopConf.set("fs.defaultFS", hdfsRootPath)
        hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
        val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
        val hconf = HBaseConfiguration.create()
        hconf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)
        //这个设置很重要,默认没有这么多。防止导入失败
        hconf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000)
        hconf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
        //创建连接
        val conn = ConnectionFactory.createConnection(hconf)
        val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
        val table = conn.getTable(TableName.valueOf(tableName))

        val job = Job.getInstance(hconf)
        job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setMapOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor)

        // 如果存放 HFile文件的路径已经存在,就删除掉
        if (fileSystem.exists(new Path(hFilePath))) {
            fileSystem.delete(new Path(hFilePath), true)
        }

        //从hive中读取数据,数据是在hdfs上,hive是个外部表,你也可以用内部表,都有一样
        //由于标签字段是不知道名称以及不知道有哪些字段。所以只能*
        val data = spark.sql("select * from sdr_dm.dm_user_label_tag")

        //表结构字段
        var fields = data.columns

        //去掉rowKey字段
        fields = fields.dropWhile(_ == "user_id")
		//需要将宽表转换为窄表并获取字段名称。
        val dataRdd = data.rdd.map(row => {
            val rowKey = row.getAs("user_id").toString

            fields.map(feild => {
                var fieldValue = row.getAs[Any](feild)
                if(fieldValue==null){
                    fieldValue="0"
                }
                val value = "{\"labelId\":\""+feild+"\",\"value\":\""+fieldValue+"\",\"deadTime\":\"2100-01-01T08:00:00Z\"}"
                val tupValue = Tuple2(feild,value)
                (rowKey, tupValue)
            })
        }).flatMap(item=>item)
        //  .persist(StorageLevel.MEMORY_AND_DISK)
        //打印测试
//        dataRdd.take(10).foreach { x => {
//            println(x._1+"|"+x._2._1+"|"+x._2._2)
//        } }

        val jobConfiguration = job.getConfiguration
        jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
        import spark.implicits._

		//一定需要全局排序,否则报错
        dataRdd
          .sortBy(x=> (x._1, x._2._1), ascending = true)
          .map(rdd => {
              val rowKey = Bytes.toBytes(rdd._1)
              val family = Bytes.toBytes(familyName)
              val colum = Bytes.toBytes(rdd._2._1)
              val value = Bytes.toBytes(rdd._2._2)
//              println(rdd(0)+"|"+familyName+"|"+rdd(1)._1+"|"+rdd(1)._2)
              (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
          }).saveAsNewAPIHadoopDataset(jobConfiguration)
        println("生产hfile成功!")

		//bulkload导入
        val load = new LoadIncrementalHFiles(hconf)
        load.doBulkLoad(new Path(hFilePath), conn.getAdmin, table, regionLocator)

        table.close()
        conn.close()
        spark.close()



    }
}

5 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.task</groupId>
    <artifactId>Hive2Hbase</artifactId>
    <version>1.0.0-release</version>


    <properties>
        <spark.version>2.4.0</spark.version>
        <scala.version>2.11.12</scala.version>
        <hbase.version>2.1.0</hbase.version>
        <hadoop.version>2.7.5</hadoop.version>
        <app.version>1.0.0</app.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-hdfs</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
            <version>3.0.1-b12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                    <jvmArgs>
                        <jvmArg>-Xss2048K</jvmArg>
                    </jvmArgs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.zoom.wqz.filter.Application</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>prd</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <env>prd</env>
                <ver>${app.version}-prd</ver>
            </properties>
            <build>
                <resources>
                    <resource>
                        <directory>src/main/resources/prd/</directory>
                    </resource>
                </resources>
            </build>
        </profile>
        <profile>
            <id>dev</id>
            <properties>
                <env>dev</env>
                <ver>${app.version}-dev</ver>
            </properties>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <resources>
                    <resource>
                        <directory>src/main/resources/dev/</directory>
                    </resource>
                </resources>
            </build>
        </profile>
        <profile>
            <id>test</id>
            <properties>
                <env>test</env>
                <ver>${app.version}-test</ver>
            </properties>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <resources>
                    <resource>
                        <directory>src/main/resources/test/</directory>
                    </resource>
                </resources>
            </build>
        </profile>
    </profiles>

</project>
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐