三、FlinkCDC-SQL
本文介绍了使用Flink CDC实现MySQL数据变更捕获的配置方法。主要内容包括:1)开启MySQL的binlog功能,配置server-id、binlog文件名和格式;2)Maven项目引入Flink相关依赖,包括flink-connector-mysql-cdc等组件;3)Java代码示例,展示如何通过StreamTableEnvironment创建CDC源表并捕获数据变更。配置重点在于设置
水善利万物而不争,处众人之所恶,故几于道💦
文章目录
1. 开启mysql的binlog
[qcln@hadoop101 ~]$ sudo vim /etc/my.cnf
#添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
##binlog类型,maxwell要求为row类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route
2. 引入相关依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink-version>1.18.0</flink-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.31</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3. 编写代码
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* Author: Pepsi
* Date: 2026/2/8
* Desc:
*/
public class FlinkCDC_SQL {
public static void main(String[] args) throws Exception {
// 1. 获取flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 开启CheckPoint
// 3. 使用FlinkCDC的sql方式建表
TableResult tableResult = tableEnv.executeSql("" +
"create table mysql_source(" +
"sku_id varchar(255) primary key not enforced," +
"price decimal(10,2)," +
"category_id varchar(255)," +
"from_date timestamp(3)," +
"ddd varchar(255)," +
"str varchar(255)" +
") with(" +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'xxxxxx', " +
" 'database-name' = 'test', " +
" 'table-name' = 'a' " +
")" +
"");
//4.读取数据
Table table = tableEnv.sqlQuery("select * from mysql_source");
//5.打印
table.execute().print();
}
}
4. 启动

相关报错
1. table without primary key,没有主键
Caused by: org.apache.flink.table.api.ValidationException: ‘scan.incremental.snapshot.chunk.key-column’ is required for table without primary key when ‘scan.incremental.snapshot.enabled’ enabled.

指定主键
2. Flink doesn’t support ENFORCED mode for PRIMARY KEY constraint. Flink不支持主键约束
Exception in thread “main” org.apache.flink.sql.parser.error.SqlValidateException: Flink doesn’t support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode
- enforced(强制执行):意味着数据库系统会主动检查并确保数据的唯一性,不允许插入违反主键约束的记录。这是 MySQL、PostgreSQL 等传统关系型数据库的标准行为,因为它们持有并管理数据。
- not enforced(不强制执行):仅将主键约束作为一条“提示”或“元数据”。系统本身不会对数据进行检查。由于 Flink 作为一个流处理引擎,并不持久化存储数据,因此它无法、也不会去验证主键的唯一性
。数据唯一性的保证责任,落在了用户或底层连接的源/目标数据库身上。 - 总的来说就是,Flink 声明主键的主要目的是为了优化,例如为某些查询提供优化信息,或者确保连接器(如 JDBC Connector)能够正确实现 Upsert (更新插入) 操作。虽然 Flink 不主动检查,但你必须确保应用逻辑或底层数据源/目的地能够维护主键的完整性,否则可能出现数据不一致的问题。

指定not enforced模式
更多推荐
所有评论(0)