SmallThinker-3B-Preview实战教程:构建可审计的AI决策日志追踪系统
本文介绍了如何在星图GPU平台上自动化部署SmallThinker-3B-Preview镜像,以构建可审计的AI决策日志追踪系统。该镜像专为生成透明、详细的推理过程而设计,可应用于金融风控、医疗诊断等关键领域,通过记录AI的完整思考链条,满足决策过程可追溯、可审查的业务需求。
SmallThinker-3B-Preview实战教程:构建可审计的AI决策日志追踪系统
1. 引言:为什么我们需要追踪AI的“思考”过程?
你有没有遇到过这种情况?你问AI一个问题,它给出了一个答案,但你完全不知道它是怎么得出这个结论的。就像考试时只看到了最终分数,却看不到解题步骤一样,心里总觉得不踏实。
在金融风控、医疗诊断、法律咨询这些关键领域,AI的“黑箱”决策尤其让人担忧。如果AI判断一笔交易有风险,我们需要知道它考虑了哪些因素;如果AI给出医疗建议,医生需要了解它的推理逻辑。
这就是我们今天要解决的问题——如何让AI的思考过程变得透明、可追溯。而SmallThinker-3B-Preview模型,正是解决这个问题的绝佳工具。
学习目标:
- 理解什么是AI决策日志追踪系统
- 掌握SmallThinker-3B-Preview的核心特性
- 学会构建一个完整的可审计日志系统
- 了解如何在实际业务中应用这个系统
前置知识:
- 基本的Python编程经验
- 对AI模型有初步了解(不需要深入)
- 会用命令行执行简单命令
教程价值: 即使你是AI新手,也能跟着本教程一步步搭建起专业的日志追踪系统。我们会用最直白的方式讲解,避开复杂的技术术语,专注于“怎么做”而不是“为什么”。
2. SmallThinker-3B-Preview:专为透明推理而生
2.1 模型简介:小而精的推理专家
SmallThinker-3B-Preview是一个很有意思的模型。它是在Qwen2.5-3b-Instruct模型基础上微调而来的,但它的设计目标很特别——专注于生成详细的推理过程。
你可以把它想象成一个特别爱“写草稿”的学生。普通AI模型可能直接给你答案,但SmallThinker会把自己的思考步骤都写出来,让你看到它是怎么一步步得出结论的。
模型特点:
- 体积小巧:只有30亿参数,可以在普通电脑上运行
- 推理详细:专门训练来生成长链的思考过程
- 开源透明:训练数据和方法都公开,你可以完全信任它
2.2 核心能力:不只是答案,更是思考过程
这个模型最厉害的地方在于它的训练数据。开发者创建了一个专门的数据集,里面超过75%的样本输出都超过8000个token(相当于几千字)。这些样本都是详细的推理过程,模型就是通过学习这些“思考范例”来学会如何展示自己的推理。
它能做什么:
- 回答问题时展示完整的推理链条
- 在复杂决策中提供多个思考角度
- 记录每个判断的依据和逻辑
- 生成结构化的决策日志
为什么选择它: 如果你需要AI的决策可以被审查、可以被解释、可以被追溯,那么SmallThinker是目前最好的选择之一。它的设计初衷就是为了解决AI透明性问题。
3. 环境准备:快速部署SmallThinker
3.1 系统要求
在开始之前,我们先看看需要准备什么:
硬件要求:
- CPU:4核以上(推荐8核)
- 内存:16GB以上(推荐32GB)
- 存储:至少10GB可用空间
- GPU:可选,有GPU会更快
软件要求:
- 操作系统:Linux、macOS或Windows(WSL2)
- Python 3.8或更高版本
- pip包管理工具
3.2 一键安装部署
我们使用Ollama来管理模型,这是最简单的方式:
# 1. 安装Ollama(Linux/macOS)
curl -fsSL https://ollama.ai/install.sh | sh
# 2. 拉取SmallThinker模型
ollama pull smallthinker:3b
# 3. 运行模型
ollama run smallthinker:3b
如果你在Windows上,可以直接下载Ollama的安装程序,然后打开命令行执行第二步和第三步。
验证安装: 安装完成后,在命令行输入:
ollama list
你应该能看到smallthinker:3b在模型列表中。
3.3 通过Web界面使用
如果你更喜欢图形界面,Ollama也提供了Web界面:
- 确保Ollama服务正在运行
- 打开浏览器访问:http://localhost:11434
- 在模型选择下拉框中找到并选择“smallthinker:3b”
- 在下方输入框中提问即可
界面很简单直观,就像使用聊天软件一样。你可以在这里测试模型的基本功能。
4. 构建决策日志追踪系统
4.1 系统架构设计
我们的日志追踪系统需要记录以下几个关键信息:
- 用户输入的问题
- AI的完整思考过程
- 最终答案
- 思考过程中的关键节点
- 时间戳和会话ID
系统组件:
用户输入 → 日志记录器 → SmallThinker模型 → 结果解析器 → 数据库存储
↓
实时监控面板
4.2 核心代码实现
让我们从最简单的版本开始,逐步完善功能。
基础版本:记录完整对话
import json
import time
from datetime import datetime
import requests
class AILogger:
def __init__(self, model_name="smallthinker:3b"):
self.model_name = model_name
self.base_url = "http://localhost:11434/api/generate"
self.session_id = self._generate_session_id()
def _generate_session_id(self):
"""生成唯一的会话ID"""
return f"session_{int(time.time())}_{hash(str(time.time()))}"
def ask_with_logging(self, question):
"""提问并记录完整过程"""
# 1. 准备请求数据
payload = {
"model": self.model_name,
"prompt": question,
"stream": False,
"options": {
"temperature": 0.7,
"top_p": 0.9
}
}
# 2. 记录开始时间
start_time = time.time()
# 3. 发送请求到模型
try:
response = requests.post(self.base_url, json=payload)
response.raise_for_status()
result = response.json()
except Exception as e:
return {"error": str(e)}
# 4. 计算处理时间
processing_time = time.time() - start_time
# 5. 构建日志记录
log_entry = {
"session_id": self.session_id,
"timestamp": datetime.now().isoformat(),
"user_input": question,
"model_response": result.get("response", ""),
"thinking_process": self._extract_thinking(result.get("response", "")),
"processing_time": round(processing_time, 2),
"model_used": self.model_name,
"tokens_used": result.get("total_duration", 0)
}
# 6. 保存日志
self._save_log(log_entry)
return {
"answer": result.get("response", ""),
"log_id": self.session_id
}
def _extract_thinking(self, response):
"""从响应中提取思考过程"""
# SmallThinker的响应通常包含明确的推理标记
thinking_lines = []
lines = response.split('\n')
for line in lines:
line = line.strip()
if line.startswith("思考:") or line.startswith("推理:") or line.startswith("步骤"):
thinking_lines.append(line)
elif "因为" in line or "所以" in line or "因此" in line:
thinking_lines.append(line)
return "\n".join(thinking_lines) if thinking_lines else "未提取到明确的思考过程"
def _save_log(self, log_entry):
"""保存日志到文件"""
filename = f"ai_logs_{datetime.now().strftime('%Y%m%d')}.json"
try:
# 读取现有日志
try:
with open(filename, 'r', encoding='utf-8') as f:
logs = json.load(f)
except FileNotFoundError:
logs = []
# 添加新日志
logs.append(log_entry)
# 保存回文件
with open(filename, 'w', encoding='utf-8') as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
print(f"日志已保存到 {filename}")
except Exception as e:
print(f"保存日志时出错: {e}")
# 如果文件保存失败,至少打印出来
print("当前日志内容:", json.dumps(log_entry, ensure_ascii=False, indent=2))
# 使用示例
if __name__ == "__main__":
logger = AILogger()
# 示例问题
question = "如果一家公司年收入1000万,成本800万,税率25%,请计算净利润并解释计算过程。"
result = logger.ask_with_logging(question)
print("答案:", result["answer"])
print("日志ID:", result["log_id"])
这个基础版本已经可以工作了。它会记录每次对话,并把思考过程提取出来单独保存。
4.3 增强版本:结构化日志和实时监控
现在让我们添加更多功能,让系统更实用:
import sqlite3
from contextlib import contextmanager
import hashlib
class EnhancedAILogger(AILogger):
def __init__(self, model_name="smallthinker:3b"):
super().__init__(model_name)
self.db_path = "ai_decision_logs.db"
self._init_database()
def _init_database(self):
"""初始化数据库"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 创建主日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS decision_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp DATETIME NOT NULL,
user_input TEXT NOT NULL,
final_answer TEXT,
processing_time REAL,
model_name TEXT,
token_count INTEGER,
log_hash TEXT UNIQUE
)
''')
# 创建思考过程表
cursor.execute('''
CREATE TABLE IF NOT EXISTS thinking_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
log_id INTEGER,
step_number INTEGER,
step_content TEXT,
step_type TEXT,
FOREIGN KEY (log_id) REFERENCES decision_logs (id)
)
''')
# 创建关键决策点表
cursor.execute('''
CREATE TABLE IF NOT EXISTS decision_points (
id INTEGER PRIMARY KEY AUTOINCREMENT,
log_id INTEGER,
point_name TEXT,
alternatives TEXT,
chosen_option TEXT,
reasoning TEXT,
confidence_score REAL,
FOREIGN KEY (log_id) REFERENCES decision_logs (id)
)
''')
conn.commit()
@contextmanager
def _get_db_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
def ask_with_enhanced_logging(self, question, context=None):
"""增强版的提问和日志记录"""
# 如果有上下文,添加到问题中
if context:
full_prompt = f"上下文信息:{context}\n\n问题:{question}\n\n请展示完整的思考过程。"
else:
full_prompt = f"{question}\n\n请展示完整的思考过程。"
# 调用父类方法获取响应
result = super().ask_with_logging(full_prompt)
if "error" in result:
return result
# 解析响应,提取结构化信息
response_text = result["answer"]
structured_info = self._parse_structured_response(response_text)
# 生成日志哈希(用于去重)
log_hash = self._generate_log_hash(question, response_text)
# 保存到数据库
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 插入主日志
cursor.execute('''
INSERT INTO decision_logs
(session_id, timestamp, user_input, final_answer, processing_time, model_name, log_hash)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
self.session_id,
datetime.now().isoformat(),
question,
structured_info.get("final_answer", ""),
result.get("processing_time", 0),
self.model_name,
log_hash
))
log_id = cursor.lastrowid
# 插入思考步骤
for i, step in enumerate(structured_info.get("thinking_steps", []), 1):
cursor.execute('''
INSERT INTO thinking_steps (log_id, step_number, step_content, step_type)
VALUES (?, ?, ?, ?)
''', (log_id, i, step["content"], step.get("type", "general")))
# 插入决策点
for point in structured_info.get("decision_points", []):
cursor.execute('''
INSERT INTO decision_points
(log_id, point_name, alternatives, chosen_option, reasoning, confidence_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (
log_id,
point.get("name", ""),
json.dumps(point.get("alternatives", [])),
point.get("chosen", ""),
point.get("reasoning", ""),
point.get("confidence", 0.0)
))
conn.commit()
return {
"answer": structured_info.get("final_answer", ""),
"log_id": log_id,
"session_id": self.session_id,
"thinking_steps": structured_info.get("thinking_steps", []),
"decision_points": structured_info.get("decision_points", [])
}
def _parse_structured_response(self, response):
"""解析响应,提取结构化信息"""
# 这是一个简化的解析器,实际使用时可以根据需要增强
lines = response.split('\n')
thinking_steps = []
decision_points = []
current_answer = ""
for line in lines:
line = line.strip()
# 提取思考步骤
if line.startswith("步骤") or line.startswith("1.") or line.startswith("2."):
thinking_steps.append({
"content": line,
"type": "reasoning_step"
})
# 提取决策点(假设格式:决策点:...)
elif "决策点:" in line or "选择:" in line:
parts = line.split(":")
if len(parts) > 1:
decision_points.append({
"name": parts[0].replace("决策点", "").strip(),
"chosen": parts[1].strip(),
"reasoning": "从上下文推断",
"confidence": 0.8
})
# 识别最终答案
elif line.startswith("答案:") or line.startswith("结论:"):
current_answer = line.split(":", 1)[1] if ":" in line else line
# 如果没有明确标记,取最后一段作为答案
if not current_answer and lines:
current_answer = lines[-1]
return {
"final_answer": current_answer,
"thinking_steps": thinking_steps,
"decision_points": decision_points
}
def _generate_log_hash(self, question, response):
"""生成日志哈希值"""
content = f"{question}{response}"
return hashlib.md5(content.encode()).hexdigest()
def query_logs(self, session_id=None, start_date=None, end_date=None):
"""查询日志"""
query = "SELECT * FROM decision_logs WHERE 1=1"
params = []
if session_id:
query += " AND session_id = ?"
params.append(session_id)
if start_date:
query += " AND timestamp >= ?"
params.append(start_date)
if end_date:
query += " AND timestamp <= ?"
params.append(end_date)
query += " ORDER BY timestamp DESC"
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
def get_thinking_process(self, log_id):
"""获取特定日志的思考过程"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 获取思考步骤
cursor.execute('''
SELECT step_number, step_content, step_type
FROM thinking_steps
WHERE log_id = ?
ORDER BY step_number
''', (log_id,))
thinking_steps = cursor.fetchall()
# 获取决策点
cursor.execute('''
SELECT point_name, alternatives, chosen_option, reasoning, confidence_score
FROM decision_points
WHERE log_id = ?
''', (log_id,))
decision_points = cursor.fetchall()
return {
"thinking_steps": thinking_steps,
"decision_points": decision_points
}
# 使用示例
if __name__ == "__main__":
enhanced_logger = EnhancedAILogger()
# 示例:金融风险评估
context = "客户A,年龄35岁,年收入50万,有房贷100万,信用卡额度20万,近期有3次大额消费"
question = "请评估该客户的信用风险等级,并说明评估依据"
result = enhanced_logger.ask_with_enhanced_logging(question, context)
print("=" * 50)
print("最终答案:")
print(result["answer"])
print("\n思考步骤:")
for step in result["thinking_steps"]:
print(f"- {step['content']}")
print("\n决策点:")
for point in result["decision_points"]:
print(f"{point['name']}: 选择 {point['chosen']},理由:{point['reasoning']}")
print("=" * 50)
# 查询日志
logs = enhanced_logger.query_logs()
print(f"\n共找到 {len(logs)} 条日志记录")
4.4 添加Web监控界面
让我们创建一个简单的Web界面来查看日志:
from flask import Flask, render_template, jsonify, request
import json
app = Flask(__name__)
logger = EnhancedAILogger()
@app.route('/')
def index():
"""主页面"""
return '''
<!DOCTYPE html>
<html>
<head>
<title>AI决策日志追踪系统</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.container { max-width: 1200px; margin: 0 auto; }
.section { margin-bottom: 30px; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
.log-entry { margin: 10px 0; padding: 10px; background: #f5f5f5; border-radius: 3px; }
.thinking-step { margin-left: 20px; color: #666; }
.decision-point { margin-left: 20px; color: #008000; }
</style>
</head>
<body>
<div class="container">
<h1>AI决策日志追踪系统</h1>
<div class="section">
<h2>提问区域</h2>
<form id="askForm">
<textarea id="question" rows="4" cols="80" placeholder="输入你的问题..."></textarea><br><br>
<textarea id="context" rows="3" cols="80" placeholder="上下文信息(可选)..."></textarea><br><br>
<button type="submit">提交问题</button>
</form>
<div id="answerArea" style="margin-top: 20px;"></div>
</div>
<div class="section">
<h2>日志查询</h2>
<button onclick="loadLogs()">加载最新日志</button>
<div id="logsArea"></div>
</div>
</div>
<script>
document.getElementById('askForm').onsubmit = async function(e) {
e.preventDefault();
const question = document.getElementById('question').value;
const context = document.getElementById('context').value;
const response = await fetch('/ask', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question, context})
});
const result = await response.json();
let html = `<h3>回答:</h3><p>${result.answer.replace(/\\n/g, '<br>')}</p>`;
if (result.thinking_steps && result.thinking_steps.length > 0) {
html += `<h3>思考过程:</h3>`;
result.thinking_steps.forEach(step => {
html += `<div class="thinking-step">${step.content}</div>`;
});
}
document.getElementById('answerArea').innerHTML = html;
};
async function loadLogs() {
const response = await fetch('/logs');
const logs = await response.json();
let html = '<h3>最近10条日志:</h3>';
logs.slice(0, 10).forEach(log => {
html += `
<div class="log-entry">
<strong>时间:</strong>${new Date(log.timestamp).toLocaleString()}<br>
<strong>问题:</strong>${log.user_input.substring(0, 100)}${log.user_input.length > 100 ? '...' : ''}<br>
<strong>回答:</strong>${log.final_answer ? log.final_answer.substring(0, 150) + (log.final_answer.length > 150 ? '...' : '') : '无'}
<button onclick="viewDetails(${log.id})">查看详情</button>
</div>
`;
});
document.getElementById('logsArea').innerHTML = html;
}
async function viewDetails(logId) {
const response = await fetch(`/log/${logId}/details`);
const details = await response.json();
alert(JSON.stringify(details, null, 2));
}
</script>
</body>
</html>
'''
@app.route('/ask', methods=['POST'])
def ask_question():
"""处理提问"""
data = request.json
question = data.get('question', '')
context = data.get('context', '')
result = logger.ask_with_enhanced_logging(question, context)
return jsonify(result)
@app.route('/logs')
def get_logs():
"""获取日志列表"""
logs = logger.query_logs()
return jsonify(logs)
@app.route('/log/<int:log_id>/details')
def get_log_details(log_id):
"""获取日志详情"""
details = logger.get_thinking_process(log_id)
return jsonify(details)
if __name__ == '__main__':
app.run(debug=True, port=5000)
运行这个Web应用后,访问 http://localhost:5000 就能看到一个完整的日志追踪系统界面。
5. 实际应用案例
5.1 案例一:金融风控决策审计
场景:银行使用AI系统审批贷款申请,需要记录每个决策的推理过程以备审计。
实现方案:
# 金融风控专用日志器
class FinancialRiskLogger(EnhancedAILogger):
def assess_loan_application(self, application_data):
"""评估贷款申请"""
# 构建评估问题
question = f"""
请评估以下贷款申请的风险等级:
申请人信息:{application_data['applicant_info']}
贷款金额:{application_data['loan_amount']}
贷款期限:{application_data['loan_term']}
收入证明:{application_data['income_proof']}
信用历史:{application_data['credit_history']}
请按照以下步骤分析:
1. 分析申请人的还款能力
2. 评估信用风险
3. 考虑抵押物价值(如有)
4. 给出最终风险等级(低/中/高)
5. 提供具体的风险评估理由
"""
# 记录完整的决策过程
result = self.ask_with_enhanced_logging(question)
# 添加金融特定的元数据
risk_log = {
**result,
"application_id": application_data.get("application_id"),
"assessor": "AI_System_v1.0",
"assessment_date": datetime.now().isoformat(),
"compliance_check": self._run_compliance_check(result)
}
return risk_log
def _run_compliance_check(self, assessment_result):
"""运行合规性检查"""
# 检查决策过程是否包含必要的分析步骤
required_keywords = ["收入", "负债", "信用", "风险"]
thinking_text = " ".join([step["content"] for step in assessment_result.get("thinking_steps", [])])
checks = {}
for keyword in required_keywords:
checks[keyword] = keyword in thinking_text
return {
"checks_passed": all(checks.values()),
"details": checks,
"recommendation": "通过" if all(checks.values()) else "需要人工复核"
}
# 使用示例
risk_logger = FinancialRiskLogger()
application = {
"application_id": "APP20240001",
"applicant_info": "张三,35岁,工程师",
"loan_amount": "500,000元",
"loan_term": "5年",
"income_proof": "月收入25,000元,工资流水",
"credit_history": "信用良好,无逾期记录"
}
assessment = risk_logger.assess_loan_application(application)
print("风险评估结果:", assessment["answer"])
print("合规检查:", assessment["compliance_check"])
5.2 案例二:医疗诊断建议追踪
场景:AI辅助医疗诊断系统,需要记录每个诊断建议的推理过程,供医生复核。
class MedicalDiagnosisLogger(EnhancedAILogger):
def provide_diagnosis_advice(self, patient_data, symptoms):
"""提供诊断建议"""
question = f"""
基于以下患者信息和症状,提供诊断建议:
患者基本信息:
- 年龄:{patient_data.get('age')}
- 性别:{patient_data.get('gender')}
- 既往病史:{patient_data.get('medical_history', '无')}
当前症状:
{symptoms}
请按以下步骤分析:
1. 症状分类和优先级排序
2. 可能的疾病推断
3. 需要进一步检查的项目建议
4. 初步处理建议
5. 紧急程度评估
注意:所有建议必须标注置信度,并说明推理依据。
"""
result = self.ask_with_enhanced_logging(question)
# 添加医疗元数据
medical_log = {
**result,
"patient_id": patient_data.get("patient_id"),
"symptoms_recorded": symptoms,
"timestamp": datetime.now().isoformat(),
"reviewer_required": self._check_if_review_needed(result)
}
return medical_log
def _check_if_review_needed(self, diagnosis_result):
"""检查是否需要人工复核"""
answer = diagnosis_result.get("answer", "").lower()
# 如果涉及严重疾病或高风险建议,需要人工复核
red_flags = ["紧急", "立即就医", "手术", "癌症", "重症", "住院"]
for flag in red_flags:
if flag in answer:
return True
# 如果置信度相关表述不明确,需要复核
confidence_indicators = ["可能", "疑似", "不确定", "需要进一步检查"]
confidence_count = sum(1 for indicator in confidence_indicators if indicator in answer)
if confidence_count > 2:
return True
return False
# 使用示例
medical_logger = MedicalDiagnosisLogger()
patient_info = {
"patient_id": "PT2024001",
"age": "45",
"gender": "男",
"medical_history": "高血压病史5年"
}
symptoms = """
主诉:持续性头痛3天,伴有恶心
伴随症状:视力模糊,对光敏感
体征:血压150/95mmHg,体温正常
"""
diagnosis = medical_logger.provide_diagnosis_advice(patient_info, symptoms)
print("诊断建议:", diagnosis["answer"])
print("是否需要人工复核:", diagnosis["reviewer_required"])
5.3 案例三:法律文件分析审计
场景:律所使用AI分析法律合同,需要记录每个条款的分析过程。
class LegalDocumentLogger(EnhancedAILogger):
def analyze_contract_clause(self, contract_text, clause_number):
"""分析合同条款"""
question = f"""
请分析以下合同条款(第{clause_number}条):
{contract_text}
请按以下步骤分析:
1. 条款类型识别(义务条款、权利条款、免责条款等)
2. 关键义务提取
3. 潜在风险点分析
4. 建议修改意见
5. 风险评估等级(低/中/高)
对于每个分析步骤,请说明依据和理由。
"""
result = self.ask_with_enhanced_logging(question)
# 结构化分析结果
structured_analysis = {
"clause_number": clause_number,
"original_text": contract_text,
"analysis": result["answer"],
"thinking_process": result["thinking_steps"],
"risk_assessment": self._extract_risk_level(result["answer"]),
"review_timestamp": datetime.now().isoformat(),
"analyst": "AI_Legal_Analyst_v1.0"
}
return structured_analysis
def _extract_risk_level(self, analysis_text):
"""从分析文本中提取风险等级"""
text_lower = analysis_text.lower()
if "高风险" in text_lower:
return "high"
elif "中风险" in text_lower or "中等风险" in text_lower:
return "medium"
elif "低风险" in text_lower:
return "low"
else:
# 尝试从上下文中推断
risk_keywords = {
"high": ["重大风险", "严重问题", "必须修改", "不可接受"],
"medium": ["需要注意", "建议修改", "潜在问题"],
"low": ["基本合规", "问题不大", "可接受"]
}
for level, keywords in risk_keywords.items():
for keyword in keywords:
if keyword in analysis_text:
return level
return "unknown"
# 使用示例
legal_logger = LegalDocumentLogger()
contract_clause = """
第八条 违约责任
8.1 任何一方违反本协议约定,应承担违约责任,向守约方支付合同总金额20%的违约金。
8.2 如违约方延迟履行义务,每延迟一日,应按合同总金额的0.1%支付滞纳金。
8.3 因违约造成的损失超过违约金的,守约方有权要求赔偿实际损失。
"""
analysis = legal_logger.analyze_contract_clause(contract_clause, "第八条")
print("条款分析:", analysis["analysis"])
print("风险等级:", analysis["risk_assessment"])
6. 系统优化与最佳实践
6.1 性能优化建议
数据库优化:
# 添加索引提高查询性能
def optimize_database(self):
"""优化数据库性能"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 为常用查询字段添加索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON decision_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_session_id ON decision_logs(session_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_log_id ON thinking_steps(log_id)')
# 定期清理旧数据(保留最近90天)
cursor.execute('''
DELETE FROM decision_logs
WHERE timestamp < datetime('now', '-90 days')
''')
conn.commit()
批量处理优化:
# 批量处理多个请求
def batch_process_questions(self, questions_with_context):
"""批量处理问题"""
results = []
for question, context in questions_with_context:
try:
result = self.ask_with_enhanced_logging(question, context)
results.append({
"success": True,
"data": result
})
except Exception as e:
results.append({
"success": False,
"error": str(e),
"question": question
})
return results
6.2 安全与合规考虑
数据加密:
import hashlib
from cryptography.fernet import Fernet
class SecureAILogger(EnhancedAILogger):
def __init__(self, model_name="smallthinker:3b", encryption_key=None):
super().__init__(model_name)
# 生成或使用提供的加密密钥
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
# 在实际应用中,密钥应该从安全的地方加载
key = Fernet.generate_key()
self.cipher = Fernet(key)
def _save_log(self, log_entry):
"""加密保存日志"""
# 序列化日志数据
log_json = json.dumps(log_entry, ensure_ascii=False)
# 加密数据
encrypted_data = self.cipher.encrypt(log_json.encode())
# 保存到文件(实际应用中可能保存到安全存储)
filename = f"encrypted_logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}.enc"
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as f:
f.write(encrypted_data)
def _load_log(self, filename):
"""加载并解密日志"""
with open(filename, 'rb') as f:
encrypted_data = f.read()
decrypted_data = self.cipher.decrypt(encrypted_data)
return json.loads(decrypted_data.decode())
访问控制:
# 简单的基于角色的访问控制
class RoleBasedLogger(EnhancedAILogger):
def __init__(self, model_name="smallthinker:3b", user_role="viewer"):
super().__init__(model_name)
self.user_role = user_role
self.role_permissions = {
"admin": ["read", "write", "delete", "export"],
"auditor": ["read", "export"],
"viewer": ["read"]
}
def query_logs(self, **kwargs):
"""根据角色权限查询日志"""
if "read" not in self.role_permissions.get(self.user_role, []):
return {"error": "权限不足"}
# 审计员只能查看特定时间范围的日志
if self.user_role == "auditor":
kwargs["start_date"] = kwargs.get("start_date") or "2024-01-01"
return super().query_logs(**kwargs)
def delete_log(self, log_id):
"""删除日志(需要管理员权限)"""
if "delete" not in self.role_permissions.get(self.user_role, []):
return {"error": "需要管理员权限"}
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM decision_logs WHERE id = ?", (log_id,))
conn.commit()
return {"success": True, "message": f"日志 {log_id} 已删除"}
6.3 监控与告警
class MonitoredAILogger(EnhancedAILogger):
def __init__(self, model_name="smallthinker:3b", alert_thresholds=None):
super().__init__(model_name)
# 告警阈值配置
self.alert_thresholds = alert_thresholds or {
"processing_time": 10.0, # 秒
"error_rate": 0.1, # 错误率
"suspicious_patterns": ["绕过", "漏洞", "攻击", "非法"]
}
self.metrics = {
"total_requests": 0,
"failed_requests": 0,
"total_processing_time": 0,
"suspicious_activities": []
}
def ask_with_enhanced_logging(self, question, context=None):
"""带监控的提问"""
self.metrics["total_requests"] += 1
try:
start_time = time.time()
result = super().ask_with_enhanced_logging(question, context)
processing_time = time.time() - start_time
# 记录处理时间
self.metrics["total_processing_time"] += processing_time
# 检查处理时间是否超阈值
if processing_time > self.alert_thresholds["processing_time"]:
self._trigger_alert("processing_time",
f"处理时间过长: {processing_time:.2f}秒")
# 检查是否有可疑模式
for pattern in self.alert_thresholds["suspicious_patterns"]:
if pattern in question or (context and pattern in context):
self.metrics["suspicious_activities"].append({
"timestamp": datetime.now().isoformat(),
"pattern": pattern,
"question": question[:100] # 只记录前100字符
})
self._trigger_alert("suspicious_pattern",
f"检测到可疑模式: {pattern}")
return result
except Exception as e:
self.metrics["failed_requests"] += 1
# 计算当前错误率
error_rate = self.metrics["failed_requests"] / self.metrics["total_requests"]
if error_rate > self.alert_thresholds["error_rate"]:
self._trigger_alert("error_rate",
f"错误率过高: {error_rate:.2%}")
raise
def _trigger_alert(self, alert_type, message):
"""触发告警"""
alert = {
"type": alert_type,
"message": message,
"timestamp": datetime.now().isoformat(),
"metrics": self.get_current_metrics()
}
# 这里可以集成到实际的告警系统(邮件、短信、Slack等)
print(f"[ALERT] {alert_type}: {message}")
# 保存告警记录
self._save_alert(alert)
def get_current_metrics(self):
"""获取当前指标"""
avg_processing_time = 0
if self.metrics["total_requests"] > 0:
avg_processing_time = (self.metrics["total_processing_time"] /
self.metrics["total_requests"])
error_rate = 0
if self.metrics["total_requests"] > 0:
error_rate = self.metrics["failed_requests"] / self.metrics["total_requests"]
return {
"total_requests": self.metrics["total_requests"],
"failed_requests": self.metrics["failed_requests"],
"error_rate": error_rate,
"avg_processing_time": avg_processing_time,
"suspicious_activities_count": len(self.metrics["suspicious_activities"])
}
def _save_alert(self, alert):
"""保存告警记录"""
filename = f"alerts/alerts_{datetime.now().strftime('%Y%m')}.json"
os.makedirs(os.path.dirname(filename), exist_ok=True)
alerts = []
try:
with open(filename, 'r', encoding='utf-8') as f:
alerts = json.load(f)
except FileNotFoundError:
pass
alerts.append(alert)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(alerts, f, ensure_ascii=False, indent=2)
7. 总结
7.1 核心收获回顾
通过本教程,我们完成了一个完整的可审计AI决策日志追踪系统的构建。让我们回顾一下关键要点:
系统核心价值:
- 透明化决策:让AI的思考过程不再是个"黑箱",每个决策都有完整的推理记录
- 合规审计:满足金融、医疗、法律等行业的合规要求,提供可追溯的决策依据
- 质量监控:通过日志分析,可以持续改进AI系统的决策质量
- 风险控制:及时发现和纠正AI决策中的偏差或错误
技术实现要点:
- 使用SmallThinker-3B-Preview模型,它天生适合生成详细的推理过程
- 设计了三层日志结构:主日志、思考步骤、决策点
- 实现了数据库存储和Web监控界面
- 提供了金融、医疗、法律三个实际应用案例
- 增加了安全、性能、监控等生产级功能
7.2 下一步学习建议
如果你对这个系统感兴趣,可以继续深入:
功能扩展方向:
- 集成更多AI模型:除了SmallThinker,可以集成其他擅长推理的模型
- 添加可视化分析:用图表展示决策模式、风险趋势等
- 实现实时告警:当AI决策出现异常模式时立即通知相关人员
- 构建审计报告:自动生成合规审计报告
技术深化方向:
- 性能优化:学习如何优化数据库查询,处理大规模日志数据
- 安全加固:研究数据加密、访问控制、审计追踪等安全机制
- 分布式部署:学习如何将系统部署到多台服务器,提高可用性
- API设计:设计完善的REST API,方便其他系统集成
7.3 实际应用建议
在实际业务中应用这个系统时,建议:
- 从小规模开始:先在一个具体的业务场景试点,比如合同审查或风险评估
- 逐步完善:根据实际使用反馈,逐步添加需要的功能
- 重视培训:确保使用系统的业务人员理解日志的价值和查看方法
- 定期审计:建立定期的日志审计机制,确保持续合规
7.4 资源推荐
如果你想深入学习相关技术:
- Ollama官方文档:了解如何管理和部署更多AI模型
- SQLite数据库教程:深入学习数据库设计和优化
- Flask Web开发:学习如何构建更复杂的Web应用
- AI可解释性研究:关注AI透明度和可解释性的最新进展
这个日志追踪系统只是一个起点。随着AI在关键业务中的应用越来越广泛,构建透明、可信、可审计的AI系统将成为每个技术团队的必备能力。希望本教程能为你打下坚实的基础,帮助你在AI落地的道路上走得更稳、更远。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐
所有评论(0)