使用spark进行hbase的bulkload

一、 背景

HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。
HBase 擅长于海量数据的实时读取,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。

二、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

  • Extract,异构数据源数据导入到 HDFS 之上。
  • Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
  • Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

 三、实践

hive表


 

 hbase表

 依赖

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

        <log4j.version>1.7.30</log4j.version>

        <zk.version>3.4.5-cdh5.16.2</zk.version>

        <scala.version>2.12.10</scala.version>

        <scala.tools.version>2.12</scala.tools.version>

        <spark.version>3.2.0</spark.version>

        <hbase.version>1.2.0-cdh5.16.2</hbase.version>

        <config.version>1.4.0</config.version>

    </properties>

     

    <repositories>

        <repository>

            <id>nexus-aliyun</id>

            <url>http://maven.aliyun.com/nexus/content/groups/public</url>

        </repository>

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${log4j.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>${zk.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

         

    </dependencies>

spark 代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

 * Description:Hbase批量加载   同一列族多列

 */

object HbaseBulkLoadApp {

  val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息

  val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件

  val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径

  val hdfsRootPath = "hdfs://cdh03:8020/"//根路径

  val tableName = "sample_07"//表名

  val familyName = "basic"//列族

  val arr = Array("code","description""total_emp","salary")//列的名字集合

  def main(args: Array[String]): Unit = {

    //获取content

    val sparkConf = new SparkConf()

      .setAppName(s"${this.getClass.getSimpleName}")

      .setMaster("local")

      //指定序列化格式,默认是java序列化

      .set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

      //告知哪些类型需要序列化

      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置

    val hadoopConf = new Configuration()

    hadoopConf.set("fs.defaultFS", hdfsRootPath)

    //获取输出路径

    val fileSystem = FileSystem.get(hadoopConf)

    //获取hbase配置

    val hconf = HBaseConfiguration.create()

    //设置zookeeper集群

    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

    //设置端口

    hconf.set("hbase.zookeeper.property.clientPort""2181");

    //设置hfile最大个数

    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

    //设置hfile的大小

    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    //获取hbase连接

    val hbaseConn = ConnectionFactory.createConnection(hconf)

    val admin = hbaseConn.getAdmin

    /**

     * 保存生成的HFile文件

     * 注:bulk load  生成的HFile文件需要落地

     * 然后再通过LoadIncrementalHFiles类load进Hbase

     * 此处关于  sortBy 操作详解:

     * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,

     * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,

     * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。

     * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序

     * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行

     * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)

     *

     * @param hfileRDD

     */

    // 0. 准备程序运行的环境

    // 如果 HBase 表不存在,就创建一个新表

    if (!admin.tableExists(TableName.valueOf(tableName))) {

      val desc = new HTableDescriptor(TableName.valueOf(tableName))

      val hcd = new HColumnDescriptor(familyName)

      desc.addFamily(hcd)

      admin.createTable(desc)

      print("创建了一个新表")

    }

    // 如果存放 HFile文件的路径已经存在,就删除掉

    if(fileSystem.exists(new Path(hFilePath))) {

      fileSystem.delete(new Path(hFilePath), true)

      print("删除hdfs上存在的路径")

    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:

    // java.io.IOException: Added a key not lexically larger than previous.

    val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

      .map(row => {

        // 处理数据的逻辑

        val arrs = row.split("\t")

        var kvlist: Seq[KeyValue] = List()//存储多个列

        var rowkey: Array[Byte] = null

        var cn: Array[Byte] = null

        var v: Array[Byte] = null

        var kv: KeyValue = null

        val cf = familyName.getBytes //列族

        rowkey = Bytes.toBytes(arrs(0)) //key

        for (i <- 1 to (arrs.length - 1)) {

          cn = arr(i).getBytes() //列的名称

          v = Bytes.toBytes(arrs(i)) //列的值

          //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

        }

        (new ImmutableBytesWritable(rowkey), kvlist)

      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

      .flatMapValues(_.iterator)

    // 2. Save Hfiles on HDFS

    val table = hbaseConn.getTable(TableName.valueOf(tableName))

    val job = Job.getInstance(hconf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    hfileRDD

      .sortBy(x => (x._1, x._2.getKeyString), true//要保持 整体有序

      .saveAsNewAPIHadoopFile(hFilePath,

        classOf[ImmutableBytesWritable],

        classOf[KeyValue],

        classOf[HFileOutputFormat2],

        hconf)

    print("成功生成HFILE")

    val bulkLoader = new LoadIncrementalHFiles(hconf)

    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()

    sc.stop()

  }

}

 其中可能遇到的问题:

1

EndOfStreamException: Unable to read additional data from server sessionid 0x17f44ca01833e45, likely server has closed socket

 解决:

  主要是zk的版本不匹配,在依赖选择匹配的zk版本。

输出结果

https://www.cnblogs.com/huangguoming/articles/12967868.html

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐