你还在写定时脚本做 T+1 同步?都 2026 年了,该让数据自己"流"过去了。本文基于 Flink 1.20 + Flink CDC 3.5,手把手教你实现 MongoDB → ClickHouse 的实时同步,附完整可运行代码和踩坑指南。

前言

如果你正在做数据仓库,八成遇到过这个灵魂拷问:

“这个 MongoDB 的数据,能不能实时同步到 ClickHouse?”

传统方案无非两种:

  • 定时脚本轮询:写个 Python/Shell 定时跑,简单粗暴,但延迟高、漏数据是家常便饭
  • 消息队列中转:业务代码双写 Kafka,改动大,还得求着业务方配合(大概率被拒绝)

Flink CDC 的思路完全不同——直接监听 MongoDB 的 Change Stream,增量捕获、实时推送、断点续传,一套代码搞定:

  • 基于 Change Stream 增量捕获,对源库几乎零压力
  • 支持全量快照 + 增量无缝衔接,不丢数据
  • Checkpoint 机制保证 Exactly-Once 语义

本文基于 Flink 1.20 + Flink CDC 3.5,完整实现 MongoDB 到 ClickHouse 的实时同步。代码拿走即用,坑我已经帮你踩过了。

前置依赖


一、环境准备

1.1 MongoDB 配置

核心前提:MongoDB 必须开启副本集(最低版本要求 3.6+)。没有副本集就没有 opLog,没有 opLog 就没有 Change Stream,没有 Change Stream 就没有实时同步——这是一条不可逾越的因果链。

有现成的 MongoDB 副本集?直接跳到 1.2 节。没有?往下看,我们用 Docker 快速搭一个。

1)创建目录和配置文件

mkdir -p /data/mongodb/data /data/mongodb/conf

编辑 /data/mongodb/conf/mongod.conf

# 副本集配置(划重点:这是 CDC 的命根子)
replication:
  replSetName: rs0

storage:
  engine: wiredTiger
  wiredTiger:
    engineConfig:
      cacheSizeGB: 4
  dbPath: /data/db
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /data/db/mongod.log

net:
  compression:
    compressors: zlib
  bindIp: 0.0.0.0
  port: 27017

processManagement:
  fork: false

security:
  authorization: enabled
  keyFile: /data/db/mongo-keyfile

operationProfiling:
  slowOpThresholdMs: 100
  mode: slowOp

2)生成副本集认证密钥

# 副本集开启认证后需要 keyFile,生成一个即可
openssl rand -base64 756 > /data/mongodb/data/mongo-keyfile
chmod 400 /data/mongodb/data/mongo-keyfile
sudo chown 999:999 /data/mongodb/data/mongo-keyfile

3)启动 MongoDB

docker run -d \
  --restart=always \
  -e TIME_ZONE='Asia/Shanghai' \
  --name=mongodb \
  -v /data/mongodb/data:/data/db \
  -v /data/mongodb/conf/mongod.conf:/etc/mongod.conf \
  -p 27017:27017 \
  mongo:4.4 \
  mongod --config /etc/mongod.conf

4)创建用户并初始化副本集

# 创建管理员用户
docker exec -it mongodb mongo --eval '
db.getSiblingDB("admin").createUser({
 user: "mongoadmin",
 pwd: "YourSecurePassword",
 roles: [{ role: "root", db: "admin" }]
})
'

# 初始化副本集
docker exec -it mongodb mongo -u mongoadmin -p YourSecurePassword \
  --authenticationDatabase admin --eval "rs.initiate()"

5)配置外部访问地址

这一步很关键!Docker 容器内的 hostname 默认是容器 ID(类似 083d051b2011),外部客户端解析不了,必须重新配置。踩过这个坑的人都懂那种"明明连上了又连不上"的绝望。

docker exec -it mongodb mongo -u mongoadmin -p YourSecurePassword \
  --authenticationDatabase admin --eval '
var cfg = rs.conf();
cfg.members[0].host = "mongo.example.com:27017";
rs.reconfig(cfg);
'

1.2 MongoDB 源集合

准备一个测试集合(以访问日志为例):

db.createCollection("access_logs", {
 validator: {
   $jsonSchema: {
     bsonType: "object",
     required: ["_id", "c_d", "channel", "device_id", "url", "version"],
     properties: {
       _id: { bsonType: "objectId" },
       user_id: { bsonType: "string" },
       c_d: { bsonType: "date" },
       channel: { bsonType: "string" },
       device_id: { bsonType: "string" },
       ip: { bsonType: "string" },
       remark: { bsonType: "string" },
       trace_id: { bsonType: "string" },
       url: { bsonType: "string" },
       version: { bsonType: "string" }
     }
   }
 }
});

// 创建索引
db.access_logs.createIndex({ user_id: 1 }, { name: "idx_user_id" });
db.access_logs.createIndex({ c_d: 1 }, { name: "idx_c_d", expireAfterSeconds: 1209600 });

插入一条测试数据:

db.access_logs.insertOne({
 user_id: "U100001",
 device_id: "DEVICE_TEST_001",
 trace_id: "trace-001",
 channel: "test_channel",
 version: "1.0.0",
 ip: "127.0.0.1",
 url: "/api/test",
 remark: "测试数据",
 c_d: new Date()
});

1.3 ClickHouse 目标表

在 ClickHouse 创建 ODS 层目标表,使用 ReplacingMergeTree 引擎实现幂等写入:

CREATE TABLE IF NOT EXISTS dw.ods_mongo_access_logs
(
    `_id` String COMMENT 'MongoDB 文档 ID',
    `user_id` Nullable(String) COMMENT '用户 ID',
    `device_id` String COMMENT '设备 ID',
    `trace_id` Nullable(String) COMMENT '请求 Trace ID',
    `channel` String COMMENT '渠道',
    `version` String COMMENT '版本',
    `ip` Nullable(String) COMMENT '请求 IP',
    `url` String COMMENT '请求 URL',
    `remark` String COMMENT '备注',
    `create_date` Nullable(DateTime('Asia/Shanghai')) COMMENT '创建时间',

    `sync_time` DateTime DEFAULT now() COMMENT '数据同步时间'
)
ENGINE = ReplacingMergeTree(sync_time)
ORDER BY (device_id, create_date, _id)
PARTITION BY toYYYYMMDD(create_date)
SETTINGS allow_nullable_key = 1
COMMENT 'ODS层 - 访问日志(MongoDB 同步)';

为什么用 ReplacingMergeTree? CDC 同步会在全量快照和增量阶段对同一条数据多次 INSERT。ReplacingMergeTreeORDER BY 键去重,保留 sync_time 最新的那条,天然适配 CDC 场景。不用它的话,你的数据就会像复读机一样越来越多。


二、Flink 应用开发

2.1 Maven 依赖

创建 Maven 项目,核心依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-mongo-to-ck</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.20.1</flink.version>
    </properties>

    <dependencies>
        <!-- Flink 核心 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink CDC:MongoDB 数据捕获 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>3.5.0</version>
        </dependency>

        <!-- ClickHouse JDBC 驱动 -->
        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.8.5</version>
            <classifier>all</classifier>
        </dependency>

        <!-- Flink JDBC Connector:写入 ClickHouse -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.3.0-1.20</version>
        </dependency>

        <!-- JSON 解析(CDC 传递依赖已包含,显式声明防止版本冲突) -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                </configuration>
            </plugin>
            <!-- Shade 插件:打 Fat Jar(千万别用 assembly,SPI 会丢) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.MongoToClickHouseSync</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <!-- 本地开发:包含 Flink 运行时 -->
        <profile>
            <id>dev</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-planner_2.12</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-runtime</artifactId>
                    <version>${flink.version}</version>
                </dependency>
                <!-- 日志 -->
                <dependency>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                    <version>2.17.2</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-core</artifactId>
                    <version>2.17.2</version>
                </dependency>
            </dependencies>
        </profile>

        <!-- 生产打包:Flink 运行时由集群提供 -->
        <profile>
            <id>prod</id>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-planner_2.12</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-runtime</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>
</project>

依赖说明

依赖 版本 作用
flink-connector-mongodb-cdc 3.5.0 监听 MongoDB Change Stream
clickhouse-jdbc 0.8.5 ClickHouse JDBC 驱动
flink-connector-jdbc 3.3.0-1.20 Flink JDBC Sink
jackson-databind 2.15.3 JSON 解析(解析 Change Stream 事件)

Profile 说明dev 用于本地 IDE 调试(包含 Flink 运行时),prod 用于提交到集群(运行时由集群提供,设为 provided 减小包体积)。一个 jar 包从 200MB 瘦身到 50MB,部署时的幸福感你懂的。

2.2 同步代码

整体数据流如下:

Change Stream
全量 + 增量

JDBC Batch
2000条/批

MongoDB
源集合

Flink CDC
FlatMap 解析

ClickHouse
ODS 层

核心思路:MongoDB CDC Source → JSON 解析(FlatMap)→ JDBC Sink 批量写入 ClickHouse。

package com.example;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;

/**
 * MongoDB 单集合实时同步到 ClickHouse
 * <p>
 * 提交命令:
 * flink run -c com.example.MongoToClickHouseSync flink-mongo-to-ck.jar
 */
public class MongoToClickHouseSync {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // CDC 源建议并行度为 1
        // Checkpoint 由集群统一配置(如 execution.checkpointing.interval: 30000),代码中不重复设置

        // ============================
        // 1. MongoDB CDC Source
        // ============================
        MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
                .hosts("mongo.example.com:27017")
                .username("flink_user")
                .password("YourSecurePassword")
                .connectionOptions("authSource=admin")
                .databaseList("app_data")
                .collectionList("app_data.access_logs")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        // ============================
        // 2. 定义 Row 类型(避免 Kryo 序列化在 JDK17 下翻车)
        // ============================
        RowTypeInfo rowType = new RowTypeInfo(
                Types.STRING, Types.STRING, Types.STRING, Types.STRING,  // _id, user_id, device_id, trace_id
                Types.STRING, Types.STRING, Types.STRING, Types.STRING,  // channel, version, ip, url
                Types.STRING,                                            // remark
                Types.SQL_TIMESTAMP                                      // create_date
        );

        // ============================
        // 3. Source → FlatMap 解析 → JDBC Sink
        // ============================
        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB CDC Source")
                .flatMap(new MongoDocParser()).returns(rowType)
                .addSink(org.apache.flink.connector.jdbc.JdbcSink.sink(
                        "INSERT INTO ods_mongo_access_logs " +
                                "(_id, user_id, device_id, trace_id, channel, version, ip, url, remark, " +
                                "create_date) " +
                                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                        (JdbcStatementBuilder<Row>) (ps, row) -> {
                            ps.setString(1, (String) row.getField(0));   // _id
                            setNullableString(ps, 2, row.getField(1));   // user_id
                            ps.setString(3, (String) row.getField(2));   // device_id
                            setNullableString(ps, 4, row.getField(3));   // trace_id
                            ps.setString(5, (String) row.getField(4));   // channel
                            ps.setString(6, (String) row.getField(5));   // version
                            setNullableString(ps, 7, row.getField(6));   // ip
                            ps.setString(8, (String) row.getField(7));   // url
                            ps.setString(9, (String) row.getField(8));      // remark
                            setNullableTimestamp(ps, 10, row.getField(9)); // create_date
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(2000)        // 每批 2000 条
                                .withBatchIntervalMs(5000)  // 最长 5 秒刷一次
                                .withMaxRetries(3)          // 失败重试 3 次
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:clickhouse://ck.example.com:8123/dw")
                                .withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
                                .withUsername("ck_user")
                                .withPassword("CkPassword")
                                .build()
                )).name("ClickHouse JDBC Sink - ods_mongo_access_logs");

        env.execute("MongoDB to ClickHouse Sync - access_logs");
    }

    // ==================== 工具方法 ====================

    private static void setNullableString(java.sql.PreparedStatement ps, int index, Object value)
            throws java.sql.SQLException {
        if (value != null) {
            ps.setString(index, (String) value);
        } else {
            ps.setNull(index, java.sql.Types.VARCHAR);
        }
    }

    private static void setNullableTimestamp(java.sql.PreparedStatement ps, int index, Object value)
            throws java.sql.SQLException {
        if (value != null) {
            ps.setTimestamp(index, (Timestamp) value);
        } else {
            ps.setNull(index, java.sql.Types.TIMESTAMP);
        }
    }

    // ==================== MongoDB 文档解析器 ====================

    /**
     * 解析 MongoDB Change Stream JSON,提取字段输出 Row
     * <p>
     * Row 字段顺序:_id, user_id, device_id, trace_id, channel, version,
     * ip, url, remark, create_date
     */
    public static class MongoDocParser implements FlatMapFunction<String, Row> {
        private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
        private transient ObjectMapper objectMapper;

        @Override
        public void flatMap(String value, Collector<Row> out) throws Exception {
            if (objectMapper == null) {
                objectMapper = new ObjectMapper();
            }

            JsonNode root = objectMapper.readTree(value);
            String operationType = root.path("operationType").asText();

            // 只处理 insert / update / replace,delete 事件直接跳过
            if (!"insert".equals(operationType)
                    && !"update".equals(operationType)
                    && !"replace".equals(operationType)) {
                return;
            }

            JsonNode fullDocNode = root.path("fullDocument");
            if (fullDocNode.isMissingNode()) {
                return;
            }

            // fullDocument 可能是嵌套 JSON 字符串,也可能是直接的对象
            String docStr = fullDocNode.isTextual() ? fullDocNode.asText() : fullDocNode.toString();
            JsonNode doc = objectMapper.readTree(docStr);

            Row row = Row.of(
                    getOid(doc, "_id"),                            // 0: _id
                    getString(doc, "user_id"),                     // 1: user_id
                    getStringOrDefault(doc, "device_id", ""),     // 2: device_id
                    getString(doc, "trace_id"),                    // 3: trace_id
                    getStringOrDefault(doc, "channel", ""),       // 4: channel
                    getStringOrDefault(doc, "version", ""),       // 5: version
                    getString(doc, "ip"),                          // 6: ip
                    getStringOrDefault(doc, "url", ""),           // 7: url
                    getStringOrDefault(doc, "remark", ""),        // 8: remark
                    getDate(doc, "c_d")                           // 9: create_date
            );
            out.collect(row);
        }

        /** 提取 ObjectId:{"$oid": "xxx"} → "xxx" */
        private String getOid(JsonNode doc, String field) {
            JsonNode node = doc.path(field);
            if (node.isObject() && node.has("$oid")) {
                return node.get("$oid").asText();
            }
            return node.asText("");
        }

        /** 提取字符串,缺失返回 null */
        private String getString(JsonNode doc, String field) {
            JsonNode node = doc.path(field);
            return node.isMissingNode() || node.isNull() ? null : node.asText();
        }

        /** 提取字符串,缺失返回默认值 */
        private String getStringOrDefault(JsonNode doc, String field, String defaultValue) {
            JsonNode node = doc.path(field);
            return node.isMissingNode() || node.isNull() ? defaultValue : node.asText();
        }

        /**
         * 提取 MongoDB Date 类型
         * 支持三种格式:
         *   {"$date": 1234567890}          — 毫秒时间戳
         *   {"$date": {"$numberLong": "xxx"}} — Extended JSON
         *   {"$date": "2026-01-01T00:00:00Z"}  — ISO 字符串
         */
        private Timestamp getDate(JsonNode doc, String field) {
            JsonNode node = doc.path(field);
            if (node.isMissingNode() || node.isNull()) {
                return null;
            }
            if (node.isObject() && node.has("$date")) {
                JsonNode dateNode = node.get("$date");
                long millis;
                if (dateNode.isNumber()) {
                    millis = dateNode.asLong();
                } else if (dateNode.isObject() && dateNode.has("$numberLong")) {
                    millis = Long.parseLong(dateNode.get("$numberLong").asText());
                } else {
                    millis = Instant.parse(dateNode.asText()).toEpochMilli();
                }
                ZonedDateTime zdt = Instant.ofEpochMilli(millis).atZone(ZONE_ID);
                return Timestamp.valueOf(zdt.toLocalDateTime());
            }
            return null;
        }
    }
}

核心参数说明

参数 说明
hosts MongoDB 地址 副本集地址,支持多节点逗号分隔
connectionOptions authSource=admin 认证库,通常为 admin
deserializer JsonDebeziumDeserializationSchema 将 Change Stream 事件序列化为 JSON
withBatchSize 2000 批量写入大小,提升 CK 写入性能
withBatchIntervalMs 5000 批次间隔,平衡延迟与吞吐

关于 Nullable 字段处理:MongoDB 是 Schema-Free 的,字段可能随时缺失。代码中通过 setNullableString / setNullableTimestamp 工具方法统一处理,避免 NPE 把任务搞崩——毕竟半夜被告警叫醒修 NPE,是每个程序员的噩梦。


三、打包部署

3.1 打包

# 生产环境打包(排除 Flink 运行时,瘦身效果显著)
mvn clean package -P prod -DskipTests

# 生成的 jar 包在 target 目录下
ls target/*.jar

3.2 提交任务

通过 Flink Web UI 提交:

  1. 打开 http://<Flink-IP>:8081
  2. 点击 Submit New Job
  3. 上传打好的 jar 包
  4. 填写 Entry Class:com.example.MongoToClickHouseSync(pom 中配置了 mainClass 可省略)
  5. 点击 Submit

在这里插入图片描述

3.3 验证运行

进入 Jobs → Running Jobs,看到任务状态为 RUNNING 就说明起来了:

在这里插入图片描述

在 MongoDB 插入或更新一条数据,等几秒后在 ClickHouse 查询验证:

SELECT * FROM dw.ods_mongo_access_logs ORDER BY sync_time DESC LIMIT 10;

看到数据出现了?恭喜,你的实时管道已经通了 🎉


四、踩坑指南

这些坑我都亲自踩过,分享出来希望你能少加几次班。

4.1 The $changeStream stage is only supported on replica sets

报错日志

com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573):
'The $changeStream stage is only supported on replica sets'
on server mongo.example.com:27017.

原因:MongoDB 没有开启副本集。Flink CDC 的 MongoDB Connector 底层依赖 Change Stream,而 Change Stream 只在副本集或分片集群上可用。

解决:回到 1.1 节,开启副本集并初始化。

4.2 MongoDB 启动失败

如果 Docker 容器启动后立刻退出,大概率是 keyFile 路径不对或权限不正确

排查方式:

docker logs mongodb

确认 keyFile 权限为 400,且 owner 为 999:999(MongoDB 容器内的用户 ID)。

4.3 开启副本集后,客户端连接不上

报错信息

getaddrinfo ENOTFOUND 083d051b2011

原因:副本集的 host 默认是容器内部 hostname(一串随机的容器 ID),外部客户端无法解析。

解决:重新配置副本集的外部访问地址(见 1.1 节第 5 步)。


五、总结

本文完整实现了 MongoDB → ClickHouse 的实时同步链路,核心技术栈:

组件 版本 作用
Flink 1.20.1 流处理引擎
Flink CDC 3.5.0 MongoDB Change Stream 捕获
ClickHouse 25.4 OLAP 分析存储
ReplacingMergeTree - 自动去重引擎

核心要点回顾

  1. ✅ MongoDB 必须开启副本集,否则 Change Stream 不可用
  2. ✅ 使用 JDBC Sink 批量写入(BatchSize=2000),大幅提升 CK 写入性能
  3. ReplacingMergeTree 配合 sync_time 字段实现幂等更新,不怕重复数据
  4. ✅ Checkpoint 保证 Exactly-Once 语义,任务重启也不丢数据

这套方案已经过生产验证,日同步千万级数据稳定运行。如果你也有 MongoDB 到 ClickHouse 的同步需求,直接 fork 代码改改配置就能用。


系列文章:如果你的数据源是 MySQL,可以看姊妹篇 👉 Flink 1.20 实战:MySQL 实时同步到 ClickHouse


如果这篇文章帮你少踩了一个坑,欢迎点赞 👍 收藏 ⭐,你的支持是我继续踩坑写文章的动力。有问题欢迎评论区交流,看到必回。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐