华为防火墙自动化管理工具套件
一个自动写入华为防火墙的脚本工具
·
项目概述
这是一套用于华为USG系列防火墙的自动化管理工具,包含两个核心功能模块,通过NETCONF协议实现防火墙对象的自动化创建和管理。
常见场景
1.企业内网管控过程中,仅允许员工办公访问外部网络或者其他网站,比如baidu等;
2.办公常见即时沟通工具如企业微信;
🛠️ 工具清单
1. 网站白名单解析工具 (whitelist_parser_gui.py)
功能特性:
- 🌐 自动分析指定网站的所有相关域名和IP地址
- 📝 IP地址以/32格式写入防火墙地址对象,可以根据自己的需求改成24级
- 🏷️ 域名写入防火墙域名组
- 📂 自动添加到"Permitted_list"地址组,可以根据自己的需要将名称改成其他,并提前配置好安全策略关联对象
- 🔗 可选择将域名组关联到安全策略
- 🕒 描述包含网站标题和创建时间戳
- 🖥️ 提供友好的图形化界面
使用场景:
- 企业内部网站访问白名单管理
- 快速添加业务系统相关域名和IP
- 第三方服务访问权限配置
2. 企业微信地址同步工具 (企业微信地址同步工具.py)
功能特性:
- 📡 自动从企业微信官方API下载最新IP和域名列表
- 🔄 智能分离处理普通域名和通配符域名
- 🎯 通配符域名层级优化(去除重复覆盖)
- 📊 详细的同步结果统计和失败分析
- 💾 自动保存配置到防火墙存储
- 🏷️ 带时间戳的对象描述便于审计
使用场景:
- 企业微信服务访问配置
- 定期同步企业微信官方地址变更
- 企业办公网络安全策略维护
🔧 华为防火墙前置配置
系统要求
- 华为USG6000V/USG9000V系列防火墙
- 固件版本:V500R005C10及以上
- 支持NETCONF协议的版本
必需的防火墙配置
核心:开启NETCOMF服务和访问的用户权限,防火墙型号不同可能会有不同的命令方法,具体方法请以华为官网为主
1. 启用NETCONF服务
# 进入系统视图
<Huawei> system-view
# 启用NETCONF服务
[Huawei] netconf
[Huawei-netconf] protocol inbound ssh port 830
[Huawei-netconf] user-interface maximum-users 10
# 配置NETCONF会话参数
[Huawei-netconf] session idle-timeout 30
[Huawei-netconf] session absolute-timeout 60
# 启用服务
[Huawei-netconf] service enable
# 返回系统视图
[Huawei-netconf] quit
2. 创建NETCONF管理用户
# 创建本地用户账号
[Huawei] aaa
[Huawei-aaa] local-user netconf-admin password irreversible-cipher Huawei@123
[Huawei-aaa] local-user netconf-admin privilege level 15
[Huawei-aaa] local-user netconf-admin service-type ssh
# 配置用户授权
[Huawei-aaa] authorization-scheme netconf-auth
[Huawei-aaa-authz-netconf-auth] authorization-attribute user-role network-admin
[Huawei-aaa-authz-netconf-auth] quit
# 应用认证方案
[Huawei-aaa] domain netconf-domain
[Huawei-aaa-domain-netconf-domain] authentication-scheme default
[Huawei-aaa-domain-netconf-domain] authorization-scheme netconf-auth
[Huawei-aaa-domain-netconf-domain] quit
# 绑定用户到域
[Huawei-aaa] local-user netconf-admin domain netconf-domain
[Huawei-aaa] quit
3. 配置SSH服务
# 启用SSH服务器
[Huawei] ssh server enable
# 配置SSH认证方式
[Huawei] ssh server authentication-type password
[Huawei] ssh server authentication-type all
# 配置SSH用户认证
[Huawei] ssh user netconf-admin
[Huawei] ssh user netconf-admin authentication-type password
[Huawei] ssh user netconf-admin service-type netconf
# 配置VTY接口
[Huawei] user-interface vty 0 4
[Huawei-ui-vty0-4] authentication-mode aaa
[Huawei-ui-vty0-4] protocol inbound ssh
[Huawei-ui-vty0-4] user-role network-admin
[Huawei-ui-vty0-4] quit
4. 网络接口和路由配置
# 配置管理接口(示例:GE1/0/0作为管理接口)
[Huawei] interface GigabitEthernet 1/0/0
[Huawei-GigabitEthernet1/0/0] ip address 192.168.0.1 255.255.255.0
[Huawei-GigabitEthernet1/0/0] service-manage ssh permit
[Huawei-GigabitEthernet1/0/0] service-manage netconf permit
[Huawei-GigabitEthernet1/0/0] quit
# 配置缺省路由(如果需要)
[Huawei] ip route-static 0.0.0.0 0.0.0.0 192.168.50.254
5. 安全策略配置
# 创建管理区域安全策略
[Huawei] security-policy
[Huawei-policy-security] rule name allow-netconf-manage
[Huawei-policy-security-rule-allow-netconf-manage] source-zone local
[Huawei-policy-security-rule-allow-netconf-manage] destination-zone local
[Huawei-policy-security-rule-allow-netconf-manage] source-address 192.168.50.0 mask 255.255.255.0
[Huawei-policy-security-rule-allow-netconf-manage] service ssh
[Huawei-policy-security-rule-allow-netconf-manage] service netconf
[Huawei-policy-security-rule-allow-netconf-manage] action permit
[Huawei-policy-security-rule-allow-netconf-manage] quit
[Huawei-policy-security] quit
6. 验证NETCONF配置
# 查看NETCONF服务状态
[Huawei] display netconf
# 查看SSH用户配置
[Huawei] display ssh user-information netconf-admin
# 查看当前NETCONF会话
[Huawei] display netconf session
# 测试SSH连接
[Huawei] ssh client 192.168.0.1 port 830
高级配置选项
1. NETCONF访问控制列表
# 创建ACL限制NETCONF访问源
[Huawei] acl number 3001
[Huawei-acl-adv-3001] rule 5 permit tcp source 192.168.50.0 0.0.0.255 destination any destination-port eq 830
[Huawei-acl-adv-3001] rule 10 deny tcp source any destination any destination-port eq 830
[Huawei-acl-adv-3001] quit
# 应用ACL到接口
[Huawei] interface GigabitEthernet 1/0/0
[Huawei-GigabitEthernet1/0/0] packet-filter 3001 inbound
[Huawei-GigabitEthernet1/0/0] quit
2. NETCONF日志记录
# 启用NETCONF操作日志
[Huawei] info-center source NETCONF log level warning
[Huawei] info-center source NETCONF channel loghost
# 配置日志主机(可选)
[Huawei] info-center loghost 192.168.0.100
3. 会话限制和安全加固
# 限制并发NETCONF会话数
[Huawei] netconf
[Huawei-netconf] user-interface maximum-users 5
# 配置会话超时
[Huawei-netconf] session idle-timeout 15
[Huawei-netconf] session absolute-timeout 30
# 启用操作审计
[Huawei-netconf] audit enable
[Huawei-netconf] quit
📋 工具部署和使用
环境要求
Python环境
# Python 3.7+
pip install paramiko requests beautifulsoup4 ipaddress lxml
# GUI工具额外依赖
pip install tkinter # 通常Python自带
网络连通性测试
# 测试防火墙连通性
ping 192.168.0.1
# 测试SSH端口
telnet 192.168.0.1 22
# 测试NETCONF端口
telnet 192.168.0.1 830
配置文件修改
在使用工具前,请根据实际环境修改配置参数:
# 华为防火墙连接配置
FIREWALL_CONFIG = {
'host': '192.168.0.1', # 修改为实际防火墙IP
'username': 'netconf-admin', # 修改为实际用户名
'password': 'Huawei@123', # 修改为实际密码
'port': 830, # NETCONF端口
'timeout': 30 # 连接超时时间
}
使用示例
网站白名单工具
# GUI模式
python whitelist_parser_gui.py
# 在GUI中输入:www.baidu.com
# 工具会自动分析并写入防火墙
企业微信同步工具
# 一键同步模式
python 企业微信地址同步工具.py --quick
# 自定义配置同步
python 企业微信地址同步工具.py --host 192.168.0.1 --username admin
🔍 故障排查
常见问题
-
连接失败
- 检查网络连通性
- 验证NETCONF服务是否启用
- 确认用户账号和密码
-
权限不足
- 检查用户权限级别(需要15级)
- 验证用户角色配置
- 确认SSH服务配置
-
对象创建失败
- 检查对象名称冲突
- 验证XML格式正确性
- 确认防火墙资源限制
调试模式
启用详细日志:
TOOL_CONFIG = {
'enable_debug': True, # 启用调试模式
'show_failure_details': True # 显示详细失败信息
}
📚 工具详细说明
网站白名单解析工具
工作流程
- 网站分析:输入网站URL,自动爬取页面内容
- 资源提取:提取所有相关域名(链接、脚本、样式表、图片等)
- IP解析:对所有域名进行DNS解析获取IP地址
- 对象创建:
- 创建IP地址对象(网站域名命名)
- 创建域名组(网站域名_domains命名)
- 添加到Permitted_list地址组
- 安全策略:可选择关联到指定安全策略
配置参数
# 防火墙对象配置
FIREWALL_OBJECTS = {
'address_group_name': 'Permitted_list', # 地址组名称
'vsys': 'public', # 虚拟系统名称
'address_group_description': '白名单地址组 - 自动创建', # 地址组描述
}
# 工具运行配置
TOOL_CONFIG = {
'enable_debug': False, # 是否启用调试日志
'request_timeout': 30, # HTTP请求超时时间(秒)
'verify_ssl': True, # 是否验证SSL证书
'max_domain_levels': 10, # 允许的最大域名层级数
'max_domain_length': 200, # 允许的最大域名长度
}
企业微信地址同步工具
工作流程
- 数据下载:从企业微信官方API下载最新地址列表https://work.weixin.qq.com/h5app/wework_domain_ip/export/latest?format=json
- 数据解析:解析JSON格式的IP地址和域名信息
- 智能处理:
- 分离普通域名和通配符域名
- 优化通配符域名层级
- 验证地址格式兼容性
- 分批创建:
- 先创建普通域名组
- 再添加通配符域名
- 创建IP地址对象
- 配置保存:自动保存到防火墙存储
高级特性
通配符域名优化
原始列表:
- *.qq.com
- *.weixin.qq.com
- *.meeting.work.weixin.qq.com
- *.work.weixin.qq.com
优化后:
- *.qq.com (覆盖所有子域名)
失败重试机制
- 普通域名创建失败时自动分批重试
- 通配符域名逐个添加重试
- 详细的失败原因分析
命令行参数
# 基本用法
python 企业微信地址同步工具.py --quick
# 高级参数
python 企业微信地址同步工具.py \
--host 192.168.0.1 \
--username netconf-admin \
--password Huawei@123 \
--ip-object WeCom_IP \
--domain-object WeCom_Domain \
--no-save # 不自动保存配置
--show-all-failures # 显示所有失败项目
🏗️ 技术架构
NETCONF协议实现
- 基于SSH子系统的NETCONF会话
- 支持华为防火墙特定的YANG模型
- 完整的RPC错误处理机制
核心类结构
NetconfSSHSession
- 底层NETCONF协议通信
- SSH连接管理
- 消息序列化/反序列化
WebsiteWhitelistManager
- 网站内容分析
- 域名和IP提取
- 防火墙对象管理
WeChatWorkSyncManager
- 企业微信API集成
- 智能域名处理
- 批量同步管理
安全特性
- XML特殊字符转义
- 输入数据验证
- 连接超时控制
- 操作审计日志
🔒 安全注意事项
账号安全
- 使用专用NETCONF账号,避免使用admin等高权限账号
- 定期更换密码,密码符合复杂度要求
- 限制登录来源,使用ACL控制访问源
网络安全
- 管理网络隔离,NETCONF管理流量走专用网络
- 防火墙策略,严格控制830端口访问
- 会话监控,监控异常的NETCONF会话
操作安全
- 测试环境验证,生产使用前充分测试
- 配置备份,操作前备份现有配置
- 操作日志,记录所有自动化操作
📈 性能优化
批量操作优化
- 支持批量创建地址对象
- 智能分批处理大量域名
- 并发DNS解析提升效率
网络优化
- 连接池复用NETCONF会话
- 请求超时合理设置
- 错误重试机制
内存优化
- 流式处理大型数据集
- 及时释放临时对象
- 垃圾回收优化
📝 更新日志
v1.0 (2025-06-18)
- ✅ 基础NETCONF协议实现
- ✅ 网站白名单解析功能
- ✅ 企业微信地址同步功能
- ✅ GUI界面支持
- ✅ 通配符域名优化
- ✅ 配置自动保存
- ✅ 详细错误诊断
📞 技术支持
- 作者: 大刘讲IT
- 版本: v1.0
- 协议: 基于NETCONF RFC 6241
- 兼容性: 华为USG6000V/USG9000V系列
⚠️ 重要提醒
- 生产环境使用前请充分测试
- 建议先在测试环境验证所有功能
- 定期备份防火墙配置
- 监控工具运行日志和错误信息
- 遵循企业安全管理制度
📄 许可证
本项目仅供学习和内部使用,请遵循相关法律法规和企业安全管理制度。
这套工具可以显著提高华为防火墙地址对象管理的效率,减少手动配置错误,实现自动化运维。
同步效果



企业微信同步写入
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
企业微信地址同步工具
功能:从企业微信API下载IP地址和域名列表,同步到华为防火墙
作者:大刘讲IT
版本:1.0
"""
# =============================================================================
# 🔧 配置区域 - 可根据实际环境修改以下参数
# =============================================================================
# 企业微信官方API地址
WECHAT_WORK_API_URL = "https://work.weixin.qq.com/h5app/wework_domain_ip/export/latest?format=json"
# 华为防火墙连接配置
FIREWALL_CONFIG = {
'host': '192.168.50.1', # 防火墙IP地址
'username': 'netconf-admin', # 用户名
'password': 'Huawei@123', # 密码
'port': 830, # NETCONF端口
'timeout': 30 # 连接超时时间(秒)
}
# 防火墙对象配置
FIREWALL_OBJECTS = {
'ip_object_name': 'WeCom_IP', # IP地址对象名称
'domain_object_name': 'WeCom_Domain', # 域名组名称
'vsys': 'public', # 虚拟系统名称
'ip_description': '企业微信IP地址列表 - 自动同步', # IP对象描述
'domain_description': '企业微信域名列表 - 自动同步' # 域名组描述
}
# 工具运行配置
TOOL_CONFIG = {
'enable_debug': False, # 是否启用调试日志
'auto_backup': True, # 是否自动备份现有配置
'auto_save_config': True, # 是否自动保存配置到防火墙存储
'export_results': True, # 是否导出解析结果
'verify_ssl': True, # 是否验证SSL证书
'request_timeout': 30, # HTTP请求超时时间(秒)
'strict_domain_filter': False, # 是否启用严格域名过滤(兼容华为防火墙)
'max_domain_levels': 10, # 允许的最大域名层级数
'max_domain_length': 200, # 允许的最大域名长度
'show_failure_details': True # 是否显示详细的失败清单
}
# =============================================================================
# 系统导入和初始化
# =============================================================================
import xml.etree.ElementTree as ET
import json
import sys
import logging
import time
import ipaddress
import requests
from datetime import datetime
import argparse
import re
from urllib.parse import urlparse
# 配置日志
log_level = logging.DEBUG if TOOL_CONFIG['enable_debug'] else logging.INFO
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class NetconfSSHSession:
"""直接连接NETCONF端口830的会话类"""
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password
def connect(self):
"""建立到NETCONF端口830的直接连接"""
try:
import paramiko
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password,
timeout=FIREWALL_CONFIG['timeout'],
look_for_keys=False,
allow_agent=False
)
self.transport = self.ssh.get_transport()
self.channel = self.transport.open_session()
self.channel.invoke_subsystem('netconf')
hello_msg = self._read_netconf_message(timeout=10)
if hello_msg and "hello" in hello_msg.lower():
logger.info(f"✅ 成功连接到华为防火墙 {self.host}:830")
client_hello = '''<?xml version="1.0" encoding="UTF-8"?>
<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<capabilities>
<capability>urn:ietf:params:netconf:base:1.0</capability>
</capabilities>
</hello>]]>]]>'''
self.channel.send(client_hello.encode('utf-8'))
return True
else:
logger.error("❌ 未收到正确的NETCONF Hello消息")
return False
except Exception as e:
logger.error(f"❌ 连接失败: {e}")
return False
def _read_netconf_message(self, timeout=30):
"""读取NETCONF消息"""
start_time = time.time()
message = ""
while time.time() - start_time < timeout:
try:
if self.channel.recv_ready():
data = self.channel.recv(4096).decode('utf-8')
message += data
if "]]>]]>" in message:
return message.replace("]]>]]>", "").strip()
time.sleep(0.1)
except Exception as e:
logger.debug(f"读取数据时出错: {e}")
break
return message if message else None
def disconnect(self):
"""断开连接"""
try:
if hasattr(self, 'channel') and self.channel:
self.channel.close()
if hasattr(self, 'ssh') and self.ssh:
self.ssh.close()
logger.info("✅ 已断开防火墙连接")
except:
pass
def send_rpc(self, rpc_request):
"""发送RPC请求"""
try:
full_request = rpc_request + "]]>]]>"
self.channel.send(full_request.encode('utf-8'))
response = self._read_netconf_message(timeout=30)
return response
except Exception as e:
logger.error(f"❌ 发送RPC请求失败: {e}")
return None
class WeChatWorkSyncManager:
"""企业微信地址同步管理类"""
def __init__(self, host, username, password, port=830):
self.host = host
self.username = username
self.password = password
self.port = port
self.session = None
def connect(self):
"""建立连接"""
try:
self.session = NetconfSSHSession(self.host, self.port, self.username, self.password)
return self.session.connect()
except Exception as e:
logger.error(f"❌ 连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.session:
self.session.disconnect()
def save_configuration(self):
"""保存配置到防火墙存储"""
logger.info("💾 正在保存配置到防火墙...")
# 华为防火墙保存配置的RPC请求
rpc_request = '''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="999" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<save xmlns="urn:huawei:params:xml:ns:yang:huawei-save">
<format>cfg</format>
</save>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
logger.info("✅ 配置已成功保存到防火墙")
return True
else:
logger.warning(f"⚠️ 保存配置可能失败,响应: {response}")
# 尝试备用的保存命令
return self._save_configuration_backup()
except Exception as e:
logger.error(f"❌ 保存配置失败: {e}")
# 尝试备用的保存命令
return self._save_configuration_backup()
def _save_configuration_backup(self):
"""备用的保存配置方法"""
logger.info("🔄 尝试备用保存配置方法...")
# 备用保存配置RPC(不同厂商可能有不同格式)
backup_rpc = '''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="998" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<copy-config>
<source>
<running/>
</source>
<target>
<startup/>
</target>
</copy-config>
</rpc>'''
try:
response = self.session.send_rpc(backup_rpc)
if response and "ok" in response.lower():
logger.info("✅ 配置已通过备用方法保存")
return True
else:
logger.warning(f"⚠️ 备用保存方法响应: {response}")
logger.warning("⚠️ 配置可能未保存,请手动执行 save 命令")
return False
except Exception as e:
logger.error(f"❌ 备用保存方法失败: {e}")
logger.warning("⚠️ 配置可能未保存,请手动执行 save 命令")
return False
def download_wechat_work_data(self, url):
"""下载企业微信IP地址和域名数据"""
logger.info(f"📥 正在下载企业微信数据: {url}")
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(
url,
headers=headers,
timeout=TOOL_CONFIG['request_timeout'],
verify=TOOL_CONFIG['verify_ssl']
)
response.raise_for_status()
# 尝试解析JSON
data = response.json()
logger.info("✅ 成功下载并解析企业微信数据")
return data
except requests.exceptions.RequestException as e:
logger.error(f"❌ 下载失败: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"❌ JSON解析失败: {e}")
return None
except Exception as e:
logger.error(f"❌ 处理数据时出错: {e}")
return None
def parse_wechat_work_data(self, data):
"""解析企业微信数据,提取IP地址和域名"""
logger.info("🔍 正在解析企业微信官方API数据...")
ip_addresses = set()
domains = set()
# 检查数据格式
if isinstance(data, list):
# 企业微信官方API格式 - 对象数组
logger.info("📋 检测到企业微信官方API格式")
self._parse_official_wechat_format(data, ip_addresses, domains)
else:
# 通用JSON格式 - 递归解析
logger.info("📋 使用通用JSON解析模式")
self._parse_generic_json_format(data, ip_addresses, domains)
# 处理IP地址格式
processed_ips = self._process_ip_addresses(ip_addresses)
processed_domains = self._process_domains(domains)
logger.info(f"✅ 解析完成: 找到 {len(processed_ips)} 个IP地址,{len(processed_domains)} 个域名")
return processed_ips, processed_domains
def _parse_official_wechat_format(self, data, ip_addresses, domains):
"""解析企业微信官方API格式"""
required_count = 0
optional_count = 0
for item in data:
if not isinstance(item, dict):
continue
domain = item.get('domain', '')
universal_domain = item.get('universal_domain', '')
remarks = item.get('remarks', '')
category = item.get('domain_category', '')
# 统计必需和可选项
if remarks == '必需':
required_count += 1
elif remarks == '按需':
optional_count += 1
# 处理domain字段
if domain:
if self._is_ip_address_or_cidr(domain):
ip_addresses.add(domain)
logger.debug(f"Found IP from domain field: {domain} ({category})")
elif self._is_domain_name(domain):
domains.add(domain.lower())
logger.debug(f"Found domain from domain field: {domain} ({category})")
# 处理universal_domain字段
if universal_domain and universal_domain != '-':
if self._is_domain_name(universal_domain):
domains.add(universal_domain.lower())
logger.debug(f"Found domain from universal_domain: {universal_domain} ({category})")
logger.info(f"📊 解析统计: 必需项 {required_count} 个,按需项 {optional_count} 个")
def _parse_generic_json_format(self, data, ip_addresses, domains):
"""解析通用JSON格式(递归)"""
def extract_from_obj(obj, path=""):
"""递归提取对象中的IP和域名"""
if isinstance(obj, dict):
for key, value in obj.items():
current_path = f"{path}.{key}" if path else key
extract_from_obj(value, current_path)
elif isinstance(obj, list):
for i, item in enumerate(obj):
current_path = f"{path}[{i}]"
extract_from_obj(item, current_path)
elif isinstance(obj, str):
self._extract_addresses_from_string(obj, ip_addresses, domains, path)
# 开始递归解析
extract_from_obj(data)
def _is_ip_address_or_cidr(self, text):
"""判断是否为IP地址或CIDR格式"""
# IP地址正则表达式(包括CIDR)
ip_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[0-2]?[0-9]))?$'
return bool(re.match(ip_pattern, text.strip()))
def _is_domain_name(self, text):
"""判断是否为有效域名"""
if not text or text == '-':
return False
# 域名正则表达式(包括通配符)
domain_pattern = r'^(\*\.)?[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*\.[a-zA-Z]{2,}$'
return bool(re.match(domain_pattern, text.strip()))
def _extract_addresses_from_string(self, text, ip_set, domain_set, path=""):
"""从字符串中提取IP地址和域名"""
# IP地址正则表达式(包括CIDR)
ip_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[0-2]?[0-9]))?\b'
# 域名正则表达式(包括通配符)
domain_pattern = r'\*?\.?[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*\.[a-zA-Z]{2,}'
# 提取IP地址
ip_matches = re.findall(ip_pattern, text)
for ip in ip_matches:
ip_set.add(ip)
logger.debug(f"Found IP in {path}: {ip}")
# 提取域名
domain_matches = re.findall(domain_pattern, text)
for domain in domain_matches:
# 过滤掉看起来像IP地址的字符串
if not re.match(ip_pattern, domain):
domain_set.add(domain.lower())
logger.debug(f"Found domain in {path}: {domain}")
def _process_ip_addresses(self, ip_set):
"""处理IP地址,规范化格式"""
processed = []
for ip_str in ip_set:
try:
if '/' in ip_str:
# CIDR格式
network = ipaddress.ip_network(ip_str, strict=False)
processed.append(str(network))
else:
# 单个IP,按照要求转换为/32
ip = ipaddress.ip_address(ip_str)
processed.append(f"{ip}/32")
except ValueError as e:
logger.warning(f"⚠️ 无效IP地址格式: {ip_str} - {e}")
continue
return sorted(processed)
def _process_domains(self, domain_set):
"""处理域名,去重和格式化"""
processed = []
filtered_domains = []
for domain in domain_set:
# 清理域名格式
domain = domain.strip().lower()
# 验证域名格式
if self._is_valid_domain(domain):
processed.append(domain)
else:
filtered_domains.append(domain)
logger.debug(f"⚠️ 过滤不兼容域名: {domain}")
if filtered_domains:
logger.info(f"📋 已过滤 {len(filtered_domains)} 个不兼容的域名格式")
logger.info(f"🔍 被过滤的域名列表:")
for i, domain in enumerate(filtered_domains, 1):
logger.info(f" {i:2d}. {domain}")
return sorted(processed)
def _is_valid_domain(self, domain):
"""验证域名格式是否有效(可配置的华为防火墙兼容性)"""
if not domain:
return False
# 过滤掉特殊字符和华为防火墙不支持的格式
if any(char in domain for char in ['^', '$', '|', '[', ']', '(', ')', '+', '?', '\\', '"']):
return False
# 过滤掉过长的域名
max_length = TOOL_CONFIG.get('max_domain_length', 150)
if len(domain) > max_length:
return False
# 允许通配符域名,但限制格式
original_domain = domain
if domain.startswith('*.'):
domain = domain[2:]
# 通配符域名不能有多个点在开头
if domain.startswith('.'):
return False
# 基本长度检查
if len(domain) < 4: # 最短如 a.cn
return False
# 检查每个标签
labels = domain.split('.')
if len(labels) < 2:
return False
# 可配置的域名层级限制
max_levels = TOOL_CONFIG.get('max_domain_levels', 10)
if len(labels) > max_levels:
return False
for label in labels:
if not label or len(label) > 63:
return False
# 标签不能以-开头或结尾
if label.startswith('-') or label.endswith('-'):
return False
# 只允许字母、数字和连字符
if not re.match(r'^[a-zA-Z0-9-]+$', label):
return False
# 顶级域必须是字母
if not labels[-1].isalpha():
return False
# 严格模式下的特殊过滤
if TOOL_CONFIG.get('strict_domain_filter', True):
# 华为防火墙对复杂通配符域名支持有限
if original_domain.startswith('*.') and original_domain.count('.') > 4:
return False
# 只过滤真正有问题的域名模式(经过实际测试确认不兼容的)
# 注意:腾讯云域名(cos.ap-guangzhou.myqcloud.com, file.myqcloud.com)
# 和企业微信会议域名(meeting.work.weixin.qq.com)实际上是可以在WebUI中手动添加的
# 所以这里不再过滤这些域名
problematic_patterns = [
# 可以在这里添加经过实际测试确认不兼容的域名模式
# 目前暂时为空,因为用户确认WebUI可以手动添加这些域名
]
for pattern in problematic_patterns:
if pattern in original_domain:
return False
return True
def _escape_xml_chars(self, text):
"""转义XML特殊字符"""
if not text:
return text
# XML字符转义
text = text.replace('&', '&')
text = text.replace('<', '<')
text = text.replace('>', '>')
text = text.replace('"', '"')
text = text.replace("'", ''')
return text
def _is_firewall_compatible_domain(self, domain):
"""检查域名是否与华为防火墙兼容(更严格的检查)"""
if not domain:
return False
# 基本长度检查
if len(domain) < 3 or len(domain) > 100:
return False
# 检查是否包含不支持的字符
invalid_chars = ['&', '<', '>', '"', "'", ' ', '\t', '\n', '\r']
for char in invalid_chars:
if char in domain:
return False
# 检查域名格式
if domain.startswith('.') or domain.endswith('.'):
return False
# 通配符域名检查
if '*' in domain:
if not domain.startswith('*.'):
return False
# 检查通配符后的部分
domain_part = domain[2:]
if not domain_part or '.' not in domain_part:
return False
# 检查每个标签
labels = domain.replace('*.', '').split('.')
if len(labels) < 2:
return False
for label in labels:
if not label:
return False
# 标签长度检查
if len(label) > 63:
return False
# 标签字符检查
if not re.match(r'^[a-zA-Z0-9-]+$', label):
return False
# 标签不能以-开头或结尾
if label.startswith('-') or label.endswith('-'):
return False
return True
def create_or_update_address_object(self, object_name, ip_addresses, vsys="public"):
"""创建或更新地址对象"""
logger.info(f"📝 正在创建/更新地址对象: {object_name}")
if not ip_addresses:
logger.warning("⚠️ 没有IP地址需要添加")
return True, []
# 先删除现有对象(覆盖方式)
self._delete_address_object(object_name, vsys)
# 验证IP地址格式并记录问题
valid_ips = []
invalid_ips = []
for ip_addr in ip_addresses:
try:
# 验证IP地址格式
if '/' in ip_addr:
ipaddress.ip_network(ip_addr, strict=False)
else:
ipaddress.ip_address(ip_addr)
valid_ips.append(ip_addr)
except ValueError as e:
invalid_ips.append({'ip': ip_addr, 'error': str(e)})
if invalid_ips:
logger.warning(f"⚠️ 发现 {len(invalid_ips)} 个无效IP地址格式")
if TOOL_CONFIG.get('show_failure_details', True):
logger.warning("📋 无效IP地址清单:")
for i, item in enumerate(invalid_ips, 1):
logger.warning(f" {i:2d}. {item['ip']} - {item['error']}")
if not valid_ips:
logger.error("❌ 没有有效的IP地址可以创建地址对象")
return False, invalid_ips
# 创建新的地址对象
elements_xml = ""
for i, ip_addr in enumerate(valid_ips):
elements_xml += f"""
<elements>
<elem-id>{i}</elem-id>
<address-ipv4>{ip_addr}</address-ipv4>
</elements>"""
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ip_description_with_time = f"{FIREWALL_OBJECTS['ip_description']} ({current_time})"
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="201" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-object>
<vsys>{vsys}</vsys>
<name>{object_name}</name>
<desc>{ip_description_with_time}</desc>{elements_xml}
</addr-object>
</address-set>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
logger.info(f"✅ 成功创建地址对象 {object_name},包含 {len(valid_ips)} 个IP地址")
if invalid_ips:
logger.info(f"⚠️ 跳过了 {len(invalid_ips)} 个无效IP地址")
return True, invalid_ips
else:
logger.error(f"❌ 创建地址对象失败,响应: {response}")
if TOOL_CONFIG.get('show_failure_details', True):
self._analyze_rpc_error(response, "IP地址对象", valid_ips)
return False, invalid_ips
except Exception as e:
logger.error(f"❌ 创建地址对象失败: {e}")
if TOOL_CONFIG.get('show_failure_details', True):
logger.error("📋 尝试写入的IP地址清单:")
for i, ip in enumerate(valid_ips, 1):
logger.error(f" {i:2d}. {ip}")
return False, invalid_ips
def create_or_update_domain_set(self, domain_set_name, domains, vsys="public"):
"""创建或更新域名组(智能分批处理)"""
logger.info(f"📝 正在创建/更新域名组: {domain_set_name}")
if not domains:
logger.warning("⚠️ 没有域名需要添加")
return True, []
# 先删除现有域名组(覆盖方式)
self._delete_domain_set(domain_set_name, vsys)
# 分离通配符域名和普通域名
wildcard_domains = []
normal_domains = []
invalid_domains = []
for domain in domains:
# XML字符转义
escaped_domain = self._escape_xml_chars(domain)
# 验证域名兼容性
if self._is_firewall_compatible_domain(escaped_domain):
if escaped_domain.startswith('*.'):
wildcard_domains.append(escaped_domain)
else:
normal_domains.append(escaped_domain)
else:
invalid_domains.append({
'domain': domain,
'error': '不兼容华为防火墙格式',
'details': self._get_domain_incompatibility_reason(domain)
})
# 优化通配符域名:去除被更高层级通配符域名覆盖的子域名
if wildcard_domains:
original_count = len(wildcard_domains)
wildcard_domains = self._optimize_wildcard_domains(wildcard_domains)
if len(wildcard_domains) < original_count:
logger.info(f"🎯 通配符域名优化完成:{original_count} → {len(wildcard_domains)} 个(去除了 {original_count - len(wildcard_domains)} 个被覆盖的子域名)")
if invalid_domains:
logger.warning(f"⚠️ 发现 {len(invalid_domains)} 个不兼容域名")
if TOOL_CONFIG.get('show_failure_details', True):
logger.warning("📋 不兼容域名清单:")
for i, item in enumerate(invalid_domains, 1):
logger.warning(f" {i:2d}. {item['domain']} - {item['details']}")
# 尝试智能创建域名组
success, final_invalid = self._create_domain_set_smart(
domain_set_name, normal_domains, wildcard_domains, vsys
)
# 合并所有失败的域名
all_invalid = invalid_domains + final_invalid
return success, all_invalid
def _create_domain_set_smart(self, domain_set_name, normal_domains, wildcard_domains, vsys):
"""智能创建域名组(分离处理策略)"""
if not normal_domains and not wildcard_domains:
logger.error("❌ 没有有效的域名可以创建域名组")
return False, []
logger.info(f"📊 准备创建域名组: 普通域名 {len(normal_domains)} 个,通配符域名 {len(wildcard_domains)} 个")
# 新策略:分离处理普通域名和通配符域名
total_failed = []
# 第一步:先创建普通域名组(如果有普通域名)
if normal_domains:
logger.info("🔹 第一步:创建普通域名组...")
success, failed_domains = self._try_create_domain_set(domain_set_name, normal_domains, vsys, "普通域名")
if not success:
logger.error("❌ 普通域名组创建失败")
# 如果普通域名都创建失败,尝试分批创建
if len(normal_domains) > 20:
logger.warning("⚠️ 尝试分批创建普通域名...")
success, failed_domains = self._create_domain_set_batch(domain_set_name, normal_domains, vsys)
if not success:
logger.error("❌ 普通域名组创建完全失败")
total_failed.extend([{'domain': d, 'error': '普通域名创建失败', 'details': '普通域名组创建失败'} for d in normal_domains])
# 如果普通域名都失败了,就不要尝试通配符域名了
if wildcard_domains:
total_failed.extend([{'domain': d, 'error': '跳过处理', 'details': '普通域名创建失败,跳过通配符域名'} for d in wildcard_domains])
return False, total_failed
else:
logger.info(f"✅ 成功创建普通域名组,包含 {len(normal_domains)} 个域名")
# 第二步:向现有域名组添加通配符域名(如果有通配符域名)
if wildcard_domains:
logger.info("🔹 第二步:向域名组添加通配符域名...")
# 如果没有普通域名,需要先创建空的域名组
if not normal_domains:
logger.info("📝 创建空域名组用于添加通配符域名...")
empty_success, _ = self._try_create_empty_domain_set(domain_set_name, vsys)
if not empty_success:
logger.error("❌ 无法创建空域名组")
total_failed.extend([{'domain': d, 'error': '域名组创建失败', 'details': '无法创建空域名组'} for d in wildcard_domains])
return False, total_failed
# 尝试添加通配符域名
wildcard_success, wildcard_failed = self._add_domains_to_existing_set(domain_set_name, wildcard_domains, vsys, "通配符域名")
if wildcard_success:
logger.info(f"✅ 成功添加 {len(wildcard_domains) - len(wildcard_failed)} 个通配符域名")
total_failed.extend(wildcard_failed)
else:
logger.warning("⚠️ 通配符域名添加失败,尝试逐个添加...")
# 逐个尝试添加通配符域名
individual_failed = self._add_wildcard_domains_individually(domain_set_name, wildcard_domains, vsys)
total_failed.extend(individual_failed)
# 判断总体结果
if normal_domains and not total_failed:
# 普通域名成功,没有失败项
return True, []
elif normal_domains and total_failed:
# 普通域名成功,但有通配符域名失败
return True, total_failed
else:
# 没有普通域名,只有通配符域名的情况
return len(total_failed) < len(wildcard_domains), total_failed
def _try_create_domain_set(self, domain_set_name, domains, vsys, strategy_name):
"""尝试创建域名组"""
if not domains:
return False, []
domains_xml = ""
for domain in domains:
domains_xml += f"\n <domain>{domain}</domain>"
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
domain_description_with_time = f"{FIREWALL_OBJECTS['domain_description']} - {strategy_name} ({current_time})"
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="301" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set>
<name>{domain_set_name}</name>
<description>{domain_description_with_time}</description>{domains_xml}
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
logger.info(f"✅ {strategy_name}策略成功创建域名组 {domain_set_name},包含 {len(domains)} 个域名")
return True, []
else:
logger.warning(f"⚠️ {strategy_name}策略失败: {response}")
if TOOL_CONFIG.get('show_failure_details', True):
self._analyze_rpc_error(response, f"域名组({strategy_name})", domains)
return False, domains
except Exception as e:
logger.warning(f"⚠️ {strategy_name}策略异常: {e}")
return False, domains
def _create_domain_set_batch(self, domain_set_name, domains, vsys):
"""分批创建域名组"""
batch_size = 20
successful_domains = []
failed_domains = []
for i in range(0, len(domains), batch_size):
batch = domains[i:i + batch_size]
batch_name = f"{domain_set_name}_batch_{i//batch_size + 1}"
logger.info(f"📦 尝试创建批次 {i//batch_size + 1}: {len(batch)} 个域名")
success, batch_failed = self._try_create_domain_set(batch_name, batch, vsys, f"批次{i//batch_size + 1}")
if success:
successful_domains.extend(batch)
else:
failed_domains.extend([{'domain': d, 'error': '批次创建失败', 'details': f'批次{i//batch_size + 1}创建失败'} for d in batch])
if successful_domains:
logger.info(f"✅ 分批创建部分成功: {len(successful_domains)} 个域名成功")
logger.warning(f"⚠️ 分批创建部分失败: {len(failed_domains)} 个域名失败")
return True, failed_domains
else:
logger.error("❌ 分批创建完全失败")
return False, failed_domains
def _try_create_empty_domain_set(self, domain_set_name, vsys):
"""创建空的域名组用于后续添加域名"""
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
empty_domain_description_with_time = f"{FIREWALL_OBJECTS['domain_description']} - 空域名组 ({current_time})"
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="302" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set>
<name>{domain_set_name}</name>
<description>{empty_domain_description_with_time}</description>
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
logger.info(f"✅ 成功创建空域名组 {domain_set_name}")
return True, []
else:
logger.warning(f"⚠️ 创建空域名组失败: {response}")
return False, []
except Exception as e:
logger.warning(f"⚠️ 创建空域名组异常: {e}")
return False, []
def _add_domains_to_existing_set(self, domain_set_name, domains, vsys, domain_type):
"""向现有域名组添加域名"""
if not domains:
return True, []
logger.info(f"📝 向域名组 {domain_set_name} 添加 {len(domains)} 个{domain_type}...")
# 华为防火墙添加域名到现有域名组的RPC请求
domains_xml = ""
for domain in domains:
domains_xml += f"\n <domain>{domain}</domain>"
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="303" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set>
<name>{domain_set_name}</name>{domains_xml}
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
logger.info(f"✅ 成功向域名组添加 {len(domains)} 个{domain_type}")
return True, []
else:
logger.warning(f"⚠️ 向域名组添加{domain_type}失败: {response}")
if TOOL_CONFIG.get('show_failure_details', True):
self._analyze_rpc_error(response, f"域名组添加({domain_type})", domains)
# 返回所有域名作为失败项
failed_list = [{'domain': d, 'error': f'{domain_type}添加失败', 'details': f'向域名组添加{domain_type}失败'} for d in domains]
return False, failed_list
except Exception as e:
logger.warning(f"⚠️ 向域名组添加{domain_type}异常: {e}")
failed_list = [{'domain': d, 'error': f'{domain_type}添加异常', 'details': str(e)} for d in domains]
return False, failed_list
def _add_wildcard_domains_individually(self, domain_set_name, wildcard_domains, vsys):
"""逐个添加通配符域名"""
logger.info("🔄 尝试逐个添加通配符域名...")
failed_domains = []
success_count = 0
for domain in wildcard_domains:
logger.debug(f"📝 尝试添加通配符域名: {domain}")
success, failed = self._add_domains_to_existing_set(domain_set_name, [domain], vsys, "单个通配符域名")
if success:
success_count += 1
logger.debug(f"✅ 成功添加: {domain}")
else:
failed_domains.extend(failed)
logger.debug(f"❌ 添加失败: {domain}")
if success_count > 0:
logger.info(f"✅ 逐个添加完成: 成功 {success_count} 个,失败 {len(failed_domains)} 个")
else:
logger.warning("⚠️ 所有通配符域名逐个添加都失败")
return failed_domains
def _optimize_wildcard_domains(self, wildcard_domains):
"""优化通配符域名列表:去除被更高层级通配符域名覆盖的子域名"""
if not wildcard_domains:
return wildcard_domains
logger.debug("🔍 开始优化通配符域名列表...")
# 去除通配符前缀,便于比较
domain_info = []
for wildcard in wildcard_domains:
if wildcard.startswith('*.'):
clean_domain = wildcard[2:] # 去除 "*."
domain_info.append({
'original': wildcard,
'clean': clean_domain,
'levels': len(clean_domain.split('.'))
})
# 按域名层级排序(层级少的在前,即更高层级的域名在前)
domain_info.sort(key=lambda x: x['levels'])
optimized = []
removed = []
for current in domain_info:
is_covered = False
# 检查当前域名是否被已经添加的更高层级域名覆盖
for existing in optimized:
if self._is_wildcard_covered_by(current['clean'], existing['clean']):
is_covered = True
removed.append({
'domain': current['original'],
'covered_by': existing['original']
})
logger.debug(f"🎯 {current['original']} 被 {existing['original']} 覆盖")
break
if not is_covered:
optimized.append(current)
# 记录优化详情
if removed:
logger.debug("📋 被覆盖的通配符域名详情:")
for item in removed:
logger.debug(f" ❌ {item['domain']} (被 {item['covered_by']} 覆盖)")
# 返回优化后的通配符域名列表
result = [item['original'] for item in optimized]
logger.debug(f"✅ 优化后保留的通配符域名: {result}")
return result
def _is_wildcard_covered_by(self, child_domain, parent_domain):
"""检查子域名是否被父域名的通配符覆盖
例如:
- meeting.work.weixin.qq.com 被 work.weixin.qq.com 覆盖 → True
- work.weixin.qq.com 被 weixin.qq.com 覆盖 → True
- weixin.qq.com 被 qq.com 覆盖 → True
- exmail.qq.com 被 weixin.qq.com 覆盖 → False (不同子域)
"""
if child_domain == parent_domain:
return False # 相同域名不算覆盖
# 检查子域名是否以父域名结尾
if child_domain.endswith('.' + parent_domain):
# 进一步检查:确保子域名比父域名多一个层级
child_parts = child_domain.split('.')
parent_parts = parent_domain.split('.')
# 子域名的层级应该比父域名多
if len(child_parts) > len(parent_parts):
# 检查后缀是否完全匹配
if child_parts[-len(parent_parts):] == parent_parts:
return True
return False
def _analyze_rpc_error(self, response, object_type, failed_items):
"""分析RPC错误响应并提供详细信息"""
logger.error(f"📋 {object_type}写入失败分析:")
if response:
# 尝试解析错误信息
if "error" in response.lower():
logger.error(f" 🔍 错误响应: {response}")
# 常见错误模式分析
if "invalid" in response.lower():
logger.error(" 💡 可能原因: 数据格式无效")
elif "duplicate" in response.lower():
logger.error(" 💡 可能原因: 对象名称重复")
elif "quota" in response.lower() or "limit" in response.lower():
logger.error(" 💡 可能原因: 超出系统限制")
elif "permission" in response.lower() or "auth" in response.lower():
logger.error(" 💡 可能原因: 权限不足")
elif "xml" in response.lower() or "parse" in response.lower():
logger.error(" 💡 可能原因: XML格式错误")
# 显示前10个失败项目
if failed_items:
logger.error(f" 📋 失败项目样本 (前10个):")
for i, item in enumerate(failed_items[:10], 1):
logger.error(f" {i:2d}. {item}")
if len(failed_items) > 10:
logger.error(f" ... 还有 {len(failed_items) - 10} 个项目")
def _get_domain_incompatibility_reason(self, domain):
"""获取域名不兼容的具体原因"""
if not domain:
return "空域名"
# 长度检查
max_length = TOOL_CONFIG.get('max_domain_length', 150)
if len(domain) > max_length:
return f"域名过长 ({len(domain)} > {max_length})"
if len(domain) < 4:
return f"域名过短 ({len(domain)} < 4)"
# 特殊字符检查
invalid_chars = ['&', '<', '>', '"', "'", ' ', '\t', '\n', '\r']
for char in invalid_chars:
if char in domain:
return f"包含无效字符: '{char}'"
# 格式检查
if domain.startswith('.') or domain.endswith('.'):
return "域名不能以点开头或结尾"
# 通配符检查
if '*' in domain and not domain.startswith('*.'):
return "通配符格式错误(只支持 *.example.com 格式)"
# 标签检查
labels = domain.replace('*.', '').split('.')
if len(labels) < 2:
return "域名标签数量不足"
max_levels = TOOL_CONFIG.get('max_domain_levels', 8)
if len(labels) > max_levels:
return f"域名层级过多 ({len(labels)} > {max_levels})"
for i, label in enumerate(labels):
if not label:
return f"第{i+1}个标签为空"
if len(label) > 63:
return f"第{i+1}个标签过长 ({len(label)} > 63)"
if label.startswith('-') or label.endswith('-'):
return f"第{i+1}个标签格式错误(不能以-开头或结尾)"
if not re.match(r'^[a-zA-Z0-9-]+$', label):
return f"第{i+1}个标签包含无效字符"
# 顶级域检查
if not labels[-1].isalpha():
return "顶级域必须是字母"
# 严格模式特殊检查
if TOOL_CONFIG.get('strict_domain_filter', True):
if domain.startswith('*.') and domain.count('.') > 4:
return "通配符域名层级过多(华为防火墙限制)"
# 特殊域名模式(仅包含经过实际测试确认不兼容的域名)
# 注意:用户确认腾讯云和企业微信会议域名可以在WebUI手动添加,所以不再过滤
problematic_patterns = [
# 可以在这里添加经过实际测试确认不兼容的域名模式
# 目前为空
]
for pattern, desc in problematic_patterns:
if pattern in domain:
return f"已知不兼容域名类型: {desc}"
return "未知兼容性问题"
def _display_sync_summary(self, sync_summary, ip_addresses, domains):
"""显示同步结果摘要"""
logger.info("📊 同步结果摘要:")
# IP地址同步结果
if ip_addresses:
if sync_summary['ip_success']:
successful_ips = len(ip_addresses) - len(sync_summary['ip_failed_items'])
logger.info(f" ✅ IP地址对象: 成功写入 {successful_ips} 个IP地址")
if sync_summary['ip_failed_items']:
logger.info(f" ⚠️ IP地址对象: 跳过 {len(sync_summary['ip_failed_items'])} 个无效IP")
else:
logger.error(f" ❌ IP地址对象: 写入失败")
if sync_summary['ip_failed_items']:
logger.error(f" 📋 失败项目数: {len(sync_summary['ip_failed_items'])}")
# 域名同步结果
if domains:
if sync_summary['domain_success']:
successful_domains = len(domains) - len(sync_summary['domain_failed_items'])
logger.info(f" ✅ 域名组: 成功写入 {successful_domains} 个域名")
if sync_summary['domain_failed_items']:
logger.info(f" ⚠️ 域名组: 跳过 {len(sync_summary['domain_failed_items'])} 个不兼容域名")
else:
logger.error(f" ❌ 域名组: 写入失败")
if sync_summary['domain_failed_items']:
logger.error(f" 📋 失败项目数: {len(sync_summary['domain_failed_items'])}")
# 详细失败清单(如果启用)
if TOOL_CONFIG.get('show_failure_details', True):
total_failed = len(sync_summary['ip_failed_items']) + len(sync_summary['domain_failed_items'])
if total_failed > 0:
logger.info(f"\n📋 详细失败清单 (共 {total_failed} 项):")
# 确定显示多少个失败项目
show_all = TOOL_CONFIG.get('show_all_failures', False)
max_items = None if show_all else 5
# IP失败项目
if sync_summary['ip_failed_items']:
logger.info(f" 🔸 IP地址失败项目 ({len(sync_summary['ip_failed_items'])} 个):")
items_to_show = sync_summary['ip_failed_items'] if show_all else sync_summary['ip_failed_items'][:5]
for i, item in enumerate(items_to_show, 1):
if isinstance(item, dict):
logger.info(f" {i}. {item.get('ip', 'N/A')} - {item.get('error', 'N/A')}")
else:
logger.info(f" {i}. {item}")
if not show_all and len(sync_summary['ip_failed_items']) > 5:
logger.info(f" ... 还有 {len(sync_summary['ip_failed_items']) - 5} 个IP地址失败")
logger.info(f" 💡 使用 --show-all-failures 选项查看所有失败项目")
# 域名失败项目
if sync_summary['domain_failed_items']:
logger.info(f" 🔸 域名失败项目 ({len(sync_summary['domain_failed_items'])} 个):")
items_to_show = sync_summary['domain_failed_items'] if show_all else sync_summary['domain_failed_items'][:5]
for i, item in enumerate(items_to_show, 1):
if isinstance(item, dict):
logger.info(f" {i}. {item.get('domain', 'N/A')} - {item.get('details', item.get('error', 'N/A'))}")
else:
logger.info(f" {i}. {item}")
if not show_all and len(sync_summary['domain_failed_items']) > 5:
logger.info(f" ... 还有 {len(sync_summary['domain_failed_items']) - 5} 个域名失败")
logger.info(f" 💡 使用 --show-all-failures 选项查看所有失败项目")
def _delete_address_object(self, object_name, vsys="public"):
"""删除地址对象"""
logger.info(f"🗑️ 删除现有地址对象: {object_name}")
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="401" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-object xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">
<vsys>{vsys}</vsys>
<name>{object_name}</name>
</addr-object>
</address-set>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
# 删除操作可能返回错误(如果对象不存在),这是正常的
if response and ("ok" in response.lower() or "data-missing" in response.lower()):
logger.debug(f"地址对象 {object_name} 删除完成")
else:
logger.debug(f"地址对象 {object_name} 可能不存在,继续创建")
except Exception as e:
logger.debug(f"删除地址对象时出错(可忽略): {e}")
def _delete_domain_set(self, domain_set_name, vsys="public"):
"""删除域名组"""
logger.info(f"🗑️ 删除现有域名组: {domain_set_name}")
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="501" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">
<name>{domain_set_name}</name>
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
# 删除操作可能返回错误(如果对象不存在),这是正常的
if response and ("ok" in response.lower() or "data-missing" in response.lower()):
logger.debug(f"域名组 {domain_set_name} 删除完成")
else:
logger.debug(f"域名组 {domain_set_name} 可能不存在,继续创建")
except Exception as e:
logger.debug(f"删除域名组时出错(可忽略): {e}")
def sync_wechat_work_addresses(self, json_url, ip_object_name="WeCom_IP", domain_object_name="WeCom_Domain", vsys="public", auto_save=None):
"""同步企业微信地址信息到防火墙"""
logger.info("🚀 开始同步企业微信地址信息...")
# 1. 下载数据
data = self.download_wechat_work_data(json_url)
if not data:
return False
# 2. 解析数据
ip_addresses, domains = self.parse_wechat_work_data(data)
if not ip_addresses and not domains:
logger.warning("⚠️ 未找到任何IP地址或域名")
return False
logger.info(f"📊 解析结果:")
logger.info(f" IP地址: {len(ip_addresses)} 个")
logger.info(f" 域名: {len(domains)} 个")
# 3. 同步到防火墙
success_count = 0
total_failed_items = []
sync_summary = {
'ip_success': False,
'ip_failed_items': [],
'domain_success': False,
'domain_failed_items': []
}
# 同步IP地址
if ip_addresses:
ip_success, ip_failed = self.create_or_update_address_object(ip_object_name, ip_addresses, vsys)
sync_summary['ip_success'] = ip_success
sync_summary['ip_failed_items'] = ip_failed
if ip_success:
success_count += 1
total_failed_items.extend(ip_failed)
# 同步域名
if domains:
domain_success, domain_failed = self.create_or_update_domain_set(domain_object_name, domains, vsys)
sync_summary['domain_success'] = domain_success
sync_summary['domain_failed_items'] = domain_failed
if domain_success:
success_count += 1
total_failed_items.extend(domain_failed)
total_operations = (1 if ip_addresses else 0) + (1 if domains else 0)
# 显示同步结果摘要
self._display_sync_summary(sync_summary, ip_addresses, domains)
if success_count == total_operations:
logger.info("🎉 企业微信地址信息同步完成!")
# 保存配置
should_save = auto_save if auto_save is not None else TOOL_CONFIG.get('auto_save_config', True)
if should_save:
if self.save_configuration():
logger.info("💾 配置已保存到防火墙存储")
else:
logger.warning("⚠️ 配置保存失败,请手动执行保存命令")
else:
logger.info("⚠️ 已跳过配置保存,请手动执行 save 命令")
return True
else:
logger.error(f"❌ 同步部分失败:{success_count}/{total_operations} 个操作成功")
if total_failed_items and TOOL_CONFIG.get('show_failure_details', True):
logger.error(f"📋 总共 {len(total_failed_items)} 个项目写入失败")
return False
def export_parsed_data(self, ip_addresses, domains, filename=None):
"""导出解析后的数据到JSON文件"""
if filename is None:
filename = f"wechat_work_addresses_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
data = {
'timestamp': str(datetime.now()),
'source': '企业微信官方API',
'ip_addresses': ip_addresses,
'domains': domains,
'summary': {
'total_ips': len(ip_addresses),
'total_domains': len(domains)
}
}
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 解析数据已导出到 {filename}")
return True
except Exception as e:
logger.error(f"❌ 导出数据失败: {e}")
return False
# =============================================================================
# 🚀 一键运行函数
# =============================================================================
def quick_sync():
"""使用预设配置一键同步"""
print("🚀 企业微信地址一键同步")
print("=" * 50)
print(f"📡 数据源: {WECHAT_WORK_API_URL}")
print(f"🔥 防火墙: {FIREWALL_CONFIG['host']}:{FIREWALL_CONFIG['port']}")
print(f"👤 用户: {FIREWALL_CONFIG['username']}")
print(f"🎯 IP对象: {FIREWALL_OBJECTS['ip_object_name']}")
print(f"🌐 域名组: {FIREWALL_OBJECTS['domain_object_name']}")
print("=" * 50)
# 创建同步管理器
sync_manager = WeChatWorkSyncManager(
FIREWALL_CONFIG['host'],
FIREWALL_CONFIG['username'],
FIREWALL_CONFIG['password'],
FIREWALL_CONFIG['port']
)
try:
# 建立连接
print(f"\n🔗 正在连接到防火墙...")
if not sync_manager.connect():
print("❌ 连接失败")
return False
print("✅ 连接成功!")
# 执行同步
success = sync_manager.sync_wechat_work_addresses(
WECHAT_WORK_API_URL,
FIREWALL_OBJECTS['ip_object_name'],
FIREWALL_OBJECTS['domain_object_name'],
FIREWALL_OBJECTS['vsys']
)
if success:
print("\n🎉 一键同步完成!")
return True
else:
print("\n❌ 同步失败")
return False
except KeyboardInterrupt:
logger.info("⚠️ 用户中断操作")
return False
except Exception as e:
logger.error(f"❌ 执行过程中发生错误: {e}")
return False
finally:
sync_manager.disconnect()
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='企业微信地址同步工具')
parser.add_argument('--url', default=WECHAT_WORK_API_URL, help=f'企业微信JSON数据下载地址 (默认: {WECHAT_WORK_API_URL[:50]}...)')
parser.add_argument('--ip-object', default=FIREWALL_OBJECTS['ip_object_name'], help=f'IP地址对象名称 (默认: {FIREWALL_OBJECTS["ip_object_name"]})')
parser.add_argument('--domain-object', default=FIREWALL_OBJECTS['domain_object_name'], help=f'域名组名称 (默认: {FIREWALL_OBJECTS["domain_object_name"]})')
parser.add_argument('--vsys', default=FIREWALL_OBJECTS['vsys'], help=f'虚拟系统名称 (默认: {FIREWALL_OBJECTS["vsys"]})')
parser.add_argument('--host', default=FIREWALL_CONFIG['host'], help=f'防火墙IP地址 (默认: {FIREWALL_CONFIG["host"]})')
parser.add_argument('--username', default=FIREWALL_CONFIG['username'], help=f'用户名 (默认: {FIREWALL_CONFIG["username"]})')
parser.add_argument('--password', default=FIREWALL_CONFIG['password'], help=f'密码 (默认: {FIREWALL_CONFIG["password"]})')
parser.add_argument('--port', type=int, default=FIREWALL_CONFIG['port'], help=f'NETCONF端口 (默认: {FIREWALL_CONFIG["port"]})')
parser.add_argument('--export-only', action='store_true', help='仅解析和导出数据,不写入防火墙')
parser.add_argument('--quick', action='store_true', help='使用预设配置一键同步')
parser.add_argument('--no-save', action='store_true', help='不自动保存配置到防火墙存储')
parser.add_argument('--save-only', action='store_true', help='仅保存当前配置,不进行同步')
parser.add_argument('--no-failure-details', action='store_true', help='不显示详细的失败清单')
parser.add_argument('--show-all-failures', action='store_true', help='显示所有失败项目(不限制数量)')
args = parser.parse_args()
# 根据命令行参数调整配置
if args.no_failure_details:
TOOL_CONFIG['show_failure_details'] = False
if args.show_all_failures:
TOOL_CONFIG['show_all_failures'] = True
# 如果指定了一键模式,直接运行
if args.quick:
success = quick_sync()
sys.exit(0 if success else 1)
# 如果指定了仅保存模式
if args.save_only:
print("💾 仅保存配置模式")
sync_manager = WeChatWorkSyncManager(args.host, args.username, args.password, args.port)
try:
if sync_manager.connect():
if sync_manager.save_configuration():
print("✅ 配置保存成功")
sys.exit(0)
else:
print("❌ 配置保存失败")
sys.exit(1)
else:
print("❌ 连接失败")
sys.exit(1)
finally:
sync_manager.disconnect()
print("🌐 企业微信地址同步工具")
print("📥 自动下载企业微信IP和域名列表")
print("🔄 同步到华为防火墙地址对象")
print("\n📋 当前配置:")
print(f" 📡 数据源: {args.url}")
print(f" 🔥 防火墙: {args.host}:{args.port} ({args.username})")
print(f" 🎯 IP对象: {args.ip_object}")
print(f" 🌐 域名组: {args.domain_object}")
print(f" 🏢 虚拟系统: {args.vsys}")
if TOOL_CONFIG['enable_debug']:
print(" 🔍 调试模式: 已启用")
if not TOOL_CONFIG['verify_ssl']:
print(" ⚠️ SSL验证: 已禁用")
# 创建同步管理器
sync_manager = WeChatWorkSyncManager(args.host, args.username, args.password, args.port)
try:
if not args.export_only:
# 建立连接
print(f"\n🔗 正在连接到 {args.host}:{args.port}...")
if not sync_manager.connect():
print("❌ 连接失败")
sys.exit(1)
print("✅ 连接成功!")
# 如果只是导出数据
if args.export_only:
print("\n📊 仅导出模式:解析数据但不写入防火墙")
data = sync_manager.download_wechat_work_data(args.url)
if data:
ip_addresses, domains = sync_manager.parse_wechat_work_data(data)
sync_manager.export_parsed_data(ip_addresses, domains)
else:
# 执行完整同步
auto_save = not args.no_save # 如果指定了--no-save则不保存
success = sync_manager.sync_wechat_work_addresses(
args.url,
args.ip_object,
args.domain_object,
args.vsys,
auto_save
)
if success:
print("\n🎉 同步完成!")
else:
print("\n❌ 同步失败")
sys.exit(1)
except KeyboardInterrupt:
logger.info("⚠️ 用户中断操作")
except Exception as e:
logger.error(f"❌ 执行过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 断开连接
if not args.export_only:
sync_manager.disconnect()
if __name__ == "__main__":
main()
对应网站对象写入
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网站白名单解析和防火墙写入工具 - GUI版本
功能:分析指定网站的所有相关域名和IP地址,自动写入华为防火墙
作者:大刘讲IT
版本:1.0
"""
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import threading
import socket
import ipaddress
import re
from typing import List, Set, Tuple
import requests
from urllib.parse import urlparse, urljoin
import time
import logging
import json
from datetime import datetime
import os
import sys
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
# =============================================================================
# 🔧 配置区域 - 可根据实际环境修改以下参数
# =============================================================================
# 华为防火墙连接配置
FIREWALL_CONFIG = {
'host': '192.168.50.1', # 防火墙IP地址
'username': 'netconf-admin', # 用户名
'password': 'Huawei@123', # 密码
'port': 830, # NETCONF端口
'timeout': 30 # 连接超时时间(秒)
}
# 防火墙对象配置
FIREWALL_OBJECTS = {
'address_group_name': 'Permitted_list', # 地址组名称
'vsys': 'public', # 虚拟系统名称
'address_group_description': '白名单地址组 - 自动创建', # 地址组描述
}
# 工具运行配置
TOOL_CONFIG = {
'enable_debug': False, # 是否启用调试日志
'request_timeout': 30, # HTTP请求超时时间(秒)
'verify_ssl': True, # 是否验证SSL证书
'max_domain_levels': 10, # 允许的最大域名层级数
'max_domain_length': 200, # 允许的最大域名长度
}
class NetconfSSHSession:
"""直接连接NETCONF端口830的会话类"""
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password
def connect(self):
"""建立到NETCONF端口830的直接连接"""
try:
import paramiko
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password,
timeout=FIREWALL_CONFIG['timeout'],
look_for_keys=False,
allow_agent=False
)
self.transport = self.ssh.get_transport()
self.channel = self.transport.open_session()
self.channel.invoke_subsystem('netconf')
hello_msg = self._read_netconf_message(timeout=10)
if hello_msg and "hello" in hello_msg.lower():
return True
else:
return False
except Exception as e:
raise Exception(f"连接失败: {e}")
def _read_netconf_message(self, timeout=30):
"""读取NETCONF消息"""
start_time = time.time()
message = ""
while time.time() - start_time < timeout:
try:
if self.channel.recv_ready():
data = self.channel.recv(4096).decode('utf-8')
message += data
if "]]>]]>" in message:
return message.replace("]]>]]>", "").strip()
time.sleep(0.1)
except Exception as e:
break
return message if message else None
def disconnect(self):
"""断开连接"""
try:
if hasattr(self, 'channel') and self.channel:
self.channel.close()
if hasattr(self, 'ssh') and self.ssh:
self.ssh.close()
except:
pass
def send_rpc(self, rpc_request):
"""发送RPC请求"""
try:
full_request = rpc_request + "]]>]]>"
self.channel.send(full_request.encode('utf-8'))
response = self._read_netconf_message(timeout=30)
return response
except Exception as e:
raise Exception(f"发送RPC请求失败: {e}")
class WebsiteWhitelistManager:
"""网站白名单管理类"""
def __init__(self, host, username, password, port=830):
self.host = host
self.username = username
self.password = password
self.port = port
self.session = None
def connect(self):
"""建立连接"""
self.session = NetconfSSHSession(self.host, self.port, self.username, self.password)
return self.session.connect()
def disconnect(self):
"""断开连接"""
if self.session:
self.session.disconnect()
def get_website_title(self, url: str) -> str:
"""获取网站标题"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10, verify=TOOL_CONFIG['verify_ssl'])
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
title = re.sub(r'\s+', ' ', title)
return title[:50]
else:
return "未知网站"
except Exception as e:
return "未知网站"
def extract_domain_from_url(self, url: str) -> str:
"""从URL中提取域名"""
try:
parsed = urlparse(url)
return parsed.netloc
except Exception as e:
return ""
def resolve_domain(self, domain: str) -> Set[str]:
"""解析域名获取IP地址"""
try:
ip = socket.gethostbyname(domain)
return {ip}
except socket.gaierror as e:
return set()
def get_all_domains_from_website(self, url: str) -> Set[str]:
"""获取网站所有相关域名"""
domains = set()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
try:
response = requests.get(url, headers=headers, timeout=TOOL_CONFIG['request_timeout'], verify=TOOL_CONFIG['verify_ssl'])
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 添加主域名
main_domain = self.extract_domain_from_url(url)
if main_domain:
domains.add(main_domain)
# 获取所有链接
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
if href:
full_url = urljoin(url, href)
domain = self.extract_domain_from_url(full_url)
if domain and domain not in domains:
domains.add(domain)
# 获取所有脚本
scripts = soup.find_all('script', src=True)
for script in scripts:
src = script['src']
if src:
full_url = urljoin(url, src)
domain = self.extract_domain_from_url(full_url)
if domain and domain not in domains:
domains.add(domain)
# 获取所有样式表
styles = soup.find_all('link', href=True)
for style in styles:
href = style['href']
if href:
full_url = urljoin(url, href)
domain = self.extract_domain_from_url(full_url)
if domain and domain not in domains:
domains.add(domain)
# 获取所有图片
images = soup.find_all('img', src=True)
for img in images:
src = img['src']
if src:
full_url = urljoin(url, src)
domain = self.extract_domain_from_url(full_url)
if domain and domain not in domains:
domains.add(domain)
# 获取iframe
iframes = soup.find_all('iframe', src=True)
for iframe in iframes:
src = iframe['src']
if src:
full_url = urljoin(url, src)
domain = self.extract_domain_from_url(full_url)
if domain and domain not in domains:
domains.add(domain)
except requests.exceptions.RequestException as e:
raise Exception(f"请求错误: {str(e)}")
except Exception as e:
raise Exception(f"解析错误: {str(e)}")
return domains
def analyze_website(self, url: str) -> Tuple[Set[str], Set[str], str]:
"""分析单个网站"""
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
title = self.get_website_title(url)
domains = self.get_all_domains_from_website(url)
all_ips = set()
for domain in domains:
resolved_ips = self.resolve_domain(domain)
all_ips.update(resolved_ips)
return domains, all_ips, title
def _escape_xml_chars(self, text):
"""转义XML特殊字符"""
if not text:
return text
text = text.replace('&', '&')
text = text.replace('<', '<')
text = text.replace('>', '>')
text = text.replace('"', '"')
text = text.replace("'", ''')
return text
def create_ip_address_object(self, object_name: str, ip_addresses: Set[str], description: str, vsys: str = "public"):
"""创建IP地址对象"""
if not ip_addresses:
raise Exception("没有IP地址需要添加")
# 先删除现有对象
self._delete_address_object(object_name, vsys)
# 验证IP地址格式并转换为/32格式
valid_ips = []
for ip_addr in ip_addresses:
try:
ip = ipaddress.ip_address(ip_addr)
valid_ips.append(f"{ip}/32")
except ValueError as e:
continue
if not valid_ips:
raise Exception("没有有效的IP地址可以创建地址对象")
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
full_description = f"{description} ({current_time})"
escaped_description = self._escape_xml_chars(full_description)
# 创建XML元素
elements_xml = ""
for i, ip_addr in enumerate(valid_ips):
elements_xml += f"""
<elements>
<elem-id>{i}</elem-id>
<address-ipv4>{ip_addr}</address-ipv4>
</elements>"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="201" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-object>
<vsys>{vsys}</vsys>
<name>{object_name}</name>
<desc>{escaped_description}</desc>{elements_xml}
</addr-object>
</address-set>
</config>
</edit-config>
</rpc>'''
response = self.session.send_rpc(rpc_request)
if not response or "ok" not in response.lower():
raise Exception(f"创建IP地址对象失败: {response}")
return len(valid_ips)
def create_domain_set(self, domain_set_name: str, domains: Set[str], description: str, vsys: str = "public"):
"""创建域名组"""
if not domains:
raise Exception("没有域名需要添加")
# 先删除现有域名组
self._delete_domain_set(domain_set_name, vsys)
# 验证域名格式
valid_domains = []
for domain in domains:
escaped_domain = self._escape_xml_chars(domain)
if self._is_valid_domain(escaped_domain):
valid_domains.append(escaped_domain)
if not valid_domains:
raise Exception("没有有效的域名可以创建域名组")
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
full_description = f"{description} ({current_time})"
escaped_description = self._escape_xml_chars(full_description)
# 创建域名XML
domains_xml = ""
for domain in valid_domains:
domains_xml += f"\n <domain>{domain}</domain>"
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="301" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set>
<name>{domain_set_name}</name>
<description>{escaped_description}</description>{domains_xml}
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
response = self.session.send_rpc(rpc_request)
if not response or "ok" not in response.lower():
raise Exception(f"创建域名组失败: {response}")
return len(valid_domains)
def create_or_update_address_group(self, group_name: str, member_objects: List[str], vsys: str = "public"):
"""智能创建或更新地址组 - 只添加不存在的成员"""
if not member_objects:
raise Exception("没有成员对象需要添加到地址组")
# 获取现有地址组的成员
existing_members = self._get_address_group_members(group_name, vsys)
# 过滤出需要添加的新成员
new_members = [member for member in member_objects if member not in existing_members]
if not new_members:
# 所有成员都已存在,无需添加
return {"status": "exists", "existing_count": len(existing_members), "new_count": 0}
# 如果地址组不存在,创建新的地址组
if not existing_members:
return self._create_new_address_group(group_name, member_objects, vsys)
else:
# 地址组存在,只添加新成员
return self._add_members_to_existing_group(group_name, new_members, existing_members, vsys)
def _get_address_group_members(self, group_name: str, vsys: str = "public"):
"""获取地址组的现有成员列表"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="601" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<get-config>
<source>
<running/>
</source>
<filter>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-group>
<vsys>{vsys}</vsys>
<name>{group_name}</name>
</addr-group>
</address-set>
</filter>
</get-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "data" in response.lower():
# 从响应中提取成员名称
members = []
if "addrset-name" in response:
# 简单的文本解析来提取成员名称
import re
pattern = r'<addrset-name>([^<]+)</addrset-name>'
matches = re.findall(pattern, response)
members = matches
return members
else:
return []
except Exception as e:
return []
def _create_new_address_group(self, group_name: str, member_objects: List[str], vsys: str = "public"):
"""创建新的地址组"""
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
group_description = f"{FIREWALL_OBJECTS['address_group_description']} ({current_time})"
escaped_description = self._escape_xml_chars(group_description)
# 创建成员XML
members_xml = ""
for i, member in enumerate(member_objects):
members_xml += f"""
<elements>
<elem-id>{i}</elem-id>
<addrset-name>{member}</addrset-name>
</elements>"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="401" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-group>
<vsys>{vsys}</vsys>
<name>{group_name}</name>
<desc>{escaped_description}</desc>{members_xml}
</addr-group>
</address-set>
</config>
</edit-config>
</rpc>'''
response = self.session.send_rpc(rpc_request)
if not response or "ok" not in response.lower():
raise Exception(f"创建地址组失败: {response}")
return {"status": "created", "existing_count": 0, "new_count": len(member_objects)}
def _add_members_to_existing_group(self, group_name: str, new_members: List[str], existing_members: List[str], vsys: str = "public"):
"""向现有地址组添加新成员并更新描述时间戳"""
# 计算下一个可用的elem-id
start_id = len(existing_members)
# 生成带时间戳的描述
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
group_description = f"{FIREWALL_OBJECTS['address_group_description']} ({current_time})"
escaped_description = self._escape_xml_chars(group_description)
# 创建新成员的XML
members_xml = ""
for i, member in enumerate(new_members):
members_xml += f"""
<elements>
<elem-id>{start_id + i}</elem-id>
<addrset-name>{member}</addrset-name>
</elements>"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="402" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<default-operation>merge</default-operation>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-group>
<vsys>{vsys}</vsys>
<name>{group_name}</name>
<desc>{escaped_description}</desc>{members_xml}
</addr-group>
</address-set>
</config>
</edit-config>
</rpc>'''
response = self.session.send_rpc(rpc_request)
if not response or "ok" not in response.lower():
raise Exception(f"添加成员到地址组失败: {response}")
return {"status": "updated", "existing_count": len(existing_members), "new_count": len(new_members)}
def _is_valid_domain(self, domain: str) -> bool:
"""验证域名格式是否有效"""
if not domain:
return False
if len(domain) < 3 or len(domain) > TOOL_CONFIG['max_domain_length']:
return False
invalid_chars = ['&', '<', '>', '"', "'", ' ', '\t', '\n', '\r']
for char in invalid_chars:
if char in domain:
return False
if domain.startswith('.') or domain.endswith('.'):
return False
labels = domain.split('.')
if len(labels) < 2:
return False
for label in labels:
if not label:
return False
if len(label) > 63:
return False
if not re.match(r'^[a-zA-Z0-9-]+$', label):
return False
if label.startswith('-') or label.endswith('-'):
return False
return True
def _delete_address_object(self, object_name: str, vsys: str = "public"):
"""删除地址对象"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="501" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-object xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">
<vsys>{vsys}</vsys>
<name>{object_name}</name>
</addr-object>
</address-set>
</config>
</edit-config>
</rpc>'''
try:
self.session.send_rpc(rpc_request)
except:
pass
def _delete_domain_set(self, domain_set_name: str, vsys: str = "public"):
"""删除域名组"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="502" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<domain-sets xmlns="urn:huawei:params:xml:ns:yang:huawei-domain-set">
<vsys>
<name>{vsys}</name>
<domain-set xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">
<name>{domain_set_name}</name>
</domain-set>
</vsys>
</domain-sets>
</config>
</edit-config>
</rpc>'''
try:
self.session.send_rpc(rpc_request)
except:
pass
def _delete_address_group(self, group_name: str, vsys: str = "public"):
"""删除地址组"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="503" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<config>
<address-set xmlns="urn:huawei:params:xml:ns:yang:huawei-address-set">
<addr-group xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">
<vsys>{vsys}</vsys>
<name>{group_name}</name>
</addr-group>
</address-set>
</config>
</edit-config>
</rpc>'''
try:
self.session.send_rpc(rpc_request)
except:
pass
def save_configuration(self):
"""保存配置到防火墙存储"""
rpc_request = '''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="999" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<save xmlns="urn:huawei:params:xml:ns:yang:huawei-save">
<format>cfg</format>
</save>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
return True
else:
return self._save_configuration_backup()
except Exception as e:
return self._save_configuration_backup()
def _save_configuration_backup(self):
"""备用的保存配置方法"""
backup_rpc = '''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="998" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<copy-config>
<source>
<running/>
</source>
<target>
<startup/>
</target>
</copy-config>
</rpc>'''
try:
response = self.session.send_rpc(backup_rpc)
return response and "ok" in response.lower()
except Exception as e:
return False
def get_security_policy_info(self, policy_name: str, vsys: str = "public"):
"""获取安全策略信息"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="701" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<get-config>
<source>
<running/>
</source>
<filter>
<sec-policy xmlns="urn:huawei:params:xml:ns:yang:huawei-security-policy">
<vsys>
<name>{vsys}</name>
<static-policy>
<rule>
<name>{policy_name}</name>
</rule>
</static-policy>
</vsys>
</sec-policy>
</filter>
</get-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "data" in response.lower():
# 检查返回的数据中是否包含策略名称
if policy_name in response:
return True
else:
return False
else:
return False
except Exception as e:
return False
def check_domain_in_security_policy(self, policy_name: str, domain_set_name: str, vsys: str = "public"):
"""检查安全策略的目标地址中是否已经包含指定的域名组"""
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="704" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<get-config>
<source>
<running/>
</source>
<filter>
<sec-policy xmlns="urn:huawei:params:xml:ns:yang:huawei-security-policy">
<vsys>
<name>{vsys}</name>
<static-policy>
<rule>
<name>{policy_name}</name>
<destination-ip>
<domain-set>{domain_set_name}</domain-set>
</destination-ip>
</rule>
</static-policy>
</vsys>
</sec-policy>
</filter>
</get-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "data" in response.lower():
# 检查返回的数据中是否包含域名组名称
if domain_set_name in response and "domain-set" in response:
return True
else:
return False
else:
return False
except Exception as e:
return False
def add_domain_to_security_policy(self, policy_name: str, domain_set_name: str, vsys: str = "public"):
"""将域名组添加到安全策略的目标地址"""
# 首先检查安全策略是否存在
if not self.get_security_policy_info(policy_name, vsys):
raise Exception(f"安全策略 '{policy_name}' 不存在,请先创建该策略")
# 检查安全策略中是否已经包含该域名组
if self.check_domain_in_security_policy(policy_name, domain_set_name, vsys):
return {"status": "exists", "message": f"域名组 '{domain_set_name}' 已存在于安全策略 '{policy_name}' 的目标地址中"}
# 添加域名组到安全策略的目标地址 - 使用华为官方文档的正确格式
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="702" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<error-option>rollback-on-error</error-option>
<config>
<sec-policy xmlns="urn:huawei:params:xml:ns:yang:huawei-security-policy" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" xmlns:yang="urn:ietf:params:xml:ns:yang:1">
<vsys>
<name>{vsys}</name>
<static-policy>
<rule nc:operation="merge">
<name>{policy_name}</name>
<destination-ip>
<domain-set>{domain_set_name}</domain-set>
</destination-ip>
</rule>
</static-policy>
</vsys>
</sec-policy>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
return {"status": "success", "message": f"成功将域名组 '{domain_set_name}' 添加到安全策略 '{policy_name}' 的目标地址"}
else:
# 尝试备用方法
return self._add_domain_to_policy_alternative(policy_name, domain_set_name, vsys)
except Exception as e:
# 尝试备用方法
return self._add_domain_to_policy_alternative(policy_name, domain_set_name, vsys)
def _add_domain_to_policy_alternative(self, policy_name: str, domain_set_name: str, vsys: str = "public"):
"""备用方法:将域名组添加到安全策略"""
# 备用XML格式 - 使用不同的操作方式
rpc_request = f'''<?xml version="1.0" encoding="UTF-8"?>
<rpc message-id="703" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<edit-config>
<target>
<running/>
</target>
<default-operation>merge</default-operation>
<config>
<sec-policy xmlns="urn:huawei:params:xml:ns:yang:huawei-security-policy" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0">
<vsys>
<name>{vsys}</name>
<static-policy>
<rule>
<name>{policy_name}</name>
<destination-ip nc:operation="merge">
<domain-set>{domain_set_name}</domain-set>
</destination-ip>
</rule>
</static-policy>
</vsys>
</sec-policy>
</config>
</edit-config>
</rpc>'''
try:
response = self.session.send_rpc(rpc_request)
if response and "ok" in response.lower():
return {"status": "success", "message": f"成功将域名组 '{domain_set_name}' 添加到安全策略 '{policy_name}' 的目标地址"}
else:
raise Exception(f"添加域名组到安全策略失败: {response}")
except Exception as e:
raise Exception(f"添加域名组到安全策略失败: {str(e)}")
class WhitelistParserGUI:
"""网站白名单解析GUI主界面"""
def __init__(self):
self.root = tk.Tk()
self.root.title("网站白名单解析和防火墙写入工具 v1.0")
self.root.geometry("800x600")
self.root.resizable(True, True)
# 设置图标(如果需要)
try:
self.root.iconbitmap("icon.ico")
except:
pass
# 创建样式
self.style = ttk.Style()
self.style.theme_use('clam')
# 创建主框架
self.create_widgets()
# 创建管理器
self.manager = None
self.is_processing = False
def create_widgets(self):
"""创建GUI组件"""
# 主标题
title_frame = ttk.Frame(self.root)
title_frame.pack(fill=tk.X, padx=10, pady=5)
title_label = ttk.Label(
title_frame,
text="🌐 网站白名单解析和防火墙写入工具",
font=("Arial", 16, "bold")
)
title_label.pack()
subtitle_label = ttk.Label(
title_frame,
text="自动分析网站域名和IP地址,写入华为防火墙",
font=("Arial", 10)
)
subtitle_label.pack()
# 分隔线
ttk.Separator(self.root, orient='horizontal').pack(fill=tk.X, padx=10, pady=5)
# 配置信息框架
config_frame = ttk.LabelFrame(self.root, text="防火墙配置", padding=10)
config_frame.pack(fill=tk.X, padx=10, pady=5)
config_info = f"""防火墙地址: {FIREWALL_CONFIG['host']}:{FIREWALL_CONFIG['port']}
用户名: {FIREWALL_CONFIG['username']}
地址组名称: {FIREWALL_OBJECTS['address_group_name']}
虚拟系统: {FIREWALL_OBJECTS['vsys']}"""
ttk.Label(config_frame, text=config_info, font=("Consolas", 9)).pack(anchor=tk.W)
# 输入框架
input_frame = ttk.LabelFrame(self.root, text="网站输入", padding=10)
input_frame.pack(fill=tk.X, padx=10, pady=5)
# 网站输入
ttk.Label(input_frame, text="请输入网站地址:").pack(anchor=tk.W)
url_frame = ttk.Frame(input_frame)
url_frame.pack(fill=tk.X, pady=5)
self.url_var = tk.StringVar()
self.url_entry = ttk.Entry(
url_frame,
textvariable=self.url_var,
font=("Arial", 11),
width=50
)
self.url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# 处理按钮
self.process_btn = ttk.Button(
url_frame,
text="开始分析",
command=self.start_processing,
style="Accent.TButton"
)
self.process_btn.pack(side=tk.RIGHT, padx=(10, 0))
# 示例文本
example_label = ttk.Label(
input_frame,
text="示例: www.baidu.com 或 https://www.google.com",
font=("Arial", 9),
foreground="gray"
)
example_label.pack(anchor=tk.W)
# 安全策略选项框架
policy_frame = ttk.LabelFrame(self.root, text="安全策略配置(可选)", padding=10)
policy_frame.pack(fill=tk.X, padx=10, pady=5)
# 安全策略复选框
self.enable_policy_var = tk.BooleanVar()
self.policy_checkbox = ttk.Checkbutton(
policy_frame,
text="将域名组关联到安全策略",
variable=self.enable_policy_var,
command=self.toggle_policy_input
)
self.policy_checkbox.pack(anchor=tk.W)
# 安全策略名称输入
self.policy_input_frame = ttk.Frame(policy_frame)
self.policy_input_frame.pack(fill=tk.X, pady=(5, 0))
ttk.Label(self.policy_input_frame, text="安全策略名称:").pack(anchor=tk.W)
policy_entry_frame = ttk.Frame(self.policy_input_frame)
policy_entry_frame.pack(fill=tk.X, pady=2)
self.policy_name_var = tk.StringVar()
self.policy_name_entry = ttk.Entry(
policy_entry_frame,
textvariable=self.policy_name_var,
font=("Arial", 11),
state='disabled'
)
self.policy_name_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# 策略说明
policy_desc_label = ttk.Label(
self.policy_input_frame,
text="注意:安全策略必须已存在,工具将在其目标地址中添加域名组",
font=("Arial", 9),
foreground="gray"
)
policy_desc_label.pack(anchor=tk.W)
# 进度框架
progress_frame = ttk.Frame(self.root)
progress_frame.pack(fill=tk.X, padx=10, pady=5)
self.progress_var = tk.StringVar(value="就绪")
self.progress_label = ttk.Label(progress_frame, textvariable=self.progress_var)
self.progress_label.pack(side=tk.LEFT)
self.progress_bar = ttk.Progressbar(
progress_frame,
mode='indeterminate'
)
self.progress_bar.pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(10, 0))
# 结果显示框架
result_frame = ttk.LabelFrame(self.root, text="分析结果", padding=10)
result_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 创建结果显示区域
self.result_text = scrolledtext.ScrolledText(
result_frame,
height=15,
font=("Consolas", 10),
wrap=tk.WORD
)
self.result_text.pack(fill=tk.BOTH, expand=True)
# 底部按钮框架
button_frame = ttk.Frame(self.root)
button_frame.pack(fill=tk.X, padx=10, pady=5)
# 清除按钮
ttk.Button(
button_frame,
text="清除日志",
command=self.clear_log
).pack(side=tk.LEFT)
# 关于按钮
ttk.Button(
button_frame,
text="关于",
command=self.show_about
).pack(side=tk.RIGHT)
# 绑定回车键
self.url_entry.bind('<Return>', lambda e: self.start_processing())
# 设置初始焦点
self.url_entry.focus()
def log_message(self, message, level="INFO"):
"""在结果显示区域记录消息"""
timestamp = datetime.now().strftime("%H:%M:%S")
# 根据级别设置前缀
if level == "ERROR":
prefix = "❌"
elif level == "SUCCESS":
prefix = "✅"
elif level == "WARNING":
prefix = "⚠️"
else:
prefix = "ℹ️"
formatted_message = f"[{timestamp}] {prefix} {message}\n"
self.result_text.insert(tk.END, formatted_message)
self.result_text.see(tk.END)
self.root.update()
def clear_log(self):
"""清除日志"""
self.result_text.delete(1.0, tk.END)
def toggle_policy_input(self):
"""切换安全策略输入框状态"""
if self.enable_policy_var.get():
self.policy_name_entry.configure(state='normal')
self.policy_name_entry.focus()
else:
self.policy_name_entry.configure(state='disabled')
self.policy_name_var.set("")
def show_about(self):
"""显示关于对话框"""
about_text = """网站白名单解析和防火墙写入工具 v1.0
功能特性:
• 自动分析网站所有相关域名和IP地址
• IP地址以/32格式写入防火墙地址对象
• 域名写入防火墙域名组
• 自动添加到Permitted_list地址组
• 可选择将域名组关联到安全策略
• 描述包含网站标题和创建时间
安全策略关联:
• 支持将域名组自动添加到指定安全策略的目标地址
• 安全策略必须预先存在于防火墙中
• 关联失败不影响其他对象的创建
技术支持:
• 基于NETCONF协议连接华为防火墙
• 智能域名验证和IP地址解析
• 完整的错误处理和日志记录
• 多线程处理,界面响应流畅
作者: 大刘讲IT
版本: 1.0"""
messagebox.showinfo("关于", about_text)
def start_processing(self):
"""开始处理网站"""
if self.is_processing:
return
url = self.url_var.get().strip()
if not url:
messagebox.showerror("错误", "请输入网站地址")
return
# 检查安全策略配置
if self.enable_policy_var.get():
policy_name = self.policy_name_var.get().strip()
if not policy_name:
messagebox.showerror("错误", "请输入安全策略名称")
self.policy_name_entry.focus()
return
# 在后台线程中执行处理
self.is_processing = True
self.process_btn.configure(state='disabled', text="处理中...")
self.progress_bar.start()
self.progress_var.set("正在处理...")
# 清除之前的日志
self.clear_log()
# 获取安全策略配置
enable_policy = self.enable_policy_var.get()
policy_name = self.policy_name_var.get().strip() if enable_policy else None
thread = threading.Thread(target=self.process_website, args=(url, enable_policy, policy_name))
thread.daemon = True
thread.start()
def process_website(self, url, enable_policy=False, policy_name=None):
"""在后台线程中处理网站"""
try:
self.log_message(f"开始处理网站: {url}")
if enable_policy and policy_name:
self.log_message(f"安全策略关联: 启用 -> {policy_name}")
# 创建管理器
self.manager = WebsiteWhitelistManager(
FIREWALL_CONFIG['host'],
FIREWALL_CONFIG['username'],
FIREWALL_CONFIG['password'],
FIREWALL_CONFIG['port']
)
# 连接防火墙
self.log_message("正在连接防火墙...")
self.manager.connect()
self.log_message("防火墙连接成功", "SUCCESS")
# 分析网站
self.log_message("正在分析网站...")
domains, ips, title = self.manager.analyze_website(url)
self.log_message(f"网站标题: {title}")
self.log_message(f"发现 {len(domains)} 个域名,{len(ips)} 个IP地址")
if not domains and not ips:
raise Exception("没有找到任何域名或IP地址")
# 提取主域名作为对象名称
main_domain = self.manager.extract_domain_from_url(url if url.startswith(('http://', 'https://')) else 'https://' + url)
if not main_domain:
raise Exception("无法提取主域名")
# 清理域名名称
object_name = re.sub(r'[^a-zA-Z0-9._-]', '_', main_domain)
success_count = 0
total_operations = 0
# 显示发现的域名和IP
self.log_message("\n发现的域名:")
for i, domain in enumerate(sorted(domains), 1):
self.log_message(f" {i:2d}. {domain}")
self.log_message("\n发现的IP地址:")
for i, ip in enumerate(sorted(ips), 1):
self.log_message(f" {i:2d}. {ip}")
# 创建IP地址对象
if ips:
total_operations += 1
self.log_message(f"\n正在创建IP地址对象: {object_name}")
ip_description = f"{title} - IP地址白名单"
ip_count = self.manager.create_ip_address_object(object_name, ips, ip_description, FIREWALL_OBJECTS['vsys'])
self.log_message(f"成功创建IP地址对象,包含 {ip_count} 个IP地址", "SUCCESS")
success_count += 1
# 创建域名组
domain_object_name = f"{object_name}_domains"
domain_created = False
if domains:
total_operations += 1
self.log_message(f"正在创建域名组: {domain_object_name}")
domain_description = f"{title} - 域名白名单"
domain_count = self.manager.create_domain_set(domain_object_name, domains, domain_description, FIREWALL_OBJECTS['vsys'])
self.log_message(f"成功创建域名组,包含 {domain_count} 个域名", "SUCCESS")
success_count += 1
domain_created = True
# 关联到安全策略(如果启用且域名组创建成功)
if enable_policy and policy_name and domain_created:
total_operations += 1
self.log_message(f"正在检查并关联域名组到安全策略: {policy_name}")
try:
result = self.manager.add_domain_to_security_policy(policy_name, domain_object_name, FIREWALL_OBJECTS['vsys'])
if isinstance(result, dict):
if result["status"] == "exists":
self.log_message(f"域名组已存在: {result['message']}", "INFO")
success_count += 1
elif result["status"] == "success":
self.log_message(f"关联成功: {result['message']}", "SUCCESS")
success_count += 1
else:
# 兼容旧的返回格式
if result:
self.log_message(f"成功将域名组关联到安全策略 '{policy_name}'", "SUCCESS")
success_count += 1
except Exception as e:
self.log_message(f"安全策略关联失败: {str(e)}", "WARNING")
self.log_message("提示:请确保安全策略已存在,或手动在防火墙界面中添加", "WARNING")
# 不阻止后续操作,继续处理
# 创建或更新地址组
if success_count > 0:
total_operations += 1
member_objects = []
if ips:
member_objects.append(object_name)
if member_objects:
self.log_message(f"正在更新地址组: {FIREWALL_OBJECTS['address_group_name']}")
member_count = self.manager.create_or_update_address_group(FIREWALL_OBJECTS['address_group_name'], member_objects, FIREWALL_OBJECTS['vsys'])
self.log_message(f"成功更新地址组,包含 {member_count} 个成员", "SUCCESS")
success_count += 1
# 保存配置
if success_count >= (total_operations - 1): # 允许安全策略关联失败
self.log_message("正在保存配置...")
if self.manager.save_configuration():
self.log_message("配置已保存到防火墙存储", "SUCCESS")
else:
self.log_message("配置保存失败,请手动执行保存命令", "WARNING")
self.log_message(f"\n🎉 处理完成!创建的对象:", "SUCCESS")
self.log_message(f"• IP地址对象: {object_name}")
self.log_message(f"• 域名组: {domain_object_name}")
self.log_message(f"• 地址组: {FIREWALL_OBJECTS['address_group_name']}")
if enable_policy and policy_name:
if success_count == total_operations:
self.log_message(f"• 安全策略: {policy_name} (域名组关联完成)", "SUCCESS")
else:
self.log_message(f"• 安全策略: {policy_name} (关联失败,请手动处理)", "WARNING")
# 显示成功消息
success_msg = "网站白名单处理完成!"
if enable_policy and policy_name and success_count < total_operations:
success_msg += "\n注意:安全策略关联失败,请手动处理。"
self.root.after(0, lambda: messagebox.showinfo("成功", success_msg))
else:
raise Exception(f"处理部分失败:{success_count}/{total_operations} 个操作成功")
except Exception as e:
error_msg = f"处理失败: {str(e)}"
self.log_message(error_msg, "ERROR")
self.root.after(0, lambda: messagebox.showerror("错误", error_msg))
finally:
# 断开连接
if self.manager:
self.manager.disconnect()
# 重置界面状态
self.root.after(0, self.reset_ui)
def reset_ui(self):
"""重置UI状态"""
self.is_processing = False
self.process_btn.configure(state='normal', text="开始分析")
self.progress_bar.stop()
self.progress_var.set("就绪")
def run(self):
"""运行GUI"""
self.root.mainloop()
def main():
"""主函数"""
try:
app = WhitelistParserGUI()
app.run()
except Exception as e:
messagebox.showerror("启动错误", f"程序启动失败: {str(e)}")
if __name__ == "__main__":
main()
更多推荐
所有评论(0)