本文还有配套的精品资源,点击获取 menu-r.4af5f7ec.gif

简介:myScanner是一款专为局域网设计的IP端口扫描工具,可用于检测指定IP范围内设备的开放端口,帮助网络管理员监控网络状态、识别运行服务及排查安全隐患。本文详细介绍了该工具的工作原理、核心功能(如自定义IP段、多线程扫描、端口过滤、结果可视化与日志记录)及其基于TCP/UDP协议的扫描机制。同时强调了合法合规使用的重要性,避免未经授权扫描带来的法律与网络性能风险。通过本工具的实践应用,可有效提升局域网的安全性与运维效率。
myScanner.rar

1. IP端口扫描基本概念与作用

网络空间探测的基石:IP与端口的关系

在网络通信中,IP地址标识主机位置,端口号(0–65535)则指向具体服务。开放端口意味着潜在的服务暴露面,是攻击者与安全工程师共同关注的焦点。端口扫描即通过向目标主机的端口发送探测包,根据响应判断其开放状态。

扫描技术在网络安全中的核心价值

IP端口扫描广泛应用于渗透测试前期侦察、资产清点、防火墙策略验证及合规审计。通过对TCP/UDP协议栈的行为分析,可识别运行服务(如SSH、HTTP)、发现未授权开放端口(如数据库外泄),为漏洞评估提供输入。

主动扫描策略分类与适用场景

常见扫描方式包括全连接(Connect Scan)、半开放(SYN Scan)、UDP扫描和ICMP存活探测。前者兼容性强但易被日志记录,后者更隐蔽但需原始套接字权限。结合云环境自动化需求,现代扫描工具趋向集成化、可编程化,支撑大规模资产动态监测。

2. myScanner工具功能概述

在现代网络安全实践中,端口扫描不仅是资产发现和风险评估的基础手段,更是渗透测试、红蓝对抗与合规审计中不可或缺的技术环节。随着网络环境的复杂化,传统手工探测已无法满足效率与精度需求,自动化扫描工具成为安全从业者的核心武器之一。myScanner正是在此背景下设计开发的一款轻量级、高可扩展性的端口扫描工具,旨在提供灵活的输入控制、多模式扫描能力、清晰的用户交互以及结构化的结果输出。

该工具的设计不仅关注基础扫描功能的完整性,更强调模块解耦、代码可维护性与未来功能扩展的可能性。从IP地址解析到端口探测策略选择,再到扫描过程中的实时反馈与最终结果处理,myScanner通过分层架构实现了各功能组件之间的低耦合与高内聚。这种设计理念使其既能作为独立命令行工具用于日常网络排查,也可集成进更大的安全检测平台中作为子系统运行。

以下将深入剖析myScanner的核心功能模块构成,解析其用户交互逻辑,并探讨其在可维护性与功能延展方面的工程实现思路。

2.1 工具核心功能模块解析

myScanner的功能体系围绕“目标定义—扫描执行—状态判断—结果输出”这一主线构建,其中最为核心的是三大基础模块: IP地址范围输入与合法性校验机制、端口指定模式支持、多种扫描模式的选择与实现 。这些模块共同构成了工具的基本工作流,决定了其适用场景的广度与探测行为的准确性。

2.1.1 IP地址范围输入与合法性校验机制

有效的IP地址管理是任何网络探测工具的前提条件。myScanner支持多种IP输入格式,包括单个IP(如 192.168.1.1 )、连续IP段(如 192.168.1.1-192.168.1.100 )以及CIDR表示法(如 192.168.1.0/24 ),极大提升了使用灵活性。为确保输入数据的有效性,系统内置了严格的合法性校验流程。

该流程首先对输入字符串进行正则匹配,识别其所属类型:

import re
from ipaddress import ip_network, ip_address

def parse_ip_input(ip_str):
    # 匹配 CIDR 格式
    cidr_pattern = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$'
    # 匹配 A-B 范围格式
    range_pattern = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
    # 匹配单个IP
    single_pattern = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'

    if re.match(cidr_pattern, ip_str):
        try:
            net = ip_network(ip_str, strict=False)
            return list(net.hosts())
        except ValueError as e:
            raise ValueError(f"Invalid CIDR notation: {ip_str}") from e
    elif re.match(range_pattern, ip_str):
        start_ip, end_ip = ip_str.split('-')
        try:
            start = ip_address(start_ip.strip())
            end = ip_address(end_ip.strip())
            if start > end:
                raise ValueError("Start IP cannot be greater than end IP")
            return [ip_address(start + i) for i in range(int(end) - int(start) + 1)]
        except Exception as e:
            raise ValueError(f"Invalid IP range: {ip_str}") from e
    elif re.match(single_pattern, ip_str):
        try:
            return [ip_address(ip_str)]
        except Exception as e:
            raise ValueError(f"Invalid single IP: {ip_str}") from e
    else:
        raise ValueError(f"Unrecognized IP format: {ip_str}")
代码逻辑逐行解读分析:
  • 第3–5行 :导入必要的标准库, re 用于正则表达式匹配, ipaddress 提供IP对象操作接口。
  • 第7–10行 :定义三个正则表达式,分别用于识别CIDR、IP范围和单个IP地址格式。
  • 第12–18行 :若匹配CIDR格式,则调用 ip_network() 生成网络对象并提取所有可用主机地址(排除网络号和广播地址)。
  • 第19–28行 :若为IP范围格式,拆分为起始和结束IP,转换为整型后生成递增列表,确保顺序正确。
  • 第29–33行 :单个IP直接封装为列表返回。
  • 第34–35行 :不匹配任何格式时抛出异常,提示格式错误。

此函数返回一个包含 ipaddress.IPv4Address 对象的列表,便于后续统一遍历处理。此外,所有非法输入均被显式捕获并包装为有意义的错误信息,提升用户体验。

输入类型 示例 解析方式
单个IP 192.168.1.1 直接验证并封装
IP范围 192.168.1.1-192.168.1.5 拆分起点终点,线性生成
CIDR网段 192.168.1.0/24 使用 ip_network 生成主机列表

⚠️ 注意事项:
- 所有IP字段值必须在0–255之间;
- CIDR前缀长度不得超过32;
- 范围输入需保证起始IP小于等于终止IP;
- 支持非严格CIDR(允许主机位全0或全1)以适应实际部署环境。

该机制确保了输入数据的纯净性,避免因无效地址导致后续扫描中断或资源浪费。

2.1.2 端口指定模式支持(单端口、连续范围、常用端口预设)

端口是服务暴露的入口,精准指定目标端口能显著提高扫描效率。myScanner支持三种主流端口输入方式:

  1. 单端口 :如 80
  2. 连续范围 :如 1-1024
  3. 常用端口预设 :如 top100 web (映射至[80, 443, 8080]等)
def parse_port_input(port_str):
    common_presets = {
        'top10': [21, 22, 23, 25, 53, 80, 110, 139, 143, 445],
        'top100': list(range(1, 101)),  # 简化示例
        'web': [80, 443, 8080, 8443],
        'db': [3306, 5432, 1433, 1521]
    }

    if port_str.lower() in common_presets:
        return common_presets[port_str.lower()]

    if '-' in port_str:
        try:
            start, end = map(int, port_str.split('-'))
            if not (1 <= start <= end <= 65535):
                raise ValueError("Port range out of bounds")
            return list(range(start, end + 1))
        except Exception as e:
            raise ValueError(f"Invalid port range: {port_str}") from e

    try:
        port = int(port_str)
        if 1 <= port <= 65535:
            return [port]
        else:
            raise ValueError("Port number out of valid range")
    except ValueError:
        raise ValueError(f"Invalid port value: {port_str}")
参数说明与逻辑分析:
  • common_presets 字典 :预置高频端口组合,减少重复输入;
  • 大小写不敏感匹配 port_str.lower() 增强兼容性;
  • 连字符判断 - :用于区分范围输入;
  • 边界检查 :所有端口号限制在1–65535范围内;
  • 返回值统一为整数列表,便于迭代扫描。

该模块可通过配置文件动态扩展预设集,例如添加物联网设备常用端口(502, 8728)或容器相关端口(2375, 2376)。结合IP解析结果,形成完整的 (ip, port) 探测任务队列。

2.1.3 扫描模式选择:TCP Connect、SYN Scan、UDP Scan、ICMP Ping Sweep

myScanner支持四种主要扫描模式,适应不同场景下的隐蔽性、权限要求与协议覆盖需求。

扫描模式对比表
模式 原理简述 是否需要root权限 隐蔽性 适用协议
TCP Connect 完成三次握手 TCP
SYN Scan 发送SYN,收到SYN-ACK即判定开放 是(原始套接字) TCP
UDP Scan 发送空UDP包,等待ICMP Port Unreachable UDP
ICMP Ping 发送Echo Request,检测主机是否存活 ICMP

每种模式对应不同的底层实现机制。以TCP Connect为例,其基于Python标准库 socket 实现:

import socket

def tcp_connect_scan(target_ip, target_port, timeout=3):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        result = sock.connect_ex((str(target_ip), target_port))
        if result == 0:
            return 'open'
        else:
            return 'closed'
    except Exception as e:
        return f'error: {e}'
    finally:
        sock.close()
代码解释:
  • 第3行 :创建IPv4 TCP套接字;
  • 第4行 :设置连接超时时间,防止阻塞;
  • 第5–8行 connect_ex() 返回整数状态码,0表示成功连接(端口开放);
  • 第10行 :无论成败均关闭套接字,防止资源泄漏。

而对于SYN扫描,则需使用原始套接字构造自定义TCP头部:

flowchart TD
    A[用户选择SYN Scan] --> B{是否有root权限?}
    B -- 是 --> C[构造IP+TCP头部]
    C --> D[发送SYN包]
    D --> E[监听响应包]
    E --> F{收到SYN-ACK?}
    F -- 是 --> G[标记端口为open]
    F -- 否 --> H[标记为closed/filter]
    B -- 否 --> I[提示权限不足]

该流程图展示了SYN扫描的核心决策路径,体现了权限依赖与响应判断的关键节点。

尽管UDP扫描无法依赖明确的响应信号,但可通过发送探测包并监听ICMP错误报文间接推断状态:

def udp_scan(target_ip, target_port, timeout=5):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout)
    try:
        # 发送空UDP数据包
        sock.sendto(b'', (str(target_ip), target_port))
        try:
            data, addr = sock.recvfrom(1024)
            return 'open_or_filtered'  # 可能是有应用响应或防火墙拦截
        except socket.timeout:
            # 无响应可能是关闭或过滤
            return 'possibly_closed'
    except socket.error as e:
        if e.errno == 113:  # No route to host
            return 'unreachable'
        elif e.errno == 101:  # Network unreachable
            return 'network_down'
        else:
            return f'error: {e}'
    finally:
        sock.close()

📌 特别注意:UDP协议本身不强制响应,因此“无响应”不能直接判定为关闭。理想做法是在扫描前先进行ICMP可达性探测,减少误判率。

综上,myScanner通过多样化的扫描模式适配不同网络环境与安全策略,在实用性与技术深度之间取得平衡。

2.2 用户交互界面设计逻辑

良好的用户体验不仅体现在功能完整,更在于交互流程的顺畅与反馈的及时性。myScanner采用命令行驱动为主、配置文件辅助的方式,兼顾自动化脚本调用与人工操作的便捷性。

2.2.1 命令行参数解析流程(argparse或类似库的应用)

工具使用Python内置的 argparse 模块构建命令行接口,定义标准化选项:

import argparse

def build_argument_parser():
    parser = argparse.ArgumentParser(
        description="myScanner - A flexible network port scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s -t 192.168.1.1 -p 80
  %(prog)s -t 192.168.1.0/24 -p top100 -m syn
  %(prog)s -c config.yaml
        """
    )
    parser.add_argument('-t', '--target', required=True, help='Target IP or range (CIDR, dash-separated)')
    parser.add_argument('-p', '--ports', default='top10', help='Port(s): single, range (1-1024), or preset name')
    parser.add_argument('-m', '--mode', choices=['connect', 'syn', 'udp', 'ping'], default='connect',
                        help='Scan mode (default: connect)')
    parser.add_argument('--timeout', type=float, default=3.0, help='Connection timeout in seconds')
    parser.add_argument('--threads', type=int, default=50, help='Number of concurrent threads')
    parser.add_argument('-c', '--config', help='Path to YAML configuration file')
    parser.add_argument('-o', '--output', help='Output file path')
    parser.add_argument('--json', action='store_true', help='Output result in JSON format')

    return parser
参数说明:
  • -t/--target :必填项,指定扫描目标;
  • -p/--ports :可选,默认使用top10端口;
  • -m/--mode :限定合法值,防止非法模式传入;
  • --timeout :浮点型,支持小数秒级精细控制;
  • --threads :控制并发度,防止单机过载;
  • -c/--config :优先级高于命令行参数,实现批量配置复用;
  • -o/--output --json :决定结果持久化方式。

解析后的参数将传递给扫描引擎,驱动具体行为。

2.2.2 配置文件读取与默认参数设置

为支持复杂任务调度,myScanner支持YAML格式的配置文件加载:

# config.yaml
target: "192.168.1.0/24"
ports: "web"
mode: "connect"
timeout: 2.5
threads: 100
output:
  format: "json"
  path: "/var/log/scans/web_scan.json"
logging:
  level: "INFO"
  file: "/var/log/myScanner.log"

解析代码如下:

import yaml

def load_config(config_path):
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)
    return config

当用户指定 -c config.yaml 时,程序优先加载该文件内容,并将其键值对映射到命令行参数空间。未提供的字段仍保留默认值,实现“配置继承”。

这种方式特别适用于CI/CD流水线或定时扫描任务,避免重复输入长命令。

2.2.3 实时扫描进度反馈与中断响应机制

大规模扫描可能持续数分钟甚至更久,缺乏反馈会导致用户误判为卡死。myScanner通过 tqdm 库实现可视化进度条:

from tqdm import tqdm
import signal
import sys

# 全局标志位,用于响应中断
scan_interrupted = False

def signal_handler(signum, frame):
    global scan_interrupted
    print("\n\n[!] Scan interrupted by user. Shutting down gracefully...")
    scan_interrupted = True

signal.signal(signal.SIGINT, signal_handler)

def run_scan(targets, ports, mode_func):
    total_tasks = len(targets) * len(ports)
    results = []

    with tqdm(total=total_tasks, desc="Scanning", unit="port") as pbar:
        for ip in targets:
            if scan_interrupted:
                break
            for port in ports:
                if scan_interrupted:
                    break
                status = mode_func(ip, port)
                results.append({'ip': str(ip), 'port': port, 'status': status})
                pbar.update(1)
    return results
关键特性说明:
  • tqdm 进度条 :显示已完成/总数、速率、预计剩余时间;
  • signal.signal() 注册SIGINT :捕获Ctrl+C中断信号;
  • 全局标志位控制循环退出 :确保当前任务完成后优雅终止;
  • 资源清理建议 :可在finally块中关闭日志句柄或数据库连接。

此机制保障了长时间任务的可观测性与可控性,显著提升专业用户的操作信心。

2.3 功能扩展性与可维护性设计

软件生命周期中,可维护性往往比初始功能更重要。myScanner在架构层面遵循SOLID原则,尤其注重单一职责与开闭原则,确保新功能可在不影响现有代码的前提下安全添加。

2.3.1 模块化代码结构划分(扫描引擎、结果处理器、输出模块分离)

项目目录结构如下:

myscanner/
├── core/
│   ├── scanner.py       # 扫描引擎
│   ├── engine.py        # 任务调度
│   └── validator.py     # 输入校验
├── io/
│   ├── output.py        # 结果输出
│   └── config.py        # 配置加载
├── services/
│   └── fingerprint.py   # 服务识别插件
├── utils/
│   └── progress.py      # 进度条封装
└── main.py              # CLI入口

各模块职责分明:

模块 职责描述
core.scanner 封装各类扫描方法(TCP/UDP/SYN)
core.engine 管理线程池、任务分发、结果收集
io.output 支持JSON/CSV/Table等多种输出格式
services.fingerprint 提供Banner抓取与服务识别接口

例如,输出模块定义统一接口:

class OutputFormatter:
    def format(self, results):
        raise NotImplementedError

class JSONFormatter(OutputFormatter):
    def format(self, results):
        import json
        return json.dumps(results, indent=2)

class CSVFormatter(OutputFormatter):
    def format(self, results):
        import csv
        from io import StringIO
        output = StringIO()
        writer = csv.DictWriter(output, fieldnames=['ip', 'port', 'status'])
        writer.writeheader()
        writer.writerows(results)
        return output.getvalue()

主程序根据用户选择实例化对应格式化器,实现“策略模式”解耦。

2.3.2 插件式服务识别接口预留

未来可通过扩展 fingerprint.py 实现服务指纹识别:

def guess_service(port, banner=None):
    service_map = {
        22: "SSH",
        23: "Telnet",
        80: "HTTP",
        443: "HTTPS",
        3306: "MySQL",
        6379: "Redis"
    }
    if banner:
        if "nginx" in banner.lower():
            return "HTTP (nginx)"
        if "OpenSSH" in banner:
            return "SSH (OpenSSH)"
    return service_map.get(port, "Unknown")

进一步可引入Nmap的 nmap-service-probes 规则库做深度匹配,提升识别准确率。

2.3.3 日志输出格式标准化(JSON、CSV、文本日志)

日志不仅是调试依据,也是审计追踪的重要材料。myScanner使用 logging 模块输出结构化日志:

import logging
import json

def setup_logger(logfile=None, log_level=logging.INFO):
    logger = logging.getLogger('myScanner')
    logger.setLevel(log_level)

    formatter = logging.Formatter(
        '{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
        '"module": "%(name)s", "message": %(message)s}', datefmt='%Y-%m-%d %H:%M:%S'
    )

    handler = logging.FileHandler(logfile) if logfile else logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

日志条目示例:

{"timestamp": "2025-04-05 10:30:22", "level": "INFO", "module": "myScanner", "message": {"event": "scan_start", "target": "192.168.1.1/24", "ports": "top100"}}

结构化日志便于ELK等系统采集分析,助力企业级安全监控体系建设。

综上所述,myScanner不仅具备实用的扫描能力,更在架构设计上体现出工程化思维,为其长期演进提供了坚实基础。

3. TCP三次握手原理在端口扫描中的应用

在现代网络通信中,传输控制协议(TCP)作为最核心的传输层协议之一,其连接建立过程——即著名的“三次握手”机制,不仅是可靠数据流的基础保障,也是主动式端口扫描技术得以实现的关键理论支撑。深入理解这一过程,有助于开发者精准设计扫描策略,区分开放与关闭端口,并优化性能与隐蔽性之间的平衡。本章将围绕TCP三次握手的底层交互逻辑展开剖析,结合实际代码示例和网络行为分析,探讨其在端口探测中的具体应用场景,特别是如何通过不同层次的套接字操作实现Connect扫描与SYN扫描两种主流模式。

3.1 TCP连接建立过程深度解析

TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议。为了确保两端主机之间能够有序、无差错地交换数据,必须先完成一个明确的连接建立流程。这个流程被称为“三次握手”(Three-way Handshake),它不仅决定了通信是否可以开始,也直接影响了端口扫描工具对目标服务状态的判断依据。

3.1.1 SYN、SYN-ACK、ACK报文交互机制

TCP连接的建立始于客户端发起请求。整个过程由三个关键步骤构成:

  1. 第一次握手: 客户端向服务器发送一个TCP段,其中 SYN=1 (同步标志位置位),并选择一个初始序列号 seq=x 。该报文不携带数据,仅用于请求建立连接。
  2. 第二次握手: 服务器收到SYN后,若允许连接,则回复一个 SYN=1, ACK=1 的TCP段,确认客户端的序列号( ack=x+1 ),同时带上自己的初始序列号 seq=y
  3. 第三次握手: 客户端收到SYN-ACK后,再发送一个 ACK=1 的确认包,确认服务器的序列号( ack=y+1 ),此时连接正式建立,双方进入 ESTABLISHED 状态。
sequenceDiagram
    participant C as Client
    participant S as Server
    C->>S: SYN (seq=x)
    S->>C: SYN-ACK (seq=y, ack=x+1)
    C->>S: ACK (ack=y+1)
    Note right of C: Connection Established

上述流程看似简单,但在端口扫描中具有决定性意义。例如,在进行 Connect扫描 时,工具会完整模拟这三步;而在更高效的 SYN扫描 中,则只执行前两步,避免最终确认从而减少日志记录风险。

从协议角度看,每个TCP头部包含如下关键字段:
- Source Port , Destination Port :标识源与目的端口;
- Sequence Number Acknowledgment Number :用于数据排序与确认;
- 控制位: URG , ACK , PSH , RST , SYN , FIN

其中, SYN ACK 是三次握手的核心控制标志。正是通过对这些标志位的观察与构造,扫描器才能推断出远端端口的状态。

参数说明与逻辑分析
字段 含义 扫描用途
SYN=1 请求建立连接 判断服务监听状态
ACK=1 确认收到数据 区分响应类型
RST=1 强制终止连接 表明端口关闭或拒绝连接

当目标端口处于 开放状态 时,通常会返回 SYN-ACK ;而如果端口关闭,系统内核会直接返回一个 RST (复位)包,表示无法建立连接。这种差异构成了大多数TCP扫描方法的基本判据。

此外,防火墙或中间设备可能拦截某些类型的流量(如SYN包被丢弃而不回应),导致超时现象。因此,除了响应内容外,响应时间与是否存在响应本身也成为重要的辅助判断指标。

3.1.2 连接状态机变迁:CLOSED → SYN_SENT → ESTABLISHED

TCP协议定义了一个精细的状态机模型来管理连接生命周期。对于客户端而言,一次成功的三次握手涉及以下主要状态迁移路径:

  1. CLOSED :初始状态,未建立任何连接。
  2. SYN_SENT :调用 connect() 或手动发送SYN后进入此状态,等待对方响应。
  3. ESTABLISHED :收到SYN-ACK并成功发送ACK后,连接建立完成。

该状态变迁过程可通过Linux系统调用跟踪验证。以Python为例,使用标准socket库发起连接时,内部自动处理状态转换:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)  # 设置5秒超时
try:
    result = sock.connect_ex(('192.168.1.100', 80))  # 非阻塞connect
    if result == 0:
        print("Port 80 is open")
    else:
        print("Port closed or filtered")
except Exception as e:
    print(f"Error: {e}")
finally:
    sock.close()
代码逐行解读与参数说明
行号 代码 解读
1 import socket 导入Python内置socket模块,提供底层网络接口访问能力
3 socket.socket(...) 创建IPv4 TCP套接字对象,AF_INET表示IPV4地址族,SOCK_STREAM表示TCP流式套接字
4 settimeout(5) 设置连接超时时间为5秒,防止无限等待,提升扫描效率
6 connect_ex() 类似于 connect() ,但返回错误码而非抛出异常:
0 表示成功(端口开放)
errno.ECONNREFUSED 表示连接被拒(端口关闭)
其他值可能表示过滤或超时
7–10 条件判断与输出 根据返回码判断端口状态并打印结果
11–13 finally: sock.close() 确保无论成败都释放资源,避免文件描述符泄漏

此代码片段展示了最基本的端口探测方式——基于操作系统提供的 connect() 系统调用,完全依赖内核完成三次握手全过程。由于该方法使用的是 全连接扫描(Connect Scan) ,每一次尝试都会在目标主机上留下完整的连接记录(即使立即关闭),因此容易被防火墙或IDS检测到。

然而,正因为其实现简单且无需特殊权限(不像原始套接字需要root),它是许多初学者编写扫描器的首选方式。

3.1.3 RST与FIN报文在端口关闭判定中的意义

在三次握手过程中,若目标端口未运行服务,操作系统内核会在收到SYN包后立即返回一个带有 RST 标志的TCP报文,表示“连接重置”。这是判断端口是否关闭的重要依据。

RST报文特征与作用
  • 当目标端口无监听进程时,内核生成RST响应;
  • RST报文不需要确认,接收方应立即终止连接尝试;
  • 在扫描中,收到RST即可确定端口为“closed”状态;
  • 若未收到任何响应(既非SYN-ACK也非RST),则可能是端口被防火墙过滤(filtered)。

相比之下, FIN 报文通常用于正常关闭连接,但在某些扫描技术(如FIN扫描)中也被用来探测端口状态。根据RFC规定,关闭的端口在收到FIN包时应返回RST,而开放端口则忽略该包(因未建立连接)。利用这一点可实施隐蔽扫描,但现代防火墙普遍对此类异常包进行拦截,实用性受限。

下面是一个捕获RST响应的伪代码逻辑框架:

# 伪代码:监听响应包以识别RST
def listen_for_rst(dst_ip, dst_port):
    raw_socket = create_raw_socket()
    send_syn_packet(dst_ip, dst_port)
    while True:
        packet = raw_socket.recvfrom(1024)
        tcp_header = parse_tcp(packet)
        if tcp_header.flags & 0x04:  # RST bit is set
            return "closed"
        elif (tcp_header.flags & 0x12) == 0x12:  # SYN+ACK
            return "open"
        else:
            continue  # ignore other packets
逻辑分析与扩展说明

该逻辑适用于使用 原始套接字(Raw Socket) 构造和监听TCP包的高级扫描场景。其中:

  • flags & 0x04 检查第3位是否为1(对应RST标志);
  • flags & 0x12 == 0x12 检测SYN和ACK同时置位;
  • recvfrom() 接收原始IP/TCP包,需配合BPF过滤器提高效率;
  • 此方式绕过操作系统常规连接管理,属于半开放扫描范畴。

值得注意的是,RST响应的存在与否并非绝对可靠。某些主机配置了“静默丢弃”策略(如iptables DROP规则),不会返回任何响应,造成误判为“filtered”。因此,真实环境中常需结合多种探测手段(如ICMP Ping、UDP探测)综合判断。

3.2 基于完整握手的Connect扫描实现

Connect扫描是最基础、最兼容的TCP端口扫描方式,依赖操作系统提供的 connect() 系统调用完成三次握手全过程。尽管性能较低且易被日志记录,但由于其实现简单、跨平台支持良好,仍是许多自动化工具默认选项。

3.2.1 使用Socket API发起connect()调用的实践步骤

要实现一个基本的Connect扫描器,需遵循以下步骤:

  1. 创建TCP套接字;
  2. 设置合理超时时间;
  3. 调用 connect_ex() 尝试连接指定IP:Port;
  4. 根据返回值判断端口状态;
  5. 关闭连接并继续下一目标。

以下是完整实现示例:

import socket
from concurrent.futures import ThreadPoolExecutor

def scan_port(ip, port, timeout=3):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((ip, port))
    sock.close()
    return port, result == 0  # 返回端口号与是否开放

def connect_scan(target_ip, port_list, max_workers=100):
    open_ports = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(scan_port, target_ip, p) for p in port_list]
        for future in futures:
            port, is_open = future.result()
            if is_open:
                open_ports.append(port)
    return open_ports
代码逻辑逐行分析
代码 功能说明
1–2 import ... 引入必要模块:socket用于网络通信,ThreadPoolExecutor实现并发
4–9 scan_port() 函数 封装单个端口扫描逻辑,返回元组 (port, is_open)
5 socket.socket(...) 创建TCP套接字
6 settimeout(timeout) 防止长时间挂起,影响整体扫描速度
7 connect_ex() 安全版connect,失败时不抛异常,返回错误码
10–17 connect_scan() 函数 主控函数,接受IP、端口列表、线程数,返回开放端口列表
12 ThreadPoolExecutor 使用线程池控制并发量,避免系统崩溃
13 列表推导式提交任务 将所有端口扫描任务异步提交至线程池
14–16 收集结果 遍历future获取结果,筛选开放端口

此实现具备良好的可扩展性,后续可通过配置文件动态加载端口范围、支持CIDR网段遍历等。

3.2.2 超时控制与异常捕获(Connection Refused、Timeout)

在网络扫描中,超时设置至关重要。过短可能导致误判(尤其在高延迟链路),过长则显著降低扫描效率。合理的超时策略应结合网络环境动态调整。

常见异常类型包括:

错误码 含义 对应端口状态
ECONNREFUSED (111) 连接被拒绝 closed
ETIMEDOUT (110) 连接超时 filtered 或 down
EHOSTUNREACH 主机不可达 host down
ENETUNREACH 网络不可达 network issue

Python中可通过捕获 OSError 及其子类进行精细化处理:

import errno

def scan_with_detailed_error(ip, port, timeout=5):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        result = sock.connect_ex((ip, port))
        if result == 0:
            status = 'open'
        elif result == errno.ECONNREFUSED:
            status = 'closed'
        else:
            status = 'filtered'
    except socket.timeout:
        status = 'filtered'
    except OSError as e:
        if e.errno == errno.EHOSTUNREACH:
            status = 'host unreachable'
        else:
            status = 'error'
    finally:
        sock.close()
    return port, status

该版本增强了错误分类能力,便于后期生成详细报告。

3.2.3 开放端口与关闭端口的判别逻辑编码实现

最终判别逻辑应综合响应类型、超时、错误码等因素。下表总结了典型情况:

响应情况 判定结果 可靠性
成功连接(connect返回0) open
收到RST包 closed
超时无响应 filtered 中(可能丢包)
ICMP Destination Unreachable filtered/host down
收到SYN-ACK但未完成握手 open(需进一步确认)

实际应用中建议采用“多轮验证”机制,尤其是对关键端口(如22、3389)进行重试,以排除瞬时网络抖动影响。

3.3 半开放扫描(SYN Scan)的高效替代方案

相较于Connect扫描,SYN扫描(又称“半开放扫描”)通过手动构造TCP SYN包并在收到SYN-ACK后主动中断连接(不发送最后的ACK),从而避免在目标主机上建立完整连接。这种方式不仅能加快扫描速度,还能有效规避部分日志记录,提升隐蔽性。

3.3.1 原始套接字(Raw Socket)构造TCP SYN包的方法

SYN扫描依赖原始套接字(Raw Socket)权限,允许用户自行封装IP和TCP头部。在Linux系统中需以root身份运行。

构造SYN包的关键步骤如下:

  1. 创建原始套接字: socket(AF_INET, SOCK_RAW, IPPROTO_TCP)
  2. 构建IP头部(可选,若由内核填充则省略)
  3. 构建TCP头部,设置 SYN=1 , ACK=0 , 随机源端口
  4. 计算校验和(TCP checksum)
  5. 发送数据包至目标IP:Port
import socket
import struct

def create_tcp_syn_packet(src_ip, dst_ip, src_port, dst_port):
    # TCP Header fields
    offset_reserv = 5 << 4  # Data offset: 5 * 4 = 20 bytes
    flags = 0x02  # SYN flag
    window = socket.htons(5840)
    checksum = 0
    urg_ptr = 0

    # Build TCP header without checksum
    tcp_header = struct.pack('!HHLLBBHHH',
                             src_port, dst_port,
                             0, 0,  # seq, ack
                             offset_reserv, flags,
                             window, checksum, urg_ptr)

    # Pseudo-header for checksum calculation
    pseudo_header = struct.pack('!4s4sBBH',
                                socket.inet_aton(src_ip),
                                socket.inet_aton(dst_ip),
                                0, socket.IPPROTO_TCP, len(tcp_header))

    # Compute TCP checksum (simplified)
    # 实际项目中需完整实现checksum算法
    checksum = calculate_checksum(pseudo_header + tcp_header)

    # Re-pack with correct checksum
    tcp_header = struct.pack('!HHLLBBH',
                             src_port, dst_port,
                             0, 0,
                             offset_reserv, flags,
                             window) + struct.pack('H', checksum) + struct.pack('H', urg_ptr)

    return tcp_header
参数说明与结构解析
字段 Python打包格式 含义
!HHLLBBHHH 大端序双字节、四字节等 定义TCP头部各字段长度与顺序
offset_reserv 首部长度(单位:32位字) 通常为5(20字节)
flags=0x02 二进制 000010 仅SYN置位
checksum 校验和字段 必须正确计算,否则包被丢弃

⚠️ 注意:完整实现需补充IP头部构造与校验和计算函数。此处仅为示意。

3.3.2 监听返回的SYN-ACK或RST响应包以判断端口状态

发送SYN包后,需监听网络接口接收响应。可使用 sniff() (Scapy)或原始套接字循环接收:

# 使用Scapy示例(简化版)
from scapy.all import *

def syn_scan_scapy(target_ip, port):
    response = sr1(IP(dst=target_ip)/TCP(dport=port, flags='S'), timeout=2, verbose=0)
    if response is None:
        return 'filtered'
    elif response.haslayer(TCP):
        if response[TCP].flags == 0x12:  # SYN+ACK
            send_reset(target_ip, port)
            return 'open'
        elif response[TCP].flags == 0x14:  # RST+ACK
            return 'closed'
    return 'unknown'

def send_reset(target_ip, port):
    send(IP(dst=target_ip)/TCP(dport=port, flags='R'), verbose=0)

该方法利用Scapy库自动处理封包与解析,极大简化开发难度。 sr1() 发送并等待第一个响应, send_reset() 发送RST中断连接,避免占用目标资源。

3.3.3 避免完成三次握手带来的日志记录优势与权限要求

SYN扫描的核心优势在于“不完成握手”,因此目标系统不会将此次连接记录到应用程序日志(如Apache、SSH daemon)。然而,仍可能被内核级防火墙(如iptables LOG规则)或IDS捕获。

其局限性包括:

优点 缺点
高效、快速 需要root权限
较低日志痕迹 易被状态防火墙拦截
可精确判断端口状态 不适用于UDP或其他协议

尽管如此,SYN扫描仍是专业渗透测试中最常用的TCP扫描方式之一,尤其适合大规模资产探测与红队行动前期侦察。

graph TD
    A[Start SYN Scan] --> B[Send SYN to Target:Port]
    B --> C{Receive Response?}
    C -->|Yes| D[Check Flags]
    D --> E{Flags == SYN-ACK?}
    E -->|Yes| F[Mark as Open; Send RST]
    E -->|No| G{Flags == RST?}
    G -->|Yes| H[Mark as Closed]
    G -->|No| I[Unknown]
    C -->|No| J[Mark as Filtered]

该流程图清晰展示了SYN扫描的决策路径,体现了其高效性和状态驱动特性。

4. UDP与ICMP扫描机制解析

在现代网络环境中,仅依赖TCP端口扫描已无法全面掌握目标系统的开放服务情况。大量关键协议如DNS(53)、SNMP(161/162)、DHCP(67/68)和NTP(123)均基于UDP传输,而传统连接型探测手段对这些无连接协议无效。此外,在防火墙策略严格或主机禁用ICMP Echo响应的场景下,常规Ping检测可能误判主机离线状态,导致资产遗漏。因此,深入理解UDP与ICMP两类非典型扫描机制,不仅是技术完整性要求,更是提升扫描覆盖率与准确性的核心路径。

本章将系统剖析UDP协议因缺乏确认机制带来的扫描挑战,揭示如何通过监听ICMP错误报文实现间接端口状态推断,并结合原始套接字编程构建实际探测逻辑。同时,详细拆解ICMP Echo请求的构造与响应捕获流程,说明其在主机发现阶段的关键作用。最终引入混合扫描策略的设计思想,融合多层协议探测能力,形成从“存活判定”到“服务识别”的完整闭环,为后续自动化资产清点提供可靠输入基础。

4.1 UDP无连接特性对扫描技术的影响

UDP作为传输层无连接协议,其设计哲学强调轻量、高效与低开销,但这也带来了显著的安全探测难题。与TCP不同,UDP不建立会话、不维护状态机、不保证交付,操作系统内核通常不会对发往关闭端口的数据包生成任何响应——这一行为直接导致传统的“有回应即开放”判断逻辑失效。若简单地发送一个UDP数据包并等待应用层回复,则面对沉默的端口时,扫描器无法区分是“服务未运行”还是“网络延迟/丢包”。这种不确定性迫使安全工具必须借助网络层反馈机制进行反向推理。

解决该问题的核心思路在于利用ICMP协议的错误报告功能。根据RFC 792规定,当目标主机收到一个目的端口无对应服务监听的UDP数据包时,应返回类型为3(Destination Unreachable)、代码为3(Port Unreachable)的ICMP消息。尽管部分现代系统出于安全考虑抑制此类响应(如Windows默认配置),但在多数Linux发行版及网络设备中仍可启用。因此,有效的UDP扫描器需具备双重侦听能力:既发送探测包,又持续监听来自网络层的ICMP差错报文,以此作为端口关闭的证据。

4.1.1 UDP协议不响应关闭端口的默认行为分析

UDP协议本身不对关闭端口做出响应,这是其“无连接”特性的直接体现。当用户进程向某个本地未绑定的UDP端口发送数据时,内核会在收到后尝试交付给对应端口的套接字。若无进程监听,大多数UNIX-like系统会触发一个ICMP Destination Unreachable (Type 3, Code 3) 报文回传给源地址。然而,这一行为并非强制标准,而是依赖于操作系统实现和防火墙策略配置。

例如,Linux系统默认允许发送Port Unreachable消息,但可通过 sysctl -w net.ipv4.icmp_ignore_bogus_error_responses=1 或iptables规则屏蔽;而某些嵌入式设备或云主机可能完全禁用ICMP响应以减少攻击面。这意味着扫描器不能假设所有关闭端口都会产生ICMP反馈,从而导致“无响应”成为三种状态的叠加结果:
- 端口开放且服务静默;
- 端口关闭但ICMP被抑制;
- 网络路径中存在丢包或过滤。

为应对这种模糊性,实践中常采用“启发式探测”策略:向常见UDP服务端口发送特定格式的探测载荷(如DNS查询、SNMP GetRequest),期望获得可识别的应用层响应。若长时间未收到任何ICMP错误或应用层回应,则标记为“filtered”而非“closed”,保留进一步人工验证空间。

操作系统 默认是否返回 ICMP Port Unreachable 可控性
Linux Kernel (主流发行版) 高(可通过 sysctl 控制)
Windows Server 否(默认禁用) 中等(需修改注册表或防火墙规则)
Cisco IOS 设备 视型号和IOS版本而定 低(依赖全局ICMP设置)
OpenWRT 路由器 高(支持自定义netfilter规则)

表:常见操作系统对UDP关闭端口ICMP响应的行为差异

上述差异凸显了跨平台扫描中必须引入动态适应机制的重要性。myScanner工具在实现UDP扫描模块时,应内置一份 ICMP响应可信度评估表 ,结合目标IP所属网段的历史响应模式调整超时阈值与重试次数,避免因单一策略造成大规模漏报。

graph TD
    A[开始UDP扫描] --> B{目标端口是否存在?}
    B -->|是| C[发送UDP探测包]
    C --> D{是否收到应用层响应?}
    D -->|是| E[标记为 OPEN + 服务识别]
    D -->|否| F{是否收到ICMP Port Unreachable?}
    F -->|是| G[标记为 CLOSED]
    F -->|否| H[标记为 FILTERED/UNKNOWN]
    H --> I[记录上下文用于后期分析]

图:UDP端口状态判定决策流程图

该流程体现了从主动探测到被动监听的协同工作模式。只有当两种通道均无反馈时,才进入不确定状态处理分支,确保结论尽可能严谨。

4.1.2 利用ICMP Port Unreachable消息反向推断端口状态

ICMP协议在网络诊断中扮演着“信使”角色,尤其在错误通知方面具有不可替代的作用。对于UDP扫描而言, Type 3 Code 3 的Port Unreachable报文是最具价值的信息源之一。当目标主机内核检测到UDP数据报送达至无人监听的端口时,理论上应构造如下ICMP响应:

IP Header:
  Source: Target IP
  Dest:   Scanner IP
ICMP Header:
  Type: 3 (Destination Unreachable)
  Code: 3 (Port Unreachable)
  Checksum: ...
  Original IP Header + First 8 bytes of UDP payload

其中,“Original IP Header + First 8 bytes”字段至关重要,它包含了原始UDP包的源/目的IP与端口号,使得扫描器能够精确匹配到发起的具体探测任务。myScanner需在后台启动独立的 ICMP监听线程 ,绑定原始套接字(Raw Socket)至 IPPROTO_ICMP 协议号,持续读取并解析入站ICMP包。

以下为Python中创建ICMP监听器的核心代码示例:

import socket
import struct
import threading
from datetime import datetime

def icmp_listener(udp_probes_map):
    """
    监听ICMP Port Unreachable报文,更新探测状态
    udp_probes_map: 共享字典,记录待确认的UDP探测 { (dst_ip, dst_port): timestamp }
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    sock.settimeout(2)

    while True:
        try:
            packet, addr = sock.recvfrom(1024)
            # 解析IP头部获取总长度和IHL
            ip_header = packet[:20]
            _, total_len, _, _, ihl, _, _, _, _, _ = struct.unpack('!BBHHHBBHII', ip_header)
            ip_ihl = (ihl & 0xF) * 4

            # 提取ICMP头部
            icmp_start = ip_ihl
            icmp_type, icmp_code = struct.unpack('!BB', packet[icmp_start:icmp_start+2])

            if icmp_type == 3 and icmp_code == 3:  # Port Unreachable
                # 提取嵌入的原始UDP信息
                inner_ip_start = icmp_start + 8
                inner_ip_header = packet[inner_ip_start:inner_ip_start+20]
                src_ip, dst_ip = struct.unpack('!II', inner_ip_header[12:20])
                src_ip = socket.inet_ntoa(struct.pack('!I', src_ip))
                dst_ip = socket.inet_ntoa(struct.pack('!I', dst_ip))

                udp_header = packet[inner_ip_start+20:inner_ip_start+28]
                src_port, dst_port = struct.unpack('!HH', udp_header[:4])

                target_key = (dst_ip, dst_port)
                if target_key in udp_probes_map:
                    print(f"[+] ICMP Unreachable received: {dst_ip}:{dst_port} is CLOSED")
                    del udp_probes_map[target_key]  # 移除待确认条目
        except socket.timeout:
            continue
        except Exception as e:
            print(f"[-] ICMP listener error: {e}")
            break

    sock.close()
代码逻辑逐行解读与参数说明
  • socket.socket(AF_INET, SOCK_RAW, IPPROTO_ICMP) :创建原始套接字,直接接收ICMP报文。需要管理员权限(root或sudo)。
  • sock.settimeout(2) :设置非阻塞式接收,防止主线程卡死。
  • recvfrom(1024) :获取原始IP+ICMP复合包,最大缓冲1024字节。
  • struct.unpack('!BBHHHBBHII', ip_header) :按大端序解析IPv4头部字段,包括版本、服务类型、总长、标识、TTL等。
  • ihl & 0xF :提取IP首部长度(IHL),单位为4字节,计算偏移量进入ICMP区。
  • icmp_type == 3 and icmp_code == 3 :精准匹配Port Unreachable事件。
  • inner_ip_header udp_header :从ICMP负载中提取原始UDP包元数据,用于反向查找探测任务。
  • udp_probes_map :共享哈希表,记录已发出但尚未确认的UDP探测,供ICMP线程回调清除。

该机制实现了异步状态更新,极大提升了扫描效率与准确性。值得注意的是,由于ICMP报文可能延迟到达甚至丢失,扫描主控逻辑仍需设定合理的等待窗口(如3秒),并在超时后归类为“filtered”。

4.1.3 发送探测数据包并等待ICMP错误响应的编程实现

完整的UDP扫描流程包含三个关键组件:探测发射器、ICMP监听器、状态协调器。以下展示myScanner中整合三者的高层调用逻辑:

from concurrent.futures import ThreadPoolExecutor
import time

def send_udp_probe(target_ip, port, probe_data=b'\x00'):
    """
    发送UDP探测包
    :param target_ip: 目标IP
    :param port: 目标端口
    :param probe_data: 探测载荷(如DNS查询)
    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(probe_data, (target_ip, port))
        sock.close()
        return True
    except Exception as e:
        print(f"Failed to send UDP to {target_ip}:{port} - {e}")
        return False

def perform_udp_scan(target_ip, port_list, timeout=3):
    active_open_ports = {}
    pending_probes = {}

    # 启动ICMP监听线程
    icmp_thread = threading.Thread(
        target=icmp_listener,
        args=(pending_probes,),
        daemon=True
    )
    icmp_thread.start()

    # 批量发送UDP探测
    with ThreadPoolExecutor(max_workers=50) as executor:
        futures = []
        for port in port_list:
            pending_probes[(target_ip, port)] = time.time()
            futures.append(executor.submit(send_udp_probe, target_ip, port))

        # 等待完成发送
        for f in futures:
            f.result()

    # 继续监听ICMP直到超时
    start_wait = time.time()
    while time.time() - start_wait < timeout:
        if not pending_probes:
            break
        time.sleep(0.1)

    # 剩余未响应的标记为 filtered
    for (ip, port), ts in pending_probes.items():
        print(f"[?] No response from {ip}:{port}, marked as FILTERED")

执行流程说明
  1. 创建守护线程运行 icmp_listener ,实时捕获ICMP错误。
  2. 使用线程池并发发送UDP探测,提高扫描速度。
  3. 将每个探测记录加入 pending_probes 映射表,供ICMP线程删除确认项。
  4. 发送完毕后进入等待期,期间ICMP线程不断清理已确认的关闭端口。
  5. 超时后剩余条目视为无响应,分类为“filtered”。

此设计平衡了性能与精度,适用于中等规模网络扫描任务。生产级部署还可引入更复杂的拥塞控制算法,动态调节并发数以适应网络质量波动。

4.2 ICMP扫描用于主机存活探测

在开展端口扫描之前,首要任务是确认目标主机是否在线。盲目对大量IP执行全端口扫描不仅效率低下,还易触发IDS告警。ICMP Ping Sweep作为一种轻量级主机发现技术,通过发送Echo Request报文并等待Reply响应,快速筛选出活跃节点,构成扫描预处理的关键环节。

尽管许多企业边界防火墙会过滤外部ICMP流量,但在内部网络或云VPC环境中,ICMP通常保持开放。此外,即使目标主机禁用了Echo响应,也可能暴露ARP、TCP SYN响应等其他存活迹象。因此,ICMP扫描虽非万能,却是最广泛兼容的初步探测方式。

4.2.1 Echo Request/Reply机制在Ping Sweep中的应用

ICMP Echo协议定义于RFC 792,其基本交互模型如下:

  • 源主机构造一个类型为8(Echo Request)、代码为0的ICMP报文;
  • 目标主机若在线且未屏蔽ICMP,应回复类型为0(Echo Reply)、代码为0的响应;
  • 响应中携带与请求相同的标识符(Identifier)与序列号(Sequence Number),便于匹配。

这一机制天然适合批量探测。myScanner可在单个进程中循环发送多个Echo请求,分别设置不同的ID或序列号,再通过监听Socket捕获返回包进行关联分析。

4.2.2 构造ICMP Echo报文并通过原始套接字发送

以下是使用Python原始套接字手动构造ICMP Echo请求的完整实现:

import os
import socket
import struct
import time
from checksum import calculate_checksum

def create_icmp_packet(packet_id, seq_num):
    """构建ICMP Echo Request包"""
    icmp_type = 8      # Echo Request
    icmp_code = 0
    icmp_checksum = 0
    icmp_id = packet_id
    icmp_seq = seq_num
    payload = b'Hello Network Scanner'

    header = struct.pack('!BBHHH', icmp_type, icmp_code, icmp_checksum, icmp_id, icmp_seq)
    packet = header + payload
    icmp_checksum = calculate_checksum(packet)
    header = struct.pack('!BBHHH', icmp_type, icmp_code, icmp_checksum, icmp_id, icmp_seq)
    return header + payload

def ping_host(target_ip, timeout=2):
    """发送ICMP Echo请求并等待响应"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
        sock.settimeout(timeout)

        packet_id = os.getpid() & 0xFFFF
        packet = create_icmp_packet(packet_id, 1)

        send_time = time.time()
        sock.sendto(packet, (target_ip, 1))  # 第二参数任意

        try:
            data, addr = sock.recvfrom(1024)
            recv_time = time.time()
            rtt_ms = (recv_time - send_time) * 1000
            print(f"[+] {target_ip} is UP (RTT={rtt_ms:.2f}ms)")
            return True
        except socket.timeout:
            print(f"[-] {target_ip} is DOWN or filtered")
            return False
    except PermissionError:
        print("[-] Raw socket access denied. Run as root/sudo.")
        return False
    finally:
        sock.close()
辅助函数:校验和计算
def calculate_checksum(data):
    """计算ICMP校验和"""
    if len(data) % 2 != 0:
        data += b'\x00'
    checksum = 0
    for i in range(0, len(data), 2):
        word = (data[i] << 8) + data[i+1]
        checksum += word
    checksum = (checksum >> 16) + (checksum & 0xFFFF)
    checksum += (checksum >> 16)
    return ~checksum & 0xFFFF
代码逻辑逐行解读
  • SOCK_RAW + IPPROTO_ICMP :绕过传输层,直接构造网络层报文。
  • struct.pack('!BBHHH') :按网络字节序打包ICMP头部,注意校验和初始为0。
  • calculate_checksum() :遵循RFC 1071算法,两字节求和后取反。
  • os.getpid() :用进程ID作为唯一标识符,避免冲突。
  • sendto(..., (ip, 1)) :端口号占位,实际不使用。
  • recvfrom() :接收任何ICMP包,需进一步验证类型和ID。

该实现展示了底层协议封装能力,是构建高性能扫描器的基础技能。

4.2.3 超时重传机制与往返时间(RTT)统计

为增强鲁棒性,真实扫描器应支持多次重试与RTT平均值计算。改进后的逻辑如下表所示:

参数 说明 推荐值
Timeout 单次等待响应时间 1~3秒
Retries 最大重试次数 2~3次
Interval 重试间隔 500ms
TTL 设置IP包生存时间 64或128

通过统计多轮RTT可评估网络稳定性,辅助判断链路质量。例如,持续高延迟可能暗示中间存在NAT或代理设备。

4.3 混合扫描策略提升探测准确性

单一协议扫描存在局限,综合运用TCP SYN、UDP探测与ICMP Ping才能逼近真实网络视图。理想的工作流应遵循“先探活、再分类、最后详扫”的原则。

4.3.1 先Ping后扫的预筛选流程设计

sequenceDiagram
    participant Scanner
    participant Network
    participant Host

    Scanner->>Network: Send ICMP Echo
    alt Responds
        Network-->>Scanner: Echo Reply
        Scanner->>Scanner: Mark Alive → Proceed Scan
    else No Response
        Scanner->>Network: Send TCP SYN to Port 80/443
        alt SYN-ACK
            Network-->>Scanner: Service Active
            Scanner->>Scanner: Mark Alive
        else RST/No Response
            Scanner->>Scanner: Mark Dead/Silent
        end
    end

该流程优先使用ICMP,失败后尝试高频开放端口SYN探测,兼顾效率与覆盖。

4.3.2 对无响应主机实施ARP探测(局域网内)

在局域网中,ARP广播可绕过防火墙限制:

from scapy.all import arping
def arp_sweep(network="192.168.1.0/24"):
    answered, _ = arping(network, verbose=0)
    for snd, rcv in answered:
        print(f"Host: {rcv.psrc} MAC: {rcv.hwsrc}")

4.3.3 综合TCP、UDP、ICMP结果生成统一资产视图

最终输出结构建议如下JSON格式:

{
  "target": "192.168.1.10",
  "alive": true,
  "scan_time": "2025-04-05T10:00:00Z",
  "tcp_open": [22, 80, 443],
  "udp_filtered": [53, 123],
  "rtt_avg_ms": 12.4,
  "mac_address": "aa:bb:cc:dd:ee:ff"
}

该模型支持跨协议聚合,为后续风险分析提供结构化输入。

5. 自定义IP地址范围扫描实现

5.1 IP地址段解析与遍历算法

在实际的网络资产探测任务中,用户往往需要针对特定子网或多个离散IP地址进行扫描。因此,myScanner工具必须具备灵活解析各类IP输入格式的能力,并将其标准化为可遍历的IP列表。

5.1.1 CIDR表示法解析(如192.168.1.0/24)

CIDR(Classless Inter-Domain Routing)表示法是现代网络中最常用的子网描述方式。例如 192.168.1.0/24 表示从 192.168.1.1 192.168.1.254 共254个可用主机地址。Python中可通过 ipaddress 模块高效解析:

import ipaddress

def parse_cidr(cidr_str):
    try:
        network = ipaddress.IPv4Network(cidr_str, strict=False)
        return [str(ip) for ip in network.hosts()]
    except ValueError as e:
        print(f"[!] 无效CIDR格式: {cidr_str}, 错误: {e}")
        return []

该函数将自动排除网络地址和广播地址,仅返回有效主机IP。

5.1.2 字符串IP到整型转换及循环生成逻辑

为了高效处理和比较IP地址,通常将点分十进制字符串转为32位无符号整数:

def ip_to_int(ip):
    parts = list(map(int, ip.split('.')))
    return (parts[0] << 24) + (parts[1] << 16) + (parts[2] << 8) + parts[3]

def int_to_ip(n):
    return f"{(n >> 24) & 0xFF}.{(n >> 16) & 0xFF}.{(n >> 8) & 0xFF}.{n & 0xFF}"

利用此转换机制,可在指定范围内快速生成所有IP:

start = ip_to_int("192.168.1.1")
end = ip_to_int("192.168.1.100")
ips = [int_to_ip(i) for i in range(start, end+1)]

5.1.3 支持手动输入逗号分隔或短横线连续范围

用户常使用混合格式输入目标,如:

192.168.1.1, 192.168.1.5-192.168.1.10, 10.0.0.1/24

需编写统一解析器:

import re

def parse_ip_input(input_str):
    all_ips = set()
    segments = [seg.strip() for seg in input_str.split(',')]
    for seg in segments:
        if '/' in seg:
            all_ips.update(parse_cidr(seg))
        elif '-' in seg:
            start_ip, end_ip = seg.split('-')
            start_int = ip_to_int(start_ip.strip())
            end_int = ip_to_int(end_ip.strip())
            for i in range(start_int, end_int + 1):
                all_ips.add(int_to_ip(i))
        else:
            all_ips.add(seg)
    return sorted(all_ips)

测试数据如下表所示:

输入格式 示例 解析结果数量
单IP 192.168.1.1 1
连续范围 192.168.1.1-192.168.1.5 5
CIDR/24 192.168.1.0/24 254
CIDR/30 10.0.0.0/30 2
混合输入 1.1.1.1,2.2.2.2-2.2.2.4 4

正则表达式辅助验证IP合法性:

IP_PATTERN = re.compile(r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$")

5.2 多线程并发扫描性能优化

面对大规模IP段扫描,串行执行效率极低。引入多线程并发模型可显著提升吞吐量。

5.2.1 线程池(ThreadPoolExecutor)管理扫描任务分配

使用 concurrent.futures 实现任务调度:

from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_target(ip_port_tuple):
    ip, port = ip_port_tuple
    # 调用具体扫描方法,返回开放状态
    is_open = tcp_connect_scan(ip, port, timeout=2)
    return {"ip": ip, "port": port, "status": "open" if is_open else "closed"}

def run_concurrent_scan(targets, max_workers=100):
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_task = {executor.submit(scan_target, t): t for t in targets}
        for future in as_completed(future_to_task):
            try:
                result = future.result()
                results.append(result)
            except Exception as exc:
                task = future_to_task[future]
                print(f"[ERROR] 扫描 {task} 异常: {exc}")
    return results

5.2.2 并发数控制避免网络拥塞与系统资源耗尽

合理设置并发线程数至关重要。过高会导致本地端口耗尽、防火墙触发限流;过低则无法发挥带宽潜力。建议策略:

  • 局域网内:200–500 线程
  • 跨公网:50–100 线程
  • 可通过命令行参数动态调整
# 根据目标规模自适应调整
def auto_concurrency(target_count):
    if target_count < 100:
        return 50
    elif target_count < 1000:
        return 100
    else:
        return 200

5.2.3 共享结果队列与线程安全的数据收集机制

使用线程安全队列收集结果,防止竞态条件:

from queue import Queue
import threading

result_queue = Queue()
lock = threading.Lock()

def worker(ip_port):
    # ...扫描逻辑...
    with lock:
        result_queue.put({"ip": ip, "port": port, "service": svc})

最终汇总:

all_results = []
while not result_queue.empty():
    all_results.append(result_queue.get())

5.3 开放端口识别与服务指纹匹配

发现开放端口后,进一步识别其承载服务类型对风险评估至关重要。

5.3.1 常见端口与服务映射表(如22→SSH,80→HTTP)

内置知名端口对照表:

COMMON_SERVICES = {
    21: "FTP",
    22: "SSH",
    23: "Telnet",
    25: "SMTP",
    53: "DNS",
    80: "HTTP",
    110: "POP3",
    143: "IMAP",
    443: "HTTPS",
    3306: "MySQL",
    6379: "Redis",
    27017: "MongoDB"
}

5.3.2 简易Banner抓取与协议识别(发送探针获取响应头)

对开放端口建立连接并读取初始响应:

def grab_banner(ip, port, timeout=3):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        sock.connect((ip, port))
        sock.send(b"GET / HTTP/1.0\r\n\r\n")  # 对HTTP适用
        banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
        sock.close()
        return banner[:200]  # 截断长响应
    except:
        return None

常见响应特征:
- SSH:以 SSH- 开头
- HTTP:包含 HTTP/ 和状态码
- FTP:以三位数字开头如 220 Ready

5.3.3 服务版本初步推断与风险提示(如Redis未授权访问)

结合端口和服务响应进行风险判断:

def assess_risk(port, banner):
    service = COMMON_SERVICES.get(port, "Unknown")
    risks = []
    if port == 6379 and ("redis" in banner.lower() or banner is None):
        risks.append("⚠️ Redis可能未授权访问")
    if "Apache" in banner and "2.4.49" in banner:
        risks.append("🚨 存在CVE-2021-41773路径穿越漏洞")
    return service, risks

5.4 扫描结果可视化与日志留存

5.4.1 控制台彩色输出与表格化展示(tabulate库应用)

使用 tabulate 美化输出:

from tabulate import tabulate

table_data = [
    ["192.168.1.10", 22, "SSH", "Open"],
    ["192.168.1.10", 80, "HTTP", "Open"],
    ["10.0.0.5", 3306, "MySQL", "Open"]
]

print(tabulate(table_data, 
               headers=["IP", "Port", "Service", "Status"], 
               tablefmt="grid"))

输出效果:

+---------------+------+----------+--------+
| IP            | Port | Service  | Status |
+===============+======+==========+========+
| 192.168.1.10  |   22 | SSH      | Open   |
+---------------+------+----------+--------+
| 192.168.1.10  |   80 | HTTP     | Open   |
+---------------+------+----------+--------+

支持ANSI颜色标记高危项:

from colorama import Fore, Style

print(Fore.RED + "⚠️ 高危端口开放:" + Style.RESET_ALL + "6379/Redis")

5.4.2 导出HTML报告包含拓扑简图与高危端口标记

生成静态HTML报告便于分享:

def export_html_report(results, filename="scan_report.html"):
    html = """
    <html><head><title>myScanner 报告</title></head><body>
    <h1>端口扫描结果</h1>
    <table border='1'>...</table>
    <script src="https://d3js.org/d3.v7.min.js"></script>
    <div id="topo"></div>
    <script>/* D3.js绘制简单拓扑 */</script>
    </body></html>
    """
    with open(filename, 'w') as f:
        f.write(html)

5.4.3 安全日志记录扫描时间、源IP、目标范围与操作人信息

遵循审计要求,记录关键元数据:

{
  "scan_id": "scan_20241015_001",
  "start_time": "2024-10-15T09:12:33Z",
  "end_time": "2024-10-15T09:25:11Z",
  "scanner_ip": "192.168.1.100",
  "target_range": "192.168.1.0/24",
  "scanned_hosts": 254,
  "open_ports_found": 47,
  "operator": "admin@company.com",
  "command_line": "myscanner -t 192.168.1.0/24 -p 22,80,443 --tcp"
}

mermaid格式拓扑示意:

graph TD
    A[扫描发起者] --> B{目标网络}
    B --> C[192.168.1.10:22 SSH]
    B --> D[192.168.1.10:80 HTTP]
    B --> E[192.168.1.20:3306 MySQL]
    C --> F[检测到弱密码策略]
    D --> G[运行Apache 2.4.49]
    E --> H[暴露于公网]

本文还有配套的精品资源,点击获取 menu-r.4af5f7ec.gif

简介:myScanner是一款专为局域网设计的IP端口扫描工具,可用于检测指定IP范围内设备的开放端口,帮助网络管理员监控网络状态、识别运行服务及排查安全隐患。本文详细介绍了该工具的工作原理、核心功能(如自定义IP段、多线程扫描、端口过滤、结果可视化与日志记录)及其基于TCP/UDP协议的扫描机制。同时强调了合法合规使用的重要性,避免未经授权扫描带来的法律与网络性能风险。通过本工具的实践应用,可有效提升局域网的安全性与运维效率。


本文还有配套的精品资源,点击获取
menu-r.4af5f7ec.gif

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐