威胁建模与风险评估实战
威胁建模(Threat Modeling)是在设计阶段系统化识别、评估和缓解安全威胁的过程。核心目标在开发早期发现安全问题(成本最低)系统化识别所有威胁(不遗漏)优先处理高风险威胁(有效分配资源)建立安全基线(持续改进)四个关键问题我们在构建什么?(系统建模)什么可能出错?(威胁识别)我们应该怎么办?(缓解措施)我们做得对吗?(验证)STRIDE缩写威胁类型安全属性描述SSpoofing(伪装)攻
·
威胁建模与风险评估实战
目录
- 威胁建模基础理论
- STRIDE威胁建模方法
- DREAD风险评估方法
- PASTA威胁建模框架
- IoT设备威胁建模实战
- 智能家居安全风险评估案例
- Android App威胁分析
- 威胁建模自动化工具
- 持续威胁监控
1. 威胁建模基础理论
1.1 什么是威胁建模?
威胁建模(Threat Modeling) 是在设计阶段系统化识别、评估和缓解安全威胁的过程。
核心目标:
- 在开发早期发现安全问题(成本最低)
- 系统化识别所有威胁(不遗漏)
- 优先处理高风险威胁(有效分配资源)
- 建立安全基线(持续改进)
四个关键问题:
- 我们在构建什么?(系统建模)
- 什么可能出错?(威胁识别)
- 我们应该怎么办?(缓解措施)
- 我们做得对吗?(验证)
1.2 威胁建模流程
威胁建模标准流程:
1. 定义范围
├── 确定系统边界
├── 识别关键资产
└── 定义信任边界
2. 创建架构图
├── 数据流图(DFD)
├── 组件交互图
└── 信任边界图
3. 识别威胁
├── 使用STRIDE
├── 使用攻击树
└── 使用Kill Chain
4. 评估风险
├── 使用DREAD
├── 计算风险值
└── 排定优先级
5. 设计缓解措施
├── 消除威胁
├── 降低风险
└── 接受风险
6. 验证有效性
├── 安全测试
├── 渗透测试
└── 代码审查
2. STRIDE威胁建模方法
2.1 STRIDE简介
STRIDE 是微软开发的威胁分类方法,每个字母代表一种威胁类型:
| 缩写 | 威胁类型 | 安全属性 | 描述 |
|---|---|---|---|
| S | Spoofing(伪装) | Authentication | 攻击者伪装成其他实体 |
| T | Tampering(篡改) | Integrity | 攻击者修改数据或代码 |
| R | Repudiation(抵赖) | Non-repudiation | 攻击者否认执行了某操作 |
| I | Information Disclosure(信息泄露) | Confidentiality | 敏感信息被未授权访问 |
| D | Denial of Service(拒绝服务) | Availability | 系统不可用 |
| E | Elevation of Privilege(权限提升) | Authorization | 攻击者获得更高权限 |
2.2 STRIDE威胁识别实战
// STRIDE威胁建模工具
class STRIDEThreatModel {
// 1. 系统建模
data class SystemModel(
val components: List<Component>,
val dataFlows: List<DataFlow>,
val trustBoundaries: List<TrustBoundary>
)
sealed class Component {
data class Process(
val id: String,
val name: String,
val runAsUser: UserContext,
val exposedInterfaces: List<Interface>
) : Component()
data class DataStore(
val id: String,
val name: String,
val dataClassification: DataClassification,
val encryptionAtRest: Boolean
) : Component()
data class ExternalEntity(
val id: String,
val name: String,
val trusted: Boolean
) : Component()
}
data class DataFlow(
val from: String, // Component ID
val to: String, // Component ID
val protocol: String,
val encrypted: Boolean,
val authenticated: Boolean,
val data: List<String> // Data types
)
data class TrustBoundary(
val name: String,
val insideComponents: List<String>,
val outsideComponents: List<String>
)
// 2. STRIDE威胁识别
class STRIDEAnalyzer {
// 针对数据流的STRIDE分析
fun analyzeDataFlow(dataFlow: DataFlow): List<Threat> {
val threats = mutableListOf<Threat>()
// S - Spoofing(伪装)
if (!dataFlow.authenticated) {
threats.add(
Threat(
type = ThreatType.SPOOFING,
target = dataFlow,
description = "数据流未认证,攻击者可能伪装成发送方",
impact = Impact.HIGH,
mitigation = "实施双向认证(Mutual TLS或证书认证)"
)
)
}
// T - Tampering(篡改)
if (!dataFlow.encrypted) {
threats.add(
Threat(
type = ThreatType.TAMPERING,
target = dataFlow,
description = "数据流未加密,攻击者可能中间人攻击篡改数据",
impact = Impact.HIGH,
mitigation = "使用TLS 1.3加密通信,实施消息完整性检查(HMAC)"
)
)
}
// R - Repudiation(抵赖)
// 检查是否有日志记录
if (!hasAuditLogging(dataFlow)) {
threats.add(
Threat(
type = ThreatType.REPUDIATION,
target = dataFlow,
description = "缺少审计日志,无法证明操作发生",
impact = Impact.MEDIUM,
mitigation = "实施详细审计日志,包括时间戳、源IP、操作类型"
)
)
}
// I - Information Disclosure(信息泄露)
if (containsSensitiveData(dataFlow) && !dataFlow.encrypted) {
threats.add(
Threat(
type = ThreatType.INFORMATION_DISCLOSURE,
target = dataFlow,
description = "敏感数据明文传输,可能被窃听",
impact = Impact.CRITICAL,
mitigation = "启用端到端加密,实施数据脱敏"
)
)
}
// D - Denial of Service(拒绝服务)
if (!hasRateLimiting(dataFlow)) {
threats.add(
Threat(
type = ThreatType.DENIAL_OF_SERVICE,
target = dataFlow,
description = "缺少速率限制,可能遭受DoS攻击",
impact = Impact.HIGH,
mitigation = "实施请求速率限制,添加防火墙规则"
)
)
}
// E - Elevation of Privilege(权限提升)
if (crossesTrustBoundary(dataFlow) && !hasInputValidation(dataFlow)) {
threats.add(
Threat(
type = ThreatType.ELEVATION_OF_PRIVILEGE,
target = dataFlow,
description = "跨信任边界的输入未验证,可能导致注入攻击",
impact = Impact.CRITICAL,
mitigation = "严格验证所有输入,实施最小权限原则"
)
)
}
return threats
}
// 针对进程的STRIDE分析
fun analyzeProcess(process: Component.Process): List<Threat> {
val threats = mutableListOf<Threat>()
// S - Spoofing
if (!hasStrongAuthentication(process)) {
threats.add(
Threat(
type = ThreatType.SPOOFING,
target = process,
description = "进程认证机制弱,可能被伪装",
impact = Impact.HIGH,
mitigation = "实施进程间认证(如SELinux标签或Android UID检查)"
)
)
}
// T - Tampering
if (!hasCodeSigning(process)) {
threats.add(
Threat(
type = ThreatType.TAMPERING,
target = process,
description = "进程代码未签名,可能被篡改",
impact = Impact.CRITICAL,
mitigation = "实施代码签名验证,使用Android APK签名"
)
)
}
// I - Information Disclosure
if (process.runAsUser == UserContext.ROOT) {
threats.add(
Threat(
type = ThreatType.INFORMATION_DISCLOSURE,
target = process,
description = "进程以root权限运行,泄露风险高",
impact = Impact.HIGH,
mitigation = "降低进程权限,使用专用用户账户"
)
)
}
// E - Elevation of Privilege
if (hasExposedInterface(process) && !hasAccessControl(process)) {
threats.add(
Threat(
type = ThreatType.ELEVATION_OF_PRIVILEGE,
target = process,
description = "进程暴露接口但无访问控制",
impact = Impact.CRITICAL,
mitigation = "实施基于角色的访问控制(RBAC)"
)
)
}
return threats
}
// 针对数据存储的STRIDE分析
fun analyzeDataStore(dataStore: Component.DataStore): List<Threat> {
val threats = mutableListOf<Threat>()
// T - Tampering
if (!dataStore.encryptionAtRest) {
threats.add(
Threat(
type = ThreatType.TAMPERING,
target = dataStore,
description = "数据未加密存储,可能被篡改",
impact = Impact.HIGH,
mitigation = "启用静态数据加密(Android EncryptedSharedPreferences)"
)
)
}
// I - Information Disclosure
if (dataStore.dataClassification >= DataClassification.CONFIDENTIAL &&
!dataStore.encryptionAtRest) {
threats.add(
Threat(
type = ThreatType.INFORMATION_DISCLOSURE,
target = dataStore,
description = "机密数据明文存储",
impact = Impact.CRITICAL,
mitigation = "使用Android Keystore加密敏感数据"
)
)
}
// D - Denial of Service
if (!hasBackup(dataStore)) {
threats.add(
Threat(
type = ThreatType.DENIAL_OF_SERVICE,
target = dataStore,
description = "数据无备份,可能丢失导致服务不可用",
impact = Impact.HIGH,
mitigation = "实施定期自动备份和恢复测试"
)
)
}
return threats
}
}
// 3. 威胁数据模型
data class Threat(
val type: ThreatType,
val target: Any,
val description: String,
val impact: Impact,
val likelihood: Likelihood = Likelihood.MEDIUM, // 稍后用DREAD评估
val mitigation: String,
val status: ThreatStatus = ThreatStatus.IDENTIFIED
)
enum class ThreatType {
SPOOFING,
TAMPERING,
REPUDIATION,
INFORMATION_DISCLOSURE,
DENIAL_OF_SERVICE,
ELEVATION_OF_PRIVILEGE
}
enum class Impact {
LOW, MEDIUM, HIGH, CRITICAL
}
enum class Likelihood {
LOW, MEDIUM, HIGH
}
enum class ThreatStatus {
IDENTIFIED, // 已识别
MITIGATED, // 已缓解
ACCEPTED, // 已接受
TRANSFERRED // 已转移
}
}
2.3 智能摄像头STRIDE分析案例
// 案例:智能摄像头威胁建模
class SmartCameraThreatModel {
// 1. 系统建模
fun buildSystemModel(): STRIDEThreatModel.SystemModel {
val components = listOf(
// 进程
STRIDEThreatModel.Component.Process(
id = "camera_app",
name = "Camera Android App",
runAsUser = UserContext.NORMAL_USER,
exposedInterfaces = listOf(
Interface("HTTP API", 8080),
Interface("RTSP Stream", 554)
)
),
STRIDEThreatModel.Component.Process(
id = "cloud_service",
name = "Cloud Video Service",
runAsUser = UserContext.SERVICE_ACCOUNT,
exposedInterfaces = listOf(
Interface("HTTPS API", 443)
)
),
// 数据存储
STRIDEThreatModel.Component.DataStore(
id = "local_storage",
name = "Local Video Storage",
dataClassification = DataClassification.CONFIDENTIAL,
encryptionAtRest = false // 威胁!
),
STRIDEThreatModel.Component.DataStore(
id = "cloud_storage",
name = "Cloud Video Storage",
dataClassification = DataClassification.CONFIDENTIAL,
encryptionAtRest = true
),
// 外部实体
STRIDEThreatModel.Component.ExternalEntity(
id = "user",
name = "End User",
trusted = true
),
STRIDEThreatModel.Component.ExternalEntity(
id = "attacker",
name = "Network Attacker",
trusted = false
)
)
val dataFlows = listOf(
// 用户通过App查看实时视频
STRIDEThreatModel.DataFlow(
from = "camera_app",
to = "user",
protocol = "RTSP",
encrypted = false, // 威胁!
authenticated = false, // 威胁!
data = listOf("video_stream", "audio_stream")
),
// App上传视频到云端
STRIDEThreatModel.DataFlow(
from = "camera_app",
to = "cloud_service",
protocol = "HTTPS",
encrypted = true,
authenticated = true,
data = listOf("video_clips", "metadata")
),
// 本地存储视频
STRIDEThreatModel.DataFlow(
from = "camera_app",
to = "local_storage",
protocol = "File System",
encrypted = false, // 威胁!
authenticated = true,
data = listOf("video_recordings")
)
)
val trustBoundaries = listOf(
STRIDEThreatModel.TrustBoundary(
name = "Device Boundary",
insideComponents = listOf("camera_app", "local_storage"),
outsideComponents = listOf("user", "cloud_service", "attacker")
),
STRIDEThreatModel.TrustBoundary(
name = "Cloud Boundary",
insideComponents = listOf("cloud_service", "cloud_storage"),
outsideComponents = listOf("camera_app")
)
)
return STRIDEThreatModel.SystemModel(
components = components,
dataFlows = dataFlows,
trustBoundaries = trustBoundaries
)
}
// 2. 威胁识别
fun identifyThreats(): List<STRIDEThreatModel.Threat> {
val model = buildSystemModel()
val analyzer = STRIDEThreatModel.STRIDEAnalyzer()
val threats = mutableListOf<STRIDEThreatModel.Threat>()
// 分析所有数据流
model.dataFlows.forEach { dataFlow ->
threats.addAll(analyzer.analyzeDataFlow(dataFlow))
}
// 分析所有进程
model.components.filterIsInstance<STRIDEThreatModel.Component.Process>()
.forEach { process ->
threats.addAll(analyzer.analyzeProcess(process))
}
// 分析所有数据存储
model.components.filterIsInstance<STRIDEThreatModel.Component.DataStore>()
.forEach { dataStore ->
threats.addAll(analyzer.analyzeDataStore(dataStore))
}
return threats
}
// 3. 生成威胁报告
fun generateThreatReport(): ThreatReport {
val threats = identifyThreats()
// 按威胁类型分组
val threatsByType = threats.groupBy { it.type }
// 按影响级别排序
val criticalThreats = threats.filter { it.impact == Impact.CRITICAL }
val highThreats = threats.filter { it.impact == Impact.HIGH }
return ThreatReport(
totalThreats = threats.size,
criticalThreats = criticalThreats.size,
highThreats = highThreats.size,
threatsByType = threatsByType.mapValues { it.value.size },
topThreats = criticalThreats.take(5),
recommendations = generateRecommendations(threats)
)
}
data class ThreatReport(
val totalThreats: Int,
val criticalThreats: Int,
val highThreats: Int,
val threatsByType: Map<ThreatType, Int>,
val topThreats: List<STRIDEThreatModel.Threat>,
val recommendations: List<String>
)
private fun generateRecommendations(threats: List<STRIDEThreatModel.Threat>): List<String> {
return threats
.filter { it.impact >= Impact.HIGH }
.map { it.mitigation }
.distinct()
}
}
3. DREAD风险评估方法
3.1 DREAD简介
DREAD 是用于量化风险的评分方法:
| 维度 | 含义 | 评分标准 |
|---|---|---|
| D | Damage Potential(潜在损害) | 威胁造成的损害程度 |
| R | Reproducibility(可重现性) | 攻击的易重现程度 |
| E | Exploitability(可利用性) | 攻击的难易程度 |
| A | Affected Users(受影响用户) | 受影响的用户数量 |
| D | Discoverability(可发现性) | 威胁的易发现程度 |
每个维度评分0-10,总分0-50。
3.2 DREAD评分实现
// DREAD风险评估
class DREADRiskAssessment {
// DREAD评分
data class DREADScore(
val damage: Int, // 0-10
val reproducibility: Int, // 0-10
val exploitability: Int, // 0-10
val affectedUsers: Int, // 0-10
val discoverability: Int // 0-10
) {
val total: Int
get() = damage + reproducibility + exploitability + affectedUsers + discoverability
val average: Double
get() = total / 5.0
val riskLevel: RiskLevel
get() = when {
average >= 8.0 -> RiskLevel.CRITICAL
average >= 6.0 -> RiskLevel.HIGH
average >= 4.0 -> RiskLevel.MEDIUM
else -> RiskLevel.LOW
}
}
enum class RiskLevel {
LOW, MEDIUM, HIGH, CRITICAL
}
// 评估威胁
fun assessThreat(threat: STRIDEThreatModel.Threat): DREADScore {
return when (threat.type) {
ThreatType.SPOOFING -> assessSpoofing(threat)
ThreatType.TAMPERING -> assessTampering(threat)
ThreatType.REPUDIATION -> assessRepudiation(threat)
ThreatType.INFORMATION_DISCLOSURE -> assessInformationDisclosure(threat)
ThreatType.DENIAL_OF_SERVICE -> assessDenialOfService(threat)
ThreatType.ELEVATION_OF_PRIVILEGE -> assessElevationOfPrivilege(threat)
}
}
// 评估伪装威胁
private fun assessSpoofing(threat: STRIDEThreatModel.Threat): DREADScore {
// 假设:智能摄像头视频流未认证
return DREADScore(
damage = 8, // 高损害:攻击者可查看私密视频
reproducibility = 10, // 易重现:工具可自动化
exploitability = 7, // 中等难度:需要网络访问
affectedUsers = 9, // 所有用户
discoverability = 6 // 中等可发现:需扫描网络
)
// 总分:40/50,平均:8.0,风险级别:CRITICAL
}
// 评估信息泄露威胁
private fun assessInformationDisclosure(threat: STRIDEThreatModel.Threat): DREADScore {
// 假设:本地视频文件未加密
return DREADScore(
damage = 9, // 极高损害:隐私泄露
reproducibility = 10, // 易重现:任何人获得物理访问即可
exploitability = 8, // 较易利用:只需文件浏览器
affectedUsers = 10, // 所有用户
discoverability = 7 // 较易发现:文件系统可见
)
// 总分:44/50,平均:8.8,风险级别:CRITICAL
}
// 自动评分(基于启发式规则)
class AutoDREADScorer {
fun autoScore(threat: STRIDEThreatModel.Threat): DREADScore {
var damage = 5
var reproducibility = 5
var exploitability = 5
var affectedUsers = 5
var discoverability = 5
// 根据威胁特征调整分数
// Damage
when (threat.impact) {
Impact.CRITICAL -> damage = 10
Impact.HIGH -> damage = 8
Impact.MEDIUM -> damage = 5
Impact.LOW -> damage = 3
}
// Reproducibility
if (isAutomatable(threat)) {
reproducibility = 10
}
// Exploitability
if (requiresNoAuthentication(threat)) {
exploitability += 3
}
if (requiresNoSpecialTools(threat)) {
exploitability += 2
}
// Affected Users
if (affectsAllUsers(threat)) {
affectedUsers = 10
} else if (affectsMostUsers(threat)) {
affectedUsers = 7
}
// Discoverability
if (isPubliclyVisible(threat)) {
discoverability = 10
} else if (requiresNetworkAccess(threat)) {
discoverability = 7
}
return DREADScore(
damage = damage.coerceIn(0, 10),
reproducibility = reproducibility.coerceIn(0, 10),
exploitability = exploitability.coerceIn(0, 10),
affectedUsers = affectedUsers.coerceIn(0, 10),
discoverability = discoverability.coerceIn(0, 10)
)
}
private fun isAutomatable(threat: STRIDEThreatModel.Threat): Boolean {
// 网络攻击通常可自动化
return threat.description.contains("网络") ||
threat.description.contains("API")
}
private fun requiresNoAuthentication(threat: STRIDEThreatModel.Threat): Boolean {
return threat.description.contains("未认证") ||
threat.description.contains("无认证")
}
private fun requiresNoSpecialTools(threat: STRIDEThreatModel.Threat): Boolean {
// 如果只需要浏览器或文件浏览器
return threat.description.contains("浏览器") ||
threat.description.contains("文件")
}
private fun affectsAllUsers(threat: STRIDEThreatModel.Threat): Boolean {
return threat.description.contains("所有用户") ||
!threat.description.contains("特定")
}
private fun affectsMostUsers(threat: STRIDEThreatModel.Threat): Boolean {
return threat.description.contains("大多数") ||
threat.description.contains("多数")
}
private fun isPubliclyVisible(threat: STRIDEThreatModel.Threat): Boolean {
return threat.description.contains("公开") ||
threat.description.contains("可见")
}
private fun requiresNetworkAccess(threat: STRIDEThreatModel.Threat): Boolean {
return threat.description.contains("网络") ||
threat.description.contains("远程")
}
}
// 风险矩阵
fun generateRiskMatrix(threats: List<Pair<STRIDEThreatModel.Threat, DREADScore>>): RiskMatrix {
val matrix = Array(5) { Array<MutableList<String>>(5) { mutableListOf() } }
threats.forEach { (threat, score) ->
// X轴:可能性(平均Reproducibility和Exploitability)
val likelihood = ((score.reproducibility + score.exploitability) / 2.0 / 2.5).toInt().coerceIn(0, 4)
// Y轴:影响(平均Damage和AffectedUsers)
val impact = ((score.damage + score.affectedUsers) / 2.0 / 2.5).toInt().coerceIn(0, 4)
matrix[4 - impact][likelihood].add(threat.description.take(50))
}
return RiskMatrix(matrix)
}
data class RiskMatrix(val matrix: Array<Array<MutableList<String>>>) {
fun print() {
println("风险矩阵(Impact vs Likelihood):")
println(" Low Med High VHigh Crit")
val labels = arrayOf("Crit", "High", "Med ", "Low ", "VLow")
matrix.forEachIndexed { index, row ->
print("${labels[index]} ")
row.forEach { cell ->
val count = cell.size
print(String.format("%-6s ", if (count > 0) count.toString() else "-"))
}
println()
}
}
}
}
4. PASTA威胁建模框架
4.1 PASTA简介
PASTA(Process for Attack Simulation and Threat Analysis) 是以攻击者视角进行的七阶段威胁建模框架。
七个阶段:
PASTA威胁建模七阶段:
Stage 1: Define Objectives(定义目标)
├── 业务目标
├── 安全目标
└── 合规要求
Stage 2: Define Technical Scope(定义技术范围)
├── 系统架构
├── 技术栈
└── 依赖关系
Stage 3: Application Decomposition(应用分解)
├── 组件清单
├── 数据流
└── 信任边界
Stage 4: Threat Analysis(威胁分析)
├── 威胁情报
├── 攻击场景
└── 攻击树
Stage 5: Vulnerability Analysis(漏洞分析)
├── 已知漏洞(CVE)
├── 配置错误
└── 设计缺陷
Stage 6: Attack Modeling(攻击建模)
├── 攻击路径
├── Kill Chain
└── ATT&CK映射
Stage 7: Risk and Impact Analysis(风险和影响分析)
├── 业务影响
├── 风险评分
└── 缓解建议
4.2 PASTA实施案例:智能门锁
// PASTA威胁建模:智能门锁
class SmartLockPASTAModel {
// Stage 1: Define Objectives
data class SecurityObjectives(
val businessObjectives: List<String> = listOf(
"保护用户家庭安全",
"防止未授权访问",
"确保系统可用性(99.9%)"
),
val securityRequirements: List<String> = listOf(
"防止物理绕过",
"防止网络攻击",
"防止凭证泄露",
"实现访问审计"
),
val complianceRequirements: List<String> = listOf(
"ETSI EN 303 645",
"GDPR",
"IoXt认证"
)
)
// Stage 2: Define Technical Scope
data class TechnicalScope(
val components: List<String> = listOf(
"智能门锁硬件",
"Android配对App",
"云端管理服务",
"蓝牙通信模块",
"本地加密存储"
),
val protocols: List<String> = listOf(
"Bluetooth Low Energy (BLE)",
"HTTPS/TLS 1.3",
"AES-256-GCM"
),
val dependencies: List<String> = listOf(
"Android Keystore",
"Google Cloud Platform",
"第三方蓝牙芯片"
)
)
// Stage 3: Application Decomposition
fun decomposeApplication(): ApplicationDecomposition {
return ApplicationDecomposition(
dataFlows = listOf(
DataFlow("User", "Android App", "User Input", "Local"),
DataFlow("Android App", "Smart Lock", "Unlock Command", "BLE Encrypted"),
DataFlow("Android App", "Cloud Service", "Activity Log", "HTTPS"),
DataFlow("Smart Lock", "Local Storage", "Access Log", "Encrypted File")
),
assets = listOf(
Asset("User Credentials", AssetValue.CRITICAL),
Asset("Unlock Keys", AssetValue.CRITICAL),
Asset("Access Logs", AssetValue.HIGH),
Asset("User Personal Data", AssetValue.HIGH)
),
trustBoundaries = listOf(
"Physical Boundary (Door)",
"Network Boundary (WiFi)",
"Cloud Boundary"
)
)
}
data class ApplicationDecomposition(
val dataFlows: List<DataFlow>,
val assets: List<Asset>,
val trustBoundaries: List<String>
)
data class Asset(
val name: String,
val value: AssetValue
)
enum class AssetValue {
LOW, MEDIUM, HIGH, CRITICAL
}
// Stage 4: Threat Analysis
fun analyzeThreat(): List<ThreatScenario> {
return listOf(
ThreatScenario(
name = "BLE中间人攻击",
description = "攻击者拦截BLE通信,重放解锁命令",
attackVector = "Network",
threatActor = "Opportunistic Attacker",
motivation = "Entry to home",
capability = "Medium"
),
ThreatScenario(
name = "暴力破解PIN码",
description = "攻击者尝试所有PIN码组合",
attackVector = "Physical",
threatActor = "Determined Attacker",
motivation = "Entry to home",
capability = "Low"
),
ThreatScenario(
name = "固件逆向工程",
description = "攻击者提取固件,分析加密算法",
attackVector = "Physical",
threatActor = "Advanced Attacker",
motivation = "Mass exploitation",
capability = "High"
)
)
}
data class ThreatScenario(
val name: String,
val description: String,
val attackVector: String,
val threatActor: String,
val motivation: String,
val capability: String
)
// Stage 5: Vulnerability Analysis
fun analyzeVulnerabilities(): List<Vulnerability> {
return listOf(
Vulnerability(
id = "VULN-001",
name = "BLE配对无PIN码保护",
cve = null,
cvss = 7.5,
description = "蓝牙配对过程不需要PIN码,任何人都可以配对",
affectedComponent = "Smart Lock BLE Module",
remediation = "实施Just Works配对改为Passkey Entry配对"
),
Vulnerability(
id = "VULN-002",
name = "固件未加密",
cve = null,
cvss = 6.0,
description = "固件以明文存储,可被提取分析",
affectedComponent = "Smart Lock Firmware",
remediation = "启用固件加密和防提取保护"
),
Vulnerability(
id = "VULN-003",
name = "无速率限制",
cve = null,
cvss = 5.0,
description = "解锁尝试无限制,可暴力破解",
affectedComponent = "Smart Lock Authentication",
remediation = "实施尝试次数限制(5次后锁定1小时)"
)
)
}
data class Vulnerability(
val id: String,
val name: String,
val cve: String?,
val cvss: Double,
val description: String,
val affectedComponent: String,
val remediation: String
)
// Stage 6: Attack Modeling
fun modelAttack(): AttackTree {
return AttackTree(
goal = "Unlock Smart Lock Without Authorization",
children = listOf(
AttackNode(
name = "Exploit Network Vulnerability",
children = listOf(
AttackNode("BLE MitM Attack", probability = 0.3, impact = "High"),
AttackNode("Replay Attack", probability = 0.4, impact = "High")
)
),
AttackNode(
name = "Exploit Physical Vulnerability",
children = listOf(
AttackNode("Pick Lock Mechanism", probability = 0.1, impact = "High"),
AttackNode("Extract Firmware", probability = 0.05, impact = "Critical")
)
),
AttackNode(
name = "Social Engineering",
children = listOf(
AttackNode("Phish User Credentials", probability = 0.2, impact = "Critical"),
AttackNode("Steal Physical Key", probability = 0.15, impact = "High")
)
)
)
)
}
data class AttackTree(
val goal: String,
val children: List<AttackNode>
)
data class AttackNode(
val name: String,
val probability: Double = 0.0,
val impact: String = "Unknown",
val children: List<AttackNode> = emptyList()
)
// Stage 7: Risk and Impact Analysis
fun assessRisk(): RiskAssessment {
val vulnerabilities = analyzeVulnerabilities()
val threats = analyzeThreats()
val risks = vulnerabilities.map { vuln ->
Risk(
id = "RISK-${vuln.id}",
vulnerability = vuln,
threat = threats.firstOrNull { it.name.contains(vuln.name.substringBefore(" ")) },
likelihood = calculateLikelihood(vuln),
impact = calculateImpact(vuln),
riskScore = vuln.cvss,
mitigation = vuln.remediation
)
}
return RiskAssessment(
risks = risks.sortedByDescending { it.riskScore },
acceptableRiskThreshold = 4.0,
highRisks = risks.filter { it.riskScore >= 7.0 },
mediumRisks = risks.filter { it.riskScore >= 4.0 && it.riskScore < 7.0 },
lowRisks = risks.filter { it.riskScore < 4.0 }
)
}
data class RiskAssessment(
val risks: List<Risk>,
val acceptableRiskThreshold: Double,
val highRisks: List<Risk>,
val mediumRisks: List<Risk>,
val lowRisks: List<Risk>
)
data class Risk(
val id: String,
val vulnerability: Vulnerability,
val threat: ThreatScenario?,
val likelihood: String,
val impact: String,
val riskScore: Double,
val mitigation: String
)
private fun calculateLikelihood(vuln: Vulnerability): String {
return when {
vuln.cvss >= 7.0 -> "High"
vuln.cvss >= 4.0 -> "Medium"
else -> "Low"
}
}
private fun calculateImpact(vuln: Vulnerability): String {
return when {
vuln.affectedComponent.contains("Authentication") -> "Critical"
vuln.affectedComponent.contains("Firmware") -> "High"
else -> "Medium"
}
}
}
5. 威胁建模自动化工具
5.1 使用Microsoft Threat Modeling Tool
// 导出威胁模型到Microsoft TMT格式
class ThreatModelExporter {
fun exportToTMT(model: STRIDEThreatModel.SystemModel, threats: List<STRIDEThreatModel.Threat>): String {
// 生成TMT XML格式
return buildString {
appendLine("""<?xml version="1.0" encoding="utf-8"?>""")
appendLine("""<ThreatModel>""")
appendLine(""" <DrawingSurfaceList>""")
// 导出组件
model.components.forEach { component ->
when (component) {
is STRIDEThreatModel.Component.Process -> {
appendLine(""" <DrawingSurfaceModel>""")
appendLine(""" <Id>${component.id}</Id>""")
appendLine(""" <Type>Process</Type>""")
appendLine(""" <Name>${component.name}</Name>""")
appendLine(""" </DrawingSurfaceModel>""")
}
is STRIDEThreatModel.Component.DataStore -> {
appendLine(""" <DrawingSurfaceModel>""")
appendLine(""" <Id>${component.id}</Id>""")
appendLine(""" <Type>DataStore</Type>""")
appendLine(""" <Name>${component.name}</Name>""")
appendLine(""" </DrawingSurfaceModel>""")
}
is STRIDEThreatModel.Component.ExternalEntity -> {
appendLine(""" <DrawingSurfaceModel>""")
appendLine(""" <Id>${component.id}</Id>""")
appendLine(""" <Type>ExternalEntity</Type>""")
appendLine(""" <Name>${component.name}</Name>""")
appendLine(""" </DrawingSurfaceModel>""")
}
}
// 导出数据流
model.dataFlows.forEach { dataFlow ->
appendLine(""" <DrawingSurfaceModel>""")
appendLine(""" <Type>DataFlow</Type>""")
appendLine(""" <Source>${dataFlow.from}</Source>""")
appendLine(""" <Target>${dataFlow.to}</Target>""")
appendLine(""" <Protocol>${dataFlow.protocol}</Protocol>""")
appendLine(""" </DrawingSurfaceModel>""")
}
appendLine(""" </DrawingSurfaceList>""")
// 导出威胁
appendLine(""" <ThreatList>""")
threats.forEach { threat ->
appendLine(""" <Threat>""")
appendLine(""" <Type>${threat.type}</Type>""")
appendLine(""" <Description>${threat.description}</Description>""")
appendLine(""" <Priority>${threat.impact}</Priority>""")
appendLine(""" <Mitigation>${threat.mitigation}</Mitigation>""")
appendLine(""" </Threat>""")
}
appendLine(""" </ThreatList>""")
appendLine("""</ThreatModel>""")
}
}
}
5.2 集成OWASP Dependency-Check
// 自动化漏洞扫描
class DependencyScan Scanner {
// 扫描项目依赖的已知漏洞
suspend fun scanDependencies(projectPath: String): ScanReport {
// 1. 运行OWASP Dependency-Check
val process = ProcessBuilder(
"dependency-check",
"--project", "MyIoTProject",
"--scan", projectPath,
"--format", "JSON",
"--out", "/tmp/dependency-check-report"
).start()
process.waitFor()
// 2. 解析报告
val reportFile = File("/tmp/dependency-check-report/dependency-check-report.json")
val report = Json.decodeFromString<OWASPReport>(reportFile.readText())
// 3. 提取高危漏洞
val criticalVulnerabilities = report.dependencies
.flatMap { it.vulnerabilities ?: emptyList() }
.filter { it.cvssv3?.baseScore ?: 0.0 >= 7.0 }
return ScanReport(
totalDependencies = report.dependencies.size,
vulnerableDependencies = report.dependencies.count { it.vulnerabilities != null },
criticalVulnerabilities = criticalVulnerabilities.size,
vulnerabilities = criticalVulnerabilities
)
}
data class ScanReport(
val totalDependencies: Int,
val vulnerableDependencies: Int,
val criticalVulnerabilities: Int,
val vulnerabilities: List<CVE>
)
@Serializable
data class OWASPReport(
val dependencies: List<Dependency>
)
@Serializable
data class Dependency(
val fileName: String,
val vulnerabilities: List<CVE>?
)
@Serializable
data class CVE(
val name: String,
val description: String,
val cvssv3: CVSSv3?
)
@Serializable
data class CVSSv3(
val baseScore: Double,
val baseSeverity: String
)
}
6. 持续威胁监控
// 生产环境持续威胁监控
class ContinuousThreatMonitoring {
// 实时威胁检测
suspend fun monitorThreats() {
flow {
while (true) {
val events = collectSecurityEvents()
emit(events)
delay(5.seconds)
}
}.collect { events ->
events.forEach { event ->
val threat = detectThreat(event)
if (threat != null) {
handleThreat(threat)
}
}
}
}
// 威胁检测
private fun detectThreat(event: SecurityEvent): DetectedThreat? {
return when {
// 检测暴力破解攻击
event.type == EventType.AUTH_FAILURE &&
getRecentFailures(event.source) > 10 -> {
DetectedThreat(
type = ThreatType.BRUTE_FORCE,
severity = Severity.HIGH,
source = event.source,
description = "检测到暴力破解攻击尝试"
)
}
// 检测异常访问模式
isAnomalousAccess(event) -> {
DetectedThreat(
type = ThreatType.ANOMALOUS_ACCESS,
severity = Severity.MEDIUM,
source = event.source,
description = "检测到异常访问模式"
)
}
// 检测数据泄露尝试
event.type == EventType.DATA_EXPORT &&
event.volume > NORMAL_EXPORT_THRESHOLD -> {
DetectedThreat(
type = ThreatType.DATA_EXFILTRATION,
severity = Severity.CRITICAL,
source = event.source,
description = "检测到异常数据导出"
)
}
else -> null
}
}
// 威胁响应
private suspend fun handleThreat(threat: DetectedThreat) {
when (threat.severity) {
Severity.CRITICAL -> {
// 立即阻断
blockSource(threat.source)
// 通知安全团队
alertSecurityTeam(threat)
// 记录日志
auditLog.critical(threat.description)
}
Severity.HIGH -> {
// 限制访问
rateLimitSource(threat.source)
// 通知管理员
alertAdmin(threat)
// 记录日志
auditLog.high(threat.description)
}
Severity.MEDIUM -> {
// 增强监控
increaseMonitoring(threat.source)
// 记录日志
auditLog.medium(threat.description)
}
else -> {
// 仅记录
auditLog.info(threat.description)
}
}
}
data class DetectedThreat(
val type: ThreatType,
val severity: Severity,
val source: String,
val description: String,
val timestamp: Instant = Clock.System.now()
)
}
总结
威胁建模和风险评估是IoT安全的基石:
- STRIDE:系统化识别威胁类型
- DREAD:量化风险优先级
- PASTA:以攻击者视角全面分析
- 自动化工具:提高效率和覆盖率
- 持续监控:动态响应新威胁
参考资源
相关文章
- 上一篇:《IoT安全认证体系对比与选择》
- 下一篇:《IoT设备安全生命周期管理》
- 相关:《IoT安全架构设计全面指南》
更多推荐
所有评论(0)