FlinkCDC_达梦JDBC_MySQL同步到达梦
本文介绍了使用Flink SQL实现MySQL到达梦数据库的实时同步方案。通过配置SQL脚本设置Checkpoint机制确保数据一致性,利用MySQL CDC连接器捕获源库变更,JDBC连接器写入目标库。测试验证了数据同步的准确性,并通过Task Manager故障模拟演示了Checkpoint机制下的自动恢复能力。方案采用RocksDB状态后端支持增量Checkpoint,有效降低系统负载。同步
·
水善利万物而不争,处众人之所恶,故几于道💦
本文是:【MySQL】同步到【达梦】的案例,采用SQL脚本的方式。
1. Jar包准备
已上传资源,这些包网络上都能找到
2. 编写SQL脚本
在flink安装目录下的job目录下新建mysql_to_dm.sql文件,内容如下:
-- 启用Checkpoint,设置间隔为60秒(单位毫秒)
SET 'execution.checkpointing.interval' = '60000';
-- 设置Checkpoint模式为精确一次(EXACTLY_ONCE),确保数据一致性
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 设置Checkpoint超时时间(默认10分钟,可调整)
SET 'execution.checkpointing.timeout' = '300000';
-- 设置两次Checkpoint之间的最小间隔(避免频繁Checkpoint)
SET 'execution.checkpointing.min-pause' = '5000';
-- 设置最大并发Checkpoint数量(1表示串行)
SET 'execution.checkpointing.max-concurrent' = '1';
-- 作业取消时保留Checkpoint,便于后续恢复
SET 'execution.checkpointing.externalized-checkpoint-cleanup' = 'RETAIN_ON_CANCELLATION';
-- 设置RocksDB状态后端,支持增量Checkpoint
SET 'state.backend' = 'rocksdb';
-- 开启增量Checkpoint(RocksDB专属)
SET 'state.backend.rocksdb.incremental' = 'true';
-- 设置Checkpoint存储路径(HDFS路径)
SET 'state.checkpoints.dir' = 'file:///opt/software/flink-1.19.3/ck';
CREATE TABLE a_source (
sku_id VARCHAR(255) primary key not enforced,
price DECIMAL(10,2),
category_id VARCHAR(255),
from_date TIMESTAMP(3),
ddd VARCHAR(255),
str VARCHAR(255)
-- PRIMARY KEY(sku_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.16.125',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'database-name' = 'test',
'table-name' = 'a',
'scan.startup.mode' = 'initial'
);
CREATE TABLE b_sink (
SKU_ID VARCHAR(255) primary key not enforced,
PRICE DECIMAL(10,2),
CATEGORY_ID VARCHAR(255),
FROM_DATE TIMESTAMP(3),
DDD VARCHAR(255),
STR VARCHAR(255)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:dm://192.168.16.125:5236/FLINKCDC_SINK',
'username' = 'SYSDBA',
'password' = 'xxxxxx',
-- 'schema-name' = 'FLINKCDC_SINK',
'table-name' = 'A',
'driver' = 'dm.jdbc.driver.DmDriver'
-- 'debezium.database.tablename.case.insensitive' = 'false',
-- 'debezium.lob.enabled' = 'true'
);
insert into b_sink select * from a_source;
3. 提交Job
在Flink安装目录下执行下面的命令,提交Job:
bin/sql-client.sh -f job/mysql_to_dm.sql

4. Web页面查看Job执行情况
在Job Manager页面查看任务执行情况

5. CDC测试
对源表,MySQL表进行增删改操作,同步观察达梦表数据变化,可以看到达梦同步成功
6. 自动恢复
因为我的sql脚本里面配了CheckPoint,所以能实现自动重启恢复。
由于只有一个任务,可以看到它跑在hadoop103上,所以我把hadoop103上的Task Manager干掉,观察Web页面任务执行情况
干掉hadoop103上的Task Manager,模拟Task Manager发生意外挂掉的情况:
观察到任务突然自动重启了:
重启好了:
测试任务正常,可以进行数据同步:
更多推荐
所有评论(0)