水善利万物而不争,处众人之所恶,故几于道💦

本文是:【MySQL】同步到【达梦】的案例,采用SQL脚本的方式。

1. Jar包准备

已上传资源,这些包网络上都能找到
在这里插入图片描述

2. 编写SQL脚本

在flink安装目录下的job目录下新建mysql_to_dm.sql文件,内容如下:

-- 启用Checkpoint,设置间隔为60秒(单位毫秒)
SET 'execution.checkpointing.interval' = '60000';

-- 设置Checkpoint模式为精确一次(EXACTLY_ONCE),确保数据一致性
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';

-- 设置Checkpoint超时时间(默认10分钟,可调整)
SET 'execution.checkpointing.timeout' = '300000';

-- 设置两次Checkpoint之间的最小间隔(避免频繁Checkpoint)
SET 'execution.checkpointing.min-pause' = '5000';

-- 设置最大并发Checkpoint数量(1表示串行)
SET 'execution.checkpointing.max-concurrent' = '1';

-- 作业取消时保留Checkpoint,便于后续恢复
SET 'execution.checkpointing.externalized-checkpoint-cleanup' = 'RETAIN_ON_CANCELLATION';

-- 设置RocksDB状态后端,支持增量Checkpoint
SET 'state.backend' = 'rocksdb';

-- 开启增量Checkpoint(RocksDB专属)
SET 'state.backend.rocksdb.incremental' = 'true';

-- 设置Checkpoint存储路径(HDFS路径)
SET 'state.checkpoints.dir' = 'file:///opt/software/flink-1.19.3/ck';

CREATE TABLE a_source (
  sku_id VARCHAR(255) primary key not enforced,
  price DECIMAL(10,2),
  category_id VARCHAR(255),
  from_date TIMESTAMP(3),
  ddd VARCHAR(255),
  str VARCHAR(255)
  -- PRIMARY KEY(sku_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.16.125',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxx',
  'database-name' = 'test',
  'table-name' = 'a',
  'scan.startup.mode' = 'initial'
);

CREATE TABLE b_sink (
  SKU_ID VARCHAR(255) primary key not enforced,
  PRICE DECIMAL(10,2),
  CATEGORY_ID VARCHAR(255),
  FROM_DATE TIMESTAMP(3),
  DDD VARCHAR(255),
  STR VARCHAR(255)
) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:dm://192.168.16.125:5236/FLINKCDC_SINK',
        'username' = 'SYSDBA',
        'password' = 'xxxxxx',
--      'schema-name' = 'FLINKCDC_SINK',    
        'table-name' = 'A',
        'driver' = 'dm.jdbc.driver.DmDriver'
--      'debezium.database.tablename.case.insensitive' =  'false',  
--      'debezium.lob.enabled' = 'true'  
);


insert into b_sink select * from a_source;

3. 提交Job

在Flink安装目录下执行下面的命令,提交Job:

 bin/sql-client.sh -f job/mysql_to_dm.sql

在这里插入图片描述

4. Web页面查看Job执行情况

在Job Manager页面查看任务执行情况
在这里插入图片描述
在这里插入图片描述

5. CDC测试

对源表,MySQL表进行增删改操作,同步观察达梦表数据变化,可以看到达梦同步成功
在这里插入图片描述

6. 自动恢复

因为我的sql脚本里面配了CheckPoint,所以能实现自动重启恢复。

由于只有一个任务,可以看到它跑在hadoop103上,所以我把hadoop103上的Task Manager干掉,观察Web页面任务执行情况
在这里插入图片描述
干掉hadoop103上的Task Manager,模拟Task Manager发生意外挂掉的情况:
在这里插入图片描述
观察到任务突然自动重启了:
在这里插入图片描述
重启好了:
在这里插入图片描述
测试任务正常,可以进行数据同步:
在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐