智能防剁手程序 - 基于Python的智能消费决策系统

 

一、实际应用场景描述

 

小明是一名互联网公司的程序员,平时工作压力大,喜欢通过网购来缓解焦虑。每个月发工资后的第一周,他总是忍不住在各种购物APP上"剁手"。上周他一口气买了5件衣服、2双鞋子和一堆数码配件,结果收到货后发现:

 

- 30%的商品从未穿过/用过

- 40%的商品质量不如预期

- 剩下的30%虽然有用,但完全可以等促销再买

 

一个月后,看着信用卡账单和堆积的闲置物品,小明意识到自己陷入了冲动消费陷阱。他需要一套智能系统来帮助他在下单前冷静思考,做出理性决策。

 

二、引入痛点

 

现代消费者的三大痛点:

 

痛点 具体表现 后果

即时满足诱惑 刷短视频时看到种草视频,立刻想买 购买非必需品,浪费金钱

缺乏成本意识 只看标价,不考虑机会成本和使用频率 物品闲置率高,性价比低

情绪驱动消费 压力大、无聊、开心时容易乱买 财务压力增大,生活质量下降

 

传统解决方案的不足:

 

- ❌ 记账软件:事后记录,无法阻止冲动

- ❌ 购物清单:过于简单,缺乏智能分析

- ❌ 朋友劝阻:依赖他人,不够客观

 

三、核心逻辑讲解

 

智能决策框架(基于决策树+加权评分)

 

┌─────────────────────────────────────────────────────────────┐

│ 输入层 │

│ 购物欲望强度(0-100) + 商品信息 │

└─────────────────────────────────────────────────────────────┘

                              │

                              ▼

┌─────────────────────────────────────────────────────────────┐

│ 预处理层 │

│ 标准化数据 + 提取关键特征 │

└─────────────────────────────────────────────────────────────┘

                              │

                              ▼

┌─────────────────────────────────────────────────────────────┐

│ 决策引擎 │

│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │

│ │ 欲望冷却评估 │ │ 成本效益分析 │ │ 必要性判断 │ │

│ │ (权重:30%) │ │ (权重:50%) │ │ (权重:20%) │ │

│ └─────────────┘ └─────────────┘ └─────────────┘ │

└─────────────────────────────────────────────────────────────┘

                              │

                              ▼

┌─────────────────────────────────────────────────────────────┐

│ 输出层 │

│ 综合评分 + 建议(买/不买/等一等) + 冷却倒计时 │

└─────────────────────────────────────────────────────────────┘

 

核心算法:加权决策公式

 

Score = W_1 \times D_c + W_2 \times C_e + W_3 \times N_a

 

其中:

 

- D_c = 欲望冷却得分 (Desire Cooling)

- C_e = 成本效益得分 (Cost Effectiveness)

- N_a = 必要性得分 (Necessity Assessment)

- W_1, W_2, W_3 = 对应权重 (30%, 50%, 20%)

 

四、代码模块化实现

 

项目结构

 

smart_shopping_guard/

├── main.py # 主程序入口

├── config.py # 配置文件

├── decision_engine.py # 决策引擎核心模块

├── cooldown_manager.py # 冷却计时器模块

├── cost_calculator.py # 成本计算器模块

├── necessity_checker.py # 必要性检查模块

├── data_models.py # 数据模型定义

├── utils.py # 工具函数

└── README.md # 项目说明文档

 

1. config.py - 配置文件

 

"""

智能防剁手程序 - 配置文件

包含权重设置、阈值配置、冷却时间参数等

"""

 

from dataclasses import dataclass

from typing import Dict, Any

 

 

@dataclass

class DecisionWeights:

    """决策权重配置"""

    desire_cooling_weight: float = 0.30 # 欲望冷却权重

    cost_effectiveness_weight: float = 0.50 # 成本效益权重

    necessity_weight: float = 0.20 # 必要性权重

 

 

@dataclass 

class ScoreThresholds:

    """评分阈值配置"""

    strong_buy: float = 80.0 # 强烈推荐购买

    conditional_buy: float = 60.0 # 条件性购买

    wait_recommendation: float = 40.0 # 建议等待

    strong_block: float = 40.0 # 强烈阻止购买

 

 

@dataclass

class CooldownConfig:

    """冷却时间配置(分钟)"""

    low_desire: int = 5 # 低欲望冷却时间

    medium_desire: int = 15 # 中等欲望冷却时间

    high_desire: int = 30 # 高欲望冷却时间

    extreme_desire: int = 60 # 极度欲望冷却时间

 

 

@dataclass

class CostConfig:

    """成本计算配置"""

    daily_wage: float = 300.0 # 日均工资(用于机会成本计算)

    usage_threshold: int = 10 # 最低使用次数阈值

    lifespan_years: int = 3 # 物品预期寿命(年)

 

 

# 全局配置实例

DECISION_WEIGHTS = DecisionWeights()

SCORE_THRESHOLDS = ScoreThresholds()

COOLDOWN_CONFIG = CooldownConfig()

COST_CONFIG = CostConfig()

 

# 商品类别预设

PRODUCT_CATEGORIES = {

    "electronics": {"name": "数码电子", "necessity_base": 0.6},

    "clothing": {"name": "服装鞋帽", "necessity_base": 0.4},

    "food": {"name": "食品饮料", "necessity_base": 0.8},

    "home": {"name": "家居用品", "necessity_base": 0.7},

    "entertainment": {"name": "娱乐休闲", "necessity_base": 0.3},

    "luxury": {"name": "奢侈品", "necessity_base": 0.1},

    "daily": {"name": "日常用品", "necessity_base": 0.9}

}

 

# 欲望等级映射

DESIRE_LEVELS = {

    (0, 25): ("极低", COOLDOWN_CONFIG.low_desire),

    (26, 50): ("较低", COOLDOWN_CONFIG.low_desire),

    (51, 70): ("中等", COOLDOWN_CONFIG.medium_desire),

    (71, 85): ("较高", COOLDOWN_CONFIG.high_desire),

    (86, 100): ("极高", COOLDOWN_CONFIG.extreme_desire)

}

 

2. data_models.py - 数据模型

 

"""

数据模型定义模块

使用Python dataclass定义程序中的核心数据结构

"""

 

from dataclasses import dataclass, field

from datetime import datetime, timedelta

from typing import Optional, List, Dict, Any

import uuid

 

 

@dataclass

class ProductInfo:

    """

    商品信息数据类

    包含商品的所有关键属性,用于成本效益分析

    """

    product_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    name: str = "" # 商品名称

    price: float = 0.0 # 商品价格

    category: str = "daily" # 商品类别

    brand: str = "" # 品牌

    expected_lifespan_months: int = 12 # 预期使用寿命(月)

    planned_usage_frequency: int = 1 # 计划使用频率(次/周)

    is_on_sale: bool = False # 是否正在促销

    original_price: float = 0.0 # 原价(用于计算折扣)

    has_review_data: bool = False # 是否有评价数据

    positive_review_rate: float = 0.0 # 好评率

    return_policy_days: int = 7 # 退货政策天数

    

    def __post_init__(self):

        """初始化后处理,确保数据有效性"""

        if self.original_price == 0:

            self.original_price = self.price

        if self.expected_lifespan_months <= 0:

            self.expected_lifespan_months = 12

        if not 0 <= self.positive_review_rate <= 1:

            self.positive_review_rate = 0.5

 

 

@dataclass

class DesireInput:

    """

    购物欲望输入数据类

    量化用户的购买欲望,用于冷却评估

    """

    intensity: int = 0 # 欲望强度 0-100

    trigger_reason: str = "" # 触发原因

    emotional_state: str = "neutral" # 情绪状态

    time_since_last_purchase_hours: float = 0.0 # 距离上次购买时长

    is_stress_relief: bool = False # 是否为缓解压力

    is_boredom_driven: bool = False # 是否为缓解无聊

    is_trend_following: bool = False # 是否为跟风购买

    

    def validate(self) -> bool:

        """验证输入数据有效性"""

        return 0 <= self.intensity <= 100

 

 

@dataclass

class DecisionResult:

    """

    决策结果数据类

    包含完整的分析结果和建议

    """

    request_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    timestamp: datetime = field(default_factory=datetime.now)

    

    # 各项得分

    desire_cooling_score: float = 0.0

    cost_effectiveness_score: float = 0.0

    necessity_score: float = 0.0

    total_score: float = 0.0

    

    # 详细分析

    analysis_details: Dict[str, Any] = field(default_factory=dict)

    recommendations: List[str] = field(default_factory=list)

    

    # 最终建议

    final_decision: str = "" # "BUY" / "WAIT" / "BLOCK"

    cooling_time_minutes: int = 0

    reasoning: str = ""

    

    def to_dict(self) -> Dict[str, Any]:

        """转换为字典格式,便于JSON序列化"""

        return {

            "request_id": self.request_id,

            "timestamp": self.timestamp.isoformat(),

            "scores": {

                "desire_cooling": round(self.desire_cooling_score, 2),

                "cost_effectiveness": round(self.cost_effectiveness_score, 2),

                "necessity": round(self.necessity_score, 2),

                "total": round(self.total_score, 2)

            },

            "analysis": self.analysis_details,

            "recommendations": self.recommendations,

            "decision": self.final_decision,

            "cooling_time_minutes": self.cooling_time_minutes,

            "reasoning": self.reasoning

        }

 

 

@dataclass

class PurchaseHistory:

    """

    购买历史数据类

    用于跟踪用户的购买行为和后悔率

    """

    history_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    product_name: str = ""

    purchase_date: datetime = field(default_factory=datetime.now)

    price: float = 0.0

    actual_usage_count: int = 0 # 实际使用次数

    satisfaction_rating: int = 0 # 满意度评分 1-5

    is_regretted: bool = False # 是否后悔购买

    regret_reasons: List[str] = field(default_factory=list)

 

3. cooldown_manager.py - 冷却计时器模块

 

"""

欲望冷却管理器模块

负责评估和管理购物欲望的冷却过程

核心功能:根据欲望强度和触发原因计算冷却时间和得分

"""

 

from datetime import datetime, timedelta

from typing import Tuple, Dict, Any

from data_models import DesireInput, PRODUCT_CATEGORIES, DESIRE_LEVELS

from config import DECISION_WEIGHTS, SCORE_THRESHOLDS

 

 

class CooldownManager:

    """

    欲望冷却管理器

    

    核心逻辑:

    1. 根据欲望强度确定冷却时间

    2. 分析触发原因的影响因子

    3. 计算欲望冷却得分

    """

    

    def __init__(self):

        """初始化冷却管理器"""

        self.current_cooldowns: Dict[str, datetime] = {} # 存储当前冷却中的请求

        

    def calculate_cooldown(

        self, 

        desire_input: DesireInput,

        product_info: 'ProductInfo'

    ) -> Tuple[int, float, Dict[str, Any]]:

        """

        计算冷却时间和欲望冷却得分

        

        Args:

            desire_input: 购物欲望输入

            product_info: 商品信息

            

        Returns:

            Tuple[冷却分钟数, 欲望冷却得分, 详细分析]

        """

        # 1. 确定基础冷却时间(基于欲望强度)

        base_cooldown = self._get_base_cooldown(desire_input.intensity)

        

        # 2. 分析触发原因的影响

        trigger_impact = self._analyze_trigger_reasons(desire_input)

        

        # 3. 考虑商品类别的影响

        category_impact = self._analyze_category_impact(product_info.category)

        

        # 4. 计算调整后的冷却时间

        adjusted_cooldown = self._adjust_cooldown_time(

            base_cooldown, 

            trigger_impact, 

            category_impact

        )

        

        # 5. 计算欲望冷却得分

        cooling_score = self._calculate_cooling_score(

            desire_input, 

            trigger_impact, 

            category_impact

        )

        

        # 6. 生成详细分析

        analysis = {

            "base_cooldown_minutes": base_cooldown,

            "trigger_impact_factor": trigger_impact["factor"],

            "category_impact_factor": category_impact["factor"],

            "adjusted_cooldown_minutes": adjusted_cooldown,

            "desire_level": self._get_desire_level_name(desire_input.intensity),

            "risk_factors": self._identify_risk_factors(desire_input, product_info)

        }

        

        return adjusted_cooldown, cooling_score, analysis

    

    def _get_base_cooldown(self, desire_intensity: int) -> int:

        """

        根据欲望强度获取基础冷却时间

        

        Args:

            desire_intensity: 欲望强度 0-100

            

        Returns:

            基础冷却时间(分钟)

        """

        for (min_val, max_val), (level_name, cooldown) in DESIRE_LEVELS.items():

            if min_val <= desire_intensity <= max_val:

                return cooldown

        return 5 # 默认值

    

    def _get_desire_level_name(self, intensity: int) -> str:

        """获取欲望等级名称"""

        for (min_val, max_val), (level_name, _) in DESIRE_LEVELS.items():

            if min_val <= intensity <= max_val:

                return level_name

        return "未知"

    

    def _analyze_trigger_reasons(self, desire_input: DesireInput) -> Dict[str, Any]:

        """

        分析触发购买欲望的原因及其影响

        

        风险因子权重:

        - 缓解压力:高风险 (+30%冷却时间)

        - 缓解无聊:中风险 (+20%冷却时间)  

        - 跟风购买:高风险 (+40%冷却时间)

        - 情绪驱动:中高风险 (+25%冷却时间)

        """

        impact_factor = 1.0 # 基础影响因子

        risk_flags = []

        risk_score = 0

        

        # 分析各种触发原因

        if desire_input.is_stress_relief:

            impact_factor += 0.30

            risk_flags.append("压力驱动消费")

            risk_score += 30

        

        if desire_input.is_boredom_driven:

            impact_factor += 0.20

            risk_flags.append("无聊驱动消费")

            risk_score += 20

            

        if desire_input.is_trend_following:

            impact_factor += 0.40

            risk_flags.append("跟风购买")

            risk_score += 40

            

        # 分析情绪状态

        emotion_multipliers = {

            "anxious": 1.25, # 焦虑状态增加风险

            "excited": 1.15, # 兴奋状态略微增加风险

            "sad": 1.20, # 悲伤状态增加风险

            "angry": 1.35, # 愤怒状态显著增加风险

            "neutral": 1.0, # 中性状态无额外影响

            "happy": 0.95 # 平静快乐状态略微降低风险

        }

        emotion_factor = emotion_multipliers.get(desire_input.emotional_state, 1.0)

        impact_factor *= emotion_factor

        

        if desire_input.emotional_state in ["anxious", "angry"]:

            risk_flags.append(f"负面情绪驱动({desire_input.emotional_state})")

            risk_score += 15

        

        # 分析距离上次购买的时间

        hours_since_last = desire_input.time_since_last_purchase_hours

        if hours_since_last < 24:

            impact_factor += 0.15

            risk_flags.append("短期内重复购买倾向")

            risk_score += 15

        elif hours_since_last > 168: # 超过一周

            impact_factor -= 0.10 # 长时间未购买,略微降低风险

            

        return {

            "factor": min(impact_factor, 2.0), # 限制最大影响因子

            "risk_flags": risk_flags,

            "risk_score": min(risk_score, 100),

            "emotion_factor": emotion_factor

        }

    

    def _analyze_category_impact(self, category: str) -> Dict[str, Any]:

        """

        分析商品类别对冷却需求的影响

        

        不同类别的风险系数:

        - 奢侈品:高风险 (1.5x)

        - 娱乐休闲:中高风险 (1.3x)

        - 服装鞋帽:中等风险 (1.2x)

        - 数码电子:中等风险 (1.15x)

        - 家居用品:低风险 (1.05x)

        - 日常用品:最低风险 (0.95x)

        - 食品饮料:低风险 (0.90x)

        """

        category_risk_map = {

            "luxury": {"factor": 1.5, "risk_level": "high"},

            "entertainment": {"factor": 1.3, "risk_level": "medium-high"},

            "clothing": {"factor": 1.2, "risk_level": "medium"},

            "electronics": {"factor": 1.15, "risk_level": "medium"},

            "home": {"factor": 1.05, "risk_level": "low"},

            "daily": {"factor": 0.95, "risk_level": "very-low"},

            "food": {"factor": 0.90, "risk_level": "very-low"}

        }

        

        category_info = category_risk_map.get(category, {"factor": 1.0, "risk_level": "unknown"})

        return {

            "factor": category_info["factor"],

            "risk_level": category_info["risk_level"],

            "category_name": PRODUCT_CATEGORIES.get(category, {}).get("name", "未知")

        }

    

    def _adjust_cooldown_time(

        self, 

        base_cooldown: int, 

        trigger_impact: Dict[str, Any], 

        category_impact: Dict[str, Any]

    ) -> int:

        """

        调整冷却时间

        

        Args:

            base_cooldown: 基础冷却时间

            trigger_impact: 触发原因影响

            category_impact: 类别影响

            

        Returns:

            调整后的冷却时间(分钟)

        """

        adjusted = base_cooldown * trigger_impact["factor"] * category_impact["factor"]

        return max(int(adjusted), 1) # 最少1分钟冷却

    

    def _calculate_cooling_score(

        self, 

        desire_input: DesireInput, 

        trigger_impact: Dict[str, Any],

        category_impact: Dict[str, Any]

    ) -> float:

        """

        计算欲望冷却得分 (0-100)

        

        得分越高,表示越需要冷却,越不应该立即购买

        

        计算公式:

        Score = 欲望强度得分 × 触发原因风险系数 × 类别风险系数 × 调节因子

        """

        # 基础得分 = 欲望强度

        base_score = desire_input.intensity

        

        # 触发原因风险调整

        trigger_penalty = trigger_impact["risk_score"] * 0.3

        

        # 类别风险调整

        category_penalty = 0

        if category_impact["risk_level"] == "high":

            category_penalty = 20

        elif category_impact["risk_level"] == "medium-high":

            category_penalty = 15

        elif category_impact["risk_level"] == "medium":

            category_penalty = 10

        elif category_impact["risk_level"] == "low":

            category_penalty = 5

        

        # 计算最终得分

        final_score = min(base_score + trigger_penalty + category_penalty, 100)

        

        return final_score

    

    def _identify_risk_factors(

        self, 

        desire_input: DesireInput, 

        product_info: 'ProductInfo'

    ) -> List[str]:

        """识别所有风险因素"""

        risk_factors = []

        

        # 欲望强度风险

        if desire_input.intensity >= 80:

            risk_factors.append("欲望强度过高(≥80)")

        elif desire_input.intensity >= 60:

            risk_factors.append("欲望强度较高(60-79)")

            

        # 触发原因风险

        if desire_input.is_stress_relief:

            risk_factors.append("压力缓解型消费")

        if desire_input.is_boredom_driven:

            risk_factors.append("无聊消遣型消费")

        if desire_input.is_trend_following:

            risk_factors.append("潮流跟风型消费")

            

        # 商品类别风险

        category_info = PRODUCT_CATEGORIES.get(product_info.category, {})

        if category_info.get("necessity_base", 0.5) < 0.4:

            risk_factors.append(f"低必要性类别({category_info.get('name', '未知')})")

            

        # 价格风险

        if product_info.price >= 1000:

            risk_factors.append("高价商品风险")

            

        # 评价风险

        if product_info.has_review_data and product_info.positive_review_rate < 0.7:

            risk_factors.append(f"好评率偏低({product_info.positive_review_rate:.1%})")

            

        return risk_factors

    

    def start_cooldown(self, request_id: str, duration_minutes: int) -> datetime:

        """

        开始一个冷却计时

        

        Args:

            request_id: 请求唯一标识

            duration_minutes: 冷却时长(分钟)

            

        Returns:

            冷却结束时间

        """

        end_time = datetime.now() + timedelta(minutes=duration_minutes)

        self.current_cooldowns[request_id] = end_time

        return end_time

    

    def check_cooldown_status(self, request_id: str) -> Tuple[bool, str]:

        """

        检查冷却状态

        

        Args:

            request_id: 请求唯一标识

            

        Returns:

            Tuple[是否仍在冷却中, 剩余时间描述]

        """

        if request_id not in self.current_cooldowns:

            return False, "无活跃冷却"

            

        end_time = self.current_cooldowns[request_id]

        now = datetime.now()

        

        if now >= end_time:

            del self.current_cooldowns[request_id]

            return False, "冷却已完成"

        

        remaining = end_time - now

        remaining_minutes = int(remaining.total_seconds() / 60)

        remaining_seconds = int(remaining.total_seconds() % 60)

        

        return True, f"{remaining_minutes}分{remaining_seconds}秒"

 

4. cost_calculator.py - 成本计算器模块

 

"""

成本计算器模块

负责计算商品的真实成本和效益比

核心功能:TCO计算、机会成本分析、使用频率评估

"""

 

from dataclasses import dataclass

from typing import Tuple, Dict, Any

from datetime import datetime

from data_models import ProductInfo

from config import COST_CONFIG, SCORE_THRESHOLDS

 

 

@dataclass

class CostAnalysisResult:

    """成本分析结果"""

    total_cost_of_ownership: float = 0.0 # 总拥有成本

    opportunity_cost: float = 0.0 # 机会成本

    cost_per_use: float = 0.0 # 每次使用成本

    value_score: float = 0.0 # 价值得分

    cost_effectiveness_score: float = 0.0 # 成本效益得分

    analysis_details: Dict[str, Any] = None

 

 

class CostCalculator:

    """

    成本计算器

    

    核心概念:

    - TCO (Total Cost of Ownership): 总拥有成本

    - Opportunity Cost: 机会成本(购买此商品放弃的其他选择)

    - Cost Per Use: 每次使用成本

    """

    

    def __init__(self):

        """初始化成本计算器"""

        self.daily_wage = COST_CONFIG.daily_wage

        self.hourly_wage = self.daily_wage / 8 # 假设8小时工作制

        

    def analyze_cost_effectiveness(

        self, 

        product_info: ProductInfo

    ) -> Tuple[float, CostAnalysisResult]:

        """

        分析商品成本效益

        

        Args:

            product_info: 商品信息

            

        Returns:

            Tuple[成本效益得分, 详细分析结果]

        """

        # 1. 计算总拥有成本 (TCO)

        tco = self._calculate_tco(product_info)

        

        # 2. 计算机会成本

        opportunity_cost = self._calculate_opportunity_cost(tco, product_info)

        

        # 3. 计算每次使用成本

        cost_per_use = self._calculate_cost_per_use(product_info)

        

        # 4. 计算价值得分

        value_score = self._calculate_value_score(product_info, tco)

        

        # 5. 计算最终成本效益得分

        effectiveness_score = self._calculate_effectiveness_score(

            product_info, tco, opportunity_cost, cost_per_use, value_score

        )

        

        # 6. 生成详细分析

        analysis = CostAnalysisResult(

            total_cost_of_ownership=tco,

            opportunity_cost=opportunity_cost,

            cost_per_use=cost_per_use,

            value_score=value_score,

            cost_effectiveness_score=effectiveness_score,

            analysis_details=self._generate_analysis_details(

利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!

          

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐