Scapy 是 Python 中最强大的网络数据包操作库,可用于构造、发送、捕获、解析、修改几乎所有网络协议数据包,广泛用于网络测试、安全审计、协议开发与渗透测试。


一、安装与环境准备

1. 安装 Scapy
# 基础安装
pip install scapy

# 查看版本
python -c "import scapy; print(scapy.__version__)"
2. Windows 额外依赖

Windows 需安装 Npcap(兼容 WinPcap)才能正常抓包/发包:

  • 下载:https://npcap.com/
  • 安装时勾选 Install Npcap in WinPcap API-compatible Mode
3. Linux/macOS 权限

抓包/发包通常需要管理员权限:

# Linux/macOS
sudo python your_script.py
# 或直接进入 Scapy 交互
sudo scapy

二、核心概念:数据包分层与构造

Scapy 用 / 运算符 拼接协议层(从链路层到应用层),非常直观。

1. 常用协议层
  • 链路层:Ether()(以太网)、ARP()
  • 网络层:IP()IPv6()ICMP()
  • 传输层:TCP()UDP()
  • 应用层:DNS()HTTP()、Raw 数据
2. 构造示例
from scapy.all import *

# 1. 简单 ICMP (Ping)
pkt1 = IP(dst="8.8.8.8") / ICMP()

# 2. 以太网 + IP + TCP + 数据
pkt2 = Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S") / b"GET /"

# 3. ARP 请求
pkt3 = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst="192.168.1.0/24")
3. 查看/修改字段
# 查看协议字段(如 TCP)
ls(TCP)

# 修改字段
pkt = IP() / TCP()
pkt[IP].dst = "10.0.0.1"
pkt[TCP].dport = 443
pkt[TCP].flags = "SA"  # SYN-ACK

# 显示数据包详情
pkt.show()

三、发送与接收数据包

1. 仅发送(不等待响应)
# 三层发送(IP 层)
send(IP(dst="8.8.8.8")/ICMP())

# 二层发送(以太网层,需指定网卡)
sendp(Ether()/IP(dst="8.8.8.8")/ICMP(), iface="以太网")
2. 发送并等待响应(最常用)
# sr1: 发送并等待第一个响应(超时 2 秒)
resp = sr1(IP(dst="8.8.8.8")/ICMP(), timeout=2)
if resp:
    print(f"响应来自: {resp[IP].src}")
    resp.show()  # 打印完整响应包
else:
    print("无响应")

# sr: 发送并等待所有响应,返回 (应答包, 未应答包)
ans, unans = sr(IP(dst="192.168.1.1-10")/ICMP(), timeout=1)
for snd, rcv in ans:
    print(f"{snd[IP].dst} 存活")

四、数据包捕获(sniff)

# 1. 基础抓包(过滤 ICMP,抓 5 个包)
def packet_callback(pkt):
    if ICMP in pkt:
        print(f"捕获 ICMP: {pkt[IP].src} -> {pkt[IP].dst}")
        pkt.show()

# 启动抓包
sniff(
    filter="icmp",       # BPF 过滤语法
    prn=packet_callback, # 每个包的回调
    count=5,             # 抓 5 个包后停止
    iface="以太网"       # 指定网卡
)

# 2. 保存抓包到文件
pkts = sniff(filter="tcp port 80", count=10)
wrpcap("http_traffic.pcap", pkts)  # 可在 Wireshark 打开

# 3. 读取 pcap 文件
pkts = rdpcap("http_traffic.pcap")
for pkt in pkts:
    pkt.summary()

五、常用场景示例

1. ARP 扫描(发现局域网存活主机)
from scapy.all import *

def arp_scan(net):
    ans, _ = srp(
        Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=net),
        timeout=2,
        verbose=0
    )
    return [rcv[ARP].psrc for snd, rcv in ans]

# 扫描 192.168.1.0/24
hosts = arp_scan("192.168.1.0/24")
print("存活主机:")
for ip in hosts:
    print(ip)
2. TCP 端口扫描
from scapy.all import *

def port_scan(ip, ports):
    open_ports = []
    for port in ports:
        resp = sr1(
            IP(dst=ip)/TCP(dport=port, flags="S"),
            timeout=0.5,
            verbose=0
        )
        if resp and resp[TCP].flags == "SA":  # SYN-ACK 表示端口开放
            open_ports.append(port)
    return open_ports

# 扫描 80,443,8080 端口
ports = port_scan("192.168.1.1", [80, 443, 8080])
print(f"开放端口: {ports}")
3. 简单流量嗅探与解析
from scapy.all import *

def parse_packet(pkt):
    if IP in pkt:
        src = pkt[IP].src
        dst = pkt[IP].dst
        proto = pkt[IP].proto
        print(f"IP: {src} -> {dst} (协议: {proto})")
    if TCP in pkt:
        sport = pkt[TCP].sport
        dport = pkt[TCP].dport
        flags = pkt[TCP].flags
        print(f"TCP: {sport} -> {dport} 标志: {flags}")

sniff(prn=parse_packet, count=10)

六、关键函数速览

函数 作用
send() 三层发包(不等待响应)
sendp() 二层发包(不等待响应)
sr1() 发包并等待第一个响应
sr() 发包并等待所有响应,返回 (ans, unans)
srp() 二层发包并等待响应
sniff() 捕获数据包
wrpcap() 保存数据包到 pcap 文件
rdpcap() 读取 pcap 文件
ls() 查看协议字段
pkt.show() 打印数据包详情

七、注意事项

  1. 权限:抓包/发包通常需要管理员/root 权限。
  2. 性能:Python 性能有限,高流量场景建议用 C/C++ 工具(如 tcpdump、nmap)。
  3. 合法性:仅在授权网络中使用,遵守当地法律法规,禁止未经许可的扫描/嗅探。
  4. 过滤sniff()BPF 语法过滤,大幅提升效率。

Scapy 源码核心是Packet 基类 + 字段系统 + 分层协议栈 + 发送/捕获后端,以“协议即对象”为设计哲学,实现数据包的构造、解析、发送与捕获。下面从源码结构、核心类、工作流程、关键机制、扩展方式展开解析。


Scapy 源代码解析

一、源码目录结构(核心)

Scapy 源码(GitHub: secdev/scapy)主目录 scapy/ 结构清晰,按功能模块化组织:

scapy/
├── __init__.py          # 入口,版本管理、模块导入
├── arch/                # 系统适配(Linux/macOS/Windows),底层发包/抓包实现
├── layers/              # 协议层实现(Ether/IP/TCP/UDP/DNS/HTTP等)
├── packet/              # 数据包核心:基类、字段、操作符
│   ├── base_classes.py   # Packet/BasePacket 基类定义
│   ├── fields.py         # 各类字段(BitField/ByteField/IPField等)
│   └── operators.py      # / 运算符重载、层绑定逻辑
├── sendrecv.py          # send/sendp/sr/sniff 等发送/捕获函数
├── utils.py             # 工具函数(校验和、时间、字符串处理)
├── config.py            # 配置管理
└── ...

二、核心类与机制解析

1. Packet 基类(scapy/packet/base_classes.py)

Packet 是所有协议层的父类,是 Scapy 架构的基石。

核心属性
  • fields_desc协议字段定义列表,每个元素是 Field 子类实例,描述字段名、默认值、长度、编码规则。
    class IP(Packet):
        name = "IP"
        fields_desc = [
            BitField("version", 4, 4),    # 版本(4位,默认4)
            BitField("ihl", None, 4),     # 首部长度(4位,自动计算)
            ByteField("tos", 0),          # 服务类型
            ShortField("len", None),      # 总长度(自动计算)
            ShortField("id", 1),          # 标识
            FlagsField("flags", 0, 3, ["MF", "DF", "evil"]),  # 标志位
            BitField("frag", 0, 13),      # 片偏移
            ByteField("ttl", 64),         # 生存时间
            ByteField("proto", 6),        # 协议(TCP=6)
            XShortField("chksum", None),  # 校验和(自动计算)
            SourceIPField("src", "0.0.0.0"),  # 源IP
            DestIPField("dst", "0.0.0.0"),   # 目的IP
        ]
    
  • payload下一层数据包对象,形成链式结构(Ether/IP/TCP/…)。
  • underlayer上一层数据包对象,反向关联。
  • name:协议名称(如 “IP”)。
核心方法
  • __init__:初始化字段值,处理默认值与动态计算(如 ihl/len/chksum)。
  • build():递归构建字节流,调用各层 do_build(),自动计算校验和、长度等。
  • do_build():按 fields_desc 编码字段为二进制,处理字段依赖。
  • dissect():从字节流解析出 Packet 对象,按协议栈逐层解包。
  • __div__/ 运算符):重载 /,实现层组合(IP()/TCP()),绑定 payload 关系。

2. Field 字段系统(scapy/packet/fields.py)

Field 是数据包字段的抽象,负责值 ↔ 二进制的编解码,是 Scapy 灵活解析的核心。

常用 Field 类型
字段类 用途 示例
BitField 位字段 BitField(“version”, 4, 4)
ByteField 单字节 ByteField(“ttl”, 64)
ShortField 双字节(大端) ShortField(“id”, 1)
IntField 四字节 IntField(“seq”, 0)
IPField IP地址 IPField(“src”, “192.168.1.1”)
MACField MAC地址 MACField(“dst”, “ff:ff:ff:ff:ff:ff”)
StrField 字符串 StrField(“data”, “”)
FlagsField 标志位 FlagsField(“flags”, 0, 3, [“MF”,“DF”])
核心机制
  • 每个 Field 实现 addfield()(编码)与 getfield()(解码)。
  • 支持动态值计算:通过 lambda 或方法实现依赖字段(如 IP 校验和依赖整个 IP 头)。
  • 自动处理字节序(大端/小端)、填充、对齐。

3. 元类与协议注册(Packet_metaclass)

Packet 使用 Packet_metaclass 元类,实现协议自动注册层绑定

  • 当定义新协议(如 class TCP(Packet)),元类自动将其注册到全局协议表。
  • 维护 payload_guess 映射:根据上层协议类型(如 IP.proto=6 → TCP),自动识别下一层协议。
  • 支持自定义协议的无缝集成。

4. 发送与捕获(sendrecv.py)

发送流程
  • send():三层发送(IP 层),自动路由,调用底层 socket 或 libpcap。
  • sendp():二层发送(链路层),需指定网卡,直接发送以太网帧。
  • sr()/sr1():发送并等待响应,实现请求-应答模式(如 ping)。
  • 底层依赖:arch/ 下系统适配代码(Linux: PF_PACKET;macOS/Windows: libpcap)。
捕获流程
  • sniff():核心抓包函数,参数:
    • iface:网卡(如 “eth0”)。
    • filter:BPF 过滤规则(如 “tcp port 80”)。
    • prn:回调函数,处理每个捕获的包。
    • count:抓包数量。
  • 底层:通过 libpcap 或系统原生抓包接口,获取原始字节流 → 调用 Packet 解析 → 回调处理。

三、核心工作流程(源码视角)

1. 数据包构造(IP/TCP/“data”)

  1. 用户执行 pkt = IP(dst="8.8.8.8")/TCP(dport=80)/"data"
  2. IP() 初始化:__init__ 处理字段,ihl/len/chksum 设为 None(待计算)。
  3. / 运算符:IP.__div__(TCP()),将 TCP() 设为 IP.payload
  4. 同理,"data" 设为 TCP.payload(Raw 层)。
  5. 调用 bytes(pkt) → 触发 pkt.build()
    • 递归调用各层 do_build()
    • 自动计算 IP.ihl(首部长度)、IP.len(总长度)、IP.chksum(校验和)。
    • fields_desc 编码为二进制字节流。

2. 数据包解析(从 pcap 或抓包)

  1. 捕获原始字节流(如 raw_data = b'\x08\x00...')。
  2. 调用 pkt = Ether(raw_data)(或自动识别顶层协议)。
  3. Ether.dissect(raw_data)
    • Ether.fields_desc 解码 dst/src/type
    • 根据 type(如 0x0800 → IP),自动解析下一层 IP
    • 递归解析,直到无法识别(Raw 层)。
  4. 生成链式 Packet 对象:Ether → IP → TCP → Raw

四、关键源码细节

1. 校验和自动计算(utils.py: checksum())

  • IP/TCP/UDP 校验和通过 checksum() 函数计算,在 build() 阶段自动填充。
  • 源码示例:
    # IP 校验和计算(简化)
    def post_build(self, pkt, pay):
        if self.chksum is None:
            chksum = checksum(pkt)
            pkt = pkt[:10] + struct.pack("!H", chksum) + pkt[12:]
        return pkt + pay
    

2. BPF 过滤集成

  • sniff(filter="tcp") 底层调用 libpcap 的 pcap_compile()/pcap_setfilter(),仅捕获匹配的包,提升效率。

3. 层绑定与自动识别

  • 通过 bind_layers() 函数绑定协议:
    bind_layers(IP, TCP, proto=6)  # IP.proto=6 → 下一层是TCP
    
  • 解析时,根据上层字段值自动选择下一层协议类。

五、扩展自定义协议(源码级)

  1. 继承 Packet,定义 fields_desc
    from scapy.packet import Packet
    from scapy.fields import ByteField, ShortField, StrField
    
    class MyProto(Packet):
        name = "MyProto"
        fields_desc = [
            ByteField("magic", 0xAA),
            ShortField("cmd", 0x0001),
            StrField("data", ""),
        ]
    
  2. 绑定到已有协议(如 UDP 端口 9999):
    bind_layers(UDP, MyProto, dport=9999)
    
  3. 直接使用:pkt = IP()/UDP(dport=9999)/MyProto(cmd=0x0002, data="test")

六、总结

Scapy 源码设计的核心优势:

  1. 分层抽象:Packet + Field 完美映射网络协议栈,灵活可扩展。
  2. 元类驱动:自动注册协议、绑定层关系,降低使用门槛。
  3. 动态计算:自动处理校验和、长度等底层细节,专注业务逻辑。
  4. 跨平台arch/ 封装系统差异,一套代码适配多平台。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐