
大数据应用开发——实时数据处理(二)
大数据应用开发——实时数据采集大数据应用开发——实时数据处理Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中hadoop,zookeeper,kafka,flink,hbase要开启目录并在HBase中进行备份。
·
前言
大数据应用开发——实时数据处理
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
hadoop,zookeeper,kafka,flink,hbase要开启
目录
并在HBase中进行备份
在pox.xml文件添加hbase的依赖
<dependency>
<groupId>org.apache.Hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
</dependency>
创建一个写入hbase的方法
public class HbaseSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
Connection connection = null;
Table table = null;
try{
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
connection = ConnectionFactory.createConnection(configuration);
TableName tableName = TableName.valueOf("user");
table = connection.getTable(tableName);
//O:1,12564,O,45,2024-10-26,3-MEDIUM,Clerk#000000296,1,ggle. special, final requests are against the furiously specia
//L:1,41,23,62,71,40,84,77,R,O,2024-10-26,2024-10-26,2024-10-26,DELIVER IN PERSONRAILquickly. bold deposits sleep slyly. packages use slyly
String[] strings = value.split(",");
if (strings[0].substring(0,1).equals("O")){
for (int i = 1; i < 9; i++) {
Put put = new Put(Bytes.toBytes(strings[0]));
put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("info"+i),Bytes.toBytes(strings[i]));
table.put(put);
}
}else {
for (int i = 1; i < 14; i++) {
Put put = new Put(Bytes.toBytes(strings[0]));
put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("info"),Bytes.toBytes(strings[i]));
table.put(put);
}
}
}finally {
if(table != null){table.close();}
if(connection != null){connection.close();}
}
}
}
PS:hbase里如果没有user表,要创建
hbase shell
create ‘表名’,‘列簇’
在main中将 HbaseSink 添加
map.addSink(new HbaseSink());
// execute program
env.execute("Flink Streaming Java API Skeleton");
将代码打包成.jar,放进主节点运行
更多推荐
所有评论(0)