威胁建模与风险评估实战

目录

  1. 威胁建模基础理论
  2. STRIDE威胁建模方法
  3. DREAD风险评估方法
  4. PASTA威胁建模框架
  5. IoT设备威胁建模实战
  6. 智能家居安全风险评估案例
  7. Android App威胁分析
  8. 威胁建模自动化工具
  9. 持续威胁监控

1. 威胁建模基础理论

1.1 什么是威胁建模?

威胁建模(Threat Modeling) 是在设计阶段系统化识别、评估和缓解安全威胁的过程。

核心目标

  • 在开发早期发现安全问题(成本最低)
  • 系统化识别所有威胁(不遗漏)
  • 优先处理高风险威胁(有效分配资源)
  • 建立安全基线(持续改进)

四个关键问题

  1. 我们在构建什么?(系统建模)
  2. 什么可能出错?(威胁识别)
  3. 我们应该怎么办?(缓解措施)
  4. 我们做得对吗?(验证)

1.2 威胁建模流程

威胁建模标准流程:

1. 定义范围
   ├── 确定系统边界
   ├── 识别关键资产
   └── 定义信任边界

2. 创建架构图
   ├── 数据流图(DFD)
   ├── 组件交互图
   └── 信任边界图

3. 识别威胁
   ├── 使用STRIDE
   ├── 使用攻击树
   └── 使用Kill Chain

4. 评估风险
   ├── 使用DREAD
   ├── 计算风险值
   └── 排定优先级

5. 设计缓解措施
   ├── 消除威胁
   ├── 降低风险
   └── 接受风险

6. 验证有效性
   ├── 安全测试
   ├── 渗透测试
   └── 代码审查

2. STRIDE威胁建模方法

2.1 STRIDE简介

STRIDE 是微软开发的威胁分类方法,每个字母代表一种威胁类型:

缩写 威胁类型 安全属性 描述
S Spoofing(伪装) Authentication 攻击者伪装成其他实体
T Tampering(篡改) Integrity 攻击者修改数据或代码
R Repudiation(抵赖) Non-repudiation 攻击者否认执行了某操作
I Information Disclosure(信息泄露) Confidentiality 敏感信息被未授权访问
D Denial of Service(拒绝服务) Availability 系统不可用
E Elevation of Privilege(权限提升) Authorization 攻击者获得更高权限

2.2 STRIDE威胁识别实战

// STRIDE威胁建模工具
class STRIDEThreatModel {

    // 1. 系统建模
    data class SystemModel(
        val components: List<Component>,
        val dataFlows: List<DataFlow>,
        val trustBoundaries: List<TrustBoundary>
    )

    sealed class Component {
        data class Process(
            val id: String,
            val name: String,
            val runAsUser: UserContext,
            val exposedInterfaces: List<Interface>
        ) : Component()

        data class DataStore(
            val id: String,
            val name: String,
            val dataClassification: DataClassification,
            val encryptionAtRest: Boolean
        ) : Component()

        data class ExternalEntity(
            val id: String,
            val name: String,
            val trusted: Boolean
        ) : Component()
    }

    data class DataFlow(
        val from: String, // Component ID
        val to: String,   // Component ID
        val protocol: String,
        val encrypted: Boolean,
        val authenticated: Boolean,
        val data: List<String> // Data types
    )

    data class TrustBoundary(
        val name: String,
        val insideComponents: List<String>,
        val outsideComponents: List<String>
    )

    // 2. STRIDE威胁识别
    class STRIDEAnalyzer {
        // 针对数据流的STRIDE分析
        fun analyzeDataFlow(dataFlow: DataFlow): List<Threat> {
            val threats = mutableListOf<Threat>()

            // S - Spoofing(伪装)
            if (!dataFlow.authenticated) {
                threats.add(
                    Threat(
                        type = ThreatType.SPOOFING,
                        target = dataFlow,
                        description = "数据流未认证,攻击者可能伪装成发送方",
                        impact = Impact.HIGH,
                        mitigation = "实施双向认证(Mutual TLS或证书认证)"
                    )
                )
            }

            // T - Tampering(篡改)
            if (!dataFlow.encrypted) {
                threats.add(
                    Threat(
                        type = ThreatType.TAMPERING,
                        target = dataFlow,
                        description = "数据流未加密,攻击者可能中间人攻击篡改数据",
                        impact = Impact.HIGH,
                        mitigation = "使用TLS 1.3加密通信,实施消息完整性检查(HMAC)"
                    )
                )
            }

            // R - Repudiation(抵赖)
            // 检查是否有日志记录
            if (!hasAuditLogging(dataFlow)) {
                threats.add(
                    Threat(
                        type = ThreatType.REPUDIATION,
                        target = dataFlow,
                        description = "缺少审计日志,无法证明操作发生",
                        impact = Impact.MEDIUM,
                        mitigation = "实施详细审计日志,包括时间戳、源IP、操作类型"
                    )
                )
            }

            // I - Information Disclosure(信息泄露)
            if (containsSensitiveData(dataFlow) && !dataFlow.encrypted) {
                threats.add(
                    Threat(
                        type = ThreatType.INFORMATION_DISCLOSURE,
                        target = dataFlow,
                        description = "敏感数据明文传输,可能被窃听",
                        impact = Impact.CRITICAL,
                        mitigation = "启用端到端加密,实施数据脱敏"
                    )
                )
            }

            // D - Denial of Service(拒绝服务)
            if (!hasRateLimiting(dataFlow)) {
                threats.add(
                    Threat(
                        type = ThreatType.DENIAL_OF_SERVICE,
                        target = dataFlow,
                        description = "缺少速率限制,可能遭受DoS攻击",
                        impact = Impact.HIGH,
                        mitigation = "实施请求速率限制,添加防火墙规则"
                    )
                )
            }

            // E - Elevation of Privilege(权限提升)
            if (crossesTrustBoundary(dataFlow) && !hasInputValidation(dataFlow)) {
                threats.add(
                    Threat(
                        type = ThreatType.ELEVATION_OF_PRIVILEGE,
                        target = dataFlow,
                        description = "跨信任边界的输入未验证,可能导致注入攻击",
                        impact = Impact.CRITICAL,
                        mitigation = "严格验证所有输入,实施最小权限原则"
                    )
                )
            }

            return threats
        }

        // 针对进程的STRIDE分析
        fun analyzeProcess(process: Component.Process): List<Threat> {
            val threats = mutableListOf<Threat>()

            // S - Spoofing
            if (!hasStrongAuthentication(process)) {
                threats.add(
                    Threat(
                        type = ThreatType.SPOOFING,
                        target = process,
                        description = "进程认证机制弱,可能被伪装",
                        impact = Impact.HIGH,
                        mitigation = "实施进程间认证(如SELinux标签或Android UID检查)"
                    )
                )
            }

            // T - Tampering
            if (!hasCodeSigning(process)) {
                threats.add(
                    Threat(
                        type = ThreatType.TAMPERING,
                        target = process,
                        description = "进程代码未签名,可能被篡改",
                        impact = Impact.CRITICAL,
                        mitigation = "实施代码签名验证,使用Android APK签名"
                    )
                )
            }

            // I - Information Disclosure
            if (process.runAsUser == UserContext.ROOT) {
                threats.add(
                    Threat(
                        type = ThreatType.INFORMATION_DISCLOSURE,
                        target = process,
                        description = "进程以root权限运行,泄露风险高",
                        impact = Impact.HIGH,
                        mitigation = "降低进程权限,使用专用用户账户"
                    )
                )
            }

            // E - Elevation of Privilege
            if (hasExposedInterface(process) && !hasAccessControl(process)) {
                threats.add(
                    Threat(
                        type = ThreatType.ELEVATION_OF_PRIVILEGE,
                        target = process,
                        description = "进程暴露接口但无访问控制",
                        impact = Impact.CRITICAL,
                        mitigation = "实施基于角色的访问控制(RBAC)"
                    )
                )
            }

            return threats
        }

        // 针对数据存储的STRIDE分析
        fun analyzeDataStore(dataStore: Component.DataStore): List<Threat> {
            val threats = mutableListOf<Threat>()

            // T - Tampering
            if (!dataStore.encryptionAtRest) {
                threats.add(
                    Threat(
                        type = ThreatType.TAMPERING,
                        target = dataStore,
                        description = "数据未加密存储,可能被篡改",
                        impact = Impact.HIGH,
                        mitigation = "启用静态数据加密(Android EncryptedSharedPreferences)"
                    )
                )
            }

            // I - Information Disclosure
            if (dataStore.dataClassification >= DataClassification.CONFIDENTIAL &&
                !dataStore.encryptionAtRest) {
                threats.add(
                    Threat(
                        type = ThreatType.INFORMATION_DISCLOSURE,
                        target = dataStore,
                        description = "机密数据明文存储",
                        impact = Impact.CRITICAL,
                        mitigation = "使用Android Keystore加密敏感数据"
                    )
                )
            }

            // D - Denial of Service
            if (!hasBackup(dataStore)) {
                threats.add(
                    Threat(
                        type = ThreatType.DENIAL_OF_SERVICE,
                        target = dataStore,
                        description = "数据无备份,可能丢失导致服务不可用",
                        impact = Impact.HIGH,
                        mitigation = "实施定期自动备份和恢复测试"
                    )
                )
            }

            return threats
        }
    }

    // 3. 威胁数据模型
    data class Threat(
        val type: ThreatType,
        val target: Any,
        val description: String,
        val impact: Impact,
        val likelihood: Likelihood = Likelihood.MEDIUM, // 稍后用DREAD评估
        val mitigation: String,
        val status: ThreatStatus = ThreatStatus.IDENTIFIED
    )

    enum class ThreatType {
        SPOOFING,
        TAMPERING,
        REPUDIATION,
        INFORMATION_DISCLOSURE,
        DENIAL_OF_SERVICE,
        ELEVATION_OF_PRIVILEGE
    }

    enum class Impact {
        LOW, MEDIUM, HIGH, CRITICAL
    }

    enum class Likelihood {
        LOW, MEDIUM, HIGH
    }

    enum class ThreatStatus {
        IDENTIFIED,      // 已识别
        MITIGATED,       // 已缓解
        ACCEPTED,        // 已接受
        TRANSFERRED      // 已转移
    }
}

2.3 智能摄像头STRIDE分析案例

// 案例:智能摄像头威胁建模
class SmartCameraThreatModel {

    // 1. 系统建模
    fun buildSystemModel(): STRIDEThreatModel.SystemModel {
        val components = listOf(
            // 进程
            STRIDEThreatModel.Component.Process(
                id = "camera_app",
                name = "Camera Android App",
                runAsUser = UserContext.NORMAL_USER,
                exposedInterfaces = listOf(
                    Interface("HTTP API", 8080),
                    Interface("RTSP Stream", 554)
                )
            ),
            STRIDEThreatModel.Component.Process(
                id = "cloud_service",
                name = "Cloud Video Service",
                runAsUser = UserContext.SERVICE_ACCOUNT,
                exposedInterfaces = listOf(
                    Interface("HTTPS API", 443)
                )
            ),

            // 数据存储
            STRIDEThreatModel.Component.DataStore(
                id = "local_storage",
                name = "Local Video Storage",
                dataClassification = DataClassification.CONFIDENTIAL,
                encryptionAtRest = false // 威胁!
            ),
            STRIDEThreatModel.Component.DataStore(
                id = "cloud_storage",
                name = "Cloud Video Storage",
                dataClassification = DataClassification.CONFIDENTIAL,
                encryptionAtRest = true
            ),

            // 外部实体
            STRIDEThreatModel.Component.ExternalEntity(
                id = "user",
                name = "End User",
                trusted = true
            ),
            STRIDEThreatModel.Component.ExternalEntity(
                id = "attacker",
                name = "Network Attacker",
                trusted = false
            )
        )

        val dataFlows = listOf(
            // 用户通过App查看实时视频
            STRIDEThreatModel.DataFlow(
                from = "camera_app",
                to = "user",
                protocol = "RTSP",
                encrypted = false, // 威胁!
                authenticated = false, // 威胁!
                data = listOf("video_stream", "audio_stream")
            ),

            // App上传视频到云端
            STRIDEThreatModel.DataFlow(
                from = "camera_app",
                to = "cloud_service",
                protocol = "HTTPS",
                encrypted = true,
                authenticated = true,
                data = listOf("video_clips", "metadata")
            ),

            // 本地存储视频
            STRIDEThreatModel.DataFlow(
                from = "camera_app",
                to = "local_storage",
                protocol = "File System",
                encrypted = false, // 威胁!
                authenticated = true,
                data = listOf("video_recordings")
            )
        )

        val trustBoundaries = listOf(
            STRIDEThreatModel.TrustBoundary(
                name = "Device Boundary",
                insideComponents = listOf("camera_app", "local_storage"),
                outsideComponents = listOf("user", "cloud_service", "attacker")
            ),
            STRIDEThreatModel.TrustBoundary(
                name = "Cloud Boundary",
                insideComponents = listOf("cloud_service", "cloud_storage"),
                outsideComponents = listOf("camera_app")
            )
        )

        return STRIDEThreatModel.SystemModel(
            components = components,
            dataFlows = dataFlows,
            trustBoundaries = trustBoundaries
        )
    }

    // 2. 威胁识别
    fun identifyThreats(): List<STRIDEThreatModel.Threat> {
        val model = buildSystemModel()
        val analyzer = STRIDEThreatModel.STRIDEAnalyzer()

        val threats = mutableListOf<STRIDEThreatModel.Threat>()

        // 分析所有数据流
        model.dataFlows.forEach { dataFlow ->
            threats.addAll(analyzer.analyzeDataFlow(dataFlow))
        }

        // 分析所有进程
        model.components.filterIsInstance<STRIDEThreatModel.Component.Process>()
            .forEach { process ->
                threats.addAll(analyzer.analyzeProcess(process))
            }

        // 分析所有数据存储
        model.components.filterIsInstance<STRIDEThreatModel.Component.DataStore>()
            .forEach { dataStore ->
                threats.addAll(analyzer.analyzeDataStore(dataStore))
            }

        return threats
    }

    // 3. 生成威胁报告
    fun generateThreatReport(): ThreatReport {
        val threats = identifyThreats()

        // 按威胁类型分组
        val threatsByType = threats.groupBy { it.type }

        // 按影响级别排序
        val criticalThreats = threats.filter { it.impact == Impact.CRITICAL }
        val highThreats = threats.filter { it.impact == Impact.HIGH }

        return ThreatReport(
            totalThreats = threats.size,
            criticalThreats = criticalThreats.size,
            highThreats = highThreats.size,
            threatsByType = threatsByType.mapValues { it.value.size },
            topThreats = criticalThreats.take(5),
            recommendations = generateRecommendations(threats)
        )
    }

    data class ThreatReport(
        val totalThreats: Int,
        val criticalThreats: Int,
        val highThreats: Int,
        val threatsByType: Map<ThreatType, Int>,
        val topThreats: List<STRIDEThreatModel.Threat>,
        val recommendations: List<String>
    )

    private fun generateRecommendations(threats: List<STRIDEThreatModel.Threat>): List<String> {
        return threats
            .filter { it.impact >= Impact.HIGH }
            .map { it.mitigation }
            .distinct()
    }
}

3. DREAD风险评估方法

3.1 DREAD简介

DREAD 是用于量化风险的评分方法:

维度 含义 评分标准
D Damage Potential(潜在损害) 威胁造成的损害程度
R Reproducibility(可重现性) 攻击的易重现程度
E Exploitability(可利用性) 攻击的难易程度
A Affected Users(受影响用户) 受影响的用户数量
D Discoverability(可发现性) 威胁的易发现程度

每个维度评分0-10,总分0-50。

3.2 DREAD评分实现

// DREAD风险评估
class DREADRiskAssessment {

    // DREAD评分
    data class DREADScore(
        val damage: Int,           // 0-10
        val reproducibility: Int,  // 0-10
        val exploitability: Int,   // 0-10
        val affectedUsers: Int,    // 0-10
        val discoverability: Int   // 0-10
    ) {
        val total: Int
            get() = damage + reproducibility + exploitability + affectedUsers + discoverability

        val average: Double
            get() = total / 5.0

        val riskLevel: RiskLevel
            get() = when {
                average >= 8.0 -> RiskLevel.CRITICAL
                average >= 6.0 -> RiskLevel.HIGH
                average >= 4.0 -> RiskLevel.MEDIUM
                else -> RiskLevel.LOW
            }
    }

    enum class RiskLevel {
        LOW, MEDIUM, HIGH, CRITICAL
    }

    // 评估威胁
    fun assessThreat(threat: STRIDEThreatModel.Threat): DREADScore {
        return when (threat.type) {
            ThreatType.SPOOFING -> assessSpoofing(threat)
            ThreatType.TAMPERING -> assessTampering(threat)
            ThreatType.REPUDIATION -> assessRepudiation(threat)
            ThreatType.INFORMATION_DISCLOSURE -> assessInformationDisclosure(threat)
            ThreatType.DENIAL_OF_SERVICE -> assessDenialOfService(threat)
            ThreatType.ELEVATION_OF_PRIVILEGE -> assessElevationOfPrivilege(threat)
        }
    }

    // 评估伪装威胁
    private fun assessSpoofing(threat: STRIDEThreatModel.Threat): DREADScore {
        // 假设:智能摄像头视频流未认证
        return DREADScore(
            damage = 8,            // 高损害:攻击者可查看私密视频
            reproducibility = 10,  // 易重现:工具可自动化
            exploitability = 7,    // 中等难度:需要网络访问
            affectedUsers = 9,     // 所有用户
            discoverability = 6    // 中等可发现:需扫描网络
        )
        // 总分:40/50,平均:8.0,风险级别:CRITICAL
    }

    // 评估信息泄露威胁
    private fun assessInformationDisclosure(threat: STRIDEThreatModel.Threat): DREADScore {
        // 假设:本地视频文件未加密
        return DREADScore(
            damage = 9,            // 极高损害:隐私泄露
            reproducibility = 10,  // 易重现:任何人获得物理访问即可
            exploitability = 8,    // 较易利用:只需文件浏览器
            affectedUsers = 10,    // 所有用户
            discoverability = 7    // 较易发现:文件系统可见
        )
        // 总分:44/50,平均:8.8,风险级别:CRITICAL
    }

    // 自动评分(基于启发式规则)
    class AutoDREADScorer {
        fun autoScore(threat: STRIDEThreatModel.Threat): DREADScore {
            var damage = 5
            var reproducibility = 5
            var exploitability = 5
            var affectedUsers = 5
            var discoverability = 5

            // 根据威胁特征调整分数

            // Damage
            when (threat.impact) {
                Impact.CRITICAL -> damage = 10
                Impact.HIGH -> damage = 8
                Impact.MEDIUM -> damage = 5
                Impact.LOW -> damage = 3
            }

            // Reproducibility
            if (isAutomatable(threat)) {
                reproducibility = 10
            }

            // Exploitability
            if (requiresNoAuthentication(threat)) {
                exploitability += 3
            }
            if (requiresNoSpecialTools(threat)) {
                exploitability += 2
            }

            // Affected Users
            if (affectsAllUsers(threat)) {
                affectedUsers = 10
            } else if (affectsMostUsers(threat)) {
                affectedUsers = 7
            }

            // Discoverability
            if (isPubliclyVisible(threat)) {
                discoverability = 10
            } else if (requiresNetworkAccess(threat)) {
                discoverability = 7
            }

            return DREADScore(
                damage = damage.coerceIn(0, 10),
                reproducibility = reproducibility.coerceIn(0, 10),
                exploitability = exploitability.coerceIn(0, 10),
                affectedUsers = affectedUsers.coerceIn(0, 10),
                discoverability = discoverability.coerceIn(0, 10)
            )
        }

        private fun isAutomatable(threat: STRIDEThreatModel.Threat): Boolean {
            // 网络攻击通常可自动化
            return threat.description.contains("网络") ||
                   threat.description.contains("API")
        }

        private fun requiresNoAuthentication(threat: STRIDEThreatModel.Threat): Boolean {
            return threat.description.contains("未认证") ||
                   threat.description.contains("无认证")
        }

        private fun requiresNoSpecialTools(threat: STRIDEThreatModel.Threat): Boolean {
            // 如果只需要浏览器或文件浏览器
            return threat.description.contains("浏览器") ||
                   threat.description.contains("文件")
        }

        private fun affectsAllUsers(threat: STRIDEThreatModel.Threat): Boolean {
            return threat.description.contains("所有用户") ||
                   !threat.description.contains("特定")
        }

        private fun affectsMostUsers(threat: STRIDEThreatModel.Threat): Boolean {
            return threat.description.contains("大多数") ||
                   threat.description.contains("多数")
        }

        private fun isPubliclyVisible(threat: STRIDEThreatModel.Threat): Boolean {
            return threat.description.contains("公开") ||
                   threat.description.contains("可见")
        }

        private fun requiresNetworkAccess(threat: STRIDEThreatModel.Threat): Boolean {
            return threat.description.contains("网络") ||
                   threat.description.contains("远程")
        }
    }

    // 风险矩阵
    fun generateRiskMatrix(threats: List<Pair<STRIDEThreatModel.Threat, DREADScore>>): RiskMatrix {
        val matrix = Array(5) { Array<MutableList<String>>(5) { mutableListOf() } }

        threats.forEach { (threat, score) ->
            // X轴:可能性(平均Reproducibility和Exploitability)
            val likelihood = ((score.reproducibility + score.exploitability) / 2.0 / 2.5).toInt().coerceIn(0, 4)

            // Y轴:影响(平均Damage和AffectedUsers)
            val impact = ((score.damage + score.affectedUsers) / 2.0 / 2.5).toInt().coerceIn(0, 4)

            matrix[4 - impact][likelihood].add(threat.description.take(50))
        }

        return RiskMatrix(matrix)
    }

    data class RiskMatrix(val matrix: Array<Array<MutableList<String>>>) {
        fun print() {
            println("风险矩阵(Impact vs Likelihood):")
            println("     Low    Med    High   VHigh  Crit")
            val labels = arrayOf("Crit", "High", "Med ", "Low ", "VLow")
            matrix.forEachIndexed { index, row ->
                print("${labels[index]}  ")
                row.forEach { cell ->
                    val count = cell.size
                    print(String.format("%-6s ", if (count > 0) count.toString() else "-"))
                }
                println()
            }
        }
    }
}

4. PASTA威胁建模框架

4.1 PASTA简介

PASTA(Process for Attack Simulation and Threat Analysis) 是以攻击者视角进行的七阶段威胁建模框架。

七个阶段

PASTA威胁建模七阶段:

Stage 1: Define Objectives(定义目标)
├── 业务目标
├── 安全目标
└── 合规要求

Stage 2: Define Technical Scope(定义技术范围)
├── 系统架构
├── 技术栈
└── 依赖关系

Stage 3: Application Decomposition(应用分解)
├── 组件清单
├── 数据流
└── 信任边界

Stage 4: Threat Analysis(威胁分析)
├── 威胁情报
├── 攻击场景
└── 攻击树

Stage 5: Vulnerability Analysis(漏洞分析)
├── 已知漏洞(CVE)
├── 配置错误
└── 设计缺陷

Stage 6: Attack Modeling(攻击建模)
├── 攻击路径
├── Kill Chain
└── ATT&CK映射

Stage 7: Risk and Impact Analysis(风险和影响分析)
├── 业务影响
├── 风险评分
└── 缓解建议

4.2 PASTA实施案例:智能门锁

// PASTA威胁建模:智能门锁
class SmartLockPASTAModel {

    // Stage 1: Define Objectives
    data class SecurityObjectives(
        val businessObjectives: List<String> = listOf(
            "保护用户家庭安全",
            "防止未授权访问",
            "确保系统可用性(99.9%)"
        ),
        val securityRequirements: List<String> = listOf(
            "防止物理绕过",
            "防止网络攻击",
            "防止凭证泄露",
            "实现访问审计"
        ),
        val complianceRequirements: List<String> = listOf(
            "ETSI EN 303 645",
            "GDPR",
            "IoXt认证"
        )
    )

    // Stage 2: Define Technical Scope
    data class TechnicalScope(
        val components: List<String> = listOf(
            "智能门锁硬件",
            "Android配对App",
            "云端管理服务",
            "蓝牙通信模块",
            "本地加密存储"
        ),
        val protocols: List<String> = listOf(
            "Bluetooth Low Energy (BLE)",
            "HTTPS/TLS 1.3",
            "AES-256-GCM"
        ),
        val dependencies: List<String> = listOf(
            "Android Keystore",
            "Google Cloud Platform",
            "第三方蓝牙芯片"
        )
    )

    // Stage 3: Application Decomposition
    fun decomposeApplication(): ApplicationDecomposition {
        return ApplicationDecomposition(
            dataFlows = listOf(
                DataFlow("User", "Android App", "User Input", "Local"),
                DataFlow("Android App", "Smart Lock", "Unlock Command", "BLE Encrypted"),
                DataFlow("Android App", "Cloud Service", "Activity Log", "HTTPS"),
                DataFlow("Smart Lock", "Local Storage", "Access Log", "Encrypted File")
            ),
            assets = listOf(
                Asset("User Credentials", AssetValue.CRITICAL),
                Asset("Unlock Keys", AssetValue.CRITICAL),
                Asset("Access Logs", AssetValue.HIGH),
                Asset("User Personal Data", AssetValue.HIGH)
            ),
            trustBoundaries = listOf(
                "Physical Boundary (Door)",
                "Network Boundary (WiFi)",
                "Cloud Boundary"
            )
        )
    }

    data class ApplicationDecomposition(
        val dataFlows: List<DataFlow>,
        val assets: List<Asset>,
        val trustBoundaries: List<String>
    )

    data class Asset(
        val name: String,
        val value: AssetValue
    )

    enum class AssetValue {
        LOW, MEDIUM, HIGH, CRITICAL
    }

    // Stage 4: Threat Analysis
    fun analyzeThreat(): List<ThreatScenario> {
        return listOf(
            ThreatScenario(
                name = "BLE中间人攻击",
                description = "攻击者拦截BLE通信,重放解锁命令",
                attackVector = "Network",
                threatActor = "Opportunistic Attacker",
                motivation = "Entry to home",
                capability = "Medium"
            ),
            ThreatScenario(
                name = "暴力破解PIN码",
                description = "攻击者尝试所有PIN码组合",
                attackVector = "Physical",
                threatActor = "Determined Attacker",
                motivation = "Entry to home",
                capability = "Low"
            ),
            ThreatScenario(
                name = "固件逆向工程",
                description = "攻击者提取固件,分析加密算法",
                attackVector = "Physical",
                threatActor = "Advanced Attacker",
                motivation = "Mass exploitation",
                capability = "High"
            )
        )
    }

    data class ThreatScenario(
        val name: String,
        val description: String,
        val attackVector: String,
        val threatActor: String,
        val motivation: String,
        val capability: String
    )

    // Stage 5: Vulnerability Analysis
    fun analyzeVulnerabilities(): List<Vulnerability> {
        return listOf(
            Vulnerability(
                id = "VULN-001",
                name = "BLE配对无PIN码保护",
                cve = null,
                cvss = 7.5,
                description = "蓝牙配对过程不需要PIN码,任何人都可以配对",
                affectedComponent = "Smart Lock BLE Module",
                remediation = "实施Just Works配对改为Passkey Entry配对"
            ),
            Vulnerability(
                id = "VULN-002",
                name = "固件未加密",
                cve = null,
                cvss = 6.0,
                description = "固件以明文存储,可被提取分析",
                affectedComponent = "Smart Lock Firmware",
                remediation = "启用固件加密和防提取保护"
            ),
            Vulnerability(
                id = "VULN-003",
                name = "无速率限制",
                cve = null,
                cvss = 5.0,
                description = "解锁尝试无限制,可暴力破解",
                affectedComponent = "Smart Lock Authentication",
                remediation = "实施尝试次数限制(5次后锁定1小时)"
            )
        )
    }

    data class Vulnerability(
        val id: String,
        val name: String,
        val cve: String?,
        val cvss: Double,
        val description: String,
        val affectedComponent: String,
        val remediation: String
    )

    // Stage 6: Attack Modeling
    fun modelAttack(): AttackTree {
        return AttackTree(
            goal = "Unlock Smart Lock Without Authorization",
            children = listOf(
                AttackNode(
                    name = "Exploit Network Vulnerability",
                    children = listOf(
                        AttackNode("BLE MitM Attack", probability = 0.3, impact = "High"),
                        AttackNode("Replay Attack", probability = 0.4, impact = "High")
                    )
                ),
                AttackNode(
                    name = "Exploit Physical Vulnerability",
                    children = listOf(
                        AttackNode("Pick Lock Mechanism", probability = 0.1, impact = "High"),
                        AttackNode("Extract Firmware", probability = 0.05, impact = "Critical")
                    )
                ),
                AttackNode(
                    name = "Social Engineering",
                    children = listOf(
                        AttackNode("Phish User Credentials", probability = 0.2, impact = "Critical"),
                        AttackNode("Steal Physical Key", probability = 0.15, impact = "High")
                    )
                )
            )
        )
    }

    data class AttackTree(
        val goal: String,
        val children: List<AttackNode>
    )

    data class AttackNode(
        val name: String,
        val probability: Double = 0.0,
        val impact: String = "Unknown",
        val children: List<AttackNode> = emptyList()
    )

    // Stage 7: Risk and Impact Analysis
    fun assessRisk(): RiskAssessment {
        val vulnerabilities = analyzeVulnerabilities()
        val threats = analyzeThreats()

        val risks = vulnerabilities.map { vuln ->
            Risk(
                id = "RISK-${vuln.id}",
                vulnerability = vuln,
                threat = threats.firstOrNull { it.name.contains(vuln.name.substringBefore(" ")) },
                likelihood = calculateLikelihood(vuln),
                impact = calculateImpact(vuln),
                riskScore = vuln.cvss,
                mitigation = vuln.remediation
            )
        }

        return RiskAssessment(
            risks = risks.sortedByDescending { it.riskScore },
            acceptableRiskThreshold = 4.0,
            highRisks = risks.filter { it.riskScore >= 7.0 },
            mediumRisks = risks.filter { it.riskScore >= 4.0 && it.riskScore < 7.0 },
            lowRisks = risks.filter { it.riskScore < 4.0 }
        )
    }

    data class RiskAssessment(
        val risks: List<Risk>,
        val acceptableRiskThreshold: Double,
        val highRisks: List<Risk>,
        val mediumRisks: List<Risk>,
        val lowRisks: List<Risk>
    )

    data class Risk(
        val id: String,
        val vulnerability: Vulnerability,
        val threat: ThreatScenario?,
        val likelihood: String,
        val impact: String,
        val riskScore: Double,
        val mitigation: String
    )

    private fun calculateLikelihood(vuln: Vulnerability): String {
        return when {
            vuln.cvss >= 7.0 -> "High"
            vuln.cvss >= 4.0 -> "Medium"
            else -> "Low"
        }
    }

    private fun calculateImpact(vuln: Vulnerability): String {
        return when {
            vuln.affectedComponent.contains("Authentication") -> "Critical"
            vuln.affectedComponent.contains("Firmware") -> "High"
            else -> "Medium"
        }
    }
}

5. 威胁建模自动化工具

5.1 使用Microsoft Threat Modeling Tool

// 导出威胁模型到Microsoft TMT格式
class ThreatModelExporter {

    fun exportToTMT(model: STRIDEThreatModel.SystemModel, threats: List<STRIDEThreatModel.Threat>): String {
        // 生成TMT XML格式
        return buildString {
            appendLine("""<?xml version="1.0" encoding="utf-8"?>""")
            appendLine("""<ThreatModel>""")
            appendLine("""  <DrawingSurfaceList>""")

            // 导出组件
            model.components.forEach { component ->
                when (component) {
                    is STRIDEThreatModel.Component.Process -> {
                        appendLine("""    <DrawingSurfaceModel>""")
                        appendLine("""      <Id>${component.id}</Id>""")
                        appendLine("""      <Type>Process</Type>""")
                        appendLine("""      <Name>${component.name}</Name>""")
                        appendLine("""    </DrawingSurfaceModel>""")
                    }
                    is STRIDEThreatModel.Component.DataStore -> {
                        appendLine("""    <DrawingSurfaceModel>""")
                        appendLine("""      <Id>${component.id}</Id>""")
                        appendLine("""      <Type>DataStore</Type>""")
                        appendLine("""      <Name>${component.name}</Name>""")
                        appendLine("""    </DrawingSurfaceModel>""")
                    }
                    is STRIDEThreatModel.Component.ExternalEntity -> {
                        appendLine("""    <DrawingSurfaceModel>""")
                        appendLine("""      <Id>${component.id}</Id>""")
                        appendLine("""      <Type>ExternalEntity</Type>""")
                        appendLine("""      <Name>${component.name}</Name>""")
                        appendLine("""    </DrawingSurfaceModel>""")
                }
            }

            // 导出数据流
            model.dataFlows.forEach { dataFlow ->
                appendLine("""    <DrawingSurfaceModel>""")
                appendLine("""      <Type>DataFlow</Type>""")
                appendLine("""      <Source>${dataFlow.from}</Source>""")
                appendLine("""      <Target>${dataFlow.to}</Target>""")
                appendLine("""      <Protocol>${dataFlow.protocol}</Protocol>""")
                appendLine("""    </DrawingSurfaceModel>""")
            }

            appendLine("""  </DrawingSurfaceList>""")

            // 导出威胁
            appendLine("""  <ThreatList>""")
            threats.forEach { threat ->
                appendLine("""    <Threat>""")
                appendLine("""      <Type>${threat.type}</Type>""")
                appendLine("""      <Description>${threat.description}</Description>""")
                appendLine("""      <Priority>${threat.impact}</Priority>""")
                appendLine("""      <Mitigation>${threat.mitigation}</Mitigation>""")
                appendLine("""    </Threat>""")
            }
            appendLine("""  </ThreatList>""")

            appendLine("""</ThreatModel>""")
        }
    }
}

5.2 集成OWASP Dependency-Check

// 自动化漏洞扫描
class DependencyScan Scanner {

    // 扫描项目依赖的已知漏洞
    suspend fun scanDependencies(projectPath: String): ScanReport {
        // 1. 运行OWASP Dependency-Check
        val process = ProcessBuilder(
            "dependency-check",
            "--project", "MyIoTProject",
            "--scan", projectPath,
            "--format", "JSON",
            "--out", "/tmp/dependency-check-report"
        ).start()

        process.waitFor()

        // 2. 解析报告
        val reportFile = File("/tmp/dependency-check-report/dependency-check-report.json")
        val report = Json.decodeFromString<OWASPReport>(reportFile.readText())

        // 3. 提取高危漏洞
        val criticalVulnerabilities = report.dependencies
            .flatMap { it.vulnerabilities ?: emptyList() }
            .filter { it.cvssv3?.baseScore ?: 0.0 >= 7.0 }

        return ScanReport(
            totalDependencies = report.dependencies.size,
            vulnerableDependencies = report.dependencies.count { it.vulnerabilities != null },
            criticalVulnerabilities = criticalVulnerabilities.size,
            vulnerabilities = criticalVulnerabilities
        )
    }

    data class ScanReport(
        val totalDependencies: Int,
        val vulnerableDependencies: Int,
        val criticalVulnerabilities: Int,
        val vulnerabilities: List<CVE>
    )

    @Serializable
    data class OWASPReport(
        val dependencies: List<Dependency>
    )

    @Serializable
    data class Dependency(
        val fileName: String,
        val vulnerabilities: List<CVE>?
    )

    @Serializable
    data class CVE(
        val name: String,
        val description: String,
        val cvssv3: CVSSv3?
    )

    @Serializable
    data class CVSSv3(
        val baseScore: Double,
        val baseSeverity: String
    )
}

6. 持续威胁监控

// 生产环境持续威胁监控
class ContinuousThreatMonitoring {

    // 实时威胁检测
    suspend fun monitorThreats() {
        flow {
            while (true) {
                val events = collectSecurityEvents()
                emit(events)
                delay(5.seconds)
            }
        }.collect { events ->
            events.forEach { event ->
                val threat = detectThreat(event)
                if (threat != null) {
                    handleThreat(threat)
                }
            }
        }
    }

    // 威胁检测
    private fun detectThreat(event: SecurityEvent): DetectedThreat? {
        return when {
            // 检测暴力破解攻击
            event.type == EventType.AUTH_FAILURE &&
            getRecentFailures(event.source) > 10 -> {
                DetectedThreat(
                    type = ThreatType.BRUTE_FORCE,
                    severity = Severity.HIGH,
                    source = event.source,
                    description = "检测到暴力破解攻击尝试"
                )
            }

            // 检测异常访问模式
            isAnomalousAccess(event) -> {
                DetectedThreat(
                    type = ThreatType.ANOMALOUS_ACCESS,
                    severity = Severity.MEDIUM,
                    source = event.source,
                    description = "检测到异常访问模式"
                )
            }

            // 检测数据泄露尝试
            event.type == EventType.DATA_EXPORT &&
            event.volume > NORMAL_EXPORT_THRESHOLD -> {
                DetectedThreat(
                    type = ThreatType.DATA_EXFILTRATION,
                    severity = Severity.CRITICAL,
                    source = event.source,
                    description = "检测到异常数据导出"
                )
            }

            else -> null
        }
    }

    // 威胁响应
    private suspend fun handleThreat(threat: DetectedThreat) {
        when (threat.severity) {
            Severity.CRITICAL -> {
                // 立即阻断
                blockSource(threat.source)

                // 通知安全团队
                alertSecurityTeam(threat)

                // 记录日志
                auditLog.critical(threat.description)
            }

            Severity.HIGH -> {
                // 限制访问
                rateLimitSource(threat.source)

                // 通知管理员
                alertAdmin(threat)

                // 记录日志
                auditLog.high(threat.description)
            }

            Severity.MEDIUM -> {
                // 增强监控
                increaseMonitoring(threat.source)

                // 记录日志
                auditLog.medium(threat.description)
            }

            else -> {
                // 仅记录
                auditLog.info(threat.description)
            }
        }
    }

    data class DetectedThreat(
        val type: ThreatType,
        val severity: Severity,
        val source: String,
        val description: String,
        val timestamp: Instant = Clock.System.now()
    )
}

总结

威胁建模和风险评估是IoT安全的基石:

  1. STRIDE:系统化识别威胁类型
  2. DREAD:量化风险优先级
  3. PASTA:以攻击者视角全面分析
  4. 自动化工具:提高效率和覆盖率
  5. 持续监控:动态响应新威胁

参考资源


相关文章

  • 上一篇:《IoT安全认证体系对比与选择》
  • 下一篇:《IoT设备安全生命周期管理》
  • 相关:《IoT安全架构设计全面指南》
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐