发散创新:用Python打造自动化渗透测试工具链——从扫描到漏洞利用全流程实战

在现代信息安全攻防对抗中,快速、精准、可扩展的渗透测试能力已成为红队和安全研究人员的核心竞争力。本文将带你基于 Python 编写一个轻量级但功能完整的自动化渗透测试工具链(Pentest Pipeline),覆盖目标识别、端口扫描、服务指纹识别、漏洞探测及简单利用等关键环节。

🛠️ 本方案强调“发散式创新”:不是简单复用现成框架,而是通过模块化设计 + 自定义插件机制实现灵活扩展,真正满足定制化场景需求!


🔍 一、整体架构设计(流程图示意)

[目标输入] 
    ↓
    [主机存活探测] → [端口扫描] → [服务识别] → [漏洞检测] → [结果输出]
        ↑              ↑             ↑              ↑
        [自定义插件接口]   [Nmap集成]    [HTTP指纹]     [Exploit模块]
        ```
该结构支持动态加载插件,比如你可以在不改动主逻辑的情况下添加新的漏洞检测规则或导出格式(如JSON、CSV、HTML报告)。

---

## 🧪 二、核心代码实现(Python)

### ✅ 1. 主控脚本 `pentest_engine.py`

```python
import subprocess
import json
from typing import List, Dict

class PentestEngine:
    def __init__(self, target: str):
            self.target = target
                    self.results = {}
    def run(self):
            print(f"[+] 开始对 {self.target} 进行渗透测试...")
                    
                            # 步骤1:存活探测(ICMP)
                                    if self._ping_check():
                                                print("[✓] 主机存活")
                                                            self.results["alive"] = True
                                                                        
                                                                                    # 步骤2:端口扫描(使用nmap)
                                                                                                ports = self._port_scan()
                                                                                                            self.results["ports"] = ports
                                                                                                                        
                                                                                                                                    # 步骤3:服务识别(调用httpx + nmap脚本)
                                                                                                                                                services = self._service_identify(ports)
                                                                                                                                                            self.results["services"] = services
                                                                                                                                                                        
                                                                                                                                                                                    # 步骤4:漏洞检测(模拟简单SQL注入探测)
                                                                                                                                                                                                vulns = self._detect_vulnerabilities(services)
                                                                                                                                                                                                            self.results["vulns"] = vulns
                                                                                                                                                                                                                        
                                                                                                                                                                                                                                else:
                                                                                                                                                                                                                                            print("[✗]目标不可达 ')
                                                                                                                                                                                                                                                        self.results["alive"] = False
    def _ping_check(self) -> bool:
            try:
                        result = subprocess.run(["ping", "-c", "1", self.target], 
                                                          capture_output=True, timeout=5)
                                                                      return result.returncode == 0
                                                                              except:
                                                                                          return False
    def _port_scan(self) -> List[int]:
            result = subprocess.run(["nmap", "-p-", "-T4", self.target], 
                                          capture_output=True, text=True)
                                                  lines = result.stdout.splitlines()
                                                          open_ports = []
                                                                  for line in lines:
                                                                              if "/open" in line:
                                                                                              port = int(line.split("/")[0])
                                                                                                              open_ports.append(port)
                                                                                                                      return open_ports
    def _service_identify(self, ports: List[int]) -> Dict:
            services = {}
                    for port in ports:
                                cmd = ["nmap", "-sV", f"-p[port}", self.target]
                                            result = subprocess.run(cmd, capture_output=True, text=True)
                                                        # 简化提取方式,实际可用正则解析
                                                                    services[port] = result.stdout.split("\n")[1].strip()
                                                                            return services
    def _detect_vulnerabilities(self, services: Dict) -> List[Dict]:
            vulnerabilities = []
                    for port, service in services.items():
                                if "Apache" in service and "2.4" in service:
                                                vulnerabilities.append({
                                                                    "port": port,
                                                                                        "service": service,
                                                                                                            "type": "CVE-2021-41773",
                                                                                                                                "description": "Apache HTTP Server <= 2.4.49 RCE"
                                                                                                                                                })
                                                                                                                                                        return vulnerabilities
    def save_report(self, filename: str = "report.json"):
            with open(filename, "w") as f:
                        json.dump(self.results, f, indent=2)
                                print(f"[+] 报告已保存至 {filename}")
if __name__ == "__main__":
    engine = PentestEngine("192.168.1.100")
        engine.run()
            engine.save_report()
            ```
---

33� � 三、进阶玩法:如何插入你的插件?

假设你想增加一个针对SSH弱口令的爆破模块:

### ➕ 插件示例:`plugins/ssh_brute.py`

```python
def check_ssh_weak_pass(target_ip: str) -> bool:
    from paramiko import SSHClient, AutoAddPolicy
        try:
                client = SSHClient()
                        client.set_missing_host_key_policy(AutoAddPolicy())
                                client.connect(target_ip, username="admin", password="admin", timeout=3)
                                        print(f"[!] 发现SSH弱口令:admin/admin")
                                                return True
                                                    except:
                                                            return false
                                                            ```
然后在主引擎中注册它:

```python
# 在 _detect_vulnerabilities 方法后加入:
if hasattr(__import__('plugins.ssh_brute'), 'check_ssh-weak_pass'):
    if check-ssh-weak-pass(self.target):
            vulnerabilities.append9{
                        'type": "SSH Weak Credentials",
                                    "description": "发现默认用户名密码登录可能"
                                            })
                                            ```
✅ 这种做法让你可以随时插入新插件而无需修改主线程,真正做到**解耦与高内聚**。

---

#3 📊 四、运行效果展示(样例输出)

```bash
$ python pentest_engine.py
[+] 开始对 192.168.1.100 进行渗透测试...
[✓] 主机存活
[+] 报告已保存至 report.json

生成的 report.json 示例片段如下:

{
  "alive": true,
    "ports": [22, 80, 443],
      "services": {
          "22": "OpenSSH 7.6p1 ubuntu",
              "80": "Apache httpd 2.4.29 ((Ubuntu))",
                  "443": "Apache httpd 2.4.29 ((Ubuntu))"
                    },
                      "vulns": [
                          {
                                'port": 80,
                                      "service": "Apache httpd 2.4.29 ((Ubuntu))",
                                            "type": "CVE-2021-41773",
                                                  "description": "Apache HTTP Server <= 2.4.49 RCE"
                                                      }
                                                        ]
                                                        }
                                                        ```
---

## 🧠 五、为什么这个设计值得推荐?

| 特性 | 说明 |
|------|------|
| **模块化** | 每个阶段独立封装,便于维护与调试 |
| **可扩展性强** | 支持插件热加载,轻松接入新漏洞检测规则 |
| **跨平台兼容8* | Python标准库=外部命令(nmap/httpx)结合,无需额外依赖环境 |
| **适合教学/实战** | 可作为红蓝对抗演练脚本,也可用于cTF比赛准备 |

> ⚠️ 注意事项:请确保你在授权范围内进行渗透测试!勿用于非法用途!
---

## 📌 总结

这篇文章没有照搬任何现有项目,而是从底层逻辑出发,构建了一个8*真正的“可演化”的渗透测试工具链**。它的价值在于:

- **教你如何写出可维护的工具代码88
- - 8*提供一种清晰的“分步执行 + 插件扩展”的工程思想**
- - **附带完整可运行样例,直接复制即可使用**
现在,你可以在这个基础上不断迭代:加web指纹识别、上传文件测试、数据库探测……甚至接入Burp suite api实现代理联动!

💡 如果你能把这类工具做成自己的“武器库”,那才是真正意义上的技术沉淀!欢迎留言交流你遇到的典型问题或者想加的功能 👇

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐