flink docker部署以及,使用 flink sql gateway, 远程调度flink进行flink sql任务执行
使用 dockers-compose 进行 系统安装。
·
1.flink docker部署
使用 dockers-compose 进行 系统安装
1.1 在喜欢的地方创建文件 dockers-compose.yaml
内容如下:
services:
jobmanager:
image: flink:1.17
container_name: flink_jobmanager
hostname: jobmanager
ports:
- "8081:8081" # web ui
- "6123:6123" # RPC
- "8083:8083" # 加 SQL Gateway
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
command: >
/bin/bash -c "
echo '启动 Flink 集群...';
/opt/flink/bin/start-cluster.sh;
echo '启动 SQL Gateway...';
/opt/flink/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=0.0.0.0;
tail -f /opt/flink/log/*.log
"
taskmanager:
image: flink:1.17
container_name: flink_taskmanager
depends_on:
- jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
command: taskmanager
1.2 在你创建文件的地方执行cmd命令
docker-compose up -d

1.3 等待镜像push完成

2. flink 配置完善
2.1 下载jar包
所有jar包资源均可在:Central Repository: org/apache/flink 官方仓库获取
Mysql连接:Central Repository: com/mysql/mysql-connector-j/8.0.33
2.1 移动jar包到对应位置
docker 执行
docker cp D:\notebook\Flink\lib\flink-connector-jdbc-3.1.0-1.17.jar flink_taskmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\mysql-connector-j-8.0.33.jar flink_taskmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\flink-connector-mysql-cdc-2.3.0.jar flink_taskmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\flink-connector-jdbc-3.1.0-1.17.jar flink_jobmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\mysql-connector-j-8.0.33.jar flink_jobmanager:/opt/flink/lib/
docker cp D:\notebook\Flink\lib\flink-connector-mysql-cdc-2.3.0.jar flink_jobmanager:/opt/flink/lib/
然后 执行:
docker exec -it flink_jobmanager bash
cd lib
ls
就能看到移动的jar包:

2.2 重启服务
(因为没做 挂载卷持久化操作,需要可自行探索 在docker-compose.yaml 文件里加入配置就行)
不能使用(否则会将进行初始化,jar包会被清除):
docker-compose down
docker-compos up -d
正确使用:
docker restart flink_taskmanager
docker restart flink_jobmanager
3. 测试 sql gateway
3.1 python测试代码
import requests
import time
import json
from urllib.parse import urljoin
BASE_URL = "http://localhost:8083/v1"
def open_session():
url = f"{BASE_URL}/sessions"
response = requests.post(url)
response.raise_for_status()
data = response.json()
session_handle = data["sessionHandle"]
print(f"[+] 会话已打开: sessionHandle={session_handle}")
return session_handle
def submit_query(session_handle, sql):
url = f"{BASE_URL}/sessions/{session_handle}/statements/"
payload = {"statement": sql}
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
operation_handle = data["operationHandle"]
print(f"[+] SQL 已提交: operationHandle={operation_handle}")
return operation_handle
def fetch_result(session_handle, operation_handle):
url = f"{BASE_URL}/sessions/{session_handle}/operations/{operation_handle}/result/0"
all_data = []
while True:
try:
response = requests.get(url, timeout=5)
if response.status_code == 404:
time.sleep(1)
continue
response.raise_for_status()
data = response.json()
except (requests.RequestException, json.JSONDecodeError) as e:
print(f"错误: 请求或解析失败 - {e}")
return None
result_type = data.get("resultType", "")
if result_type == "NOT_READY":
next_result_uri = data.get("nextResultUri")
if next_result_uri:
url = urljoin(BASE_URL, next_result_uri)
else:
time.sleep(1)
continue
if result_type == "EOS":
break
if result_type == "PAYLOAD":
results = data.get("results", {})
page_data = results.get("data", [])
all_data.extend(page_data)
next_result_uri = data.get("nextResultUri")
if next_result_uri:
url = urljoin(BASE_URL, next_result_uri)
else:
break
else:
print(f"错误: 未知的resultType - {result_type}")
return None
return all_data
def close_session(session_handle):
url = f"{BASE_URL}/sessions/{session_handle}"
response = requests.delete(url)
response.raise_for_status()
print(f"[+] 会话已关闭: sessionHandle={session_handle}")
if __name__ == "__main__":
print("=== Flink SQL Gateway Python 调度脚本启动 ===")
session = open_session()
operation = submit_query(session, "SELECT 1")
result = fetch_result(session, operation)
if result is not None:
print("查询结果:", result)
else:
print("查询失败。")
close_session(session)
print("=== 脚本执行完成 ===")
执行结束输出:

3.2 实际代码运行效果
因为具体flink sql需要对应自己的数据库不再展示,
结构就是 表注册(包含数据源表 + 目标表)--->数据操作
import requests
import time
import json
from urllib.parse import urljoin
BASE_URL = "http://localhost:8083/v1"
def open_session():
url = f"{BASE_URL}/sessions"
resp = requests.post(url)
resp.raise_for_status()
data = resp.json()
return data["sessionHandle"]
def submit_query(session_handle, sql):
url = f"{BASE_URL}/sessions/{session_handle}/statements/"
headers = {"Content-Type": "application/json"}
payload = {"statement": sql}
resp = requests.post(url, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
return data["operationHandle"]
def fetch_result(session_handle, operation_handle):
url = f"{BASE_URL}/sessions/{session_handle}/operations/{operation_handle}/result/0"
while True:
resp = requests.get(url)
if resp.status_code == 404:
time.sleep(1)
continue
resp.raise_for_status()
data = resp.json()
rt = data.get("resultType")
if rt == "NOT_READY":
time.sleep(1)
url = urljoin(BASE_URL, data.get("nextResultUri", ""))
continue
if rt == "EOS":
break
if rt == "PAYLOAD":
rows = data.get("results", {}).get("data", [])
for r in rows:
print(r)
next_uri = data.get("nextResultUri")
if next_uri:
url = urljoin(BASE_URL, next_uri)
else:
break
else:
print(f"错误: 未知的resultType - {rt}")
return False
return True
def close_session(session_handle):
url = f"{BASE_URL}/sessions/{session_handle}"
requests.delete(url)
def submit_sql_file(session_handle, sql_file_path):
with open(sql_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 按;分割语句(简单处理)
statements = [s.strip() for s in content.split(';') if s.strip()]
for sql in statements:
try:
op_handle = submit_query(session_handle, sql)
success = fetch_result(session_handle, op_handle)
if not success:
print("查询失败。")
return
except requests.HTTPError as e:
print(f"错误: {e.response.text}")
return
if __name__ == "__main__":
print("=== Flink SQL Gateway 调度脚本启动 ===")
session = open_session()
submit_sql_file(session, "D:/notebook/software/pycharm/pycharm_project/PycharmProjects/flink_jdbc/flinksql/dwd/1.授信申请信息表.sql") # 这里替换成你的 SQL 文件路径
close_session(session)
print("=== 脚本执行完成 ===")
运行结果展示:
控制台打印:

flink ui:127.0.0.1:8081

数据库:


更多推荐
所有评论(0)