0. 环境准备:用 SQL Client 直接跑起来

为了把注意力放在 SQL 本身,本文用 Kafka 做数据源:手动往 Topic 推送点击行为数据,用 print 在 TaskManager Stdout 里观察结果。

使用前请确认 Flink 已加载 Kafka SQL Connector(把 flink-sql-connector-kafka-*.jar 放到 $FLINK_HOME/lib 并重启集群/SQL Client)。

先把下面几个参数设好,后面跑窗口/TopN 时更容易看到输出:


-- 1) 避免 source 空闲导致 watermark 不推进,从而窗口一直不触发 SET 'table.exec.source.idle-timeout' = '5s'; -- 2) 让窗口/TopN 的结果更“及时”(更快看到输出) SET 'execution.checkpointing.interval' = '10s'; -- 3) 以流模式运行(源是无界),持续刷到 SQL Client SET 'execution.runtime-mode' = 'streaming'; -- 4) 开启 changelog 模式,使窗口/TopN 的结果更“及时”(更快看到输出) SET 'sql-client.execution.result-mode' = 'changelog';

接着创建一张点击行为表(事件时间 + Watermark):


CREATE TABLE dwd_click_log ( user_id STRING, item_id STRING, category_id STRING, ts BIGINT, event_time AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_click_log', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'flink-sql-dwd-click-log', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true' );

先准备 Kafka Topic:


$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic dwd_click_log --partitions 1 --replication-factor 1

再创建几个 print sink 用来观察输出:


CREATE TABLE ads_window_metrics_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), pv BIGINT, uv BIGINT ) WITH ('connector' = 'print', 'print-identifier' = 'ads_window_metrics_print' ); CREATE TABLE ads_session_metrics_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), user_id STRING, click_cnt BIGINT ) WITH ('connector' = 'print', 'print-identifier' = 'ads_session_metrics_print' ); CREATE TABLE ads_topn_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), category_id STRING, item_id STRING, cnt BIGINT, rn BIGINT ) WITH ('connector' = 'print', 'print-identifier' = 'ads_topn_print' );

1. 窗口聚合基础:你到底在对“哪段时间”做统计

在 Flink SQL 里,窗口的本质是:把无界流切成一个个“有限集合”,再在集合上做 GROUP BY 聚合。

窗口统计能否输出,核心取决于两件事:

  • 你选的是 Processing Time 还是 Event Time
  • Event Time 场景下,Watermark 是否在推进(决定窗口是否“关窗”)

本文以 Event Time 为主,因为绝大多数实时数仓指标都需要“按业务发生时间统计”,而不是“按处理到达时间统计”。

2. Window TVF:Flink SQL 窗口的主流写法

Flink 早期有 GROUP BY TUMBLE(...) 这类 Group Window 语法,新版本更推荐 Window TVF(Table Valued Function),它的输出会直接带上 window_start/window_end/window_time 字段,更清晰,也更容易与 TopN/Join 组合。

2.1 滚动窗口(TUMBLE):每条数据只属于一个窗口

典型场景:按分钟/小时统计 PV、UV、GMV。


INSERT INTO ads_window_metrics_print SELECT window_start, window_end, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM TABLE( TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '1' SECOND) ) GROUP BY window_start, window_end;

推送数据(最简单:控制台直接粘贴 JSON,一行一条)


$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log

粘贴下面数据(ts 用毫秒时间戳;为了让事件时间窗口及时“关窗”,建议 ts 单调递增,或者最后补一条明显更大的 ts 用来推进 Watermark):


{"user_id":"u01","item_id":"i01","category_id":"c01","ts":1774454400000} {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1774454402000} {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1774454403000} {"user_id":"u03","item_id":"i04","category_id":"c02","ts":1774454404000} {"user_id":"u04","item_id":"i01","category_id":"c01","ts":1774454405000}

到 Flink Web UI → TaskManagers → Stdout 查看输出: 

滚动窗口示例

要点:

  • TUMBLE 适合“报表型”指标,窗口不重叠,状态相对可控
  • COUNT(DISTINCT ...) 会引入去重状态,用户数大时要关注状态体积(生产中可考虑用近似去重或分层聚合)

2.2 滑动窗口(HOP):一条数据会被“复制”到多个窗口

典型场景:最近 5 分钟滚动 UV、最近 1 小时成交额每 5 分钟刷新一次。

示例:窗口长度 30s,每 10s 滑动一次。


INSERT INTO ads_window_metrics_print SELECT window_start, window_end, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM TABLE( HOP(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND, INTERVAL '30' SECOND) ) GROUP BY window_start, window_end;

到 Flink Web UI → TaskManagers → Stdout 查看输出: 

滑动窗口示例

要点:

  • HOP 的状态压力通常显著高于 TUMBLE,因为数据会进入多个窗口
  • 业务上能用 TUMBLE 不用 HOP;必须用 HOP 时,尽量降低窗口长度或放大 slide(减少并行窗口数)

2.3 会话窗口(SESSION):按“事件间隔”自动切窗

典型场景:统计用户一次访问会话内的点击数/停留时长、按会话做转化漏斗。

示例:同一用户 10s 内没有新事件就认为会话结束。


INSERT INTO ads_session_metrics_print SELECT window_start, window_end, user_id, COUNT(*) AS click_cnt FROM TABLE( SESSION(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND) ) GROUP BY window_start, window_end, user_id;

推送数据(最简单:控制台直接粘贴 JSON,一行一条)


$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log

往 topic dwd_click_log 推送数据(控制台直接粘贴 JSON,一行一条)


{"user_id":"u01","item_id":"i01","category_id":"c01","ts":1775143687285} {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1775143688285} {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1775143689285}

到 Flink Web UI → TaskManagers → Stdout 查看输出: 

会话窗口示例

要点:

  • SESSION 窗口边界不固定,会因为迟到数据发生“合并”,下游会看到更新/撤回更频繁
  • 如果你的下游只接受 Append(只插入不更新),SESSION 往往不合适,除非你引入可更新的 sink(Upsert)

4. TopN:把窗口聚合变成实时榜单

TopN 的正确打开方式是“两段式”:

  1. 先做窗口聚合得到每个候选项的指标(比如每个商品的点击数)
  2. 再在聚合结果上做排序,取前 N

4.1 窗口内 TopN:每个窗口的热榜 Top3

需求:每 10 秒统计一次“各品类内点击 Top3 商品”。


INSERT INTO ads_topn_print WITH item_cnt AS ( SELECT window_start, window_end, category_id, item_id, COUNT(*) AS cnt FROM TABLE( TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND) ) GROUP BY window_start, window_end, category_id, item_id ), ranked AS ( SELECT window_start, window_end, category_id, item_id, cnt, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end, category_id ORDER BY cnt DESC, item_id ) AS rn FROM item_cnt ) SELECT window_start, window_end, category_id, item_id, cnt, rn FROM ranked WHERE rn <= 3;

到 Flink Web UI → TaskManagers → Stdout 查看输出: 

窗口内 TopN 示例

几个关键点:

  • PARTITION BY window_start, window_end, category_id 表示“每个窗口、每个品类各自一张榜单”
  • ORDER BY cnt DESC 决定榜单规则;追加 item_id 是为了稳定排序,避免并列时结果抖动
  • TopN 本质是对一张动态表做排序截断,窗口聚合(尤其是 SESSION/HOP)会产生更新,因此 TopN 输出常常不是纯 Append

5. 生产落地:TopN 为什么“写不进”下游

TopN 落地最常见的问题是:下游只接受追加流(Append-only),但 TopN 的结果在运行过程中会不断更新。

你会在 print 里看到类似 +I/-U/+U 的变更日志输出(不同版本格式略有差异),这意味着:

  • 早期输出的第 3 名,后续可能被挤掉,需要撤回
  • 早期输出的第 1 名,后续计数增长,会以更新的形式重发

落地时通常有两种策略:

  • 写 Upsert Sink:Kafka Upsert、JDBC(主键表)、HBase、Redis 等,要求结果表定义 PRIMARY KEY (...) NOT ENFORCED
  • 把 TopN 变成“窗口结束一次性输出”:只在窗口最终关闭后输出最终榜单,减少更新(更偏离线思路)

对于实时大屏、实时榜单,通常选 Upsert Sink。

6. 性能与稳定性:窗口与 TopN 的几个关键参数

6.1 状态 TTL:给状态设“上限”

即使是窗口聚合,状态也不是“完全自动可控”的:滑动窗口、去重、TopN 的中间表都会占用状态。

生产建议给作业设置统一 TTL,例如保留 7 天(按业务调整):


SET 'table.exec.state.ttl' = '7 d';

6.2 MiniBatch:降低聚合与排序的更新频率

当源数据更新非常频繁(尤其是有去重、TopN)时,MiniBatch 能显著减少算子更新次数,提升吞吐:


SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.allow-latency' = '2 s'; SET 'table.exec.mini-batch.size' = '2000';

6.3 Watermark 设计:迟到与实时性的平衡

WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND 的 3 秒不是越大越好:

  • 太小:乱序稍大就会被判定为迟到数据而丢弃
  • 太大:窗口输出延迟变大,榜单刷新变慢

通常做法是先用历史数据评估乱序分布(P95/P99),再给一个能接受的延迟阈值。

7. 小结

  • Window TVF(TUMBLE/HOP/SESSION)是 Flink SQL 窗口聚合的主流写法,关键在于 Event Time + Watermark 是否能把窗口按预期触发出来
  • TopN 建议两段式:先聚合再排名,并根据业务选择 ROW_NUMBER 或 RANK
  • TopN 多数情况下会产生更新/撤回,生产下游优先考虑 Upsert Sink(或把输出改为“窗口结束一次性输出”)
  • 性能与稳定性重点关注:State TTL、MiniBatch、Watermark 延迟与业务时效的权衡
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐