企业微信Webhook开发
Prometheus Alertmanager:监控告警集成。CI/CD通知:Jenkins构建状态、Git提交通知。工作流触发:与n8n、Zapier等自动化平台集成。数据同步:数据库同步结果、ETL任务完成通知。Python SDK:封装完整的消息发送类。监控告警:服务器监控、业务异常报警。Jenkins插件:自动化构建通知。Java SDK:企业级集成方案。1、企业微信PC端查看人员ID。n
·
一、创建群聊机器人

- 1、只能是内部人员群才可以进行发送
- 2、获取Webhook地址:点击 “添加机器人”,创建成功后系统会立即显示一个Webhook地址,格式如下
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=*******
1、做一个简单的测试
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的KEY' \
-H 'Content-Type: application/json' \
-d '{
"msgtype": "text",
"text": {
"content": "【服务器告警】\n时间:2025-02-27 14:30\n主机:192.168.1.100\n状态:CRITICAL\n详情:CPU使用率超过95%,请立即处理!",
"mentioned_list": ["@all"]
}
}'
1、根据不同的告警内容发送给不同的同事

-
1、企业微信PC端查看人员ID
-
2、根据不同的Id@不同的人
-
3、简单示例
- 测试同事示例
# 先获取当前时间,再发送消息
current_time=$(date "+%Y-%m-%d %H:%M:%S")
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=******************' \
-H 'Content-Type: application/json' \
-d "{
\"msgtype\": \"text\",
\"text\": {
\"content\": \"【内存告警】\\n时间:${current_time}\\n主机:192.168.1.100\\n状态:WARNING\\n详情:内存使用率超过85%,请检查!\",
\"mentioned_list\": [\"测试同事userid\"]
}
}"
- 开发同事示例
# 1. 定义关键变量
WEBHOOK_KEY="*************" # 请替换为您的真实Key
DEV_USER_ID="***" # 请替换为开发同事的真实userid
CURRENT_TIME=$(date "+%Y-%m-%d %H:%M:%S")
HOST_IP="192.168.10.101"
CPU_USAGE="96"
THRESHOLD="80"
# 2. 发送告警
curl "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=${WEBHOOK_KEY}" \
-H 'Content-Type: application/json' \
-d "{
\"msgtype\": \"markdown\",
\"markdown\": {
\"content\": \"**🚨 服务器CPU紧急告警**\\n> **告警时间**:${CURRENT_TIME}\\n> **服务器IP**:${HOST_IP}\\n> **当前使用率**:<font color=\\\"warning\\\">${CPU_USAGE}%</font>\\n> **告警阈值**:${THRESHOLD}%\\n> **问题描述**:CPU使用率持续过高,可能影响服务响应。\\n\\n<@${DEV_USER_ID}> 请立即介入排查!\"
}
}"
二、 webhook支持的数据类型
| 消息类型 | 格式 | 主要用途 | 关键限制 |
|---|---|---|---|
| 文本消息 | text |
基础文本通知,支持@成员 | 最长 2048 字节 |
| Markdown 消息 | markdown / markdown_v2 |
格式化文本,适合技术报告、代码片段 | 最长 4096 字节 |
| 图片消息 | image |
发送图片、截图、图表 | Base64 编码,最大 2MB |
| 文件消息 | file |
发送文档、日志、压缩包 | 5B ~ 20MB |
| 图文消息 | news |
带标题、描述、图片和链接的富媒体 | 支持 1-8 条图文 |
| 语音消息 | voice |
发送音频文件 | - |
| 模板卡片 | template_card |
交互式卡片,支持按钮、输入框 | 增强用户交互体验 |
1、 简单示例
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=5ae60f1d-297c-4268-b4b4-72991e4cd6ed' \
-H 'Content-Type: application/json' \
-d '{
"msgtype": "template_card",
"template_card": {
"card_type": "text_notice",
"source": {
"icon_url": "https://res.mail.qq.com/node/ww/wwopenmng/images/independent/doc/test_pic_msg1.png",
"desc": "系统监控",
"desc_color": 1
},
"main_title": {
"title": "多指标告警",
"desc": "CPU:95% | 内存:85% | 磁盘:90%"
},
"horizontal_content_list": [
{
"keyname": "告警时间",
"value": "2024-01-15 14:30:00"
},
{
"keyname": "告警级别",
"value": "严重"
}
],
"card_action": {
"type": 1,
"url": "https://www.work.weixin.qq.com"
}
}
}'
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=5ae60f1d-297c-4268-b4b4-72991e4cd6ed' -H 'Content-Type: application/json' -d '{
"msgtype": "news",
"news": {
"articles": [
{
"title": "多指标告警",
"description": "CPU:95% | 内存:85% | 磁盘:90%\n告警时间:2024-01-15 14:30:00",
"url": "https://www.work.weixin.qq.com/",
"picurl": "https://res.mail.qq.com/node/ww/wwopenmng/images/independent/doc/test_pic_msg1.png"
}
]
}
}'
2、webhook其他的使用注意事项
- 1、图片建议尺寸:大图1068×455,小图150×150 图片格式:JPG、PNG
- 2、只能单向通讯,只能发送,不能接收,如需双向交互,需使用自建应用或智能机器人(API模式)
3、高级应用场景
CI/CD通知:Jenkins构建状态、Git提交通知
监控告警:服务器监控、业务异常报警
数据同步:数据库同步结果、ETL任务完成通知
工作流触发:与n8n、Zapier等自动化平台集成
4、扩展能力
1、与第三方工具集成
Jenkins插件:自动化构建通知
n8n工作流:可视化自动化流程
Prometheus Alertmanager:监控告警集成
Zabbix:IT监控告警通知
2、开发SDK支持
Python SDK:封装完整的消息发送类
Java SDK:企业级集成方案
3、命令行工具:
如x-cmd的qywx模块
三、python-falst 客户端和服务端采集告警
[内网数据源: 192.168.1.200]
--(1) 采集数据/告警-->
[客户端(Client)]
--(2) HTTP POST 上报-->
[网关/服务端: 192.168.1.100:5000]
--(3) 接收、处理请求-->
[服务端(Server)]
--(4) 调用公网API-->
[企业微信(Webhook)]
--(5) 推送消息-->
[企业微信群聊]
1、 测试webhook是否可用
# 在 192.168.1.100 上执行
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=**************** \
-H 'Content-Type: application/json' \
-d '{
"msgtype": "text",
"text": {
"content": "直接测试消息:如果收到此消息,说明Webhook配置正确"
}
}'
# 在客户端容器中执行
python3 -c "import requests; r=requests.post('http://192.168.1.100:5000/report', json={'metric':'test','value':99,'threshold':80,'status':'firing','message':'测试告警'}); print(r.status_code, r.text)"
2、192.168.1.200
cat << EOF monitor_client.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内网监控客户端
部署于:192.168.1.200 (及未来其他内网机器)
功能:采集本地数据,并上报到网关服务器(192.168.1.100)
"""
import requests
import json
import time
import logging
import socket
import psutil # 需要安装: pip install psutil
import os
# ========== 配置区域 ==========
# 网关服务器的地址
GATEWAY_SERVER = "http://192.168.1.100:5000"
# 宿主机IP(通过环境变量传入,运行容器时使用 -e HOST_IP=192.168.1.200)
HOST_IP = os.getenv('HOST_IP', '')
REPORT_ENDPOINT = f"{GATEWAY_SERVER}/report"
# =============================
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_host_ip():
"""
获取主机IP地址(优先使用环境变量传入的宿主机IP)
返回:(宿主机IP, 容器内IP)
"""
# 1. 首选从环境变量获取宿主机IP(由运行Docker时传入)
if HOST_IP:
logger.info(f"从环境变量获取到宿主机IP: {HOST_IP}")
else:
logger.warning("未设置 HOST_IP 环境变量,将尝试自动检测,但可能不准确。")
# 2. 获取容器内IP(用于参考和调试)
container_ip = "未知"
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80)) # 不实际发送数据,仅用于获取本机IP
container_ip = s.getsockname()[0]
s.close()
except Exception:
pass
return HOST_IP, container_ip
def collect_system_metrics():
"""采集本机系统指标(示例:CPU和内存)"""
metrics = []
# 采集CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
metrics.append({
"metric": "cpu_usage_percent",
"value": cpu_percent,
"threshold": 80.0,
"status": "firing" if cpu_percent > 80.0 else "normal",
"message": f"CPU使用率: {cpu_percent:.1f}%"
})
# 采集内存使用率
mem = psutil.virtual_memory()
metrics.append({
"metric": "memory_usage_percent",
"value": mem.percent,
"threshold": 85.0,
"status": "firing" if mem.percent > 85.0 else "normal",
"message": f"内存使用率: {mem.percent:.1f}%,可用: {mem.available / (1024**3):.2f} GB"
})
return metrics
def report_to_gateway(alert_data, host_ip, container_ip):
"""将告警数据上报到网关服务器"""
# 在告警数据中增加IP信息
alert_data_with_ip = {
**alert_data,
"host_ip": host_ip, # 宿主机真实IP
"container_ip": container_ip # 容器内部IP(供调试参考)
}
try:
headers = {'Content-Type': 'application/json'}
response = requests.post(REPORT_ENDPOINT, json=alert_data_with_ip, headers=headers, timeout=10)
response.raise_for_status()
result = response.json()
if result.get('status') == 'success':
logger.info(f"✅ 告警上报成功: {alert_data.get('metric')} (宿主机: {host_ip})")
return True
else:
logger.warning(f"⚠️ 网关处理告警未返回成功状态: {result}")
return False
except requests.exceptions.ConnectionError:
logger.error(f"❌ 无法连接到网关服务器 {GATEWAY_SERVER},请检查网络或服务状态。")
return False
except requests.exceptions.Timeout:
logger.error("⏱️ 连接网关服务器超时。")
return False
except Exception as e:
logger.error(f"❌ 上报告警时发生错误: {e}")
return False
def main_loop(interval_seconds=60):
"""主循环,定期采集并上报数据"""
# 获取IP信息
host_ip, container_ip = get_host_ip()
local_ip = host_ip if host_ip else container_ip
logger.info(f"内网监控客户端启动。")
logger.info(f" 宿主机IP(上报用): {host_ip if host_ip else '(未设置,请检查环境变量HOST_IP)'}")
logger.info(f" 容器内部IP: {container_ip}")
logger.info(f" 数据将上报至网关: {GATEWAY_SERVER}")
logger.info(f" 采集间隔: {interval_seconds} 秒")
while True:
try:
logger.info("开始新一轮数据采集...")
metrics = collect_system_metrics()
for item in metrics:
# 只上报触发阈值的告警
if item['status'] == 'firing':
report_to_gateway(item, host_ip, container_ip)
# 如果想上报所有状态(包括normal),可以去掉if判断
# report_to_gateway(item, host_ip, container_ip)
except Exception as e:
logger.error(f"在主循环中发生未预期错误: {e}")
logger.info(f"本次采集上报完成,等待 {interval_seconds} 秒后继续...")
time.sleep(interval_seconds)
if __name__ == '__main__':
# 在运行主循环前,可以先测试一次连通性
try:
test_resp = requests.get(f"{GATEWAY_SERVER}/health", timeout=5)
if test_resp.status_code == 200:
logger.info(f"网关服务连接测试成功: {test_resp.json()}")
else:
logger.warning(f"网关服务健康检查返回非200状态: {test_resp.status_code}")
except Exception as e:
logger.error(f"无法连接到网关服务,请确认 {GATEWAY_SERVER} 可访问: {e}")
exit(1)
# 启动主循环
main_loop(interval_seconds=60) # 每60秒采集上报一次
EOF
3、192.168.1.100
cat << EOF gateway_server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
告警网关服务端
部署于:192.168.1.100
功能:接收内网客户端上报的告警,并转发至企业微信
启动命令:python3 gateway_server.py
"""
from flask import Flask, request, jsonify
import requests
import logging
import json
from datetime import datetime
app = Flask(__name__)
# ========== 配置区域 ==========
# 企业微信机器人Webhook地址 (在 192.168.1.100 上配置)
# !!! 请替换为您自己的Webhook Key !!!
WECHAT_WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=5ae60f1d-297c-4268-b4b4-72991e4cd6ed"
# 服务端监听配置。由于需要被内网机器访问,host 应设为 '0.0.0.0'
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 5000
# =============================
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def forward_to_wechat(alert_data):
"""将告警数据转发到企业微信"""
# 优先使用客户端上报的 host_ip(宿主机IP),如果没有则使用请求来源IP(容器IP)
# 这是实现“告警显示宿主机IP”的关键修改
display_ip = alert_data.get('host_ip', alert_data.get('source_ip', '未知'))
markdown_content = f"""**🚨 收到来自内网机器的告警**
**主机(物理机/宿主机)**: `{display_ip}`
**指标**: {alert_data.get('metric', '未知')}
**当前值**: {alert_data.get('value', '未知')}
**阈值**: {alert_data.get('threshold', 'N/A')}
**状态**: {alert_data.get('status', 'firing')}
**时间**: {alert_data.get('timestamp', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))}
**详情**: {alert_data.get('message', '')}
"""
payload = {
"msgtype": "markdown",
"markdown": {
"content": markdown_content
}
}
try:
resp = requests.post(WECHAT_WEBHOOK_URL, json=payload, timeout=10)
resp.raise_for_status() # 如果状态码不是200,抛出异常
result = resp.json()
if result.get('errcode') == 0:
logger.info(f"消息成功转发至企业微信。来源IP(宿主机): {display_ip}")
return True
else:
logger.error(f"企业微信接口返回错误: {result}")
return False
except Exception as e:
logger.error(f"转发消息到企业微信时失败: {e}")
return False
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({"status": "ok", "service": "alert-gateway", "host": "192.168.1.100"})
@app.route('/report', methods=['POST'])
def report_alert():
"""
接收客户端上报告警的主要接口。
期望的JSON格式(客户端修改后会增加 host_ip 和 container_ip 字段):
{
"metric": "cpu_usage",
"value": 95.5,
"threshold": 80,
"status": "firing",
"message": "CPU使用率超过阈值",
"host_ip": "192.168.1.200", # <- 新增:宿主机IP
"container_ip": "172.17.0.2" # <- 新增:容器IP(供调试)
}
"""
client_ip = request.remote_addr # 这是客户端的容器IP
logger.info(f"收到来自 {client_ip} 的上报请求")
try:
data = request.get_json()
if not data:
return jsonify({"error": "请求体必须为JSON格式"}), 400
# 为告警数据补充来源IP(容器IP)和时间戳
alert_data = {
"source_ip": client_ip,
"timestamp": datetime.now().isoformat(),
**data # 合并上传的数据(其中应包含 host_ip 和 container_ip)
}
logger.info(f"处理告警数据: {json.dumps(alert_data, indent=2, ensure_ascii=False)}")
# 转发到企业微信
if forward_to_wechat(alert_data):
return jsonify({
"status": "success",
"message": "告警��接收并转发",
"received_from": client_ip
}), 200
else:
return jsonify({
"status": "error",
"message": "告警转发至企业微信失败"
}), 500
except Exception as e:
logger.exception(f"处理上报请求时发生异常: {e}")
return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500
if __name__ == '__main__':
logger.info(f"告警网关服务端启动,监听 {SERVER_HOST}:{SERVER_PORT}")
logger.info(f"健康检查地址: http://192.168.1.100:{SERVER_PORT}/health")
logger.info(f"告警上报地址: http://192.168.1.100:{SERVER_PORT}/report")
app.run(host=SERVER_HOST, port=SERVER_PORT, debug=False)
EOF
4、 测试脚本
cat << EOF simulate_fault.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模拟故障告警客户端
部署于:192.168.1.200 (或其他内网机器)
功能:手动发送模拟的故障告警,用于测试告警系统
使用方式:
1. python3 simulate_fault.py --type cpu --value 95
2. python3 simulate_fault.py --list
3. python3 simulate_fault.py --auto
"""
import requests
import json
import time
import logging
import argparse
import random
import os
import socket
from datetime import datetime
# ========== 配置区域 ==========
GATEWAY_SERVER = "http://192.168.1.100:5000"
REPORT_ENDPOINT = f"{GATEWAY_SERVER}/report"
# 从环境变量获取宿主机IP(关键修改)
HOST_IP = os.getenv('HOST_IP', '')
# =============================
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def get_container_ip():
"""获取容器内部IP(用于调试和参考)"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "unknown"
def get_ip_info():
"""获取IP信息:宿主机IP和容器IP"""
host_ip = HOST_IP
container_ip = get_container_ip()
if host_ip:
logger.info(f"使用环境变量中的宿主机IP: {host_ip}")
else:
logger.warning("未设置 HOST_IP 环境变量,将使用容器IP作为主机IP")
host_ip = container_ip
logger.info(f"容器内部IP: {container_ip}")
return host_ip, container_ip
class FaultSimulator:
def __init__(self, gateway_url=GATEWAY_SERVER):
self.gateway_url = gateway_url
self.host_ip, self.container_ip = get_ip_info()
def send_alert(self, alert_data):
"""发送告警到网关(增加宿主机IP信息)"""
# 在告警数据中添加IP信息
alert_data_with_ip = {
**alert_data,
"host_ip": self.host_ip, # 宿主机真实IP
"container_ip": self.container_ip, # 容器内部IP
"timestamp": datetime.now().isoformat()
}
try:
response = requests.post(
f"{self.gateway_url}/report",
json=alert_data_with_ip,
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'success':
logger.info(f"✅ 告警发送成功: {alert_data.get('metric')} (来自主机: {self.host_ip})")
return True
else:
logger.warning(f"⚠️ 网关处理告警未返回成功: {result}")
return False
else:
logger.error(f"❌ 网关返回异常状态码: {response.status_code}")
logger.error(f"响应: {response.text}")
return False
except Exception as e:
logger.error(f"❌ 发送告警时发生错误: {e}")
return False
def simulate_cpu_fault(self, value=None, duration_minutes=5):
"""模拟CPU故障告警"""
if value is None:
value = random.randint(85, 100) # 随机生成85-100之间的值
alert_data = {
"metric": "cpu_usage_percent",
"value": float(value),
"threshold": 80.0,
"status": "firing",
"message": f"CPU使用率持续超过阈值,已持续{duration_minutes}分钟。可能原因:1. 应用进程异常 2. 系统负载过高 3. 配置不足"
}
logger.info(f"模拟CPU故障: 使用率{value}%,阈值80%,持续时间{duration_minutes}分钟")
return self.send_alert(alert_data)
def simulate_memory_fault(self, value=None, leak_rate="2GB/小时"):
"""模拟内存故障告警"""
if value is None:
value = random.randint(88, 99)
alert_data = {
"metric": "memory_usage_percent",
"value": float(value),
"threshold": 85.0,
"status": "firing",
"message": f"内存使用率过高,疑似内存泄漏,泄漏率约{leak_rate}。建议:1. 检查应用内存使用 2. 重启相关服务 3. 扩容内存"
}
logger.info(f"模拟内存故障: 使用率{value}%,阈值85%,泄漏率{leak_rate}")
return self.send_alert(alert_data)
def simulate_disk_fault(self, value=None, partition="/var"):
"""模拟磁盘故障告警"""
if value is None:
value = random.randint(88, 98)
alert_data = {
"metric": "disk_usage_percent",
"value": float(value),
"threshold": 85.0,
"status": "firing",
"message": f"{partition}分区空间不足,仅剩{100 - value}%可用空间。建议:1. 清理日志文件 2. 归档历史数据 3. 扩容磁盘"
}
logger.info(f"模拟磁盘故障: {partition}分区使用率{value}%,阈值85%")
return self.send_alert(alert_data)
def simulate_network_fault(self, latency_ms=500, packet_loss=15):
"""模拟网络故障告警"""
alert_data = {
"metric": "network_latency",
"value": float(latency_ms),
"threshold": 100.0,
"status": "firing",
"message": f"网络延迟过高: {latency_ms}ms,丢包率: {packet_loss}%。可能原因:1. 网络拥塞 2. 防火墙策略 3. 带宽不足"
}
logger.info(f"模拟网络故障: 延迟{latency_ms}ms,丢包率{packet_loss}%")
return self.send_alert(alert_data)
def simulate_service_down(self, service_name="nginx", downtime_minutes=10):
"""模拟服务宕机告警"""
alert_data = {
"metric": "service_status",
"value": 0.0, # 0表示服务停止
"threshold": 1.0, # 期望值1表示服务正常
"status": "firing",
"message": f"{service_name}服务已停止响应,宕机时间约{downtime_minutes}分钟。建议:1. 检查服务进程 2. 查看服务日志 3. 重启服务"
}
logger.info(f"模拟服务宕机: {service_name}服务停止{downtime_minutes}分钟")
return self.send_alert(alert_data)
def simulate_database_fault(self, connections=950, max_connections=1000):
"""模拟数据库故障告警"""
connection_percent = (connections / max_connections) * 100
alert_data = {
"metric": "database_connections",
"value": float(connection_percent),
"threshold": 90.0,
"status": "firing",
"message": f"数据库连接数接近上限: {connections}/{max_connections} ({connection_percent:.1f}%)。建议:1. 优化查询语句 2. 增加连接池大小 3. 扩容数据库"
}
logger.info(f"模拟数据库故障: 连接数{connections}/{max_connections} ({connection_percent:.1f}%)")
return self.send_alert(alert_data)
def simulate_high_traffic(self, requests_per_second=1500, normal_capacity=1000):
"""模拟高流量告警"""
traffic_percent = (requests_per_second / normal_capacity) * 100
alert_data = {
"metric": "request_rate",
"value": float(traffic_percent),
"threshold": 120.0,
"status": "firing",
"message": f"请求量异常增高: {requests_per_second}请求/秒,超出正常容量{normal_capacity}。建议:1. 自动扩容 2. 限流降级 3. 检查是否被攻击"
}
logger.info(f"模拟高流量: {requests_per_second}请求/秒,正常容量{normal_capacity}")
return self.send_alert(alert_data)
def simulate_alert_resolved(self, original_metric="cpu_usage_percent"):
"""模拟告警恢复"""
alert_data = {
"metric": original_metric,
"value": 45.0, # 恢复正常值
"threshold": 80.0,
"status": "resolved",
"message": f"{original_metric}告警已自动恢复,当前值恢复正常水平"
}
logger.info(f"模拟告警恢复: {original_metric}已恢复正常")
return self.send_alert(alert_data)
def random_fault_scenario(self):
"""随机选择一个故障场景进行模拟"""
scenarios = [
("cpu", self.simulate_cpu_fault),
("memory", self.simulate_memory_fault),
("disk", self.simulate_disk_fault),
("network", self.simulate_network_fault),
("service", self.simulate_service_down),
("database", self.simulate_database_fault),
("traffic", self.simulate_high_traffic)
]
scenario_name, scenario_func = random.choice(scenarios)
logger.info(f"随机选择故障场景: {scenario_name}")
return scenario_func()
def main():
parser = argparse.ArgumentParser(
description='模拟故障告警客户端',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例:
# 模拟CPU故障
python3 simulate_fault.py --type cpu --value 95
# 模拟内存故障
python3 simulate_fault.py --type memory --value 92
# 模拟磁盘故障
python3 simulate_fault.py --type disk --value 95 --partition /var
# 模拟服务宕机
python3 simulate_fault.py --type service --service mysql --downtime 15
# 模拟告警恢复
python3 simulate_fault.py --type resolve --metric cpu_usage_percent
# 自动模式:每2分钟发送一个随机故障
python3 simulate_fault.py --auto --interval 120
# 批量发送多种故障
python3 simulate_fault.py --batch cpu memory disk
# 列出所有支持的故障类型
python3 simulate_fault.py --list
'''
)
parser.add_argument('--type', '-t',
choices=['cpu', 'memory', 'disk', 'network', 'service', 'database', 'traffic', 'resolve', 'random'],
help='故障类型')
parser.add_argument('--value', '-v', type=float, help='故障值(百分比)')
parser.add_argument('--partition', default='/var', help='磁盘分区(仅disk类型有效)')
parser.add_argument('--service', default='nginx', help='服务名称(仅service类型有效)')
parser.add_argument('--downtime', type=int, default=10, help='宕机时间(分钟,仅service类型有效)')
parser.add_argument('--metric', help='告警恢复的指标名称(仅resolve类型有效)')
parser.add_argument('--auto', action='store_true', help='自动模式,循环发送随机故障')
parser.add_argument('--interval', type=int, default=120, help='自动模式间隔(秒)')
parser.add_argument('--batch', nargs='+', help='批量发送多种故障')
parser.add_argument('--list', action='store_true', help='列出所有支持的故障类型')
parser.add_argument('--server', default=GATEWAY_SERVER, help='网关服务器地址')
parser.add_argument('--host-ip', help='宿主机IP地址(覆盖环境变量HOST_IP)')
args = parser.parse_args()
# 如果命令行指定了host-ip,则覆盖环境变量
if args.host_ip:
os.environ['HOST_IP'] = args.host_ip
logger.info(f"使用命令行参数中的宿主机IP: {args.host_ip}")
simulator = FaultSimulator(args.server)
if args.list:
print("支持的故障类型:")
print(" cpu - CPU使用率过高")
print(" memory - 内存使用率过高/内存泄漏")
print(" disk - 磁盘空间不足")
print(" network - 网络延迟/丢包")
print(" service - 服务宕机")
print(" database - 数据库连接数过高")
print(" traffic - 高流量冲击")
print(" resolve - 告警恢复")
print(" random - 随机故障")
return
if args.batch:
logger.info(f"批量发送故障: {args.batch}")
for fault_type in args.batch:
if fault_type == 'cpu':
simulator.simulate_cpu_fault()
elif fault_type == 'memory':
simulator.simulate_memory_fault()
elif fault_type == 'disk':
simulator.simulate_disk_fault()
elif fault_type == 'network':
simulator.simulate_network_fault()
elif fault_type == 'service':
simulator.simulate_service_down()
elif fault_type == 'database':
simulator.simulate_database_fault()
elif fault_type == 'traffic':
simulator.simulate_high_traffic()
time.sleep(2) # 每个故障间隔2秒
return
if args.auto:
logger.info(f"启动自动模式,每{args.interval}秒发送一个随机故障")
try:
while True:
simulator.random_fault_scenario()
logger.info(f"等待{args.interval}秒后发送下一个故障...")
time.sleep(args.interval)
except KeyboardInterrupt:
logger.info("自动模式已停止")
return
# 处理单个故障
if args.type == 'cpu':
simulator.simulate_cpu_fault(args.value)
elif args.type == 'memory':
simulator.simulate_memory_fault(args.value)
elif args.type == 'disk':
simulator.simulate_disk_fault(args.value, args.partition)
elif args.type == 'network':
simulator.simulate_network_fault()
elif args.type == 'service':
simulator.simulate_service_down(args.service, args.downtime)
elif args.type == 'database':
simulator.simulate_database_fault()
elif args.type == 'traffic':
simulator.simulate_high_traffic()
elif args.type == 'resolve':
simulator.simulate_alert_resolved(args.metric or 'cpu_usage_percent')
elif args.type == 'random':
simulator.random_fault_scenario()
else:
parser.print_help()
if __name__ == '__main__':
main()
EOF
5、测试方法
# 1. 列出所有支持的故障类型
python3 simulate_fault.py --list
# 2. 模拟CPU故障(使用率95%)
python3 simulate_fault.py --type cpu --value 95
# 3. 模拟内存故障(使用率92%)
python3 simulate_fault.py --type memory --value 92
# 4. 模拟磁盘故障(/var分区使用率95%)
python3 simulate_fault.py --type disk --value 95 --partition /var
# 5. 模拟MySQL服务宕机
python3 simulate_fault.py --type service --service mysql --downtime 15
# 6. 模拟告警恢复
python3 simulate_fault.py --type resolve --metric cpu_usage_percent
# 7. 批量发送多种故障
python3 simulate_fault.py --batch cpu memory disk
# 8. 自动模式:每2分钟发送一个随机故障
python3 simulate_fault.py --auto --interval 120
# 9. 指定网关地址
python3 simulate_fault.py --type cpu --value 90 --server http://192.168.1.100:5000
# 场景1:CPU使用率突增(常见于计算密集型任务)
python3 simulate_fault.py --type cpu --value 98
# 场景2:内存泄漏(内存使用率持续上升)
python3 simulate_fault.py --type memory --value 96
# 场景3:磁盘写满(日志未轮转)
python3 simulate_fault.py --type disk --value 99 --partition /var/log
# 场景4:数据库连接池耗尽
python3 simulate_fault.py --type database
# 场景5:服务雪崩(多个服务同时故障)
python3 simulate_fault.py --batch service service service --service nginx --downtime 5
python3 simulate_fault.py --batch service service --service mysql --downtime 10
python3 simulate_fault.py --batch service --service redis --downtime 8
# 场景6:告警恢复测试
python3 simulate_fault.py --type resolve --metric cpu_usage_percent
python3 simulate_fault.py --type resolve --metric memory_usage_percent
更多推荐
所有评论(0)