1.flink docker部署

  使用 dockers-compose 进行 系统安装

  1.1 在喜欢的地方创建文件 dockers-compose.yaml 

        内容如下:

services:
  jobmanager:
    image: flink:1.17
    container_name: flink_jobmanager
    hostname: jobmanager
    ports:
      - "8081:8081"  # web ui
      - "6123:6123"  # RPC 
      - "8083:8083"  # 加 SQL Gateway 
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    command: >
      /bin/bash -c "
        echo '启动 Flink 集群...';
        /opt/flink/bin/start-cluster.sh;
        echo '启动 SQL Gateway...';
        /opt/flink/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=0.0.0.0;
        tail -f /opt/flink/log/*.log
      "

  taskmanager:
    image: flink:1.17
    container_name: flink_taskmanager
    depends_on:
      - jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    command: taskmanager
    

1.2 在你创建文件的地方执行cmd命令

docker-compose up -d

1.3 等待镜像push完成

2. flink 配置完善

  2.1 下载jar包

所有jar包资源均可在:Central Repository: org/apache/flink 官方仓库获取

JDBC:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.2-1.17/flink-connector-jdbc-3.1.2-1.17.jar

CDC:https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar

Mysql连接:Central Repository: com/mysql/mysql-connector-j/8.0.33

  2.1 移动jar包到对应位置

docker 执行

docker cp D:\notebook\Flink\lib\flink-connector-jdbc-3.1.0-1.17.jar flink_taskmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\mysql-connector-j-8.0.33.jar flink_taskmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\flink-connector-mysql-cdc-2.3.0.jar flink_taskmanager:/opt/flink/lib/

docker cp D:\notebook\Flink\lib\flink-connector-jdbc-3.1.0-1.17.jar flink_jobmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\mysql-connector-j-8.0.33.jar flink_jobmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\flink-connector-mysql-cdc-2.3.0.jar flink_jobmanager:/opt/flink/lib/

然后 执行:

docker exec -it flink_jobmanager bash
cd lib
ls

就能看到移动的jar包:

  2.2 重启服务

(因为没做 挂载卷持久化操作,需要可自行探索 在docker-compose.yaml 文件里加入配置就行)

  不能使用(否则会将进行初始化,jar包会被清除):

        docker-compose down

        docker-compos up -d

 正确使用:

        docker restart flink_taskmanager

        docker restart flink_jobmanager

3. 测试 sql gateway

3.1 python测试代码

import requests
import time
import json
from urllib.parse import urljoin


BASE_URL = "http://localhost:8083/v1"

def open_session():
    url = f"{BASE_URL}/sessions"
    response = requests.post(url)
    response.raise_for_status()
    data = response.json()
    session_handle = data["sessionHandle"]
    print(f"[+] 会话已打开: sessionHandle={session_handle}")
    return session_handle

def submit_query(session_handle, sql):
    url = f"{BASE_URL}/sessions/{session_handle}/statements/"
    payload = {"statement": sql}
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    data = response.json()
    operation_handle = data["operationHandle"]
    print(f"[+] SQL 已提交: operationHandle={operation_handle}")
    return operation_handle

def fetch_result(session_handle, operation_handle):
    url = f"{BASE_URL}/sessions/{session_handle}/operations/{operation_handle}/result/0"
    all_data = []
    while True:
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 404:
                time.sleep(1)
                continue
            response.raise_for_status()
            data = response.json()
        except (requests.RequestException, json.JSONDecodeError) as e:
            print(f"错误: 请求或解析失败 - {e}")
            return None
        
        result_type = data.get("resultType", "")

        if result_type == "NOT_READY":
            next_result_uri = data.get("nextResultUri")
            if next_result_uri:
                url = urljoin(BASE_URL, next_result_uri)
            else:
                time.sleep(1)
            continue
        
        if result_type == "EOS":
            break
        
        if result_type == "PAYLOAD":
            results = data.get("results", {})
            page_data = results.get("data", [])
            all_data.extend(page_data)
            
            next_result_uri = data.get("nextResultUri")
            if next_result_uri:
                url = urljoin(BASE_URL, next_result_uri)
            else:
                break
        else:
            print(f"错误: 未知的resultType - {result_type}")
            return None
    
    return all_data

def close_session(session_handle):
    url = f"{BASE_URL}/sessions/{session_handle}"
    response = requests.delete(url)
    response.raise_for_status()
    print(f"[+] 会话已关闭: sessionHandle={session_handle}")


if __name__ == "__main__":
    print("=== Flink SQL Gateway Python 调度脚本启动 ===")
    session = open_session()
    operation = submit_query(session, "SELECT 1")
    result = fetch_result(session, operation)
    if result is not None:
        print("查询结果:", result)
    else:
        print("查询失败。")
    close_session(session)
    print("=== 脚本执行完成 ===")

执行结束输出:

3.2 实际代码运行效果

因为具体flink sql需要对应自己的数据库不再展示,

结构就是 表注册(包含数据源表 + 目标表)--->数据操作

import requests
import time
import json
from urllib.parse import urljoin

BASE_URL = "http://localhost:8083/v1"

def open_session():
    url = f"{BASE_URL}/sessions"
    resp = requests.post(url)
    resp.raise_for_status()
    data = resp.json()
    return data["sessionHandle"]

def submit_query(session_handle, sql):
    url = f"{BASE_URL}/sessions/{session_handle}/statements/"
    headers = {"Content-Type": "application/json"}
    payload = {"statement": sql}
    resp = requests.post(url, headers=headers, json=payload)
    resp.raise_for_status()
    data = resp.json()
    return data["operationHandle"]

def fetch_result(session_handle, operation_handle):
    url = f"{BASE_URL}/sessions/{session_handle}/operations/{operation_handle}/result/0"
    while True:
        resp = requests.get(url)
        if resp.status_code == 404:
            time.sleep(1)
            continue
        resp.raise_for_status()
        data = resp.json()

        rt = data.get("resultType")
        if rt == "NOT_READY":
            time.sleep(1)
            url = urljoin(BASE_URL, data.get("nextResultUri", ""))
            continue
        if rt == "EOS":
            break
        if rt == "PAYLOAD":
            rows = data.get("results", {}).get("data", [])
            for r in rows:
                print(r)
            next_uri = data.get("nextResultUri")
            if next_uri:
                url = urljoin(BASE_URL, next_uri)
            else:
                break
        else:
            print(f"错误: 未知的resultType - {rt}")
            return False
    return True

def close_session(session_handle):
    url = f"{BASE_URL}/sessions/{session_handle}"
    requests.delete(url)

def submit_sql_file(session_handle, sql_file_path):
    with open(sql_file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    # 按;分割语句(简单处理)
    statements = [s.strip() for s in content.split(';') if s.strip()]
    for sql in statements:
        try:
            op_handle = submit_query(session_handle, sql)
            success = fetch_result(session_handle, op_handle)
            if not success:
                print("查询失败。")
                return
        except requests.HTTPError as e:
            print(f"错误: {e.response.text}")
            return

if __name__ == "__main__":
    print("=== Flink SQL Gateway 调度脚本启动 ===")
    session = open_session()
    submit_sql_file(session, "D:/notebook/software/pycharm/pycharm_project/PycharmProjects/flink_jdbc/flinksql/dwd/1.授信申请信息表.sql")  # 这里替换成你的 SQL 文件路径
    close_session(session)
    print("=== 脚本执行完成 ===")

运行结果展示:

控制台打印:

flink ui:127.0.0.1:8081

数据库:

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐