SmallThinker-3B-Preview实战教程:构建可审计的AI决策日志追踪系统

1. 引言:为什么我们需要追踪AI的“思考”过程?

你有没有遇到过这种情况?你问AI一个问题,它给出了一个答案,但你完全不知道它是怎么得出这个结论的。就像考试时只看到了最终分数,却看不到解题步骤一样,心里总觉得不踏实。

在金融风控、医疗诊断、法律咨询这些关键领域,AI的“黑箱”决策尤其让人担忧。如果AI判断一笔交易有风险,我们需要知道它考虑了哪些因素;如果AI给出医疗建议,医生需要了解它的推理逻辑。

这就是我们今天要解决的问题——如何让AI的思考过程变得透明、可追溯。而SmallThinker-3B-Preview模型,正是解决这个问题的绝佳工具。

学习目标

  • 理解什么是AI决策日志追踪系统
  • 掌握SmallThinker-3B-Preview的核心特性
  • 学会构建一个完整的可审计日志系统
  • 了解如何在实际业务中应用这个系统

前置知识

  • 基本的Python编程经验
  • 对AI模型有初步了解(不需要深入)
  • 会用命令行执行简单命令

教程价值: 即使你是AI新手,也能跟着本教程一步步搭建起专业的日志追踪系统。我们会用最直白的方式讲解,避开复杂的技术术语,专注于“怎么做”而不是“为什么”。

2. SmallThinker-3B-Preview:专为透明推理而生

2.1 模型简介:小而精的推理专家

SmallThinker-3B-Preview是一个很有意思的模型。它是在Qwen2.5-3b-Instruct模型基础上微调而来的,但它的设计目标很特别——专注于生成详细的推理过程。

你可以把它想象成一个特别爱“写草稿”的学生。普通AI模型可能直接给你答案,但SmallThinker会把自己的思考步骤都写出来,让你看到它是怎么一步步得出结论的。

模型特点

  • 体积小巧:只有30亿参数,可以在普通电脑上运行
  • 推理详细:专门训练来生成长链的思考过程
  • 开源透明:训练数据和方法都公开,你可以完全信任它

2.2 核心能力:不只是答案,更是思考过程

这个模型最厉害的地方在于它的训练数据。开发者创建了一个专门的数据集,里面超过75%的样本输出都超过8000个token(相当于几千字)。这些样本都是详细的推理过程,模型就是通过学习这些“思考范例”来学会如何展示自己的推理。

它能做什么

  • 回答问题时展示完整的推理链条
  • 在复杂决策中提供多个思考角度
  • 记录每个判断的依据和逻辑
  • 生成结构化的决策日志

为什么选择它: 如果你需要AI的决策可以被审查、可以被解释、可以被追溯,那么SmallThinker是目前最好的选择之一。它的设计初衷就是为了解决AI透明性问题。

3. 环境准备:快速部署SmallThinker

3.1 系统要求

在开始之前,我们先看看需要准备什么:

硬件要求

  • CPU:4核以上(推荐8核)
  • 内存:16GB以上(推荐32GB)
  • 存储:至少10GB可用空间
  • GPU:可选,有GPU会更快

软件要求

  • 操作系统:Linux、macOS或Windows(WSL2)
  • Python 3.8或更高版本
  • pip包管理工具

3.2 一键安装部署

我们使用Ollama来管理模型,这是最简单的方式:

# 1. 安装Ollama(Linux/macOS)
curl -fsSL https://ollama.ai/install.sh | sh

# 2. 拉取SmallThinker模型
ollama pull smallthinker:3b

# 3. 运行模型
ollama run smallthinker:3b

如果你在Windows上,可以直接下载Ollama的安装程序,然后打开命令行执行第二步和第三步。

验证安装: 安装完成后,在命令行输入:

ollama list

你应该能看到smallthinker:3b在模型列表中。

3.3 通过Web界面使用

如果你更喜欢图形界面,Ollama也提供了Web界面:

  1. 确保Ollama服务正在运行
  2. 打开浏览器访问:http://localhost:11434
  3. 在模型选择下拉框中找到并选择“smallthinker:3b”
  4. 在下方输入框中提问即可

界面很简单直观,就像使用聊天软件一样。你可以在这里测试模型的基本功能。

4. 构建决策日志追踪系统

4.1 系统架构设计

我们的日志追踪系统需要记录以下几个关键信息:

  • 用户输入的问题
  • AI的完整思考过程
  • 最终答案
  • 思考过程中的关键节点
  • 时间戳和会话ID

系统组件

用户输入 → 日志记录器 → SmallThinker模型 → 结果解析器 → 数据库存储
         ↓
     实时监控面板

4.2 核心代码实现

让我们从最简单的版本开始,逐步完善功能。

基础版本:记录完整对话

import json
import time
from datetime import datetime
import requests

class AILogger:
    def __init__(self, model_name="smallthinker:3b"):
        self.model_name = model_name
        self.base_url = "http://localhost:11434/api/generate"
        self.session_id = self._generate_session_id()
        
    def _generate_session_id(self):
        """生成唯一的会话ID"""
        return f"session_{int(time.time())}_{hash(str(time.time()))}"
    
    def ask_with_logging(self, question):
        """提问并记录完整过程"""
        
        # 1. 准备请求数据
        payload = {
            "model": self.model_name,
            "prompt": question,
            "stream": False,
            "options": {
                "temperature": 0.7,
                "top_p": 0.9
            }
        }
        
        # 2. 记录开始时间
        start_time = time.time()
        
        # 3. 发送请求到模型
        try:
            response = requests.post(self.base_url, json=payload)
            response.raise_for_status()
            result = response.json()
        except Exception as e:
            return {"error": str(e)}
        
        # 4. 计算处理时间
        processing_time = time.time() - start_time
        
        # 5. 构建日志记录
        log_entry = {
            "session_id": self.session_id,
            "timestamp": datetime.now().isoformat(),
            "user_input": question,
            "model_response": result.get("response", ""),
            "thinking_process": self._extract_thinking(result.get("response", "")),
            "processing_time": round(processing_time, 2),
            "model_used": self.model_name,
            "tokens_used": result.get("total_duration", 0)
        }
        
        # 6. 保存日志
        self._save_log(log_entry)
        
        return {
            "answer": result.get("response", ""),
            "log_id": self.session_id
        }
    
    def _extract_thinking(self, response):
        """从响应中提取思考过程"""
        # SmallThinker的响应通常包含明确的推理标记
        thinking_lines = []
        lines = response.split('\n')
        
        for line in lines:
            line = line.strip()
            if line.startswith("思考:") or line.startswith("推理:") or line.startswith("步骤"):
                thinking_lines.append(line)
            elif "因为" in line or "所以" in line or "因此" in line:
                thinking_lines.append(line)
        
        return "\n".join(thinking_lines) if thinking_lines else "未提取到明确的思考过程"
    
    def _save_log(self, log_entry):
        """保存日志到文件"""
        filename = f"ai_logs_{datetime.now().strftime('%Y%m%d')}.json"
        
        try:
            # 读取现有日志
            try:
                with open(filename, 'r', encoding='utf-8') as f:
                    logs = json.load(f)
            except FileNotFoundError:
                logs = []
            
            # 添加新日志
            logs.append(log_entry)
            
            # 保存回文件
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(logs, f, ensure_ascii=False, indent=2)
                
            print(f"日志已保存到 {filename}")
            
        except Exception as e:
            print(f"保存日志时出错: {e}")
            # 如果文件保存失败,至少打印出来
            print("当前日志内容:", json.dumps(log_entry, ensure_ascii=False, indent=2))

# 使用示例
if __name__ == "__main__":
    logger = AILogger()
    
    # 示例问题
    question = "如果一家公司年收入1000万,成本800万,税率25%,请计算净利润并解释计算过程。"
    
    result = logger.ask_with_logging(question)
    print("答案:", result["answer"])
    print("日志ID:", result["log_id"])

这个基础版本已经可以工作了。它会记录每次对话,并把思考过程提取出来单独保存。

4.3 增强版本:结构化日志和实时监控

现在让我们添加更多功能,让系统更实用:

import sqlite3
from contextlib import contextmanager
import hashlib

class EnhancedAILogger(AILogger):
    def __init__(self, model_name="smallthinker:3b"):
        super().__init__(model_name)
        self.db_path = "ai_decision_logs.db"
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            
            # 创建主日志表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS decision_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    session_id TEXT NOT NULL,
                    timestamp DATETIME NOT NULL,
                    user_input TEXT NOT NULL,
                    final_answer TEXT,
                    processing_time REAL,
                    model_name TEXT,
                    token_count INTEGER,
                    log_hash TEXT UNIQUE
                )
            ''')
            
            # 创建思考过程表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS thinking_steps (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    log_id INTEGER,
                    step_number INTEGER,
                    step_content TEXT,
                    step_type TEXT,
                    FOREIGN KEY (log_id) REFERENCES decision_logs (id)
                )
            ''')
            
            # 创建关键决策点表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS decision_points (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    log_id INTEGER,
                    point_name TEXT,
                    alternatives TEXT,
                    chosen_option TEXT,
                    reasoning TEXT,
                    confidence_score REAL,
                    FOREIGN KEY (log_id) REFERENCES decision_logs (id)
                )
            ''')
            
            conn.commit()
    
    @contextmanager
    def _get_db_connection(self):
        """获取数据库连接"""
        conn = sqlite3.connect(self.db_path)
        try:
            yield conn
        finally:
            conn.close()
    
    def ask_with_enhanced_logging(self, question, context=None):
        """增强版的提问和日志记录"""
        
        # 如果有上下文,添加到问题中
        if context:
            full_prompt = f"上下文信息:{context}\n\n问题:{question}\n\n请展示完整的思考过程。"
        else:
            full_prompt = f"{question}\n\n请展示完整的思考过程。"
        
        # 调用父类方法获取响应
        result = super().ask_with_logging(full_prompt)
        
        if "error" in result:
            return result
        
        # 解析响应,提取结构化信息
        response_text = result["answer"]
        structured_info = self._parse_structured_response(response_text)
        
        # 生成日志哈希(用于去重)
        log_hash = self._generate_log_hash(question, response_text)
        
        # 保存到数据库
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            
            # 插入主日志
            cursor.execute('''
                INSERT INTO decision_logs 
                (session_id, timestamp, user_input, final_answer, processing_time, model_name, log_hash)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                self.session_id,
                datetime.now().isoformat(),
                question,
                structured_info.get("final_answer", ""),
                result.get("processing_time", 0),
                self.model_name,
                log_hash
            ))
            
            log_id = cursor.lastrowid
            
            # 插入思考步骤
            for i, step in enumerate(structured_info.get("thinking_steps", []), 1):
                cursor.execute('''
                    INSERT INTO thinking_steps (log_id, step_number, step_content, step_type)
                    VALUES (?, ?, ?, ?)
                ''', (log_id, i, step["content"], step.get("type", "general")))
            
            # 插入决策点
            for point in structured_info.get("decision_points", []):
                cursor.execute('''
                    INSERT INTO decision_points 
                    (log_id, point_name, alternatives, chosen_option, reasoning, confidence_score)
                    VALUES (?, ?, ?, ?, ?, ?)
                ''', (
                    log_id,
                    point.get("name", ""),
                    json.dumps(point.get("alternatives", [])),
                    point.get("chosen", ""),
                    point.get("reasoning", ""),
                    point.get("confidence", 0.0)
                ))
            
            conn.commit()
        
        return {
            "answer": structured_info.get("final_answer", ""),
            "log_id": log_id,
            "session_id": self.session_id,
            "thinking_steps": structured_info.get("thinking_steps", []),
            "decision_points": structured_info.get("decision_points", [])
        }
    
    def _parse_structured_response(self, response):
        """解析响应,提取结构化信息"""
        # 这是一个简化的解析器,实际使用时可以根据需要增强
        
        lines = response.split('\n')
        thinking_steps = []
        decision_points = []
        current_answer = ""
        
        for line in lines:
            line = line.strip()
            
            # 提取思考步骤
            if line.startswith("步骤") or line.startswith("1.") or line.startswith("2."):
                thinking_steps.append({
                    "content": line,
                    "type": "reasoning_step"
                })
            
            # 提取决策点(假设格式:决策点:...)
            elif "决策点:" in line or "选择:" in line:
                parts = line.split(":")
                if len(parts) > 1:
                    decision_points.append({
                        "name": parts[0].replace("决策点", "").strip(),
                        "chosen": parts[1].strip(),
                        "reasoning": "从上下文推断",
                        "confidence": 0.8
                    })
            
            # 识别最终答案
            elif line.startswith("答案:") or line.startswith("结论:"):
                current_answer = line.split(":", 1)[1] if ":" in line else line
        
        # 如果没有明确标记,取最后一段作为答案
        if not current_answer and lines:
            current_answer = lines[-1]
        
        return {
            "final_answer": current_answer,
            "thinking_steps": thinking_steps,
            "decision_points": decision_points
        }
    
    def _generate_log_hash(self, question, response):
        """生成日志哈希值"""
        content = f"{question}{response}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def query_logs(self, session_id=None, start_date=None, end_date=None):
        """查询日志"""
        query = "SELECT * FROM decision_logs WHERE 1=1"
        params = []
        
        if session_id:
            query += " AND session_id = ?"
            params.append(session_id)
        
        if start_date:
            query += " AND timestamp >= ?"
            params.append(start_date)
        
        if end_date:
            query += " AND timestamp <= ?"
            params.append(end_date)
        
        query += " ORDER BY timestamp DESC"
        
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            columns = [description[0] for description in cursor.description]
            results = [dict(zip(columns, row)) for row in cursor.fetchall()]
        
        return results
    
    def get_thinking_process(self, log_id):
        """获取特定日志的思考过程"""
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            
            # 获取思考步骤
            cursor.execute('''
                SELECT step_number, step_content, step_type 
                FROM thinking_steps 
                WHERE log_id = ? 
                ORDER BY step_number
            ''', (log_id,))
            
            thinking_steps = cursor.fetchall()
            
            # 获取决策点
            cursor.execute('''
                SELECT point_name, alternatives, chosen_option, reasoning, confidence_score
                FROM decision_points
                WHERE log_id = ?
            ''', (log_id,))
            
            decision_points = cursor.fetchall()
        
        return {
            "thinking_steps": thinking_steps,
            "decision_points": decision_points
        }

# 使用示例
if __name__ == "__main__":
    enhanced_logger = EnhancedAILogger()
    
    # 示例:金融风险评估
    context = "客户A,年龄35岁,年收入50万,有房贷100万,信用卡额度20万,近期有3次大额消费"
    question = "请评估该客户的信用风险等级,并说明评估依据"
    
    result = enhanced_logger.ask_with_enhanced_logging(question, context)
    
    print("=" * 50)
    print("最终答案:")
    print(result["answer"])
    print("\n思考步骤:")
    for step in result["thinking_steps"]:
        print(f"- {step['content']}")
    print("\n决策点:")
    for point in result["decision_points"]:
        print(f"{point['name']}: 选择 {point['chosen']},理由:{point['reasoning']}")
    print("=" * 50)
    
    # 查询日志
    logs = enhanced_logger.query_logs()
    print(f"\n共找到 {len(logs)} 条日志记录")

4.4 添加Web监控界面

让我们创建一个简单的Web界面来查看日志:

from flask import Flask, render_template, jsonify, request
import json

app = Flask(__name__)
logger = EnhancedAILogger()

@app.route('/')
def index():
    """主页面"""
    return '''
    <!DOCTYPE html>
    <html>
    <head>
        <title>AI决策日志追踪系统</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 40px; }
            .container { max-width: 1200px; margin: 0 auto; }
            .section { margin-bottom: 30px; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
            .log-entry { margin: 10px 0; padding: 10px; background: #f5f5f5; border-radius: 3px; }
            .thinking-step { margin-left: 20px; color: #666; }
            .decision-point { margin-left: 20px; color: #008000; }
        </style>
    </head>
    <body>
        <div class="container">
            <h1>AI决策日志追踪系统</h1>
            
            <div class="section">
                <h2>提问区域</h2>
                <form id="askForm">
                    <textarea id="question" rows="4" cols="80" placeholder="输入你的问题..."></textarea><br><br>
                    <textarea id="context" rows="3" cols="80" placeholder="上下文信息(可选)..."></textarea><br><br>
                    <button type="submit">提交问题</button>
                </form>
                <div id="answerArea" style="margin-top: 20px;"></div>
            </div>
            
            <div class="section">
                <h2>日志查询</h2>
                <button onclick="loadLogs()">加载最新日志</button>
                <div id="logsArea"></div>
            </div>
        </div>
        
        <script>
            document.getElementById('askForm').onsubmit = async function(e) {
                e.preventDefault();
                
                const question = document.getElementById('question').value;
                const context = document.getElementById('context').value;
                
                const response = await fetch('/ask', {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify({question, context})
                });
                
                const result = await response.json();
                
                let html = `<h3>回答:</h3><p>${result.answer.replace(/\\n/g, '<br>')}</p>`;
                
                if (result.thinking_steps && result.thinking_steps.length > 0) {
                    html += `<h3>思考过程:</h3>`;
                    result.thinking_steps.forEach(step => {
                        html += `<div class="thinking-step">${step.content}</div>`;
                    });
                }
                
                document.getElementById('answerArea').innerHTML = html;
            };
            
            async function loadLogs() {
                const response = await fetch('/logs');
                const logs = await response.json();
                
                let html = '<h3>最近10条日志:</h3>';
                logs.slice(0, 10).forEach(log => {
                    html += `
                        <div class="log-entry">
                            <strong>时间:</strong>${new Date(log.timestamp).toLocaleString()}<br>
                            <strong>问题:</strong>${log.user_input.substring(0, 100)}${log.user_input.length > 100 ? '...' : ''}<br>
                            <strong>回答:</strong>${log.final_answer ? log.final_answer.substring(0, 150) + (log.final_answer.length > 150 ? '...' : '') : '无'}
                            <button onclick="viewDetails(${log.id})">查看详情</button>
                        </div>
                    `;
                });
                
                document.getElementById('logsArea').innerHTML = html;
            }
            
            async function viewDetails(logId) {
                const response = await fetch(`/log/${logId}/details`);
                const details = await response.json();
                
                alert(JSON.stringify(details, null, 2));
            }
        </script>
    </body>
    </html>
    '''

@app.route('/ask', methods=['POST'])
def ask_question():
    """处理提问"""
    data = request.json
    question = data.get('question', '')
    context = data.get('context', '')
    
    result = logger.ask_with_enhanced_logging(question, context)
    
    return jsonify(result)

@app.route('/logs')
def get_logs():
    """获取日志列表"""
    logs = logger.query_logs()
    return jsonify(logs)

@app.route('/log/<int:log_id>/details')
def get_log_details(log_id):
    """获取日志详情"""
    details = logger.get_thinking_process(log_id)
    return jsonify(details)

if __name__ == '__main__':
    app.run(debug=True, port=5000)

运行这个Web应用后,访问 http://localhost:5000 就能看到一个完整的日志追踪系统界面。

5. 实际应用案例

5.1 案例一:金融风控决策审计

场景:银行使用AI系统审批贷款申请,需要记录每个决策的推理过程以备审计。

实现方案

# 金融风控专用日志器
class FinancialRiskLogger(EnhancedAILogger):
    def assess_loan_application(self, application_data):
        """评估贷款申请"""
        
        # 构建评估问题
        question = f"""
        请评估以下贷款申请的风险等级:
        申请人信息:{application_data['applicant_info']}
        贷款金额:{application_data['loan_amount']}
        贷款期限:{application_data['loan_term']}
        收入证明:{application_data['income_proof']}
        信用历史:{application_data['credit_history']}
        
        请按照以下步骤分析:
        1. 分析申请人的还款能力
        2. 评估信用风险
        3. 考虑抵押物价值(如有)
        4. 给出最终风险等级(低/中/高)
        5. 提供具体的风险评估理由
        """
        
        # 记录完整的决策过程
        result = self.ask_with_enhanced_logging(question)
        
        # 添加金融特定的元数据
        risk_log = {
            **result,
            "application_id": application_data.get("application_id"),
            "assessor": "AI_System_v1.0",
            "assessment_date": datetime.now().isoformat(),
            "compliance_check": self._run_compliance_check(result)
        }
        
        return risk_log
    
    def _run_compliance_check(self, assessment_result):
        """运行合规性检查"""
        # 检查决策过程是否包含必要的分析步骤
        required_keywords = ["收入", "负债", "信用", "风险"]
        thinking_text = " ".join([step["content"] for step in assessment_result.get("thinking_steps", [])])
        
        checks = {}
        for keyword in required_keywords:
            checks[keyword] = keyword in thinking_text
        
        return {
            "checks_passed": all(checks.values()),
            "details": checks,
            "recommendation": "通过" if all(checks.values()) else "需要人工复核"
        }

# 使用示例
risk_logger = FinancialRiskLogger()

application = {
    "application_id": "APP20240001",
    "applicant_info": "张三,35岁,工程师",
    "loan_amount": "500,000元",
    "loan_term": "5年",
    "income_proof": "月收入25,000元,工资流水",
    "credit_history": "信用良好,无逾期记录"
}

assessment = risk_logger.assess_loan_application(application)
print("风险评估结果:", assessment["answer"])
print("合规检查:", assessment["compliance_check"])

5.2 案例二:医疗诊断建议追踪

场景:AI辅助医疗诊断系统,需要记录每个诊断建议的推理过程,供医生复核。

class MedicalDiagnosisLogger(EnhancedAILogger):
    def provide_diagnosis_advice(self, patient_data, symptoms):
        """提供诊断建议"""
        
        question = f"""
        基于以下患者信息和症状,提供诊断建议:
        
        患者基本信息:
        - 年龄:{patient_data.get('age')}
        - 性别:{patient_data.get('gender')}
        - 既往病史:{patient_data.get('medical_history', '无')}
        
        当前症状:
        {symptoms}
        
        请按以下步骤分析:
        1. 症状分类和优先级排序
        2. 可能的疾病推断
        3. 需要进一步检查的项目建议
        4. 初步处理建议
        5. 紧急程度评估
        
        注意:所有建议必须标注置信度,并说明推理依据。
        """
        
        result = self.ask_with_enhanced_logging(question)
        
        # 添加医疗元数据
        medical_log = {
            **result,
            "patient_id": patient_data.get("patient_id"),
            "symptoms_recorded": symptoms,
            "timestamp": datetime.now().isoformat(),
            "reviewer_required": self._check_if_review_needed(result)
        }
        
        return medical_log
    
    def _check_if_review_needed(self, diagnosis_result):
        """检查是否需要人工复核"""
        answer = diagnosis_result.get("answer", "").lower()
        
        # 如果涉及严重疾病或高风险建议,需要人工复核
        red_flags = ["紧急", "立即就医", "手术", "癌症", "重症", "住院"]
        
        for flag in red_flags:
            if flag in answer:
                return True
        
        # 如果置信度相关表述不明确,需要复核
        confidence_indicators = ["可能", "疑似", "不确定", "需要进一步检查"]
        confidence_count = sum(1 for indicator in confidence_indicators if indicator in answer)
        
        if confidence_count > 2:
            return True
        
        return False

# 使用示例
medical_logger = MedicalDiagnosisLogger()

patient_info = {
    "patient_id": "PT2024001",
    "age": "45",
    "gender": "男",
    "medical_history": "高血压病史5年"
}

symptoms = """
主诉:持续性头痛3天,伴有恶心
伴随症状:视力模糊,对光敏感
体征:血压150/95mmHg,体温正常
"""

diagnosis = medical_logger.provide_diagnosis_advice(patient_info, symptoms)
print("诊断建议:", diagnosis["answer"])
print("是否需要人工复核:", diagnosis["reviewer_required"])

5.3 案例三:法律文件分析审计

场景:律所使用AI分析法律合同,需要记录每个条款的分析过程。

class LegalDocumentLogger(EnhancedAILogger):
    def analyze_contract_clause(self, contract_text, clause_number):
        """分析合同条款"""
        
        question = f"""
        请分析以下合同条款(第{clause_number}条):
        
        {contract_text}
        
        请按以下步骤分析:
        1. 条款类型识别(义务条款、权利条款、免责条款等)
        2. 关键义务提取
        3. 潜在风险点分析
        4. 建议修改意见
        5. 风险评估等级(低/中/高)
        
        对于每个分析步骤,请说明依据和理由。
        """
        
        result = self.ask_with_enhanced_logging(question)
        
        # 结构化分析结果
        structured_analysis = {
            "clause_number": clause_number,
            "original_text": contract_text,
            "analysis": result["answer"],
            "thinking_process": result["thinking_steps"],
            "risk_assessment": self._extract_risk_level(result["answer"]),
            "review_timestamp": datetime.now().isoformat(),
            "analyst": "AI_Legal_Analyst_v1.0"
        }
        
        return structured_analysis
    
    def _extract_risk_level(self, analysis_text):
        """从分析文本中提取风险等级"""
        text_lower = analysis_text.lower()
        
        if "高风险" in text_lower:
            return "high"
        elif "中风险" in text_lower or "中等风险" in text_lower:
            return "medium"
        elif "低风险" in text_lower:
            return "low"
        else:
            # 尝试从上下文中推断
            risk_keywords = {
                "high": ["重大风险", "严重问题", "必须修改", "不可接受"],
                "medium": ["需要注意", "建议修改", "潜在问题"],
                "low": ["基本合规", "问题不大", "可接受"]
            }
            
            for level, keywords in risk_keywords.items():
                for keyword in keywords:
                    if keyword in analysis_text:
                        return level
            
            return "unknown"

# 使用示例
legal_logger = LegalDocumentLogger()

contract_clause = """
第八条 违约责任
8.1 任何一方违反本协议约定,应承担违约责任,向守约方支付合同总金额20%的违约金。
8.2 如违约方延迟履行义务,每延迟一日,应按合同总金额的0.1%支付滞纳金。
8.3 因违约造成的损失超过违约金的,守约方有权要求赔偿实际损失。
"""

analysis = legal_logger.analyze_contract_clause(contract_clause, "第八条")
print("条款分析:", analysis["analysis"])
print("风险等级:", analysis["risk_assessment"])

6. 系统优化与最佳实践

6.1 性能优化建议

数据库优化

# 添加索引提高查询性能
def optimize_database(self):
    """优化数据库性能"""
    with self._get_db_connection() as conn:
        cursor = conn.cursor()
        
        # 为常用查询字段添加索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON decision_logs(timestamp)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_session_id ON decision_logs(session_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_log_id ON thinking_steps(log_id)')
        
        # 定期清理旧数据(保留最近90天)
        cursor.execute('''
            DELETE FROM decision_logs 
            WHERE timestamp < datetime('now', '-90 days')
        ''')
        
        conn.commit()

批量处理优化

# 批量处理多个请求
def batch_process_questions(self, questions_with_context):
    """批量处理问题"""
    results = []
    
    for question, context in questions_with_context:
        try:
            result = self.ask_with_enhanced_logging(question, context)
            results.append({
                "success": True,
                "data": result
            })
        except Exception as e:
            results.append({
                "success": False,
                "error": str(e),
                "question": question
            })
    
    return results

6.2 安全与合规考虑

数据加密

import hashlib
from cryptography.fernet import Fernet

class SecureAILogger(EnhancedAILogger):
    def __init__(self, model_name="smallthinker:3b", encryption_key=None):
        super().__init__(model_name)
        
        # 生成或使用提供的加密密钥
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            # 在实际应用中,密钥应该从安全的地方加载
            key = Fernet.generate_key()
            self.cipher = Fernet(key)
    
    def _save_log(self, log_entry):
        """加密保存日志"""
        # 序列化日志数据
        log_json = json.dumps(log_entry, ensure_ascii=False)
        
        # 加密数据
        encrypted_data = self.cipher.encrypt(log_json.encode())
        
        # 保存到文件(实际应用中可能保存到安全存储)
        filename = f"encrypted_logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}.enc"
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        
        with open(filename, 'wb') as f:
            f.write(encrypted_data)
    
    def _load_log(self, filename):
        """加载并解密日志"""
        with open(filename, 'rb') as f:
            encrypted_data = f.read()
        
        decrypted_data = self.cipher.decrypt(encrypted_data)
        return json.loads(decrypted_data.decode())

访问控制

# 简单的基于角色的访问控制
class RoleBasedLogger(EnhancedAILogger):
    def __init__(self, model_name="smallthinker:3b", user_role="viewer"):
        super().__init__(model_name)
        self.user_role = user_role
        self.role_permissions = {
            "admin": ["read", "write", "delete", "export"],
            "auditor": ["read", "export"],
            "viewer": ["read"]
        }
    
    def query_logs(self, **kwargs):
        """根据角色权限查询日志"""
        if "read" not in self.role_permissions.get(self.user_role, []):
            return {"error": "权限不足"}
        
        # 审计员只能查看特定时间范围的日志
        if self.user_role == "auditor":
            kwargs["start_date"] = kwargs.get("start_date") or "2024-01-01"
        
        return super().query_logs(**kwargs)
    
    def delete_log(self, log_id):
        """删除日志(需要管理员权限)"""
        if "delete" not in self.role_permissions.get(self.user_role, []):
            return {"error": "需要管理员权限"}
        
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM decision_logs WHERE id = ?", (log_id,))
            conn.commit()
        
        return {"success": True, "message": f"日志 {log_id} 已删除"}

6.3 监控与告警

class MonitoredAILogger(EnhancedAILogger):
    def __init__(self, model_name="smallthinker:3b", alert_thresholds=None):
        super().__init__(model_name)
        
        # 告警阈值配置
        self.alert_thresholds = alert_thresholds or {
            "processing_time": 10.0,  # 秒
            "error_rate": 0.1,  # 错误率
            "suspicious_patterns": ["绕过", "漏洞", "攻击", "非法"]
        }
        
        self.metrics = {
            "total_requests": 0,
            "failed_requests": 0,
            "total_processing_time": 0,
            "suspicious_activities": []
        }
    
    def ask_with_enhanced_logging(self, question, context=None):
        """带监控的提问"""
        self.metrics["total_requests"] += 1
        
        try:
            start_time = time.time()
            result = super().ask_with_enhanced_logging(question, context)
            processing_time = time.time() - start_time
            
            # 记录处理时间
            self.metrics["total_processing_time"] += processing_time
            
            # 检查处理时间是否超阈值
            if processing_time > self.alert_thresholds["processing_time"]:
                self._trigger_alert("processing_time", 
                                   f"处理时间过长: {processing_time:.2f}秒")
            
            # 检查是否有可疑模式
            for pattern in self.alert_thresholds["suspicious_patterns"]:
                if pattern in question or (context and pattern in context):
                    self.metrics["suspicious_activities"].append({
                        "timestamp": datetime.now().isoformat(),
                        "pattern": pattern,
                        "question": question[:100]  # 只记录前100字符
                    })
                    self._trigger_alert("suspicious_pattern", 
                                       f"检测到可疑模式: {pattern}")
            
            return result
            
        except Exception as e:
            self.metrics["failed_requests"] += 1
            
            # 计算当前错误率
            error_rate = self.metrics["failed_requests"] / self.metrics["total_requests"]
            if error_rate > self.alert_thresholds["error_rate"]:
                self._trigger_alert("error_rate", 
                                   f"错误率过高: {error_rate:.2%}")
            
            raise
    
    def _trigger_alert(self, alert_type, message):
        """触发告警"""
        alert = {
            "type": alert_type,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "metrics": self.get_current_metrics()
        }
        
        # 这里可以集成到实际的告警系统(邮件、短信、Slack等)
        print(f"[ALERT] {alert_type}: {message}")
        
        # 保存告警记录
        self._save_alert(alert)
    
    def get_current_metrics(self):
        """获取当前指标"""
        avg_processing_time = 0
        if self.metrics["total_requests"] > 0:
            avg_processing_time = (self.metrics["total_processing_time"] / 
                                  self.metrics["total_requests"])
        
        error_rate = 0
        if self.metrics["total_requests"] > 0:
            error_rate = self.metrics["failed_requests"] / self.metrics["total_requests"]
        
        return {
            "total_requests": self.metrics["total_requests"],
            "failed_requests": self.metrics["failed_requests"],
            "error_rate": error_rate,
            "avg_processing_time": avg_processing_time,
            "suspicious_activities_count": len(self.metrics["suspicious_activities"])
        }
    
    def _save_alert(self, alert):
        """保存告警记录"""
        filename = f"alerts/alerts_{datetime.now().strftime('%Y%m')}.json"
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        
        alerts = []
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                alerts = json.load(f)
        except FileNotFoundError:
            pass
        
        alerts.append(alert)
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(alerts, f, ensure_ascii=False, indent=2)

7. 总结

7.1 核心收获回顾

通过本教程,我们完成了一个完整的可审计AI决策日志追踪系统的构建。让我们回顾一下关键要点:

系统核心价值

  1. 透明化决策:让AI的思考过程不再是个"黑箱",每个决策都有完整的推理记录
  2. 合规审计:满足金融、医疗、法律等行业的合规要求,提供可追溯的决策依据
  3. 质量监控:通过日志分析,可以持续改进AI系统的决策质量
  4. 风险控制:及时发现和纠正AI决策中的偏差或错误

技术实现要点

  1. 使用SmallThinker-3B-Preview模型,它天生适合生成详细的推理过程
  2. 设计了三层日志结构:主日志、思考步骤、决策点
  3. 实现了数据库存储和Web监控界面
  4. 提供了金融、医疗、法律三个实际应用案例
  5. 增加了安全、性能、监控等生产级功能

7.2 下一步学习建议

如果你对这个系统感兴趣,可以继续深入:

功能扩展方向

  1. 集成更多AI模型:除了SmallThinker,可以集成其他擅长推理的模型
  2. 添加可视化分析:用图表展示决策模式、风险趋势等
  3. 实现实时告警:当AI决策出现异常模式时立即通知相关人员
  4. 构建审计报告:自动生成合规审计报告

技术深化方向

  1. 性能优化:学习如何优化数据库查询,处理大规模日志数据
  2. 安全加固:研究数据加密、访问控制、审计追踪等安全机制
  3. 分布式部署:学习如何将系统部署到多台服务器,提高可用性
  4. API设计:设计完善的REST API,方便其他系统集成

7.3 实际应用建议

在实际业务中应用这个系统时,建议:

  1. 从小规模开始:先在一个具体的业务场景试点,比如合同审查或风险评估
  2. 逐步完善:根据实际使用反馈,逐步添加需要的功能
  3. 重视培训:确保使用系统的业务人员理解日志的价值和查看方法
  4. 定期审计:建立定期的日志审计机制,确保持续合规

7.4 资源推荐

如果你想深入学习相关技术:

  1. Ollama官方文档:了解如何管理和部署更多AI模型
  2. SQLite数据库教程:深入学习数据库设计和优化
  3. Flask Web开发:学习如何构建更复杂的Web应用
  4. AI可解释性研究:关注AI透明度和可解释性的最新进展

这个日志追踪系统只是一个起点。随着AI在关键业务中的应用越来越广泛,构建透明、可信、可审计的AI系统将成为每个技术团队的必备能力。希望本教程能为你打下坚实的基础,帮助你在AI落地的道路上走得更稳、更远。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐