水善利万物而不争,处众人之所恶,故几于道💦

1. 开启mysql的binlog

[qcln@hadoop101 ~]$ sudo vim /etc/my.cnf

#添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
##binlog类型,maxwell要求为row类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route

2. 引入相关依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink-version>1.18.0</flink-version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-debezium</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3. 编写代码

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Author: Pepsi
 * Date: 2026/2/8
 * Desc:
 */
public class FlinkCDC_SQL {

    public static void main(String[] args) throws Exception {
        // 1. 获取flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2. 开启CheckPoint

        // 3. 使用FlinkCDC的sql方式建表
        TableResult tableResult = tableEnv.executeSql("" +
                "create table mysql_source(" +
                "sku_id varchar(255) primary key not enforced," +
                "price decimal(10,2)," +
                "category_id varchar(255)," +
                "from_date timestamp(3)," +
                "ddd varchar(255)," +
                "str varchar(255)" +
                ") with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'localhost', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'xxxxxx', " +
                " 'database-name' = 'test', " +
                " 'table-name' = 'a' " +
                ")" +
                "");

        //4.读取数据
        Table table = tableEnv.sqlQuery("select * from mysql_source");

        //5.打印
        table.execute().print();

    }

}

4. 启动

在这里插入图片描述

相关报错

1. table without primary key,没有主键

Caused by: org.apache.flink.table.api.ValidationException: ‘scan.incremental.snapshot.chunk.key-column’ is required for table without primary key when ‘scan.incremental.snapshot.enabled’ enabled.

在这里插入图片描述

指定主键
在这里插入图片描述

2. Flink doesn’t support ENFORCED mode for PRIMARY KEY constraint. Flink不支持主键约束

Exception in thread “main” org.apache.flink.sql.parser.error.SqlValidateException: Flink doesn’t support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode

  • enforced(强制执行):意味着数据库系统会主动检查并确保数据的唯一性,不允许插入违反主键约束的记录。这是 MySQL、PostgreSQL 等传统关系型数据库的标准行为,因为它们持有并管理数据。
  • not enforced(不强制执行):仅将主键约束作为一条“提示”或“元数据”。系统本身不会对数据进行检查。由于 Flink 作为一个流处理引擎,并不持久化存储数据,因此它无法、也不会去验证主键的唯一性
    。数据唯一性的保证责任,落在了用户或底层连接的源/目标数据库身上。
  • 总的来说就是,Flink 声明主键的主要目的是为了优化,例如为某些查询提供优化信息,或者确保连接器(如 JDBC Connector)能够正确实现 Upsert (更新插入) 操作。虽然 Flink 不主动检查,但你必须确保应用逻辑或底层数据源/目的地能够维护主键的完整性,否则可能出现数据不一致的问题。
    在这里插入图片描述
    指定not enforced模式
    在这里插入图片描述
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐