Flink CDC+Flink SQL:构建端到端实时数据处理流水线(MySQL→Flink→ClickHouse)
·
Flink CDC + Flink SQL:构建端到端实时数据处理流水线(MySQL → Flink → ClickHouse)
1. 技术架构概述
- 数据源:MySQL(通过CDC捕获变更)
- 处理引擎:Flink(使用SQL API进行实时处理)
- 数据目标:ClickHouse(高性能列式存储)
- 核心组件:
- Flink CDC Connector:实时捕获MySQL的INSERT/UPDATE/DELETE事件
- Flink SQL:进行数据转换、过滤和聚合
- ClickHouse JDBC Sink:写入处理后的数据
2. 环境准备
-- 添加Flink CDC依赖(示例)
ADD JAR '/path/to/flink-sql-connector-mysql-cdc-2.4.0.jar';
ADD JAR '/path/to/flink-connector-clickhouse-1.16.0.jar';
3. Flink SQL 实现步骤
步骤1:创建MySQL CDC源表
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY,
name STRING,
age INT,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'users',
'server-time-zone' = 'UTC'
);
步骤2:数据转换(示例:过滤+字段计算)
CREATE VIEW processed_users AS
SELECT
id,
UPPER(name) AS name_upper, -- 姓名转大写
age,
CASE WHEN age >= 18 THEN 1 ELSE 0 END AS is_adult, -- 年龄标记
update_time
FROM mysql_users
WHERE age > 0; -- 过滤无效年龄
步骤3:创建ClickHouse目标表
CREATE TABLE ck_users (
id BIGINT,
name_upper STRING,
age INT,
is_adult INT,
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://clickhouse-host:8123/default',
'table-name' = 'users',
'username' = 'default',
'password' = '',
'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',
'sink.batch.interval' = '1s',
'sink.max-retries' = '3'
);
步骤4:启动实时同步作业
INSERT INTO ck_users
SELECT * FROM processed_users;
4. 关键优化策略
- CDC配置优化:
'debezium.snapshot.mode' = 'initial' -- 首次全量+增量同步 'scan.incremental.snapshot.chunk.size' = '8096' -- 分块读取 - Flink检查点配置:
SET 'execution.checkpointing.interval' = '30s'; - ClickHouse写入优化:
- 启用批量写入(
sink.batch.size=1000) - 使用
ReplacingMergeTree引擎处理更新:ENGINE = ReplacingMergeTree(update_time) ORDER BY id
- 启用批量写入(
5. 异常处理机制
- CDC断点续传:通过
Flink Checkpoint保存binlog位置 - 死信队列:捕获异常数据写入Kafka
CREATE TABLE dead_letter_queue (...) WITH ('connector'='kafka'...); INSERT INTO dead_letter_queue SELECT * FROM ... WHERE age IS NULL;
6. 监控指标
- Flink Dashboard:跟踪
numRecordsIn/numRecordsOut - ClickHouse查询:
SELECT count() FROM system.parts WHERE table='users'
7. 完整部署流程
graph LR
A[MySQL] -->|CDC| B(Flink SQL)
B -->|实时处理| C[ClickHouse]
C --> D{Grafana监控}
8. 典型应用场景
- 实时用户画像更新
- 电商订单状态监控
- 日志审计分析系统
注意:生产环境需配置高可用(HA)模式,建议使用:
- Flink on YARN/K8s
- ClickHouse多副本集群
- MySQL主从复制
更多推荐
所有评论(0)