简单说flink&kafka

Kafka 是高吞吐、高可靠的消息队列,负责承接上游所有动态数据(用户行为、业务日志、设备采集、数据库变更);
Flink 是流批一体的计算引擎,负责对 Kafka 里的 “流动数据” 做实时处理。
image.png
下面是企业真实开发中最常用的 3 类代码模板,覆盖 “消费→处理→输出” 全链路:
前置依赖(需补充 Kafka 连接器)

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>
  1. 基础:消费 Kafka 数据做实时统计(你理解的 “统计功能”)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class FlinkKafkaCountDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 配置Kafka连接参数
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "flink-kafka-group");

        // 2. 从Kafka消费数据(topic:user_behavior)
        DataStream<String> kafkaStream = env.addSource(
                new FlinkKafkaConsumer<>("user_behavior", new SimpleStringSchema(), kafkaProps)
        );

        // 3. 实时统计:按用户行为类型(点击/加购/下单)计数
        kafkaStream
                .map(line -> line.split(",")[1]) // 假设数据格式:userID,behavior,time
                .keyBy(behavior -> behavior)
                .countWindow(60) // 60秒窗口
                .sum(0)
                .print("实时行为统计:");

        env.execute("Flink-Kafka Count Job");
    }
}
  1. 进阶:消费 Kafka 做实时 ETL(清洗后写入另一 Topic)

// 承接上面的kafkaStream
DataStream<String> cleanStream = kafkaStream
        // 过滤脏数据(非JSON/空值)
        .filter(line -> line != null && line.startsWith("{"))
        // 转换格式(JSON→CSV)
        .map(line -> {
            JSONObject json = JSON.parseObject(line);
            return json.getString("userID") + "," + json.getString("orderID") + "," + json.getString("amount");
        });

// 将清洗后的数据写入Kafka的clean_order_topic
cleanStream.addSink(
        new FlinkKafkaProducer<>(
                "localhost:9092",
                "clean_order_topic",
                new SimpleStringSchema()
        )
);
  1. 高阶:消费 Kafka 做 CEP 复杂事件检测

// 承接kafkaStream,解析为用户行为实体
DataStream<UserBehavior> behaviorStream = kafkaStream
        .map(line -> {
            String[] fields = line.split(",");
            return new UserBehavior(fields[0], fields[1], Long.parseLong(fields[2]));
        });

// 定义CEP模式:10分钟内连续3次失败登录
Pattern<UserBehavior, ?> failLoginPattern = Pattern
        .<UserBehavior>begin("first_fail")
        .where(behavior -> "fail_login".equals(behavior.getBehavior()))
        .next("second_fail")
        .where(behavior -> "fail_login".equals(behavior.getBehavior()))
        .next("third_fail")
        .where(behavior -> "fail_login".equals(behavior.getBehavior()))
        .within(Time.minutes(10));

// 检测匹配的事件,输出风控告警
PatternStream<UserBehavior> patternStream = CEP.pattern(behaviorStream.keyBy(UserBehavior::getUserID), failLoginPattern);
patternStream.select(pattern -> "用户" + pattern.get("first_fail").get(0).getUserID() + "10分钟内3次登录失败")
           .print("风控告警:");

总结

核心结论:Flink 接 Kafka 不只是 “统计”,而是覆盖实时统计、ETL、风控、特征工程、CEP 等全场景,“统计” 只是最基础的入门场景;
架构逻辑:Kafka 负责 “数据传输 / 暂存”,Flink 负责 “数据计算 / 处理”,是企业实时大数据的标准组合;
落地关键:无论什么场景,核心流程都是「Kafka 消费数据 → Flink 计算 / 处理 → 输出到存储 / 下游 Kafka / 大屏」。
如果需要某类场景(如金融风控 / 电商实时统计)的完整可运行代码,可以直接说,我会按企业真实开发规范提供。

#“为什么非要用 Flink?自己写接口接数据、做清洗、插数据库不行吗?”
核心差异其实体现在「数据量 / 并发 / 实时性 / 可靠性」这几个维度,尤其是数据量变大后,手写接口的方案会快速失效。

#手写方案的致命问题
并发瓶颈:单消费者线程处理每秒 1 万条会卡死,多消费者需自己做分区分配,极易出现重复消费;
可靠性差:如果插入数据库时程序崩溃,已经处理的日志可能丢了,未处理的可能重复;
扩展难:要加 “5 分钟内去重” 逻辑,需自己写缓存(Redis)+ 定时清理,代码量翻倍;
监控缺失:不知道每秒处理了多少条、失败了多少条,出问题只能查日志。

// Spring Boot + Kafka Consumer
@Service
public class LogConsumerService {
    @Autowired
    private ClickHouseMapper clickHouseMapper;
    
    // 手写Kafka消费者
    @KafkaListener(topics = "user_log", groupId = "log-group")
    public void consume(List<String> logs, Acknowledgment ack) {
        try {
            // 1. 数据清洗(简单过滤)
            List<String> cleanLogs = logs.stream()
                    .filter(log -> log != null && !log.contains("null"))
                    .collect(Collectors.toList());
            
            // 2. 批量插入ClickHouse(自己写批量逻辑)
            clickHouseMapper.batchInsert(cleanLogs);
            
            // 3. 手动提交偏移量(避免丢数据)
            ack.acknowledge();
        } catch (Exception e) {
            // 4. 异常处理(自己写重试,极易重复入库)
            retryInsert(logs);
        }
    }
}

#Flink 方案的优势
并发:只需设置env.setParallelism(10),就能用 10 个并行度处理,轻松扛每秒 1 万条;
可靠:Checkpoint 自动记录状态,程序崩溃重启后从 5 秒前的快照继续,数据不丢不重;
扩展:要加 CEP 检测异常行为,只需加几行 Pattern 代码,无需重构;
运维:启动 Flink Web UI(默认 8081 端口),能实时看到处理吞吐量、延迟、失败数,一键扩缩容。

public class FlinkLogCleanJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启Checkpoint(每5秒做一次快照,故障重启不丢数据)
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 1. 消费Kafka数据(自动做负载均衡、分区分配)
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "flink-log-group");
        DataStream<String> logStream = env.addSource(new FlinkKafkaConsumer<>("user_log", new SimpleStringSchema(), kafkaProps));

        // 2. 数据清洗(内置算子,支持复杂逻辑)
        DataStream<String> cleanStream = logStream
                .filter(log -> log != null && !log.contains("null")) // 过滤脏数据
                .keyBy(log -> log.split(",")[0]) // 按用户ID分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒窗口去重
                .distinct(); // 内置去重算子

        // 3. 写入ClickHouse(内置连接器,支持Exactly-Once)
        cleanStream.addSink(ClickHouseSink.builder()
                .setUrl("jdbc:clickhouse://localhost:8123/db")
                .setSql("INSERT INTO user_log VALUES (?)")
                .build());

        env.execute("Flink Log Clean Job");
    }
}

Flink 是如何实现分布式

Flink 集群分为 3 类节点,分工明确:
image.png
image.png

<dependencies>
    <!-- Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.0</version>
    </dependency>
    <!-- Flink-Kafka 连接器 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>1.17.0</version>
    </dependency>
    <!-- Flink-JDBC(对接ClickHouse-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>1.17.0</version>
    </dependency>
    <!-- ClickHouse JDBC 驱动 -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.3.2</version>
    </dependency>
</dependencies>
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.sql.PreparedStatement;

/**
 * Flink 分布式数据清洗入库(完整可运行版)
 * 功能:消费Kafka日志 → 清洗 → 5秒窗口去重 → 写入ClickHouse
 */
public class FlinkDistributedLogCleanJob {

    // 定义ClickHouse表结构(提前创建):CREATE TABLE user_log (user_id String, log_content String, ts BIGINT) ENGINE = MergeTree() ORDER BY ts;
    public static class UserLog {
        private String userId;
        private String logContent;
        private long ts;

        public UserLog(String userId, String logContent, long ts) {
            this.userId = userId;
            this.logContent = logContent;
            this.ts = ts;
        }

        // getter
        public String getUserId() { return userId; }
        public String getLogContent() { return logContent; }
        public long getTs() { return ts; }
    }

    public static void main(String[] args) throws Exception {
        // 1. 创建Flink执行环境(分布式执行的入口)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度=3(分布式核心:任务会拆成3个子任务运行在不同TaskManager)
        env.setParallelism(3);

        // 2. 开启Checkpoint(分布式容错核心:每5秒快照,故障重启不丢数据)
        env.enableCheckpointing(5000); // 5秒一次Checkpoint
        env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE); // 精准一次语义
        env.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint超时时间60秒

        // 3. 配置Kafka数据源(分布式消费:自动分配分区到不同子任务)
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092") // Kafka集群地址(分布式部署时填多个节点)
                .setTopics("user_log") // 消费的Topic
                .setGroupId("flink-log-group") // 消费组
                .setStartingOffsets(OffsetsInitializer.latest()) // 从最新偏移量开始消费
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 反序列化
                .build();

        // 4. 读取Kafka数据(分布式消费)
        DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 5. 分布式数据清洗:过滤脏数据 + 解析为实体类
        DataStream<UserLog> cleanLogStream = kafkaStream
                // 过滤空值/异常日志(分布式执行:每个子任务独立过滤自己的分片数据)
                .filter((FilterFunction<String>) log -> log != null && !log.contains("null") && log.split(",").length == 3)
                // 解析日志(格式:user_id,log_content,ts)
                .map((MapFunction<String, UserLog>) log -> {
                    String[] fields = log.split(",");
                    return new UserLog(fields[0], fields[1], Long.parseLong(fields[2]));
                });

        // 6. 分布式窗口去重(按用户ID分组,5秒窗口去重)
        DataStream<UserLog> distinctLogStream = cleanLogStream
                .keyBy(UserLog::getUserId) // 按用户ID分组(分布式:相同用户的日志会发到同一个子任务)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口
                .distinct(UserLog::getLogContent); // 窗口内去重(分布式执行)

        // 7. 分布式写入ClickHouse(每个子任务独立写入,Flink保证Exactly-Once)
        distinctLogStream.addSink(
                JdbcSink.sink(
                        "INSERT INTO user_log (user_id, log_content, ts) VALUES (?, ?, ?)", // ClickHouse插入SQL
                        (PreparedStatement stmt, UserLog log) -> { // 绑定参数
                            stmt.setString(1, log.getUserId());
                            stmt.setString(2, log.getLogContent());
                            stmt.setLong(3, log.getTs());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:clickhouse://localhost:8123/default") // ClickHouse地址
                                .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") // 驱动类
                                .withUsername("default") // 用户名
                                .withPassword("") // 密码(默认空)
                                .build()
                )
        ).name("ClickHouse Sink");

        // 8. 提交任务(分布式执行:JobManager接收任务后拆分并调度到TaskManager)
        env.execute("Flink Distributed Log Clean Job");
    }
}

启动 Flink 集群:

./bin/start-cluster.sh  #(会启动 1 个 JobManager + 1 个 TaskManager,默认有 4 个 Slot);
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐