大家好,我是jobleap.cn的小九。
如果你想了解 Python 的 Kubernetes 客户端组件能做什么、解决什么问题,并且需要一份包含核心 API 实战的完整教程,我会从功能定位、核心价值到具体实战,帮你全面掌握这个工具。

一、Python Kubernetes 客户端:能做什么?解决什么问题?

1. 核心定位与能力

Python Kubernetes 客户端(官方库为 kubernetes)是 Kubernetes API 的 Python 语言封装,让你可以通过 Python 代码直接与 Kubernetes 集群交互,核心能做这些事:

  • 集群资源管理:创建、查询、更新、删除 Pod、Deployment、Service、ConfigMap 等所有 Kubernetes 原生资源;
  • 集群状态监控:实时获取集群节点、Pod、容器的运行状态、资源使用率等数据;
  • 自动化运维:编写脚本替代手动 kubectl 命令,实现批量操作、定时任务、异常自动处理;
  • 自定义控制器:基于 Watch API 开发自定义的控制器(如业务专属的资源调度、故障自愈逻辑);
  • 集成与扩展:将 Kubernetes 集群管理能力嵌入 Python 应用(如运维平台、CI/CD 系统)。

2. 解决的核心问题

  • 替代手动执行 kubectl 命令,避免批量操作时的重复劳动和人为错误;
  • 突破 kubectl 命令行的局限性,实现复杂的逻辑判断、循环、条件分支等编程化操作;
  • 无缝集成到 Python 生态(如结合 Airflow 做定时运维、结合 FastAPI 开发运维接口、结合 Prometheus 做监控告警);
  • 实现 Kubernetes 集群操作的“代码化、可版本化、可自动化”。

二、环境准备

1. 安装客户端库

# 安装官方客户端(推荐稳定版)
pip install kubernetes==29.0.0

2. 集群认证配置

Python Kubernetes 客户端会自动读取以下位置的认证配置(优先级从高到低):

  1. 代码中显式指定的 kubeconfig 文件路径;
  2. 环境变量 KUBECONFIG 指向的文件;
  3. 默认路径 ~/.kube/config(本地开发常用);
  4. 集群内 Pod 的服务账户(/var/run/secrets/kubernetes.io/serviceaccount/,集群内部署应用常用)。

验证配置是否有效:

from kubernetes import config, client

# 加载配置(本地开发用这个)
config.load_kube_config()
# 如果是在集群内的 Pod 中运行,替换为:config.load_incluster_config()

# 验证连接
v1 = client.CoreV1Api()
print("集群节点列表:")
for node in v1.list_node().items:
    print(f"- {node.metadata.name}")

三、常用 API 实战:核心操作串联

1. 基础资源操作:Pod 生命周期管理

(1)创建 Pod
from kubernetes import client, config
from kubernetes.client.rest import ApiException

# 加载配置
config.load_kube_config()
core_api = client.CoreV1Api()

# 定义 Pod 规格
pod_manifest = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {
        "name": "python-k8s-demo",
        "namespace": "default"
    },
    "spec": {
        "containers": [
            {
                "name": "nginx",
                "image": "nginx:1.25",
                "ports": [{"containerPort": 80}]
            }
        ],
        "restartPolicy": "Always"
    }
}

# 创建 Pod
try:
    response = core_api.create_namespaced_pod(
        body=pod_manifest,
        namespace="default"
    )
    print(f"Pod {response.metadata.name} 创建成功")
except ApiException as e:
    print(f"创建 Pod 失败:{e.reason} ({e.status})")
(2)查询 Pod 状态
# 查询单个 Pod
try:
    pod = core_api.read_namespaced_pod(
        name="python-k8s-demo",
        namespace="default"
    )
    print(f"Pod 状态:{pod.status.phase}")
    print(f"Pod IP:{pod.status.pod_ip}")
    print(f"容器状态:{pod.status.container_statuses[0].state}")
except ApiException as e:
    print(f"查询 Pod 失败:{e.reason}")

# 批量查询 Pod(按标签过滤)
pod_list = core_api.list_namespaced_pod(
    namespace="default",
    label_selector="app=demo"  # 可替换为你的标签
)
print("\n符合条件的 Pod 列表:")
for p in pod_list.items:
    print(f"- {p.metadata.name} ({p.status.phase})")
(3)更新 Pod(修改标签)
# 准备更新的标签
patch_body = {
    "metadata": {
        "labels": {
            "app": "demo",
            "env": "test"
        }
    }
}

try:
    response = core_api.patch_namespaced_pod(
        name="python-k8s-demo",
        namespace="default",
        body=patch_body
    )
    print(f"Pod 标签更新成功:{response.metadata.labels}")
except ApiException as e:
    print(f"更新 Pod 失败:{e.reason}")
(4)删除 Pod
try:
    core_api.delete_namespaced_pod(
        name="python-k8s-demo",
        namespace="default",
        # 优雅删除超时时间(秒)
        grace_period_seconds=5
    )
    print("Pod 删除成功")
except ApiException as e:
    print(f"删除 Pod 失败:{e.reason}")

2. 高级资源操作:Deployment 管理(无状态服务)

Deployment 是管理 Pod 副本和更新的核心资源,比直接操作 Pod 更实用:

from kubernetes import client, config

config.load_kube_config()
apps_api = client.AppsV1Api()

# 1. 创建 Deployment
deployment_manifest = {
    "apiVersion": "apps/v1",
    "kind": "Deployment",
    "metadata": {
        "name": "nginx-deployment",
        "namespace": "default"
    },
    "spec": {
        "replicas": 3,  # 3个副本
        "selector": {
            "matchLabels": {
                "app": "nginx"
            }
        },
        "template": {
            "metadata": {
                "labels": {
                    "app": "nginx"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "nginx",
                        "image": "nginx:1.25",
                        "ports": [{"containerPort": 80}]
                    }
                ]
            }
        }
    }
}

# 创建 Deployment
try:
    response = apps_api.create_namespaced_deployment(
        body=deployment_manifest,
        namespace="default"
    )
    print(f"Deployment {response.metadata.name} 创建成功")
except ApiException as e:
    print(f"创建 Deployment 失败:{e.reason}")

# 2. 扩缩容 Deployment(修改副本数)
try:
    scale_body = {
        "spec": {
            "replicas": 5
        }
    }
    response = apps_api.patch_namespaced_deployment_scale(
        name="nginx-deployment",
        namespace="default",
        body=scale_body
    )
    print(f"Deployment 扩缩容成功,当前副本数:{response.spec.replicas}")
except ApiException as e:
    print(f"扩缩容失败:{e.reason}")

# 3. 查询 Deployment 状态
try:
    deployment = apps_api.read_namespaced_deployment(
        name="nginx-deployment",
        namespace="default"
    )
    print(f"Deployment 可用副本数:{deployment.status.available_replicas}")
    print(f"Deployment 当前副本数:{deployment.status.replicas}")
except ApiException as e:
    print(f"查询 Deployment 失败:{e.reason}")

3. 监控与监听:Watch API 实时感知资源变化

Watch API 可以实时监听资源的创建、更新、删除事件,是开发自定义控制器的核心:

from kubernetes import client, config, watch

config.load_kube_config()
core_api = client.CoreV1Api()

# 监听 default 命名空间的 Pod 事件
w = watch.Watch()
print("开始监听 Pod 事件(按 Ctrl+C 停止):")
try:
    # stream 方法会阻塞,直到手动停止或超时
    for event in w.stream(
        core_api.list_namespaced_pod,
        namespace="default",
        timeout_seconds=60  # 监听60秒后自动停止
    ):
        pod = event["object"]
        print(f"事件类型:{event['type']} | Pod 名称:{pod.metadata.name} | 状态:{pod.status.phase}")
except KeyboardInterrupt:
    w.stop()
    print("\n监听已停止")

4. 配置管理:ConfigMap 读写

ConfigMap 用于存储非敏感配置,Python 客户端可轻松读写:

from kubernetes import client, config

config.load_kube_config()
core_api = client.CoreV1Api()

# 1. 创建 ConfigMap
cm_manifest = {
    "apiVersion": "v1",
    "kind": "ConfigMap",
    "metadata": {
        "name": "demo-config",
        "namespace": "default"
    },
    "data": {
        "app.conf": "env=test\nlog_level=info",
        "max_conn": "1000"
    }
}

try:
    core_api.create_namespaced_config_map(
        body=cm_manifest,
        namespace="default"
    )
    print("ConfigMap 创建成功")
except ApiException as e:
    print(f"创建 ConfigMap 失败:{e.reason}")

# 2. 读取 ConfigMap
try:
    cm = core_api.read_namespaced_config_map(
        name="demo-config",
        namespace="default"
    )
    print("ConfigMap 数据:")
    for key, value in cm.data.items():
        print(f"- {key}: {value}")
except ApiException as e:
    print(f"读取 ConfigMap 失败:{e.reason}")

四、典型实战场景:自动化运维脚本

以下是一个实用脚本:检查指定命名空间下所有 Pod 的状态,若有异常(Error/CrashLoopBackOff)则自动重启 Deployment:

from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import time

def check_and_restart_pods(namespace="default"):
    # 加载配置
    config.load_kube_config()
    core_api = client.CoreV1Api()
    apps_api = client.AppsV1Api()

    # 1. 查询所有异常 Pod
    abnormal_pods = []
    pod_list = core_api.list_namespaced_pod(namespace=namespace)
    for pod in pod_list.items:
        # 判断 Pod 状态是否异常
        if pod.status.phase in ["Failed", "Unknown"]:
            abnormal_pods.append(pod)
        # 判断容器是否崩溃循环
        for container_status in pod.status.container_statuses or []:
            if container_status.state and container_status.state.waiting:
                if container_status.state.waiting.reason == "CrashLoopBackOff":
                    abnormal_pods.append(pod)
                    break

    if not abnormal_pods:
        print("无异常 Pod")
        return

    # 2. 找出异常 Pod 关联的 Deployment 并重启
    for pod in abnormal_pods:
        print(f"异常 Pod:{pod.metadata.name},状态:{pod.status.phase}")
        # 获取 Pod 关联的 Deployment(通过 ownerReference)
        for owner in pod.metadata.owner_references or []:
            if owner.kind == "ReplicaSet":
                # 通过 ReplicaSet 找到 Deployment
                try:
                    rs = apps_api.read_namespaced_replica_set(
                        name=owner.name,
                        namespace=namespace
                    )
                    for rs_owner in rs.metadata.owner_references or []:
                        if rs_owner.kind == "Deployment":
                            deployment_name = rs_owner.name
                            print(f"重启 Deployment:{deployment_name}")
                            # 重启 Deployment(通过滚动更新实现)
                            apps_api.patch_namespaced_deployment(
                                name=deployment_name,
                                namespace=namespace,
                                body={"spec": {"template": {"metadata": {"annotations": {"kubectl.kubernetes.io/restartedAt": str(time.time())}}}}}
                            )
                            break
                except ApiException as e:
                    print(f"处理 Deployment 失败:{e.reason}")
                break

if __name__ == "__main__":
    check_and_restart_pods(namespace="default")

总结

  1. 核心价值:Python Kubernetes 客户端将 Kubernetes 集群操作代码化,解决了手动 kubectl 命令无法自动化、无法集成到应用的问题,是 Python 生态运维 K8s 的核心工具;
  2. 核心 API:基础操作(CoreV1Api)覆盖 Pod/ConfigMap/Service,高级操作(AppsV1Api)覆盖 Deployment/StatefulSet,Watch API 实现实时监控;
  3. 实战要点:认证优先使用 load_kube_config(本地)/load_incluster_config(集群内),操作时需处理 ApiException 异常,复杂运维优先用 Deployment 而非直接操作 Pod。

通过以上教程,你可以基于 Python 快速实现 Kubernetes 集群的自动化管理、监控和故障自愈,将集群操作无缝融入你的 Python 应用或运维脚本中。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐