python:Scapy 网络数据包操作库,源代码解析
Scapy 是 Python 中最强大的网络数据包操作库,可用于构造、发送、捕获、解析、修改几乎所有网络协议数据包,广泛用于网络测试、安全审计、协议开发与渗透测试。源代码解析
·
Scapy 是 Python 中最强大的网络数据包操作库,可用于构造、发送、捕获、解析、修改几乎所有网络协议数据包,广泛用于网络测试、安全审计、协议开发与渗透测试。
一、安装与环境准备
1. 安装 Scapy
# 基础安装
pip install scapy
# 查看版本
python -c "import scapy; print(scapy.__version__)"
2. Windows 额外依赖
Windows 需安装 Npcap(兼容 WinPcap)才能正常抓包/发包:
- 下载:https://npcap.com/
- 安装时勾选 Install Npcap in WinPcap API-compatible Mode
3. Linux/macOS 权限
抓包/发包通常需要管理员权限:
# Linux/macOS
sudo python your_script.py
# 或直接进入 Scapy 交互
sudo scapy
二、核心概念:数据包分层与构造
Scapy 用 / 运算符 拼接协议层(从链路层到应用层),非常直观。
1. 常用协议层
- 链路层:
Ether()(以太网)、ARP() - 网络层:
IP()、IPv6()、ICMP() - 传输层:
TCP()、UDP() - 应用层:
DNS()、HTTP()、Raw 数据
2. 构造示例
from scapy.all import *
# 1. 简单 ICMP (Ping)
pkt1 = IP(dst="8.8.8.8") / ICMP()
# 2. 以太网 + IP + TCP + 数据
pkt2 = Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S") / b"GET /"
# 3. ARP 请求
pkt3 = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst="192.168.1.0/24")
3. 查看/修改字段
# 查看协议字段(如 TCP)
ls(TCP)
# 修改字段
pkt = IP() / TCP()
pkt[IP].dst = "10.0.0.1"
pkt[TCP].dport = 443
pkt[TCP].flags = "SA" # SYN-ACK
# 显示数据包详情
pkt.show()
三、发送与接收数据包
1. 仅发送(不等待响应)
# 三层发送(IP 层)
send(IP(dst="8.8.8.8")/ICMP())
# 二层发送(以太网层,需指定网卡)
sendp(Ether()/IP(dst="8.8.8.8")/ICMP(), iface="以太网")
2. 发送并等待响应(最常用)
# sr1: 发送并等待第一个响应(超时 2 秒)
resp = sr1(IP(dst="8.8.8.8")/ICMP(), timeout=2)
if resp:
print(f"响应来自: {resp[IP].src}")
resp.show() # 打印完整响应包
else:
print("无响应")
# sr: 发送并等待所有响应,返回 (应答包, 未应答包)
ans, unans = sr(IP(dst="192.168.1.1-10")/ICMP(), timeout=1)
for snd, rcv in ans:
print(f"{snd[IP].dst} 存活")
四、数据包捕获(sniff)
# 1. 基础抓包(过滤 ICMP,抓 5 个包)
def packet_callback(pkt):
if ICMP in pkt:
print(f"捕获 ICMP: {pkt[IP].src} -> {pkt[IP].dst}")
pkt.show()
# 启动抓包
sniff(
filter="icmp", # BPF 过滤语法
prn=packet_callback, # 每个包的回调
count=5, # 抓 5 个包后停止
iface="以太网" # 指定网卡
)
# 2. 保存抓包到文件
pkts = sniff(filter="tcp port 80", count=10)
wrpcap("http_traffic.pcap", pkts) # 可在 Wireshark 打开
# 3. 读取 pcap 文件
pkts = rdpcap("http_traffic.pcap")
for pkt in pkts:
pkt.summary()
五、常用场景示例
1. ARP 扫描(发现局域网存活主机)
from scapy.all import *
def arp_scan(net):
ans, _ = srp(
Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=net),
timeout=2,
verbose=0
)
return [rcv[ARP].psrc for snd, rcv in ans]
# 扫描 192.168.1.0/24
hosts = arp_scan("192.168.1.0/24")
print("存活主机:")
for ip in hosts:
print(ip)
2. TCP 端口扫描
from scapy.all import *
def port_scan(ip, ports):
open_ports = []
for port in ports:
resp = sr1(
IP(dst=ip)/TCP(dport=port, flags="S"),
timeout=0.5,
verbose=0
)
if resp and resp[TCP].flags == "SA": # SYN-ACK 表示端口开放
open_ports.append(port)
return open_ports
# 扫描 80,443,8080 端口
ports = port_scan("192.168.1.1", [80, 443, 8080])
print(f"开放端口: {ports}")
3. 简单流量嗅探与解析
from scapy.all import *
def parse_packet(pkt):
if IP in pkt:
src = pkt[IP].src
dst = pkt[IP].dst
proto = pkt[IP].proto
print(f"IP: {src} -> {dst} (协议: {proto})")
if TCP in pkt:
sport = pkt[TCP].sport
dport = pkt[TCP].dport
flags = pkt[TCP].flags
print(f"TCP: {sport} -> {dport} 标志: {flags}")
sniff(prn=parse_packet, count=10)
六、关键函数速览
| 函数 | 作用 |
|---|---|
send() |
三层发包(不等待响应) |
sendp() |
二层发包(不等待响应) |
sr1() |
发包并等待第一个响应 |
sr() |
发包并等待所有响应,返回 (ans, unans) |
srp() |
二层发包并等待响应 |
sniff() |
捕获数据包 |
wrpcap() |
保存数据包到 pcap 文件 |
rdpcap() |
读取 pcap 文件 |
ls() |
查看协议字段 |
pkt.show() |
打印数据包详情 |
七、注意事项
- 权限:抓包/发包通常需要管理员/root 权限。
- 性能:Python 性能有限,高流量场景建议用 C/C++ 工具(如 tcpdump、nmap)。
- 合法性:仅在授权网络中使用,遵守当地法律法规,禁止未经许可的扫描/嗅探。
- 过滤:
sniff()用 BPF 语法过滤,大幅提升效率。
Scapy 源码核心是Packet 基类 + 字段系统 + 分层协议栈 + 发送/捕获后端,以“协议即对象”为设计哲学,实现数据包的构造、解析、发送与捕获。下面从源码结构、核心类、工作流程、关键机制、扩展方式展开解析。
Scapy 源代码解析
一、源码目录结构(核心)
Scapy 源码(GitHub: secdev/scapy)主目录 scapy/ 结构清晰,按功能模块化组织:
scapy/
├── __init__.py # 入口,版本管理、模块导入
├── arch/ # 系统适配(Linux/macOS/Windows),底层发包/抓包实现
├── layers/ # 协议层实现(Ether/IP/TCP/UDP/DNS/HTTP等)
├── packet/ # 数据包核心:基类、字段、操作符
│ ├── base_classes.py # Packet/BasePacket 基类定义
│ ├── fields.py # 各类字段(BitField/ByteField/IPField等)
│ └── operators.py # / 运算符重载、层绑定逻辑
├── sendrecv.py # send/sendp/sr/sniff 等发送/捕获函数
├── utils.py # 工具函数(校验和、时间、字符串处理)
├── config.py # 配置管理
└── ...
二、核心类与机制解析
1. Packet 基类(scapy/packet/base_classes.py)
Packet 是所有协议层的父类,是 Scapy 架构的基石。
核心属性
fields_desc:协议字段定义列表,每个元素是 Field 子类实例,描述字段名、默认值、长度、编码规则。class IP(Packet): name = "IP" fields_desc = [ BitField("version", 4, 4), # 版本(4位,默认4) BitField("ihl", None, 4), # 首部长度(4位,自动计算) ByteField("tos", 0), # 服务类型 ShortField("len", None), # 总长度(自动计算) ShortField("id", 1), # 标识 FlagsField("flags", 0, 3, ["MF", "DF", "evil"]), # 标志位 BitField("frag", 0, 13), # 片偏移 ByteField("ttl", 64), # 生存时间 ByteField("proto", 6), # 协议(TCP=6) XShortField("chksum", None), # 校验和(自动计算) SourceIPField("src", "0.0.0.0"), # 源IP DestIPField("dst", "0.0.0.0"), # 目的IP ]payload:下一层数据包对象,形成链式结构(Ether/IP/TCP/…)。underlayer:上一层数据包对象,反向关联。name:协议名称(如 “IP”)。
核心方法
__init__:初始化字段值,处理默认值与动态计算(如ihl/len/chksum)。build():递归构建字节流,调用各层do_build(),自动计算校验和、长度等。do_build():按fields_desc编码字段为二进制,处理字段依赖。dissect():从字节流解析出 Packet 对象,按协议栈逐层解包。__div__(/运算符):重载/,实现层组合(IP()/TCP()),绑定 payload 关系。
2. Field 字段系统(scapy/packet/fields.py)
Field 是数据包字段的抽象,负责值 ↔ 二进制的编解码,是 Scapy 灵活解析的核心。
常用 Field 类型
| 字段类 | 用途 | 示例 |
|---|---|---|
| BitField | 位字段 | BitField(“version”, 4, 4) |
| ByteField | 单字节 | ByteField(“ttl”, 64) |
| ShortField | 双字节(大端) | ShortField(“id”, 1) |
| IntField | 四字节 | IntField(“seq”, 0) |
| IPField | IP地址 | IPField(“src”, “192.168.1.1”) |
| MACField | MAC地址 | MACField(“dst”, “ff:ff:ff:ff:ff:ff”) |
| StrField | 字符串 | StrField(“data”, “”) |
| FlagsField | 标志位 | FlagsField(“flags”, 0, 3, [“MF”,“DF”]) |
核心机制
- 每个 Field 实现
addfield()(编码)与getfield()(解码)。 - 支持动态值计算:通过
lambda或方法实现依赖字段(如 IP 校验和依赖整个 IP 头)。 - 自动处理字节序(大端/小端)、填充、对齐。
3. 元类与协议注册(Packet_metaclass)
Packet 使用 Packet_metaclass 元类,实现协议自动注册与层绑定:
- 当定义新协议(如
class TCP(Packet)),元类自动将其注册到全局协议表。 - 维护
payload_guess映射:根据上层协议类型(如 IP.proto=6 → TCP),自动识别下一层协议。 - 支持自定义协议的无缝集成。
4. 发送与捕获(sendrecv.py)
发送流程
send():三层发送(IP 层),自动路由,调用底层 socket 或 libpcap。sendp():二层发送(链路层),需指定网卡,直接发送以太网帧。sr()/sr1():发送并等待响应,实现请求-应答模式(如 ping)。- 底层依赖:
arch/下系统适配代码(Linux: PF_PACKET;macOS/Windows: libpcap)。
捕获流程
sniff():核心抓包函数,参数:iface:网卡(如 “eth0”)。filter:BPF 过滤规则(如 “tcp port 80”)。prn:回调函数,处理每个捕获的包。count:抓包数量。
- 底层:通过 libpcap 或系统原生抓包接口,获取原始字节流 → 调用
Packet解析 → 回调处理。
三、核心工作流程(源码视角)
1. 数据包构造(IP/TCP/“data”)
- 用户执行
pkt = IP(dst="8.8.8.8")/TCP(dport=80)/"data"。 IP()初始化:__init__处理字段,ihl/len/chksum设为None(待计算)。/运算符:IP.__div__(TCP()),将TCP()设为IP.payload。- 同理,
"data"设为TCP.payload(Raw 层)。 - 调用
bytes(pkt)→ 触发pkt.build():- 递归调用各层
do_build()。 - 自动计算
IP.ihl(首部长度)、IP.len(总长度)、IP.chksum(校验和)。 - 按
fields_desc编码为二进制字节流。
- 递归调用各层
2. 数据包解析(从 pcap 或抓包)
- 捕获原始字节流(如
raw_data = b'\x08\x00...')。 - 调用
pkt = Ether(raw_data)(或自动识别顶层协议)。 Ether.dissect(raw_data):- 按
Ether.fields_desc解码dst/src/type。 - 根据
type(如 0x0800 → IP),自动解析下一层IP。 - 递归解析,直到无法识别(Raw 层)。
- 按
- 生成链式 Packet 对象:
Ether → IP → TCP → Raw。
四、关键源码细节
1. 校验和自动计算(utils.py: checksum())
- IP/TCP/UDP 校验和通过
checksum()函数计算,在build()阶段自动填充。 - 源码示例:
# IP 校验和计算(简化) def post_build(self, pkt, pay): if self.chksum is None: chksum = checksum(pkt) pkt = pkt[:10] + struct.pack("!H", chksum) + pkt[12:] return pkt + pay
2. BPF 过滤集成
sniff(filter="tcp")底层调用 libpcap 的pcap_compile()/pcap_setfilter(),仅捕获匹配的包,提升效率。
3. 层绑定与自动识别
- 通过
bind_layers()函数绑定协议:bind_layers(IP, TCP, proto=6) # IP.proto=6 → 下一层是TCP - 解析时,根据上层字段值自动选择下一层协议类。
五、扩展自定义协议(源码级)
- 继承
Packet,定义fields_desc:from scapy.packet import Packet from scapy.fields import ByteField, ShortField, StrField class MyProto(Packet): name = "MyProto" fields_desc = [ ByteField("magic", 0xAA), ShortField("cmd", 0x0001), StrField("data", ""), ] - 绑定到已有协议(如 UDP 端口 9999):
bind_layers(UDP, MyProto, dport=9999) - 直接使用:
pkt = IP()/UDP(dport=9999)/MyProto(cmd=0x0002, data="test")。
六、总结
Scapy 源码设计的核心优势:
- 分层抽象:Packet + Field 完美映射网络协议栈,灵活可扩展。
- 元类驱动:自动注册协议、绑定层关系,降低使用门槛。
- 动态计算:自动处理校验和、长度等底层细节,专注业务逻辑。
- 跨平台:
arch/封装系统差异,一套代码适配多平台。
更多推荐
所有评论(0)