本文手把手教你用 Docker 部署 Flink 1.20,从零到跑通任务只需 5 分钟。配套 Prometheus 监控告警规则,让你的 Flink 任务稳如老狗。

前言

想玩 Flink 却被复杂的集群部署劝退?想在本地快速验证一个 SQL 任务却懒得折腾环境?

Docker 单机部署了解一下——一条命令启动,开箱即用,测试环境首选。

本文基于 Flink 1.20.1 + Java 17 镜像,包含:

  • 完整的 docker-compose 配置
  • Prometheus 监控指标采集
  • 生产级告警规则(拿来即用)

废话不多说,直接开干。


一、环境准备

1.1 目录结构

先把目录建好,别等会儿挂载的时候报错:

mkdir -p /data/flink/{lib,conf,sql,checkpoints,savepoints,jars}
目录 用途
lib 额外的 jar 包(连接器、驱动等)
conf 自定义配置文件
sql Flink SQL 脚本
checkpoints Checkpoint 持久化目录
savepoints Savepoint 持久化目录
jars 监控相关的依赖包

1.2 监控依赖(可选)

如果需要采集系统资源指标(CPU、内存、磁盘等),需要额外下载 OSHI 库和 JNA 依赖:

# OSHI:Java 获取系统信息的神器
wget -P /data/flink/jars https://repo1.maven.org/maven2/com/github/oshi/oshi-core/6.4.11/oshi-core-6.4.11.jar

# JNA:OSHI 的底层依赖,用于调用操作系统 API
wget -P /data/flink/jars https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.13.0/jna-5.13.0.jar
wget -P /data/flink/jars https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.13.0/jna-platform-5.13.0.jar

不需要监控?这步可以跳过,后面的 volumes 挂载也记得注释掉对应行。


二、Docker Compose 配置

创建 docker-compose.yml,这是全文的核心:

version: '3.8'

services:
  jobmanager:
    image: flink:1.20.1-scala_2.12-java17
    hostname: jobmanager
    ports:
      - "8081:8081"   # Web UI,日常看任务状态
      - "6123:6123"   # RPC 端口,TaskManager 连接用
      - "9020:9020"   # Prometheus 指标端口
    command: jobmanager
    environment:
      - TZ=Asia/Shanghai
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        state.backend: filesystem
        state.checkpoints.dir: file:///opt/flink/checkpoints/
        state.savepoints.dir: file:///opt/flink/savepoints/
        execution.checkpointing.interval: 30000
        metrics.reporters: prom                   # 基于 Prometheus 进行监控,注意注意注意,此行不能有注释
        metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
        metrics.reporter.prom.port: 9020
        metrics.system-resource: true
    volumes:
      # Flink 的系统资源指标依赖 OSHI 库
      - /data/flink/jars/oshi-core-6.4.11.jar:/opt/flink/lib/oshi-core-6.4.11.jar
      # oshi-core(OSHI 库)依赖另一个库:JNA(Java Native Access) 来调用操作系统底层 API
      - /data/flink/jars/jna-5.13.0.jar:/opt/flink/lib/jna-5.13.0.jar
      - /data/flink/jars/jna-platform-5.13.0.jar:/opt/flink/lib/jna-platform-5.13.0.jar
      # - /data/flink/conf:/opt/flink/conf          # 如果把 conf目录 copy 出来,这行可以取消注释
      # - /data/flink/sql:/opt/flink/sql            # 如果需要sql 等,这行可以取消注释
      - /tmp:/tmp                                 # 如果不需要安装其他中间件(如 flink-cdc),可以注释掉此行
      - /data/flink/checkpoints:/opt/flink/checkpoints
      - /data/flink/savepoints:/opt/flink/savepoints

  taskmanager:
    image: flink:1.20.1-scala_2.12-java17
    depends_on:
      - jobmanager
    command: taskmanager
    ports:
      - "9021:9021"   # Prometheus 指标端口
    environment:
      - TZ=Asia/Shanghai
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        state.backend: filesystem
        state.checkpoints.dir: file:///opt/flink/checkpoints/
        state.savepoints.dir: file:///opt/flink/savepoints/
        taskmanager.numberOfTaskSlots: 4
        metrics.reporters: prom
        metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
        metrics.reporter.prom.port: 9021-9040
        metrics.system-resource: true
    volumes:
      # Flink 的系统资源指标依赖 OSHI 库
      - /data/flink/jars/oshi-core-6.4.11.jar:/opt/flink/lib/oshi-core-6.4.11.jar
      # oshi-core(OSHI 库)依赖另一个库:JNA(Java Native Access) 来调用操作系统底层 API
      - /data/flink/jars/jna-5.13.0.jar:/opt/flink/lib/jna-5.13.0.jar
      - /data/flink/jars/jna-platform-5.13.0.jar:/opt/flink/lib/jna-platform-5.13.0.jar
      - /data/flink/checkpoints:/opt/flink/checkpoints
      - /data/flink/savepoints:/opt/flink/savepoints

配置说明

配置项 说明
state.backend: filesystem 使用文件系统存储状态,生产环境建议用 RocksDB
execution.checkpointing.interval: 30000 每 30 秒做一次 Checkpoint
taskmanager.numberOfTaskSlots: 4 每个 TaskManager 提供 4 个 Slot
metrics.reporter.prom.port: 9021-9040 TaskManager 的指标端口范围,支持扩容

踩坑提醒metrics.reporters: prom 这行配置后面不能加注释,否则会解析失败。别问我怎么知道的。


三、启动与验证

3.1 启动服务

docker compose up -d

查看容器状态:

docker compose ps

3.2 访问 Web UI

浏览器打开:http://<你的IP>:8081

看到这个界面就说明部署成功了:

在这里插入图片描述


四、提交任务

两种方式,看你喜欢哪种:

方式一:进入容器执行

# 进入 JobManager 容器
docker exec -it flink-jobmanager-1 /bin/bash

# 执行 SQL 任务
cd /opt/flink && ./bin/sql-client.sh -f /opt/flink/sql/your-task.sql

方式二:一行命令搞定

docker exec -it flink-jobmanager-1 /opt/flink/bin/sql-client.sh -f /opt/flink/sql/your-task.sql

推荐方式二,简洁优雅,适合写到脚本里。


五、Prometheus 监控集成

没有监控的服务就像没有仪表盘的汽车——出了问题两眼一抹黑。

5.1 配置 Prometheus 采集

prometheus.yml 中添加:

scrape_configs:
  # JobManager 指标
  - job_name: 'flink-jm'
    static_configs:
      - targets: ['<JobManager-IP>:9020']
        labels:
          group: jm

  # TaskManager 指标
  - job_name: 'flink-tm'
    static_configs:
      - targets: ['<TaskManager-IP>:9021']
        labels:
          group: tm

5.2 告警规则

创建 flink-rules.yml,这套规则覆盖了 Flink 运维的核心场景:

groups:
  # ==================== 任务级告警 ====================
  - name: flink-job-alerts
    rules:
      # 任务停止运行 - 最高优先级
      - alert: Flink任务停止运行
        expr: flink_jobmanager_job_uptime == 0
        for: 1m
        labels:
          severity: critical
          group: flink
        annotations:
          summary: "Flink 任务停止运行"
          description: "任务 {{ $labels.job_name }} 已停止运行超过 1 分钟"

      # 任务重启 - 需要关注
      - alert: Flink任务重启
        expr: increase(flink_jobmanager_job_numRestarts[5m]) > 0
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink 任务发生重启"
          description: "任务 {{ $labels.job_name }} 在过去 5 分钟内重启了 {{ $value }} 次"

      # 频繁重启 - 说明有大问题
      - alert: Flink任务频繁重启
        expr: increase(flink_jobmanager_job_numRestarts[30m]) > 3
        labels:
          severity: critical
          group: flink
        annotations:
          summary: "Flink 任务频繁重启"
          description: "任务 {{ $labels.job_name }} 在过去 30 分钟内重启了 {{ $value }} 次,请立即检查"

      # Checkpoint 失败
      - alert: Flink Checkpoint失败
        expr: increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 0
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink Checkpoint 失败"
          description: "任务 {{ $labels.job_name }} 在过去 10 分钟内有 {{ $value }} 次 Checkpoint 失败"

      # Checkpoint 耗时过长
      - alert: Flink Checkpoint耗时过长
        expr: flink_jobmanager_job_lastCheckpointDuration > 120000
        for: 5m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink Checkpoint 耗时过长"
          description: "任务 {{ $labels.job_name }} Checkpoint 耗时超过 2 分钟(当前:{{ $value }}ms)"

      # 长时间没有成功的 Checkpoint
      - alert: Flink长时间无Checkpoint
        expr: time() - flink_jobmanager_job_lastCheckpointRestoreTimestamp > 1800
        for: 5m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink 长时间无 Checkpoint"
          description: "任务 {{ $labels.job_name }} 超过 30 分钟没有成功的 Checkpoint"

  # ==================== TaskManager 告警 ====================
  - name: flink-taskmanager-alerts
    rules:
      # 堆内存使用过高
      - alert: Flink TM堆内存过高
        expr: flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.85
        for: 5m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "TaskManager 堆内存使用过高"
          description: "TaskManager {{ $labels.tm_id }} 堆内存使用率达 {{ printf \"%.1f\" $value }}%"

      # GC 时间过长
      - alert: Flink TM GC时间过长
        expr: increase(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[5m]) > 30000
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "TaskManager GC 时间过长"
          description: "TaskManager {{ $labels.tm_id }} 过去 5 分钟 Old GC 耗时 {{ $value }}ms"

      # 输入数据停滞
      - alert: Flink任务无输入数据
        expr: rate(flink_taskmanager_job_task_numRecordsIn[5m]) == 0 and flink_taskmanager_job_task_numRecordsIn > 0
        for: 10m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink 任务无输入数据"
          description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 超过 10 分钟无输入,检查数据源"

      # 输出数据停滞
      - alert: Flink任务无输出数据
        expr: rate(flink_taskmanager_job_task_numRecordsOut[5m]) == 0 and flink_taskmanager_job_task_numRecordsOut > 0
        for: 10m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink 任务无输出数据"
          description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 超过 10 分钟无输出,检查 Sink"

      # 反压告警
      - alert: Flink任务反压
        expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
        for: 5m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "Flink 任务出现反压"
          description: "任务 {{ $labels.job_name }} 的 {{ $labels.task_name }} 反压严重(每秒 {{ $value }}ms)"

  # ==================== 集群级告警 ====================
  - name: flink-cluster-alerts
    rules:
      # JobManager 挂了
      - alert: Flink JobManager不可用
        expr: up{job="flink-jm"} == 0
        for: 1m
        labels:
          severity: critical
          group: flink
        annotations:
          summary: "JobManager 不可用"
          description: "Flink JobManager {{ $labels.instance }} 已停止响应"

      # TaskManager 挂了
      - alert: Flink TaskManager不可用
        expr: up{job="flink-tm"} == 0
        for: 1m
        labels:
          severity: critical
          group: flink
        annotations:
          summary: "TaskManager 不可用"
          description: "Flink TaskManager {{ $labels.instance }} 已停止响应"

      # TaskManager 数量不足
      - alert: Flink TaskManager数量减少
        expr: flink_jobmanager_numRegisteredTaskManagers < 1
        for: 2m
        labels:
          severity: critical
          group: flink
        annotations:
          summary: "TaskManager 数量不足"
          description: "当前注册的 TaskManager 数量为 {{ $value }},请检查集群状态"

      # Slot 不足
      - alert: Flink可用Slot不足
        expr: flink_jobmanager_taskSlotsAvailable == 0
        for: 5m
        labels:
          severity: warning
          group: flink
        annotations:
          summary: "可用 Slot 不足"
          description: "当前没有可用的 Task Slot,可能影响新任务提交"

六、常见问题

Q1: 容器启动后 TaskManager 连不上 JobManager?

检查 jobmanager.rpc.address 配置是否正确,确保两个容器在同一个 Docker 网络中。

Q2: Checkpoint 目录权限问题?

确保宿主机的 /data/flink/checkpoints 目录有读写权限:

chmod -R 777 /data/flink/checkpoints /data/flink/savepoints

Q3: 需要扩展更多 TaskManager?

修改 docker-compose.yml,复制 taskmanager 服务配置,改个名字即可。别忘了调整端口映射避免冲突。


总结

Docker 部署 Flink 的优势:

  • 快速:一条命令启动,环境隔离干净
  • 灵活:挂载目录,配置随改随生效
  • 可观测:集成 Prometheus,告警规则拿来就用

适用场景:本地开发、功能验证、小规模数据同步任务。

生产环境还是老老实实上 K8s 或者 YARN 吧,毕竟高可用不是闹着玩的。


如果这篇文章对你有帮助,欢迎点赞收藏。有问题欢迎评论区交流。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐