水善利万物而不争,处众人之所恶,故几于道💦

1. jar包准备

已上传附件资源。
将文件加中所有的jar包放到Flink-1.19.3的lib目录下,不要怀疑,jar包有点多,直接放,我实测过了

2. 启动Flink集群

进入Flink安装目录,执行下面的启动脚本启动Flink集群:

bin/start-cluster.sh

3. 编写SQL脚本

在Flink安装目录下新建job目录,在job目录下编写dm_to_mysql_new.sql脚本,内容如下:

CREATE TABLE b_source (
  SKU_ID VARCHAR(255) primary key not enforced,
  PRICE DECIMAL(10,2),
  CATEGORY_ID VARCHAR(255),
  FROM_DATE TIMESTAMP(3),
  DDD VARCHAR(255),
  STR VARCHAR(255)
) WITH (
        'connector' = 'dm-cdc',
--      'hostname' = '192.168.1.11',    
        'hostname' = '10.80.76.11',
        'port' = '5236',
        'username' = 'SYSDBA',
        'password' = 'xxxxxx',
        'database-name' = 'FLINKCDC_SINK',
        'schema-name' = 'FLINKCDC_SINK',
        'table-name' = 'A',
        'scan.startup.mode' = 'initial'
--      'debezium.database.tablename.case.insensitive' =  'false',  
--      'debezium.lob.enabled' = 'true'  
);

CREATE TABLE a_sink (
  sku_id VARCHAR(255) primary key not enforced,
  price DECIMAL(10,2),
  category_id VARCHAR(255),
  from_date TIMESTAMP(3),
  ddd VARCHAR(255),
  str VARCHAR(255)
) WITH (
  'connector' = 'jdbc',
--  'url' = 'jdbc:mysql://192.168.1.11:3306/test',
  'url' = 'jdbc:mysql://10.80.76.11:3306/test',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'a_flink_cdc'
);

SET 'pipeline.name' = 'dm-cdc_to_mysql-jdbc';

insert into a_sink select * from b_source;

4. 提交任务

bin/sql-client.sh -f job/dm_to_mysql_new.sql

请添加图片描述

5. Flink Web页面查看任务运行情况

请添加图片描述
请添加图片描述

6. 测试CDC是否正常

在达梦的源表中进行增删改操作,观察目标库表的数据是否会同步变化。
请添加图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐