前言

大数据应用开发——实时数据采集

大数据应用开发——实时数据处理

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

        并在HBase中进行备份

大数据应用开发——数据可视化

hadoop,zookeeper,kafka,flink,hbase要开启

目录

        并在HBase中进行备份


并在HBase中进行备份

在pox.xml文件添加hbase的依赖

		<dependency>
			<groupId>org.apache.Hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>2.2.3</version>
		</dependency>

 创建一个写入hbase的方法

public class HbaseSink implements SinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        Connection connection = null;
        Table table = null;

        try{
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            connection = ConnectionFactory.createConnection(configuration);
            TableName tableName = TableName.valueOf("user");
            table = connection.getTable(tableName);
            //O:1,12564,O,45,2024-10-26,3-MEDIUM,Clerk#000000296,1,ggle. special, final requests are against the furiously specia
            //L:1,41,23,62,71,40,84,77,R,O,2024-10-26,2024-10-26,2024-10-26,DELIVER IN PERSONRAILquickly. bold deposits sleep slyly. packages use slyly
            String[] strings = value.split(",");
            if (strings[0].substring(0,1).equals("O")){
                for (int i = 1; i < 9; i++) {
                    Put put = new Put(Bytes.toBytes(strings[0]));
                    put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("info"+i),Bytes.toBytes(strings[i]));
                    table.put(put);
                }
            }else {
                for (int i = 1; i < 14; i++) {
                    Put put = new Put(Bytes.toBytes(strings[0]));
                    put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("info"),Bytes.toBytes(strings[i]));
                    table.put(put);
                }
            }
        }finally {
            if(table != null){table.close();}
            if(connection != null){connection.close();}
        }
    }
}

PS:hbase里如果没有user表,要创建

hbase shell

create ‘表名’,‘列簇’ 

在main中将 HbaseSink 添加

		map.addSink(new HbaseSink());
		// execute program
		env.execute("Flink Streaming Java API Skeleton");

将代码打包成.jar,放进主节点运行 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐