Flink 1.20 基于 Docker 单机部署实战指南
本文介绍如何使用Docker快速部署Flink 1.20.1集群,5分钟即可完成从零部署到任务运行。主要内容包括:创建目录结构、下载监控依赖(可选)、编写docker-compose配置文件(含JobManager和TaskManager)、启动服务验证以及提交任务的两种方式。配置支持Prometheus监控指标采集,提供生产级告警规则,并详细说明了各配置项作用。通过挂载本地目录实现数据持久化,适
本文手把手教你用 Docker 部署 Flink 1.20,从零到跑通任务只需 5 分钟。配套 Prometheus 监控告警规则,让你的 Flink 任务稳如老狗。
前言
想玩 Flink 却被复杂的集群部署劝退?想在本地快速验证一个 SQL 任务却懒得折腾环境?
Docker 单机部署了解一下——一条命令启动,开箱即用,测试环境首选。
本文基于 Flink 1.20.1 + Java 17 镜像,包含:
- 完整的 docker-compose 配置
- Prometheus 监控指标采集
- 生产级告警规则(拿来即用)
废话不多说,直接开干。
一、环境准备
1.1 目录结构
先把目录建好,别等会儿挂载的时候报错:
mkdir -p /data/flink/{lib,conf,sql,checkpoints,savepoints,jars}
| 目录 | 用途 |
|---|---|
lib |
额外的 jar 包(连接器、驱动等) |
conf |
自定义配置文件 |
sql |
Flink SQL 脚本 |
checkpoints |
Checkpoint 持久化目录 |
savepoints |
Savepoint 持久化目录 |
jars |
监控相关的依赖包 |
1.2 监控依赖(可选)
如果需要采集系统资源指标(CPU、内存、磁盘等),需要额外下载 OSHI 库和 JNA 依赖:
# OSHI:Java 获取系统信息的神器
wget -P /data/flink/jars https://repo1.maven.org/maven2/com/github/oshi/oshi-core/6.4.11/oshi-core-6.4.11.jar
# JNA:OSHI 的底层依赖,用于调用操作系统 API
wget -P /data/flink/jars https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.13.0/jna-5.13.0.jar
wget -P /data/flink/jars https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.13.0/jna-platform-5.13.0.jar
不需要监控?这步可以跳过,后面的 volumes 挂载也记得注释掉对应行。
二、Docker Compose 配置
创建 docker-compose.yml,这是全文的核心:
version: '3.8'
services:
jobmanager:
image: flink:1.20.1-scala_2.12-java17
hostname: jobmanager
ports:
- "8081:8081" # Web UI,日常看任务状态
- "6123:6123" # RPC 端口,TaskManager 连接用
- "9020:9020" # Prometheus 指标端口
command: jobmanager
environment:
- TZ=Asia/Shanghai
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/checkpoints/
state.savepoints.dir: file:///opt/flink/savepoints/
execution.checkpointing.interval: 30000
metrics.reporters: prom # 基于 Prometheus 进行监控,注意注意注意,此行不能有注释
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9020
metrics.system-resource: true
volumes:
# Flink 的系统资源指标依赖 OSHI 库
- /data/flink/jars/oshi-core-6.4.11.jar:/opt/flink/lib/oshi-core-6.4.11.jar
# oshi-core(OSHI 库)依赖另一个库:JNA(Java Native Access) 来调用操作系统底层 API
- /data/flink/jars/jna-5.13.0.jar:/opt/flink/lib/jna-5.13.0.jar
- /data/flink/jars/jna-platform-5.13.0.jar:/opt/flink/lib/jna-platform-5.13.0.jar
# - /data/flink/conf:/opt/flink/conf # 如果把 conf目录 copy 出来,这行可以取消注释
# - /data/flink/sql:/opt/flink/sql # 如果需要sql 等,这行可以取消注释
- /tmp:/tmp # 如果不需要安装其他中间件(如 flink-cdc),可以注释掉此行
- /data/flink/checkpoints:/opt/flink/checkpoints
- /data/flink/savepoints:/opt/flink/savepoints
taskmanager:
image: flink:1.20.1-scala_2.12-java17
depends_on:
- jobmanager
command: taskmanager
ports:
- "9021:9021" # Prometheus 指标端口
environment:
- TZ=Asia/Shanghai
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/checkpoints/
state.savepoints.dir: file:///opt/flink/savepoints/
taskmanager.numberOfTaskSlots: 4
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9021-9040
metrics.system-resource: true
volumes:
# Flink 的系统资源指标依赖 OSHI 库
- /data/flink/jars/oshi-core-6.4.11.jar:/opt/flink/lib/oshi-core-6.4.11.jar
# oshi-core(OSHI 库)依赖另一个库:JNA(Java Native Access) 来调用操作系统底层 API
- /data/flink/jars/jna-5.13.0.jar:/opt/flink/lib/jna-5.13.0.jar
- /data/flink/jars/jna-platform-5.13.0.jar:/opt/flink/lib/jna-platform-5.13.0.jar
- /data/flink/checkpoints:/opt/flink/checkpoints
- /data/flink/savepoints:/opt/flink/savepoints
配置说明
| 配置项 | 说明 |
|---|---|
state.backend: filesystem |
使用文件系统存储状态,生产环境建议用 RocksDB |
execution.checkpointing.interval: 30000 |
每 30 秒做一次 Checkpoint |
taskmanager.numberOfTaskSlots: 4 |
每个 TaskManager 提供 4 个 Slot |
metrics.reporter.prom.port: 9021-9040 |
TaskManager 的指标端口范围,支持扩容 |
踩坑提醒:
metrics.reporters: prom这行配置后面不能加注释,否则会解析失败。别问我怎么知道的。
三、启动与验证
3.1 启动服务
docker compose up -d
查看容器状态:
docker compose ps
3.2 访问 Web UI
浏览器打开:http://<你的IP>:8081
看到这个界面就说明部署成功了:

四、提交任务
两种方式,看你喜欢哪种:
方式一:进入容器执行
# 进入 JobManager 容器
docker exec -it flink-jobmanager-1 /bin/bash
# 执行 SQL 任务
cd /opt/flink && ./bin/sql-client.sh -f /opt/flink/sql/your-task.sql
方式二:一行命令搞定
docker exec -it flink-jobmanager-1 /opt/flink/bin/sql-client.sh -f /opt/flink/sql/your-task.sql
推荐方式二,简洁优雅,适合写到脚本里。
五、Prometheus 监控集成
没有监控的服务就像没有仪表盘的汽车——出了问题两眼一抹黑。
5.1 配置 Prometheus 采集
在 prometheus.yml 中添加:
scrape_configs:
# JobManager 指标
- job_name: 'flink-jm'
static_configs:
- targets: ['<JobManager-IP>:9020']
labels:
group: jm
# TaskManager 指标
- job_name: 'flink-tm'
static_configs:
- targets: ['<TaskManager-IP>:9021']
labels:
group: tm
5.2 告警规则
创建 flink-rules.yml,这套规则覆盖了 Flink 运维的核心场景:
groups:
# ==================== 任务级告警 ====================
- name: flink-job-alerts
rules:
# 任务停止运行 - 最高优先级
- alert: Flink任务停止运行
expr: flink_jobmanager_job_uptime == 0
for: 1m
labels:
severity: critical
group: flink
annotations:
summary: "Flink 任务停止运行"
description: "任务 {{ $labels.job_name }} 已停止运行超过 1 分钟"
# 任务重启 - 需要关注
- alert: Flink任务重启
expr: increase(flink_jobmanager_job_numRestarts[5m]) > 0
labels:
severity: warning
group: flink
annotations:
summary: "Flink 任务发生重启"
description: "任务 {{ $labels.job_name }} 在过去 5 分钟内重启了 {{ $value }} 次"
# 频繁重启 - 说明有大问题
- alert: Flink任务频繁重启
expr: increase(flink_jobmanager_job_numRestarts[30m]) > 3
labels:
severity: critical
group: flink
annotations:
summary: "Flink 任务频繁重启"
description: "任务 {{ $labels.job_name }} 在过去 30 分钟内重启了 {{ $value }} 次,请立即检查"
# Checkpoint 失败
- alert: Flink Checkpoint失败
expr: increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 0
labels:
severity: warning
group: flink
annotations:
summary: "Flink Checkpoint 失败"
description: "任务 {{ $labels.job_name }} 在过去 10 分钟内有 {{ $value }} 次 Checkpoint 失败"
# Checkpoint 耗时过长
- alert: Flink Checkpoint耗时过长
expr: flink_jobmanager_job_lastCheckpointDuration > 120000
for: 5m
labels:
severity: warning
group: flink
annotations:
summary: "Flink Checkpoint 耗时过长"
description: "任务 {{ $labels.job_name }} Checkpoint 耗时超过 2 分钟(当前:{{ $value }}ms)"
# 长时间没有成功的 Checkpoint
- alert: Flink长时间无Checkpoint
expr: time() - flink_jobmanager_job_lastCheckpointRestoreTimestamp > 1800
for: 5m
labels:
severity: warning
group: flink
annotations:
summary: "Flink 长时间无 Checkpoint"
description: "任务 {{ $labels.job_name }} 超过 30 分钟没有成功的 Checkpoint"
# ==================== TaskManager 告警 ====================
- name: flink-taskmanager-alerts
rules:
# 堆内存使用过高
- alert: Flink TM堆内存过高
expr: flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.85
for: 5m
labels:
severity: warning
group: flink
annotations:
summary: "TaskManager 堆内存使用过高"
description: "TaskManager {{ $labels.tm_id }} 堆内存使用率达 {{ printf \"%.1f\" $value }}%"
# GC 时间过长
- alert: Flink TM GC时间过长
expr: increase(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[5m]) > 30000
labels:
severity: warning
group: flink
annotations:
summary: "TaskManager GC 时间过长"
description: "TaskManager {{ $labels.tm_id }} 过去 5 分钟 Old GC 耗时 {{ $value }}ms"
# 输入数据停滞
- alert: Flink任务无输入数据
expr: rate(flink_taskmanager_job_task_numRecordsIn[5m]) == 0 and flink_taskmanager_job_task_numRecordsIn > 0
for: 10m
labels:
severity: warning
group: flink
annotations:
summary: "Flink 任务无输入数据"
description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 超过 10 分钟无输入,检查数据源"
# 输出数据停滞
- alert: Flink任务无输出数据
expr: rate(flink_taskmanager_job_task_numRecordsOut[5m]) == 0 and flink_taskmanager_job_task_numRecordsOut > 0
for: 10m
labels:
severity: warning
group: flink
annotations:
summary: "Flink 任务无输出数据"
description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 超过 10 分钟无输出,检查 Sink"
# 反压告警
- alert: Flink任务反压
expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
for: 5m
labels:
severity: warning
group: flink
annotations:
summary: "Flink 任务出现反压"
description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 反压严重(每秒 {{ $value }}ms)"
# ==================== 集群级告警 ====================
- name: flink-cluster-alerts
rules:
# JobManager 挂了
- alert: Flink JobManager不可用
expr: up{job="flink-jm"} == 0
for: 1m
labels:
severity: critical
group: flink
annotations:
summary: "JobManager 不可用"
description: "Flink JobManager {{ $labels.instance }} 已停止响应"
# TaskManager 挂了
- alert: Flink TaskManager不可用
expr: up{job="flink-tm"} == 0
for: 1m
labels:
severity: critical
group: flink
annotations:
summary: "TaskManager 不可用"
description: "Flink TaskManager {{ $labels.instance }} 已停止响应"
# TaskManager 数量不足
- alert: Flink TaskManager数量减少
expr: flink_jobmanager_numRegisteredTaskManagers < 1
for: 2m
labels:
severity: critical
group: flink
annotations:
summary: "TaskManager 数量不足"
description: "当前注册的 TaskManager 数量为 {{ $value }},请检查集群状态"
# Slot 不足
- alert: Flink可用Slot不足
expr: flink_jobmanager_taskSlotsAvailable == 0
for: 5m
labels:
severity: warning
group: flink
annotations:
summary: "可用 Slot 不足"
description: "当前没有可用的 Task Slot,可能影响新任务提交"
六、常见问题
Q1: 容器启动后 TaskManager 连不上 JobManager?
检查 jobmanager.rpc.address 配置是否正确,确保两个容器在同一个 Docker 网络中。
Q2: Checkpoint 目录权限问题?
确保宿主机的 /data/flink/checkpoints 目录有读写权限:
chmod -R 777 /data/flink/checkpoints /data/flink/savepoints
Q3: 需要扩展更多 TaskManager?
修改 docker-compose.yml,复制 taskmanager 服务配置,改个名字即可。别忘了调整端口映射避免冲突。
总结
Docker 部署 Flink 的优势:
- 快速:一条命令启动,环境隔离干净
- 灵活:挂载目录,配置随改随生效
- 可观测:集成 Prometheus,告警规则拿来就用
适用场景:本地开发、功能验证、小规模数据同步任务。
生产环境还是老老实实上 K8s 或者 YARN 吧,毕竟高可用不是闹着玩的。
如果这篇文章对你有帮助,欢迎点赞收藏。有问题欢迎评论区交流。
更多推荐
所有评论(0)