Python Kubernetes 客户端全面教程:常用 API 串联与实战指南
核心价值:Python Kubernetes 客户端将 Kubernetes 集群操作代码化,解决了手动kubectl命令无法自动化、无法集成到应用的问题,是 Python 生态运维 K8s 的核心工具;核心 API:基础操作(CoreV1Api)覆盖 Pod/ConfigMap/Service,高级操作(AppsV1Api)覆盖 Deployment/StatefulSet,Watch API
大家好,我是jobleap.cn的小九。
如果你想了解 Python 的 Kubernetes 客户端组件能做什么、解决什么问题,并且需要一份包含核心 API 实战的完整教程,我会从功能定位、核心价值到具体实战,帮你全面掌握这个工具。
一、Python Kubernetes 客户端:能做什么?解决什么问题?
1. 核心定位与能力
Python Kubernetes 客户端(官方库为 kubernetes)是 Kubernetes API 的 Python 语言封装,让你可以通过 Python 代码直接与 Kubernetes 集群交互,核心能做这些事:
- 集群资源管理:创建、查询、更新、删除 Pod、Deployment、Service、ConfigMap 等所有 Kubernetes 原生资源;
- 集群状态监控:实时获取集群节点、Pod、容器的运行状态、资源使用率等数据;
- 自动化运维:编写脚本替代手动
kubectl命令,实现批量操作、定时任务、异常自动处理; - 自定义控制器:基于 Watch API 开发自定义的控制器(如业务专属的资源调度、故障自愈逻辑);
- 集成与扩展:将 Kubernetes 集群管理能力嵌入 Python 应用(如运维平台、CI/CD 系统)。
2. 解决的核心问题
- 替代手动执行
kubectl命令,避免批量操作时的重复劳动和人为错误; - 突破
kubectl命令行的局限性,实现复杂的逻辑判断、循环、条件分支等编程化操作; - 无缝集成到 Python 生态(如结合 Airflow 做定时运维、结合 FastAPI 开发运维接口、结合 Prometheus 做监控告警);
- 实现 Kubernetes 集群操作的“代码化、可版本化、可自动化”。
二、环境准备
1. 安装客户端库
# 安装官方客户端(推荐稳定版)
pip install kubernetes==29.0.0
2. 集群认证配置
Python Kubernetes 客户端会自动读取以下位置的认证配置(优先级从高到低):
- 代码中显式指定的 kubeconfig 文件路径;
- 环境变量
KUBECONFIG指向的文件; - 默认路径
~/.kube/config(本地开发常用); - 集群内 Pod 的服务账户(
/var/run/secrets/kubernetes.io/serviceaccount/,集群内部署应用常用)。
验证配置是否有效:
from kubernetes import config, client
# 加载配置(本地开发用这个)
config.load_kube_config()
# 如果是在集群内的 Pod 中运行,替换为:config.load_incluster_config()
# 验证连接
v1 = client.CoreV1Api()
print("集群节点列表:")
for node in v1.list_node().items:
print(f"- {node.metadata.name}")
三、常用 API 实战:核心操作串联
1. 基础资源操作:Pod 生命周期管理
(1)创建 Pod
from kubernetes import client, config
from kubernetes.client.rest import ApiException
# 加载配置
config.load_kube_config()
core_api = client.CoreV1Api()
# 定义 Pod 规格
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "python-k8s-demo",
"namespace": "default"
},
"spec": {
"containers": [
{
"name": "nginx",
"image": "nginx:1.25",
"ports": [{"containerPort": 80}]
}
],
"restartPolicy": "Always"
}
}
# 创建 Pod
try:
response = core_api.create_namespaced_pod(
body=pod_manifest,
namespace="default"
)
print(f"Pod {response.metadata.name} 创建成功")
except ApiException as e:
print(f"创建 Pod 失败:{e.reason} ({e.status})")
(2)查询 Pod 状态
# 查询单个 Pod
try:
pod = core_api.read_namespaced_pod(
name="python-k8s-demo",
namespace="default"
)
print(f"Pod 状态:{pod.status.phase}")
print(f"Pod IP:{pod.status.pod_ip}")
print(f"容器状态:{pod.status.container_statuses[0].state}")
except ApiException as e:
print(f"查询 Pod 失败:{e.reason}")
# 批量查询 Pod(按标签过滤)
pod_list = core_api.list_namespaced_pod(
namespace="default",
label_selector="app=demo" # 可替换为你的标签
)
print("\n符合条件的 Pod 列表:")
for p in pod_list.items:
print(f"- {p.metadata.name} ({p.status.phase})")
(3)更新 Pod(修改标签)
# 准备更新的标签
patch_body = {
"metadata": {
"labels": {
"app": "demo",
"env": "test"
}
}
}
try:
response = core_api.patch_namespaced_pod(
name="python-k8s-demo",
namespace="default",
body=patch_body
)
print(f"Pod 标签更新成功:{response.metadata.labels}")
except ApiException as e:
print(f"更新 Pod 失败:{e.reason}")
(4)删除 Pod
try:
core_api.delete_namespaced_pod(
name="python-k8s-demo",
namespace="default",
# 优雅删除超时时间(秒)
grace_period_seconds=5
)
print("Pod 删除成功")
except ApiException as e:
print(f"删除 Pod 失败:{e.reason}")
2. 高级资源操作:Deployment 管理(无状态服务)
Deployment 是管理 Pod 副本和更新的核心资源,比直接操作 Pod 更实用:
from kubernetes import client, config
config.load_kube_config()
apps_api = client.AppsV1Api()
# 1. 创建 Deployment
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "nginx-deployment",
"namespace": "default"
},
"spec": {
"replicas": 3, # 3个副本
"selector": {
"matchLabels": {
"app": "nginx"
}
},
"template": {
"metadata": {
"labels": {
"app": "nginx"
}
},
"spec": {
"containers": [
{
"name": "nginx",
"image": "nginx:1.25",
"ports": [{"containerPort": 80}]
}
]
}
}
}
}
# 创建 Deployment
try:
response = apps_api.create_namespaced_deployment(
body=deployment_manifest,
namespace="default"
)
print(f"Deployment {response.metadata.name} 创建成功")
except ApiException as e:
print(f"创建 Deployment 失败:{e.reason}")
# 2. 扩缩容 Deployment(修改副本数)
try:
scale_body = {
"spec": {
"replicas": 5
}
}
response = apps_api.patch_namespaced_deployment_scale(
name="nginx-deployment",
namespace="default",
body=scale_body
)
print(f"Deployment 扩缩容成功,当前副本数:{response.spec.replicas}")
except ApiException as e:
print(f"扩缩容失败:{e.reason}")
# 3. 查询 Deployment 状态
try:
deployment = apps_api.read_namespaced_deployment(
name="nginx-deployment",
namespace="default"
)
print(f"Deployment 可用副本数:{deployment.status.available_replicas}")
print(f"Deployment 当前副本数:{deployment.status.replicas}")
except ApiException as e:
print(f"查询 Deployment 失败:{e.reason}")
3. 监控与监听:Watch API 实时感知资源变化
Watch API 可以实时监听资源的创建、更新、删除事件,是开发自定义控制器的核心:
from kubernetes import client, config, watch
config.load_kube_config()
core_api = client.CoreV1Api()
# 监听 default 命名空间的 Pod 事件
w = watch.Watch()
print("开始监听 Pod 事件(按 Ctrl+C 停止):")
try:
# stream 方法会阻塞,直到手动停止或超时
for event in w.stream(
core_api.list_namespaced_pod,
namespace="default",
timeout_seconds=60 # 监听60秒后自动停止
):
pod = event["object"]
print(f"事件类型:{event['type']} | Pod 名称:{pod.metadata.name} | 状态:{pod.status.phase}")
except KeyboardInterrupt:
w.stop()
print("\n监听已停止")
4. 配置管理:ConfigMap 读写
ConfigMap 用于存储非敏感配置,Python 客户端可轻松读写:
from kubernetes import client, config
config.load_kube_config()
core_api = client.CoreV1Api()
# 1. 创建 ConfigMap
cm_manifest = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "demo-config",
"namespace": "default"
},
"data": {
"app.conf": "env=test\nlog_level=info",
"max_conn": "1000"
}
}
try:
core_api.create_namespaced_config_map(
body=cm_manifest,
namespace="default"
)
print("ConfigMap 创建成功")
except ApiException as e:
print(f"创建 ConfigMap 失败:{e.reason}")
# 2. 读取 ConfigMap
try:
cm = core_api.read_namespaced_config_map(
name="demo-config",
namespace="default"
)
print("ConfigMap 数据:")
for key, value in cm.data.items():
print(f"- {key}: {value}")
except ApiException as e:
print(f"读取 ConfigMap 失败:{e.reason}")
四、典型实战场景:自动化运维脚本
以下是一个实用脚本:检查指定命名空间下所有 Pod 的状态,若有异常(Error/CrashLoopBackOff)则自动重启 Deployment:
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import time
def check_and_restart_pods(namespace="default"):
# 加载配置
config.load_kube_config()
core_api = client.CoreV1Api()
apps_api = client.AppsV1Api()
# 1. 查询所有异常 Pod
abnormal_pods = []
pod_list = core_api.list_namespaced_pod(namespace=namespace)
for pod in pod_list.items:
# 判断 Pod 状态是否异常
if pod.status.phase in ["Failed", "Unknown"]:
abnormal_pods.append(pod)
# 判断容器是否崩溃循环
for container_status in pod.status.container_statuses or []:
if container_status.state and container_status.state.waiting:
if container_status.state.waiting.reason == "CrashLoopBackOff":
abnormal_pods.append(pod)
break
if not abnormal_pods:
print("无异常 Pod")
return
# 2. 找出异常 Pod 关联的 Deployment 并重启
for pod in abnormal_pods:
print(f"异常 Pod:{pod.metadata.name},状态:{pod.status.phase}")
# 获取 Pod 关联的 Deployment(通过 ownerReference)
for owner in pod.metadata.owner_references or []:
if owner.kind == "ReplicaSet":
# 通过 ReplicaSet 找到 Deployment
try:
rs = apps_api.read_namespaced_replica_set(
name=owner.name,
namespace=namespace
)
for rs_owner in rs.metadata.owner_references or []:
if rs_owner.kind == "Deployment":
deployment_name = rs_owner.name
print(f"重启 Deployment:{deployment_name}")
# 重启 Deployment(通过滚动更新实现)
apps_api.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body={"spec": {"template": {"metadata": {"annotations": {"kubectl.kubernetes.io/restartedAt": str(time.time())}}}}}
)
break
except ApiException as e:
print(f"处理 Deployment 失败:{e.reason}")
break
if __name__ == "__main__":
check_and_restart_pods(namespace="default")
总结
- 核心价值:Python Kubernetes 客户端将 Kubernetes 集群操作代码化,解决了手动
kubectl命令无法自动化、无法集成到应用的问题,是 Python 生态运维 K8s 的核心工具; - 核心 API:基础操作(CoreV1Api)覆盖 Pod/ConfigMap/Service,高级操作(AppsV1Api)覆盖 Deployment/StatefulSet,Watch API 实现实时监控;
- 实战要点:认证优先使用
load_kube_config(本地)/load_incluster_config(集群内),操作时需处理ApiException异常,复杂运维优先用 Deployment 而非直接操作 Pod。
通过以上教程,你可以基于 Python 快速实现 Kubernetes 集群的自动化管理、监控和故障自愈,将集群操作无缝融入你的 Python 应用或运维脚本中。
更多推荐
所有评论(0)