flink+kafka实例
核心结论:Flink 接 Kafka 不只是 “统计”,而是覆盖实时统计、ETL、风控、特征工程、CEP 等全场景,“统计” 只是最基础的入门场景;架构逻辑:Kafka 负责 “数据传输 / 暂存”,Flink 负责 “数据计算 / 处理”,是企业实时大数据的标准组合;落地关键:无论什么场景,核心流程都是「Kafka 消费数据 → Flink 计算 / 处理 → 输出到存储 / 下游 Kafka
简单说flink&kafka
Kafka 是高吞吐、高可靠的消息队列,负责承接上游所有动态数据(用户行为、业务日志、设备采集、数据库变更);
Flink 是流批一体的计算引擎,负责对 Kafka 里的 “流动数据” 做实时处理。
下面是企业真实开发中最常用的 3 类代码模板,覆盖 “消费→处理→输出” 全链路:
前置依赖(需补充 Kafka 连接器)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
- 基础:消费 Kafka 数据做实时统计(你理解的 “统计功能”)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkKafkaCountDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 配置Kafka连接参数
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-kafka-group");
// 2. 从Kafka消费数据(topic:user_behavior)
DataStream<String> kafkaStream = env.addSource(
new FlinkKafkaConsumer<>("user_behavior", new SimpleStringSchema(), kafkaProps)
);
// 3. 实时统计:按用户行为类型(点击/加购/下单)计数
kafkaStream
.map(line -> line.split(",")[1]) // 假设数据格式:userID,behavior,time
.keyBy(behavior -> behavior)
.countWindow(60) // 60秒窗口
.sum(0)
.print("实时行为统计:");
env.execute("Flink-Kafka Count Job");
}
}
- 进阶:消费 Kafka 做实时 ETL(清洗后写入另一 Topic)
// 承接上面的kafkaStream
DataStream<String> cleanStream = kafkaStream
// 过滤脏数据(非JSON/空值)
.filter(line -> line != null && line.startsWith("{"))
// 转换格式(JSON→CSV)
.map(line -> {
JSONObject json = JSON.parseObject(line);
return json.getString("userID") + "," + json.getString("orderID") + "," + json.getString("amount");
});
// 将清洗后的数据写入Kafka的clean_order_topic
cleanStream.addSink(
new FlinkKafkaProducer<>(
"localhost:9092",
"clean_order_topic",
new SimpleStringSchema()
)
);
- 高阶:消费 Kafka 做 CEP 复杂事件检测
// 承接kafkaStream,解析为用户行为实体
DataStream<UserBehavior> behaviorStream = kafkaStream
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(fields[0], fields[1], Long.parseLong(fields[2]));
});
// 定义CEP模式:10分钟内连续3次失败登录
Pattern<UserBehavior, ?> failLoginPattern = Pattern
.<UserBehavior>begin("first_fail")
.where(behavior -> "fail_login".equals(behavior.getBehavior()))
.next("second_fail")
.where(behavior -> "fail_login".equals(behavior.getBehavior()))
.next("third_fail")
.where(behavior -> "fail_login".equals(behavior.getBehavior()))
.within(Time.minutes(10));
// 检测匹配的事件,输出风控告警
PatternStream<UserBehavior> patternStream = CEP.pattern(behaviorStream.keyBy(UserBehavior::getUserID), failLoginPattern);
patternStream.select(pattern -> "用户" + pattern.get("first_fail").get(0).getUserID() + "10分钟内3次登录失败")
.print("风控告警:");
总结
核心结论:Flink 接 Kafka 不只是 “统计”,而是覆盖实时统计、ETL、风控、特征工程、CEP 等全场景,“统计” 只是最基础的入门场景;
架构逻辑:Kafka 负责 “数据传输 / 暂存”,Flink 负责 “数据计算 / 处理”,是企业实时大数据的标准组合;
落地关键:无论什么场景,核心流程都是「Kafka 消费数据 → Flink 计算 / 处理 → 输出到存储 / 下游 Kafka / 大屏」。
如果需要某类场景(如金融风控 / 电商实时统计)的完整可运行代码,可以直接说,我会按企业真实开发规范提供。
#“为什么非要用 Flink?自己写接口接数据、做清洗、插数据库不行吗?”
核心差异其实体现在「数据量 / 并发 / 实时性 / 可靠性」这几个维度,尤其是数据量变大后,手写接口的方案会快速失效。
#手写方案的致命问题
并发瓶颈:单消费者线程处理每秒 1 万条会卡死,多消费者需自己做分区分配,极易出现重复消费;
可靠性差:如果插入数据库时程序崩溃,已经处理的日志可能丢了,未处理的可能重复;
扩展难:要加 “5 分钟内去重” 逻辑,需自己写缓存(Redis)+ 定时清理,代码量翻倍;
监控缺失:不知道每秒处理了多少条、失败了多少条,出问题只能查日志。
// Spring Boot + Kafka Consumer
@Service
public class LogConsumerService {
@Autowired
private ClickHouseMapper clickHouseMapper;
// 手写Kafka消费者
@KafkaListener(topics = "user_log", groupId = "log-group")
public void consume(List<String> logs, Acknowledgment ack) {
try {
// 1. 数据清洗(简单过滤)
List<String> cleanLogs = logs.stream()
.filter(log -> log != null && !log.contains("null"))
.collect(Collectors.toList());
// 2. 批量插入ClickHouse(自己写批量逻辑)
clickHouseMapper.batchInsert(cleanLogs);
// 3. 手动提交偏移量(避免丢数据)
ack.acknowledge();
} catch (Exception e) {
// 4. 异常处理(自己写重试,极易重复入库)
retryInsert(logs);
}
}
}
#Flink 方案的优势
并发:只需设置env.setParallelism(10),就能用 10 个并行度处理,轻松扛每秒 1 万条;
可靠:Checkpoint 自动记录状态,程序崩溃重启后从 5 秒前的快照继续,数据不丢不重;
扩展:要加 CEP 检测异常行为,只需加几行 Pattern 代码,无需重构;
运维:启动 Flink Web UI(默认 8081 端口),能实时看到处理吞吐量、延迟、失败数,一键扩缩容。
public class FlinkLogCleanJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启Checkpoint(每5秒做一次快照,故障重启不丢数据)
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 1. 消费Kafka数据(自动做负载均衡、分区分配)
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-log-group");
DataStream<String> logStream = env.addSource(new FlinkKafkaConsumer<>("user_log", new SimpleStringSchema(), kafkaProps));
// 2. 数据清洗(内置算子,支持复杂逻辑)
DataStream<String> cleanStream = logStream
.filter(log -> log != null && !log.contains("null")) // 过滤脏数据
.keyBy(log -> log.split(",")[0]) // 按用户ID分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒窗口去重
.distinct(); // 内置去重算子
// 3. 写入ClickHouse(内置连接器,支持Exactly-Once)
cleanStream.addSink(ClickHouseSink.builder()
.setUrl("jdbc:clickhouse://localhost:8123/db")
.setSql("INSERT INTO user_log VALUES (?)")
.build());
env.execute("Flink Log Clean Job");
}
}
Flink 是如何实现分布式
Flink 集群分为 3 类节点,分工明确:

<dependencies>
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- Flink-Kafka 连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<!-- Flink-JDBC(对接ClickHouse) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17.0</version>
</dependency>
<!-- ClickHouse JDBC 驱动 -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version>
</dependency>
</dependencies>
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.sql.PreparedStatement;
/**
* Flink 分布式数据清洗入库(完整可运行版)
* 功能:消费Kafka日志 → 清洗 → 5秒窗口去重 → 写入ClickHouse
*/
public class FlinkDistributedLogCleanJob {
// 定义ClickHouse表结构(提前创建):CREATE TABLE user_log (user_id String, log_content String, ts BIGINT) ENGINE = MergeTree() ORDER BY ts;
public static class UserLog {
private String userId;
private String logContent;
private long ts;
public UserLog(String userId, String logContent, long ts) {
this.userId = userId;
this.logContent = logContent;
this.ts = ts;
}
// getter
public String getUserId() { return userId; }
public String getLogContent() { return logContent; }
public long getTs() { return ts; }
}
public static void main(String[] args) throws Exception {
// 1. 创建Flink执行环境(分布式执行的入口)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度=3(分布式核心:任务会拆成3个子任务运行在不同TaskManager)
env.setParallelism(3);
// 2. 开启Checkpoint(分布式容错核心:每5秒快照,故障重启不丢数据)
env.enableCheckpointing(5000); // 5秒一次Checkpoint
env.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE); // 精准一次语义
env.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint超时时间60秒
// 3. 配置Kafka数据源(分布式消费:自动分配分区到不同子任务)
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092") // Kafka集群地址(分布式部署时填多个节点)
.setTopics("user_log") // 消费的Topic
.setGroupId("flink-log-group") // 消费组
.setStartingOffsets(OffsetsInitializer.latest()) // 从最新偏移量开始消费
.setValueOnlyDeserializer(new SimpleStringSchema()) // 反序列化
.build();
// 4. 读取Kafka数据(分布式消费)
DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 5. 分布式数据清洗:过滤脏数据 + 解析为实体类
DataStream<UserLog> cleanLogStream = kafkaStream
// 过滤空值/异常日志(分布式执行:每个子任务独立过滤自己的分片数据)
.filter((FilterFunction<String>) log -> log != null && !log.contains("null") && log.split(",").length == 3)
// 解析日志(格式:user_id,log_content,ts)
.map((MapFunction<String, UserLog>) log -> {
String[] fields = log.split(",");
return new UserLog(fields[0], fields[1], Long.parseLong(fields[2]));
});
// 6. 分布式窗口去重(按用户ID分组,5秒窗口去重)
DataStream<UserLog> distinctLogStream = cleanLogStream
.keyBy(UserLog::getUserId) // 按用户ID分组(分布式:相同用户的日志会发到同一个子任务)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口
.distinct(UserLog::getLogContent); // 窗口内去重(分布式执行)
// 7. 分布式写入ClickHouse(每个子任务独立写入,Flink保证Exactly-Once)
distinctLogStream.addSink(
JdbcSink.sink(
"INSERT INTO user_log (user_id, log_content, ts) VALUES (?, ?, ?)", // ClickHouse插入SQL
(PreparedStatement stmt, UserLog log) -> { // 绑定参数
stmt.setString(1, log.getUserId());
stmt.setString(2, log.getLogContent());
stmt.setLong(3, log.getTs());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://localhost:8123/default") // ClickHouse地址
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver") // 驱动类
.withUsername("default") // 用户名
.withPassword("") // 密码(默认空)
.build()
)
).name("ClickHouse Sink");
// 8. 提交任务(分布式执行:JobManager接收任务后拆分并调度到TaskManager)
env.execute("Flink Distributed Log Clean Job");
}
}
启动 Flink 集群:
./bin/start-cluster.sh #(会启动 1 个 JobManager + 1 个 TaskManager,默认有 4 个 Slot);
更多推荐
所有评论(0)