FlinkCDC_达梦CDC_SQL方式
本文介绍了使用Flink CDC实现达梦数据库(DM)到MySQL数据同步的详细步骤。首先准备所需jar包并放入Flink目录,启动Flink集群后编写SQL脚本定义源表(达梦)和目标表(MySQL)的连接配置。通过SQL客户端提交任务后,可在Flink Web界面监控任务运行状态。最后测试验证了在达梦源表进行增删改操作时,MySQL目标表能实时同步数据变化。整个流程实现了从达梦到MySQL的实时
·
水善利万物而不争,处众人之所恶,故几于道💦
1. jar包准备
已上传附件资源。
将文件加中所有的jar包放到Flink-1.19.3的lib目录下,不要怀疑,jar包有点多,直接放,我实测过了
2. 启动Flink集群
进入Flink安装目录,执行下面的启动脚本启动Flink集群:
bin/start-cluster.sh
3. 编写SQL脚本
在Flink安装目录下新建job目录,在job目录下编写dm_to_mysql_new.sql脚本,内容如下:
CREATE TABLE b_source (
SKU_ID VARCHAR(255) primary key not enforced,
PRICE DECIMAL(10,2),
CATEGORY_ID VARCHAR(255),
FROM_DATE TIMESTAMP(3),
DDD VARCHAR(255),
STR VARCHAR(255)
) WITH (
'connector' = 'dm-cdc',
-- 'hostname' = '192.168.1.11',
'hostname' = '10.80.76.11',
'port' = '5236',
'username' = 'SYSDBA',
'password' = 'xxxxxx',
'database-name' = 'FLINKCDC_SINK',
'schema-name' = 'FLINKCDC_SINK',
'table-name' = 'A',
'scan.startup.mode' = 'initial'
-- 'debezium.database.tablename.case.insensitive' = 'false',
-- 'debezium.lob.enabled' = 'true'
);
CREATE TABLE a_sink (
sku_id VARCHAR(255) primary key not enforced,
price DECIMAL(10,2),
category_id VARCHAR(255),
from_date TIMESTAMP(3),
ddd VARCHAR(255),
str VARCHAR(255)
) WITH (
'connector' = 'jdbc',
-- 'url' = 'jdbc:mysql://192.168.1.11:3306/test',
'url' = 'jdbc:mysql://10.80.76.11:3306/test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'xxxxxx',
'table-name' = 'a_flink_cdc'
);
SET 'pipeline.name' = 'dm-cdc_to_mysql-jdbc';
insert into a_sink select * from b_source;
4. 提交任务
bin/sql-client.sh -f job/dm_to_mysql_new.sql

5. Flink Web页面查看任务运行情况


6. 测试CDC是否正常
在达梦的源表中进行增删改操作,观察目标库表的数据是否会同步变化。
更多推荐
所有评论(0)