Quix Streams 窗口函数实战:滑动窗口与滚动窗口应用案例
Quix Streams 是一个开源 Python 库,专为使用 Apache Kafka 构建容器化流处理应用而设计,它提供了强大的窗口函数功能,可帮助开发者轻松实现滑动窗口与滚动窗口等流处理场景。[和滑动窗口(Sliding Window)。
滚动窗口(Tumbling Window)实战
滚动窗口特点
- 时间或计数间隔固定
- 窗口之间不重叠
- 每个事件只属于一个窗口
时间滚动窗口示例:传感器数据聚合
假设需要计算每小时的平均温度,代码实现如下:
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Mean
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# 定义1小时的滚动窗口
.tumbling_window(duration_ms=timedelta(hours=1))
# 计算平均温度
.agg(avg_temperature=Mean("temperature"))
# 实时输出结果
.current()
)
输入数据:
{"temperature": 30, "timestamp": 100}
{"temperature": 29, "timestamp": 200}
{"temperature": 28, "timestamp": 300}
输出结果:
{"avg_temperature": 30, "start": 0, "end": 3600000}
{"avg_temperature": 29.5, "start": 0, "end": 3600000}
{"avg_temperature": 29, "start": 0, "end": 3600000}
计数滚动窗口示例:批量数据处理
当需要每3条消息批量处理一次时:
from quixstreams import Application
from quixstreams.dataframe.windows import Collect
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# 定义3条消息的计数滚动窗口
.tumbling_count_window(count=3)
# 收集窗口内所有数据
.agg(data=Collect())
# 窗口关闭后输出
.final()
)
滑动窗口(Sliding Window)实战
滑动窗口特点
- 窗口之间重叠
- 随事件推进不断更新
- 提供更高时间精度的聚合结果
时间滑动窗口示例:实时温度监控
计算过去1小时的平均温度,每收到一条消息就更新一次结果:
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Mean
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# 定义1小时的滑动窗口
.sliding_window(duration_ms=timedelta(hours=1))
# 计算平均温度
.agg(avg_temperature=Mean("temperature"))
# 每条消息处理后更新结果
.current()
)
输入数据:
{"temperature": 30, "timestamp": 3600000}
{"temperature": 29, "timestamp": 4800000}
{"temperature": 28, "timestamp": 4800001}
输出结果:
{"avg_temperature": 30, "start": 0, "end": 3600000}
{"avg_temperature": 29.5, "start": 1200000, "end": 4800000}
{"avg_temperature": 29, "start": 1200001, "end": 4800001}
计数滑动窗口示例:最近3次购买平均金额
from quixstreams import Application
from quixstreams.dataframe.windows import Mean
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# 定义3条消息的计数滑动窗口
.sliding_count_window(count=3)
# 计算平均金额
.agg(average=Mean("amount"))
# 窗口关闭后输出
.final()
)
窗口结果输出策略
Quix Streams 提供两种结果输出模式:
实时更新模式(.current())
- 每条消息处理后立即更新结果
- 可能产生重复结果
- 适用于实时监控场景
最终结果模式(.final())
- 窗口关闭后输出一次最终结果
- 结果唯一且完整
- 适用于报表生成等场景
# 实时更新模式
sdf.tumbling_window(...).agg(...).current()
# 最终结果模式
sdf.tumbling_window(...).agg(...).final()
处理迟到数据
通过设置宽限期(grace period)处理迟到数据:
from datetime import timedelta
# 1小时窗口,10秒宽限期
sdf.tumbling_window(
duration_ms=timedelta(hours=1),
grace_ms=timedelta(seconds=10)
)
还可以通过 on_late 回调函数自定义迟到数据处理逻辑:
def on_late(value, key, timestamp_ms, late_by_ms, start, end, name, topic, partition, offset):
print(f"检测到迟到数据: {late_by_ms}毫秒")
return False # 抑制默认日志
sdf.tumbling_window(
duration_ms=timedelta(hours=1),
on_late=on_late
)
总结
Quix Streams 提供了强大而灵活的窗口函数实现,无论是时间还是计数为基础的窗口,都能轻松应对各种流处理场景。通过本文介绍的滚动窗口和滑动窗口实战案例,您可以快速上手并应用到实际项目中。
要了解更多窗口函数高级特性,请参考官方文档:docs/windowing.md
希望本文能帮助您更好地理解和应用 Quix Streams 的窗口函数功能!🚀
更多推荐

所有评论(0)