Flink 1.20 + CDC 3.5 实战:MongoDB 实时同步到 ClickHouse,从踩坑到上线
摘要: 本文介绍基于 Flink CDC 3.5 和 Flink 1.20 实现 MongoDB 到 ClickHouse 的实时数据同步方案。传统定时脚本和消息队列中转方式存在延迟高、业务侵入性强等问题,而 Flink CDC 通过监听 MongoDB 的 Change Stream 实现增量捕获、断点续传和 Exactly-Once 语义。文章详细演示了环境搭建(包括 MongoDB 副本集配
你还在写定时脚本做 T+1 同步?都 2026 年了,该让数据自己"流"过去了。本文基于 Flink 1.20 + Flink CDC 3.5,手把手教你实现 MongoDB → ClickHouse 的实时同步,附完整可运行代码和踩坑指南。
前言
如果你正在做数据仓库,八成遇到过这个灵魂拷问:
“这个 MongoDB 的数据,能不能实时同步到 ClickHouse?”
传统方案无非两种:
- 定时脚本轮询:写个 Python/Shell 定时跑,简单粗暴,但延迟高、漏数据是家常便饭
- 消息队列中转:业务代码双写 Kafka,改动大,还得求着业务方配合(大概率被拒绝)
而 Flink CDC 的思路完全不同——直接监听 MongoDB 的 Change Stream,增量捕获、实时推送、断点续传,一套代码搞定:
- 基于 Change Stream 增量捕获,对源库几乎零压力
- 支持全量快照 + 增量无缝衔接,不丢数据
- Checkpoint 机制保证 Exactly-Once 语义
本文基于 Flink 1.20 + Flink CDC 3.5,完整实现 MongoDB 到 ClickHouse 的实时同步。代码拿走即用,坑我已经帮你踩过了。
前置依赖:
- Flink 环境:Flink 1.20 基于 Docker 单机部署实战指南
- ClickHouse 环境:ClickHouse 25.4 基于 Docker 单机部署实战指南
一、环境准备
1.1 MongoDB 配置
核心前提:MongoDB 必须开启副本集(最低版本要求 3.6+)。没有副本集就没有 opLog,没有 opLog 就没有 Change Stream,没有 Change Stream 就没有实时同步——这是一条不可逾越的因果链。
有现成的 MongoDB 副本集?直接跳到 1.2 节。没有?往下看,我们用 Docker 快速搭一个。
1)创建目录和配置文件
mkdir -p /data/mongodb/data /data/mongodb/conf
编辑 /data/mongodb/conf/mongod.conf:
# 副本集配置(划重点:这是 CDC 的命根子)
replication:
replSetName: rs0
storage:
engine: wiredTiger
wiredTiger:
engineConfig:
cacheSizeGB: 4
dbPath: /data/db
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /data/db/mongod.log
net:
compression:
compressors: zlib
bindIp: 0.0.0.0
port: 27017
processManagement:
fork: false
security:
authorization: enabled
keyFile: /data/db/mongo-keyfile
operationProfiling:
slowOpThresholdMs: 100
mode: slowOp
2)生成副本集认证密钥
# 副本集开启认证后需要 keyFile,生成一个即可
openssl rand -base64 756 > /data/mongodb/data/mongo-keyfile
chmod 400 /data/mongodb/data/mongo-keyfile
sudo chown 999:999 /data/mongodb/data/mongo-keyfile
3)启动 MongoDB
docker run -d \
--restart=always \
-e TIME_ZONE='Asia/Shanghai' \
--name=mongodb \
-v /data/mongodb/data:/data/db \
-v /data/mongodb/conf/mongod.conf:/etc/mongod.conf \
-p 27017:27017 \
mongo:4.4 \
mongod --config /etc/mongod.conf
4)创建用户并初始化副本集
# 创建管理员用户
docker exec -it mongodb mongo --eval '
db.getSiblingDB("admin").createUser({
user: "mongoadmin",
pwd: "YourSecurePassword",
roles: [{ role: "root", db: "admin" }]
})
'
# 初始化副本集
docker exec -it mongodb mongo -u mongoadmin -p YourSecurePassword \
--authenticationDatabase admin --eval "rs.initiate()"
5)配置外部访问地址
这一步很关键!Docker 容器内的 hostname 默认是容器 ID(类似
083d051b2011),外部客户端解析不了,必须重新配置。踩过这个坑的人都懂那种"明明连上了又连不上"的绝望。
docker exec -it mongodb mongo -u mongoadmin -p YourSecurePassword \
--authenticationDatabase admin --eval '
var cfg = rs.conf();
cfg.members[0].host = "mongo.example.com:27017";
rs.reconfig(cfg);
'
1.2 MongoDB 源集合
准备一个测试集合(以访问日志为例):
db.createCollection("access_logs", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["_id", "c_d", "channel", "device_id", "url", "version"],
properties: {
_id: { bsonType: "objectId" },
user_id: { bsonType: "string" },
c_d: { bsonType: "date" },
channel: { bsonType: "string" },
device_id: { bsonType: "string" },
ip: { bsonType: "string" },
remark: { bsonType: "string" },
trace_id: { bsonType: "string" },
url: { bsonType: "string" },
version: { bsonType: "string" }
}
}
}
});
// 创建索引
db.access_logs.createIndex({ user_id: 1 }, { name: "idx_user_id" });
db.access_logs.createIndex({ c_d: 1 }, { name: "idx_c_d", expireAfterSeconds: 1209600 });
插入一条测试数据:
db.access_logs.insertOne({
user_id: "U100001",
device_id: "DEVICE_TEST_001",
trace_id: "trace-001",
channel: "test_channel",
version: "1.0.0",
ip: "127.0.0.1",
url: "/api/test",
remark: "测试数据",
c_d: new Date()
});
1.3 ClickHouse 目标表
在 ClickHouse 创建 ODS 层目标表,使用 ReplacingMergeTree 引擎实现幂等写入:
CREATE TABLE IF NOT EXISTS dw.ods_mongo_access_logs
(
`_id` String COMMENT 'MongoDB 文档 ID',
`user_id` Nullable(String) COMMENT '用户 ID',
`device_id` String COMMENT '设备 ID',
`trace_id` Nullable(String) COMMENT '请求 Trace ID',
`channel` String COMMENT '渠道',
`version` String COMMENT '版本',
`ip` Nullable(String) COMMENT '请求 IP',
`url` String COMMENT '请求 URL',
`remark` String COMMENT '备注',
`create_date` Nullable(DateTime('Asia/Shanghai')) COMMENT '创建时间',
`sync_time` DateTime DEFAULT now() COMMENT '数据同步时间'
)
ENGINE = ReplacingMergeTree(sync_time)
ORDER BY (device_id, create_date, _id)
PARTITION BY toYYYYMMDD(create_date)
SETTINGS allow_nullable_key = 1
COMMENT 'ODS层 - 访问日志(MongoDB 同步)';
为什么用 ReplacingMergeTree? CDC 同步会在全量快照和增量阶段对同一条数据多次 INSERT。
ReplacingMergeTree按ORDER BY键去重,保留sync_time最新的那条,天然适配 CDC 场景。不用它的话,你的数据就会像复读机一样越来越多。
二、Flink 应用开发
2.1 Maven 依赖
创建 Maven 项目,核心依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-mongo-to-ck</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.1</flink.version>
</properties>
<dependencies>
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC:MongoDB 数据捕获 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.5.0</version>
</dependency>
<!-- ClickHouse JDBC 驱动 -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.8.5</version>
<classifier>all</classifier>
</dependency>
<!-- Flink JDBC Connector:写入 ClickHouse -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<!-- JSON 解析(CDC 传递依赖已包含,显式声明防止版本冲突) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<!-- Shade 插件:打 Fat Jar(千万别用 assembly,SPI 会丢) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.MongoToClickHouseSync</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- 本地开发:包含 Flink 运行时 -->
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
</dependencies>
</profile>
<!-- 生产打包:Flink 运行时由集群提供 -->
<profile>
<id>prod</id>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
依赖说明:
| 依赖 | 版本 | 作用 |
|---|---|---|
flink-connector-mongodb-cdc |
3.5.0 | 监听 MongoDB Change Stream |
clickhouse-jdbc |
0.8.5 | ClickHouse JDBC 驱动 |
flink-connector-jdbc |
3.3.0-1.20 | Flink JDBC Sink |
jackson-databind |
2.15.3 | JSON 解析(解析 Change Stream 事件) |
Profile 说明:
dev用于本地 IDE 调试(包含 Flink 运行时),prod用于提交到集群(运行时由集群提供,设为provided减小包体积)。一个 jar 包从 200MB 瘦身到 50MB,部署时的幸福感你懂的。
2.2 同步代码
整体数据流如下:
核心思路:MongoDB CDC Source → JSON 解析(FlatMap)→ JDBC Sink 批量写入 ClickHouse。
package com.example;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* MongoDB 单集合实时同步到 ClickHouse
* <p>
* 提交命令:
* flink run -c com.example.MongoToClickHouseSync flink-mongo-to-ck.jar
*/
public class MongoToClickHouseSync {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // CDC 源建议并行度为 1
// Checkpoint 由集群统一配置(如 execution.checkpointing.interval: 30000),代码中不重复设置
// ============================
// 1. MongoDB CDC Source
// ============================
MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
.hosts("mongo.example.com:27017")
.username("flink_user")
.password("YourSecurePassword")
.connectionOptions("authSource=admin")
.databaseList("app_data")
.collectionList("app_data.access_logs")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// ============================
// 2. 定义 Row 类型(避免 Kryo 序列化在 JDK17 下翻车)
// ============================
RowTypeInfo rowType = new RowTypeInfo(
Types.STRING, Types.STRING, Types.STRING, Types.STRING, // _id, user_id, device_id, trace_id
Types.STRING, Types.STRING, Types.STRING, Types.STRING, // channel, version, ip, url
Types.STRING, // remark
Types.SQL_TIMESTAMP // create_date
);
// ============================
// 3. Source → FlatMap 解析 → JDBC Sink
// ============================
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB CDC Source")
.flatMap(new MongoDocParser()).returns(rowType)
.addSink(org.apache.flink.connector.jdbc.JdbcSink.sink(
"INSERT INTO ods_mongo_access_logs " +
"(_id, user_id, device_id, trace_id, channel, version, ip, url, remark, " +
"create_date) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(JdbcStatementBuilder<Row>) (ps, row) -> {
ps.setString(1, (String) row.getField(0)); // _id
setNullableString(ps, 2, row.getField(1)); // user_id
ps.setString(3, (String) row.getField(2)); // device_id
setNullableString(ps, 4, row.getField(3)); // trace_id
ps.setString(5, (String) row.getField(4)); // channel
ps.setString(6, (String) row.getField(5)); // version
setNullableString(ps, 7, row.getField(6)); // ip
ps.setString(8, (String) row.getField(7)); // url
ps.setString(9, (String) row.getField(8)); // remark
setNullableTimestamp(ps, 10, row.getField(9)); // create_date
},
JdbcExecutionOptions.builder()
.withBatchSize(2000) // 每批 2000 条
.withBatchIntervalMs(5000) // 最长 5 秒刷一次
.withMaxRetries(3) // 失败重试 3 次
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://ck.example.com:8123/dw")
.withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
.withUsername("ck_user")
.withPassword("CkPassword")
.build()
)).name("ClickHouse JDBC Sink - ods_mongo_access_logs");
env.execute("MongoDB to ClickHouse Sync - access_logs");
}
// ==================== 工具方法 ====================
private static void setNullableString(java.sql.PreparedStatement ps, int index, Object value)
throws java.sql.SQLException {
if (value != null) {
ps.setString(index, (String) value);
} else {
ps.setNull(index, java.sql.Types.VARCHAR);
}
}
private static void setNullableTimestamp(java.sql.PreparedStatement ps, int index, Object value)
throws java.sql.SQLException {
if (value != null) {
ps.setTimestamp(index, (Timestamp) value);
} else {
ps.setNull(index, java.sql.Types.TIMESTAMP);
}
}
// ==================== MongoDB 文档解析器 ====================
/**
* 解析 MongoDB Change Stream JSON,提取字段输出 Row
* <p>
* Row 字段顺序:_id, user_id, device_id, trace_id, channel, version,
* ip, url, remark, create_date
*/
public static class MongoDocParser implements FlatMapFunction<String, Row> {
private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
private transient ObjectMapper objectMapper;
@Override
public void flatMap(String value, Collector<Row> out) throws Exception {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
JsonNode root = objectMapper.readTree(value);
String operationType = root.path("operationType").asText();
// 只处理 insert / update / replace,delete 事件直接跳过
if (!"insert".equals(operationType)
&& !"update".equals(operationType)
&& !"replace".equals(operationType)) {
return;
}
JsonNode fullDocNode = root.path("fullDocument");
if (fullDocNode.isMissingNode()) {
return;
}
// fullDocument 可能是嵌套 JSON 字符串,也可能是直接的对象
String docStr = fullDocNode.isTextual() ? fullDocNode.asText() : fullDocNode.toString();
JsonNode doc = objectMapper.readTree(docStr);
Row row = Row.of(
getOid(doc, "_id"), // 0: _id
getString(doc, "user_id"), // 1: user_id
getStringOrDefault(doc, "device_id", ""), // 2: device_id
getString(doc, "trace_id"), // 3: trace_id
getStringOrDefault(doc, "channel", ""), // 4: channel
getStringOrDefault(doc, "version", ""), // 5: version
getString(doc, "ip"), // 6: ip
getStringOrDefault(doc, "url", ""), // 7: url
getStringOrDefault(doc, "remark", ""), // 8: remark
getDate(doc, "c_d") // 9: create_date
);
out.collect(row);
}
/** 提取 ObjectId:{"$oid": "xxx"} → "xxx" */
private String getOid(JsonNode doc, String field) {
JsonNode node = doc.path(field);
if (node.isObject() && node.has("$oid")) {
return node.get("$oid").asText();
}
return node.asText("");
}
/** 提取字符串,缺失返回 null */
private String getString(JsonNode doc, String field) {
JsonNode node = doc.path(field);
return node.isMissingNode() || node.isNull() ? null : node.asText();
}
/** 提取字符串,缺失返回默认值 */
private String getStringOrDefault(JsonNode doc, String field, String defaultValue) {
JsonNode node = doc.path(field);
return node.isMissingNode() || node.isNull() ? defaultValue : node.asText();
}
/**
* 提取 MongoDB Date 类型
* 支持三种格式:
* {"$date": 1234567890} — 毫秒时间戳
* {"$date": {"$numberLong": "xxx"}} — Extended JSON
* {"$date": "2026-01-01T00:00:00Z"} — ISO 字符串
*/
private Timestamp getDate(JsonNode doc, String field) {
JsonNode node = doc.path(field);
if (node.isMissingNode() || node.isNull()) {
return null;
}
if (node.isObject() && node.has("$date")) {
JsonNode dateNode = node.get("$date");
long millis;
if (dateNode.isNumber()) {
millis = dateNode.asLong();
} else if (dateNode.isObject() && dateNode.has("$numberLong")) {
millis = Long.parseLong(dateNode.get("$numberLong").asText());
} else {
millis = Instant.parse(dateNode.asText()).toEpochMilli();
}
ZonedDateTime zdt = Instant.ofEpochMilli(millis).atZone(ZONE_ID);
return Timestamp.valueOf(zdt.toLocalDateTime());
}
return null;
}
}
}
核心参数说明:
| 参数 | 值 | 说明 |
|---|---|---|
hosts |
MongoDB 地址 | 副本集地址,支持多节点逗号分隔 |
connectionOptions |
authSource=admin |
认证库,通常为 admin |
deserializer |
JsonDebeziumDeserializationSchema |
将 Change Stream 事件序列化为 JSON |
withBatchSize |
2000 | 批量写入大小,提升 CK 写入性能 |
withBatchIntervalMs |
5000 | 批次间隔,平衡延迟与吞吐 |
关于 Nullable 字段处理:MongoDB 是 Schema-Free 的,字段可能随时缺失。代码中通过
setNullableString/setNullableTimestamp工具方法统一处理,避免 NPE 把任务搞崩——毕竟半夜被告警叫醒修 NPE,是每个程序员的噩梦。
三、打包部署
3.1 打包
# 生产环境打包(排除 Flink 运行时,瘦身效果显著)
mvn clean package -P prod -DskipTests
# 生成的 jar 包在 target 目录下
ls target/*.jar
3.2 提交任务
通过 Flink Web UI 提交:
- 打开
http://<Flink-IP>:8081 - 点击 Submit New Job
- 上传打好的 jar 包
- 填写 Entry Class:
com.example.MongoToClickHouseSync(pom 中配置了 mainClass 可省略) - 点击 Submit

3.3 验证运行
进入 Jobs → Running Jobs,看到任务状态为 RUNNING 就说明起来了:

在 MongoDB 插入或更新一条数据,等几秒后在 ClickHouse 查询验证:
SELECT * FROM dw.ods_mongo_access_logs ORDER BY sync_time DESC LIMIT 10;
看到数据出现了?恭喜,你的实时管道已经通了 🎉
四、踩坑指南
这些坑我都亲自踩过,分享出来希望你能少加几次班。
4.1 The $changeStream stage is only supported on replica sets
报错日志:
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573):
'The $changeStream stage is only supported on replica sets'
on server mongo.example.com:27017.
原因:MongoDB 没有开启副本集。Flink CDC 的 MongoDB Connector 底层依赖 Change Stream,而 Change Stream 只在副本集或分片集群上可用。
解决:回到 1.1 节,开启副本集并初始化。
4.2 MongoDB 启动失败
如果 Docker 容器启动后立刻退出,大概率是 keyFile 路径不对或权限不正确。
排查方式:
docker logs mongodb
确认 keyFile 权限为 400,且 owner 为 999:999(MongoDB 容器内的用户 ID)。
4.3 开启副本集后,客户端连接不上
报错信息:
getaddrinfo ENOTFOUND 083d051b2011
原因:副本集的 host 默认是容器内部 hostname(一串随机的容器 ID),外部客户端无法解析。
解决:重新配置副本集的外部访问地址(见 1.1 节第 5 步)。
五、总结
本文完整实现了 MongoDB → ClickHouse 的实时同步链路,核心技术栈:
| 组件 | 版本 | 作用 |
|---|---|---|
| Flink | 1.20.1 | 流处理引擎 |
| Flink CDC | 3.5.0 | MongoDB Change Stream 捕获 |
| ClickHouse | 25.4 | OLAP 分析存储 |
| ReplacingMergeTree | - | 自动去重引擎 |
核心要点回顾:
- ✅ MongoDB 必须开启副本集,否则 Change Stream 不可用
- ✅ 使用 JDBC Sink 批量写入(BatchSize=2000),大幅提升 CK 写入性能
- ✅
ReplacingMergeTree配合sync_time字段实现幂等更新,不怕重复数据 - ✅ Checkpoint 保证 Exactly-Once 语义,任务重启也不丢数据
这套方案已经过生产验证,日同步千万级数据稳定运行。如果你也有 MongoDB 到 ClickHouse 的同步需求,直接 fork 代码改改配置就能用。
系列文章:如果你的数据源是 MySQL,可以看姊妹篇 👉 Flink 1.20 实战:MySQL 实时同步到 ClickHouse
如果这篇文章帮你少踩了一个坑,欢迎点赞 👍 收藏 ⭐,你的支持是我继续踩坑写文章的动力。有问题欢迎评论区交流,看到必回。
更多推荐
所有评论(0)