flink cdc 读取mysql的binlog写入hive中
package com.zallsteel.flink.app.log;import com.google.gson.Gson;import com.zallsteel.flink.entity.ChangelogVO;import com.zallsteel.flink.utils.ConfigUtils;import org.apache.commons.lang3.time.FastDate
·
package com.zallsteel.flink.app.log;
import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.text.ParseException;
import java.time.Duration;
import java.util.Properties;
/**
*
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
*/
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(
catalogName,
"default",
"/etc/hive/conf"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time TIMESTAMP,\n" +
" operate_time TIMESTAMP,\n" +
" province_id INT,\n" +
" order_status STRING,\n" +
" total_amount DECIMAL(10, 5)\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'hdp-xxx-dev-node01',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'phkC4DE4dM28$PUD',\n" +
" 'database-name' = 'cdc_test',\n" +
" 'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'order_info',\n" +
"'scan.startup.mode' = 'earliest-offset',\n" +
"'properties.bootstrap.servers' =
'hdp-xxx-dev-node03:9092',\n" +
"'format' = 'changelog-json'\n" +
")");
// 向kafka表中插入数据
tableEnv.executeSql("INSERT INTO kafka.order_info\n" +
"SELECT id, user_id, create_time,
operate_time,province_id,order_status,total_amount\n" +
"FROM cdc.order_info");
// 自定义带op字段的stream
Properties kafkaConfig = ConfigUtils.getKafkaConfig();
FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
"order_info",
new SimpleStringSchema(),
kafkaConfig
).setStartFromEarliest();
DataStreamSource<String> streamSource = env.addSource(consumer);
String[] fieldNames =
{"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"};
TypeInformation[] types =
{Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING};
SingleOutputStreamOperator<Row> ds2 = streamSource.map(new
MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
Gson gson = new Gson();
ChangelogVO changelogVO = gson.fromJson(value,
ChangelogVO.class);
String op = changelogVO.getOp();
int arity = fieldNames.length;
Row row = new Row(arity);
row.setField(0, changelogVO.getData().getId());
row.setField(1, changelogVO.getData().getUserId());
row.setField(2, changelogVO.getData().getCreateTime());
row.setField(3, changelogVO.getData().getOperateTime());
row.setField(4, changelogVO.getData().getProviceId());
row.setField(5, changelogVO.getData().getOrderStatus());
row.setField(6, changelogVO.getData().getTotalAmount());
String operation = getOperation(op);
row.setField(7, operation);
return row;
}
private String getOperation(String op) {
String operation = "INSERT";
for (RowKind rk : RowKind.values()) {
if (rk.shortString().equals(op)) {
switch (rk) {
case UPDATE_BEFORE:
operation = "UPDATE-BEFORE";
break;
case UPDATE_AFTER:
operation = "UPDATE-AFTER";
break;
case DELETE:
operation = "DELETE";
break;
case INSERT:
default:
operation = "INSERT";
break;
}
break;
}
}
return operation;
}
}, new RowTypeInfo(types, fieldNames));
// 设置水印
SingleOutputStreamOperator<Row> ds3 =
ds2.assignTimestampsAndWatermarks(
WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new
SerializableTimestampAssigner<Row>() {
@Override
public long extractTimestamp(Row element, long
recordTimestamp) {
String create_time = (String)
element.getField(2);
FastDateFormat dateFormat =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
try {
long time =
dateFormat.parse(create_time).getTime();
return time;
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
})
);
tableEnv.createTemporaryView("merged_order_info", ds3);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info");
tableEnv.executeSql("CREATE TABLE ods.order_info (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" create_time STRING,\n" +
" operate_time STRING,\n" +
" province_id INT,\n" +
" order_status INT,\n" +
" total_amount DOUBLE,\n" +
" op STRING \n" +
") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$dt
$hr:00:00',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.delay'='1 min',\n" +
"
'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO ods.order_info\n" +
"SELECT \n" +
"id,\n" +
"user_id,\n" +
"create_time,\n" +
"operate_time,\n" +
"province_id,\n" +
"order_status,\n" +
"total_amount,\n" +
"op,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'yyyy-MM-dd') as dt,\n" +
"DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd
HH:mm:ss'),'HH') as hr\n" +
"FROM merged_order_info"
);
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)