【Agents篇】16:Agent 训练——从 RLHF 到强化学习
Agent训练
🎯 系列导航:本文是 Agents 系列第16篇,深入探讨 Agent 训练的核心技术,从经典的 RLHF 到最新的强化学习方法。
🏷️ 关键词:RLHF、InstructGPT、AgentGym、R3、Agent微调、强化学习
📑 目录
- 1. 引言:为什么 Agent 需要训练?
- 2. 基础篇:从监督学习到强化学习
- 3. InstructGPT:RLHF 的里程碑
- 4. RLHF 进阶:技术演进与变体
- 5. AgentGym:通用 Agent 训练框架
- 6. R3:推理增强的强化学习
- 7. Agent 微调方法大全
- 8. 实战项目:构建可训练的 Agent
- 9. 前沿探索:未来发展方向
- 10. 总结与最佳实践
- 11. 参考文献
1. 引言:为什么 Agent 需要训练? 🎯

在 AI Agent 的发展历程中,训练一直是提升 Agent 能力的核心手段。虽然大语言模型(LLM)本身已经具备了强大的基础能力,但要让它成为一个高效的 Agent,还需要针对性的训练和优化。
💡 思考:为什么预训练的 LLM 不能直接成为优秀的 Agent?
🤔 解答:预训练的 LLM 主要通过大规模文本进行语言建模训练,其目标是预测下一个 token。这种训练方式使模型具备了语言理解和生成能力,但存在以下局限:
- 任务对齐不足:模型不知道用户的真实意图,可能生成正确但无用的回答
- 交互能力有限:缺乏与环境交互、使用工具的经验
- 推理链条不完整:难以进行多步骤的复杂推理
- 安全性问题:可能生成有害内容或执行危险操作
┌─────────────────────────────────────────────────────────────────────┐
│ Agent 能力提升的训练路径 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 预训练 LLM 监督微调 RLHF/RL Agent 专项训练 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────────┐ │
│ │语言 │ → │指令 │ → │人类 │ → │环境交互 │ │
│ │理解 │ │跟随 │ │偏好 │ │工具使用 │ │
│ │生成 │ │能力 │ │对齐 │ │任务完成 │ │
│ └──────┘ └──────┘ └──────┘ └──────────┘ │
│ │
│ 基础能力 任务能力 价值对齐 Agent 能力 │
└─────────────────────────────────────────────────────────────────────┘
Agent 训练的核心目标是让模型:
- 理解意图:准确理解用户需求和任务目标
- 规划行动:制定合理的执行计划和步骤
- 交互执行:与环境和工具有效交互
- 学习改进:从反馈中持续优化行为
本文将深入探讨实现这些目标的各种训练方法。
2. 基础篇:从监督学习到强化学习 📚
2.1 监督微调 (SFT)
监督微调(Supervised Fine-Tuning,SFT)是最直接的训练方式,通过人工标注的数据让模型学习期望的行为。
"""
监督微调的基本流程
"""
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
from datasets import load_dataset
# 1. 加载预训练模型
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
# 2. 准备指令微调数据
# 数据格式: {"instruction": "...", "input": "...", "output": "..."}
def format_instruction(example):
"""格式化为模型输入"""
prompt = f"""### Instruction:
{example['instruction']}
### Input:
{example['input']}
### Response:
{example['output']}"""
return {"text": prompt}
dataset = load_dataset("your_instruction_dataset")
formatted_dataset = dataset.map(format_instruction)
# 3. 配置训练参数
training_args = TrainingArguments(
output_dir="./sft_model",
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=4,
learning_rate=2e-5,
warmup_ratio=0.03,
logging_steps=10,
save_strategy="epoch",
fp16=True,
)
# 4. 开始训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=formatted_dataset["train"],
tokenizer=tokenizer,
)
trainer.train()
💡 思考:SFT 为什么不能完全解决 Agent 训练问题?
🤔 解答:SFT 存在几个关键限制:
- 数据瓶颈:高质量的 Agent 交互数据难以获取和标注
- 静态学习:只能从固定数据学习,无法适应动态环境
- 错误累积:模型在推理时可能偏离训练分布,导致错误累积
- 奖励稀疏:无法从最终结果反向传播学习信号
┌────────────────────────────────────────────────────────────────┐
│ SFT 的局限性分析 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 训练数据分布 推理时分布 │
│ ┌─────┐ ┌─────┐ │
│ │ │ │ │ ← 分布偏移 │
│ ╱│ A │╲ ╱│ A' │╲ (Distribution │
│ ╱ │ │ ╲ ╱ │ │ ╲ Shift) │
│ ╱ └─────┘ ╲ ╱ └─────┘ ╲ │
│ ╱ ╲ ╱ ↓ ╲ │
│ ╱ ╲ ╱ 错误累积 ╲ │
│ ──────────────────── ──────────────────── │
│ │
│ 静态数据 → 固定模式 动态环境 → 需要适应 │
└────────────────────────────────────────────────────────────────┘
2.2 强化学习基础
强化学习(Reinforcement Learning,RL)提供了一种让 Agent 在与环境交互中学习的范式。
核心概念:
| 概念 | 符号 | 说明 |
|---|---|---|
| 状态 (State) | s s s | Agent 观察到的环境信息 |
| 动作 (Action) | a a a | Agent 可执行的操作 |
| 奖励 (Reward) | r r r | 环境对动作的反馈信号 |
| 策略 (Policy) | π \pi π | 从状态到动作的映射 |
| 价值函数 (Value) | V ( s ) V(s) V(s) | 状态的长期期望回报 |
"""
强化学习的基本框架
"""
import numpy as np
from typing import Tuple, List
class RLAgent:
"""基本的 RL Agent 框架"""
def __init__(self, state_dim: int, action_dim: int):
self.state_dim = state_dim
self.action_dim = action_dim
self.policy = self._init_policy()
self.value_function = self._init_value()
def _init_policy(self):
"""初始化策略网络"""
# 通常是一个神经网络
pass
def _init_value(self):
"""初始化价值网络"""
pass
def select_action(self, state: np.ndarray) -> int:
"""根据当前状态选择动作"""
# π(a|s) - 策略函数
action_probs = self.policy(state)
return np.random.choice(self.action_dim, p=action_probs)
def update(self, trajectory: List[Tuple]):
"""根据轨迹更新策略
trajectory: [(s_0, a_0, r_0), (s_1, a_1, r_1), ...]
"""
# 计算回报 G_t = r_t + γ*r_{t+1} + γ²*r_{t+2} + ...
returns = self._compute_returns(trajectory)
# 策略梯度更新
# ∇J(θ) = E[∇log π(a|s) * A(s,a)]
self._policy_gradient_update(trajectory, returns)
class LLMAsRLAgent:
"""将 LLM 视为 RL Agent"""
def __init__(self, llm):
self.llm = llm # 策略网络 π_θ
def generate_action(self, state: str) -> str:
"""
状态: 对话历史/任务描述
动作: 生成的文本/API调用
"""
# LLM 生成 = 策略采样
# P(token_t | token_{<t}) = π(a_t | s_t)
return self.llm.generate(state)
def compute_reward(self, state: str, action: str,
ground_truth: str = None) -> float:
"""
奖励可以来自:
1. 人类反馈
2. 奖励模型
3. 环境反馈 (任务成功/失败)
"""
pass
2.3 为什么需要 RLHF
RLHF(Reinforcement Learning from Human Feedback)结合了监督学习和强化学习的优势,通过人类反馈来指导模型训练。
┌─────────────────────────────────────────────────────────────────────────┐
│ RLHF 的必要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题1: 奖励函数难以定义 │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 什么是"好"的回答? │ │
│ │ - 准确性? → 有时需要承认不确定 │ │
│ │ - 详细性? → 有时简洁更好 │ │
│ │ - 创造性? → 有时需要严谨 │ │
│ │ │ │
│ │ 人类偏好复杂且情境相关,难以用简单规则表达 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ 问题2: 直接优化指标可能适得其反 (Goodhart's Law) │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ "When a measure becomes a target, it ceases to be a good │ │
│ │ measure." │ │
│ │ │ │
│ │ 例: 优化"回答长度" → 模型学会废话连篇 │ │
│ │ 例: 优化"包含关键词" → 模型生成不连贯的关键词堆砌 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案: RLHF │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 1. 收集人类偏好数据 (比较 A vs B) │ │
│ │ 2. 训练奖励模型学习人类偏好 │ │
│ │ 3. 用 RL 优化模型以最大化奖励 │ │
│ │ │ │
│ │ 优势: 奖励模型可以捕捉复杂的人类偏好 │ │
│ └───────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
3. InstructGPT:RLHF 的里程碑 🏆
InstructGPT 是 OpenAI 在 2022 年发布的重要工作,首次大规模验证了 RLHF 在 LLM 上的有效性。这项工作奠定了后续 ChatGPT、GPT-4 等模型的技术基础。
3.1 InstructGPT 的三阶段训练
┌─────────────────────────────────────────────────────────────────────────┐
│ InstructGPT 三阶段训练流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Stage 1 │ │ Stage 2 │ │ Stage 3 │ │
│ │ SFT │ → │ RM Training │ → │ RL (PPO) │ │
│ │ 监督微调 │ │ 奖励模型训练 │ │ 强化学习 │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 人工标注数据 │ │ 人类偏好对比 │ │ 奖励模型指导 │ │
│ │ (Prompt,Response)│ │ (A > B, B > C) │ │ Policy 优化 │ │
│ │ │ │ │ │ │ │
│ │ ~13K 样本 │ │ ~33K 对比对 │ │ ~31K Prompts │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ 数据来源: OpenAI API 用户提交的真实 prompts + 标注员编写的 prompts │
└─────────────────────────────────────────────────────────────────────────┘
Stage 1: 监督微调 (SFT)
"""
Stage 1: 监督微调
目标: 让模型学会按照指令格式回答
"""
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class SFTDataPoint:
"""SFT 训练数据格式"""
prompt: str # 用户指令
response: str # 标注员撰写的高质量回答
metadata: Dict # 元信息(来源、标注者等)
class SFTTrainer:
"""监督微调训练器"""
def __init__(self, base_model, tokenizer):
self.model = base_model
self.tokenizer = tokenizer
def prepare_data(self, data: List[SFTDataPoint]) -> List[Dict]:
"""准备训练数据"""
processed = []
for dp in data:
# 拼接 prompt 和 response
text = f"Human: {dp.prompt}\n\nAssistant: {dp.response}"
tokens = self.tokenizer(text, truncation=True, max_length=2048)
# 只在 response 部分计算 loss
labels = tokens['input_ids'].copy()
prompt_len = len(self.tokenizer(f"Human: {dp.prompt}\n\nAssistant: ")['input_ids'])
labels[:prompt_len] = [-100] * prompt_len # -100 表示忽略
processed.append({
'input_ids': tokens['input_ids'],
'attention_mask': tokens['attention_mask'],
'labels': labels
})
return processed
def train(self, dataset, epochs=3, lr=1e-5):
"""执行 SFT 训练"""
# 标准的语言模型训练
# Loss = -Σ log P(y_t | y_{<t}, x)
pass
Stage 2: 奖励模型训练 (RM)
"""
Stage 2: 奖励模型训练
目标: 学习人类偏好,给回答打分
"""
import torch
import torch.nn as nn
from transformers import AutoModel
class RewardModel(nn.Module):
"""奖励模型架构"""
def __init__(self, base_model_name: str):
super().__init__()
# 使用与 SFT 相同的基座模型
self.backbone = AutoModel.from_pretrained(base_model_name)
# 添加一个线性层输出标量奖励
self.reward_head = nn.Linear(self.backbone.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
"""前向传播,输出奖励值"""
outputs = self.backbone(input_ids, attention_mask=attention_mask)
# 取最后一个 token 的隐藏状态
last_hidden = outputs.last_hidden_state[:, -1, :]
reward = self.reward_head(last_hidden)
return reward.squeeze(-1)
@dataclass
class PreferencePair:
"""人类偏好对比数据"""
prompt: str
chosen: str # 被选中的(更好的)回答
rejected: str # 被拒绝的(较差的)回答
class RMTrainer:
"""奖励模型训练器"""
def __init__(self, reward_model: RewardModel, tokenizer):
self.model = reward_model
self.tokenizer = tokenizer
def compute_loss(self, batch: List[PreferencePair]) -> torch.Tensor:
"""
计算偏好学习损失 (Bradley-Terry 模型)
Loss = -log(σ(r_chosen - r_rejected))
即:让好回答的奖励尽可能高于差回答
"""
chosen_rewards = []
rejected_rewards = []
for pair in batch:
# 编码 chosen
chosen_input = f"Human: {pair.prompt}\n\nAssistant: {pair.chosen}"
chosen_tokens = self.tokenizer(chosen_input, return_tensors='pt')
r_chosen = self.model(**chosen_tokens)
chosen_rewards.append(r_chosen)
# 编码 rejected
rejected_input = f"Human: {pair.prompt}\n\nAssistant: {pair.rejected}"
rejected_tokens = self.tokenizer(rejected_input, return_tensors='pt')
r_rejected = self.model(**rejected_tokens)
rejected_rewards.append(r_rejected)
chosen_rewards = torch.stack(chosen_rewards)
rejected_rewards = torch.stack(rejected_rewards)
# Bradley-Terry loss
loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
return loss
3.2 奖励模型的设计
💡 思考:奖励模型需要具备哪些特性?
🤔 解答:
┌─────────────────────────────────────────────────────────────────────────┐
│ 奖励模型设计要点 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 与策略模型同源 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 奖励模型通常从 SFT 模型初始化 │ │
│ │ - 继承语言理解能力 │ │
│ │ - 减少训练数据需求 │ │
│ │ - 更好地理解生成质量 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 2. 输出标量奖励 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 输入: (prompt, response) → 输出: scalar reward │ │
│ │ │ │
│ │ 技术细节: │ │
│ │ - 通常在 EOS token 位置取隐藏状态 │ │
│ │ - 通过线性层映射到标量 │ │
│ │ - 训练时使用相对比较,推理时使用绝对值 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 3. 校准与泛化 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 挑战: │ │
│ │ - 训练数据有限,需要泛化到新 prompt │ │
│ │ - 奖励尺度需要校准(防止 reward hacking) │ │
│ │ - 需要定期用新数据更新 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 4. 多维度奖励(高级) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 可以训练多个奖励头: │ │
│ │ - Helpfulness reward │ │
│ │ - Harmlessness reward │ │
│ │ - Honesty reward │ │
│ │ │ │
│ │ 最终奖励 = w1*r_help + w2*r_harmless + w3*r_honest │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
3.3 PPO 算法详解
PPO(Proximal Policy Optimization)是 InstructGPT 使用的核心 RL 算法。
┌─────────────────────────────────────────────────────────────────────────┐
│ PPO 算法核心思想 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 目标: 更新策略 π_θ 以最大化期望奖励,同时保持稳定 │
│ │
│ 核心公式: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ L^{CLIP}(θ) = E[ min(r(θ)A, clip(r(θ), 1-ε, 1+ε)A) ] │ │
│ │ │ │
│ │ 其中: │ │
│ │ r(θ) = π_θ(a|s) / π_{θ_old}(a|s) ← 新旧策略比率 │ │
│ │ A = 优势函数 (Advantage) ← 动作好坏程度 │ │
│ │ ε = 0.2 (通常) ← 裁剪范围 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 为什么需要 clip? │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ r(θ) │ │
│ │ ▲ │ │
│ │ │ ╱ 无限制优化 │ │
│ │ 1+ε├─────────┼───── clip 上界 │ │
│ │ │ ╱│ │ │
│ │ 1 ├───────•─┼────── 初始策略 │ │
│ │ │ ╱ │ │ │
│ │ 1-ε├─────┼───┴───── clip 下界 │ │
│ │ │ ╱ │ │
│ │ └────┴──────────→ 训练步数 │ │
│ │ │ │
│ │ clip 机制限制每次更新的幅度,防止策略变化过大 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
PPO for LLM 的特殊考虑:
"""
PPO for LLM 实现
"""
import torch
import torch.nn.functional as F
from typing import Dict, List, Tuple
class PPOTrainer:
"""PPO 训练器(用于 LLM)"""
def __init__(
self,
policy_model, # 当前策略模型 π_θ
ref_model, # 参考模型(SFT 模型)π_ref
reward_model, # 奖励模型 r_φ
tokenizer,
config: Dict
):
self.policy = policy_model
self.ref = ref_model
self.reward_model = reward_model
self.tokenizer = tokenizer
# PPO 超参数
self.clip_range = config.get('clip_range', 0.2)
self.kl_coef = config.get('kl_coef', 0.1) # KL 惩罚系数
self.vf_coef = config.get('vf_coef', 0.1) # 价值函数系数
self.gamma = config.get('gamma', 1.0) # 折扣因子
self.lam = config.get('lam', 0.95) # GAE lambda
def generate_responses(self, prompts: List[str]) -> List[Dict]:
"""使用当前策略生成回答"""
responses = []
for prompt in prompts:
# 生成回答
input_ids = self.tokenizer.encode(prompt, return_tensors='pt')
output = self.policy.generate(
input_ids,
max_new_tokens=256,
do_sample=True,
temperature=0.7,
return_dict_in_generate=True,
output_scores=True
)
response_ids = output.sequences[0][len(input_ids[0]):]
response_text = self.tokenizer.decode(response_ids)
responses.append({
'prompt': prompt,
'response': response_text,
'response_ids': response_ids,
'logprobs': self._compute_logprobs(output)
})
return responses
def compute_rewards(self, samples: List[Dict]) -> torch.Tensor:
"""计算奖励(包含 KL 惩罚)"""
rewards = []
for sample in samples:
# 1. 获取奖励模型打分
full_text = f"{sample['prompt']}{sample['response']}"
tokens = self.tokenizer(full_text, return_tensors='pt')
rm_reward = self.reward_model(**tokens)
# 2. 计算 KL 散度惩罚
# KL(π_θ || π_ref) ≈ log π_θ(a|s) - log π_ref(a|s)
with torch.no_grad():
ref_logprobs = self._get_logprobs(
self.ref,
sample['prompt'],
sample['response']
)
policy_logprobs = sample['logprobs']
kl_penalty = (policy_logprobs - ref_logprobs).sum()
# 3. 最终奖励 = RM奖励 - KL惩罚
final_reward = rm_reward - self.kl_coef * kl_penalty
rewards.append(final_reward)
return torch.stack(rewards)
def compute_advantages(
self,
rewards: torch.Tensor,
values: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
使用 GAE (Generalized Advantage Estimation) 计算优势函数
A_t = δ_t + (γλ)δ_{t+1} + (γλ)²δ_{t+2} + ...
δ_t = r_t + γV(s_{t+1}) - V(s_t)
"""
advantages = []
returns = []
gae = 0
# 从后往前计算 (token 级别)
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + self.gamma * next_value - values[t]
gae = delta + self.gamma * self.lam * gae
advantages.insert(0, gae)
returns.insert(0, gae + values[t])
advantages = torch.tensor(advantages)
returns = torch.tensor(returns)
# 标准化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
return advantages, returns
def ppo_step(self, samples: List[Dict]) -> Dict:
"""执行一步 PPO 更新"""
# 1. 计算奖励
rewards = self.compute_rewards(samples)
# 2. 计算价值估计
values = self._estimate_values(samples)
# 3. 计算优势
advantages, returns = self.compute_advantages(rewards, values)
# 4. PPO 更新(多个 epoch)
for _ in range(self.ppo_epochs):
# 获取当前策略的 log prob
curr_logprobs = self._get_batch_logprobs(samples)
old_logprobs = torch.tensor([s['logprobs'] for s in samples])
# 计算比率
ratio = torch.exp(curr_logprobs - old_logprobs)
# Clipped surrogate objective
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_range, 1 + self.clip_range) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
# Value function loss
curr_values = self._estimate_values(samples)
value_loss = F.mse_loss(curr_values, returns)
# Total loss
loss = policy_loss + self.vf_coef * value_loss
# 反向传播
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
return {
'policy_loss': policy_loss.item(),
'value_loss': value_loss.item(),
'mean_reward': rewards.mean().item(),
'mean_kl': self._compute_mean_kl(samples).item()
}
def train(self, prompts: List[str], num_iterations: int):
"""完整训练循环"""
for i in range(num_iterations):
# 1. 采样 prompts
batch_prompts = self._sample_batch(prompts)
# 2. 生成回答
samples = self.generate_responses(batch_prompts)
# 3. PPO 更新
metrics = self.ppo_step(samples)
# 4. 日志记录
print(f"Iteration {i}: reward={metrics['mean_reward']:.3f}, "
f"kl={metrics['mean_kl']:.4f}")
# 5. 检查 KL 散度,必要时调整
if metrics['mean_kl'] > self.target_kl * 1.5:
self.kl_coef *= 1.5
elif metrics['mean_kl'] < self.target_kl / 1.5:
self.kl_coef /= 1.5
3.4 代码实现
下面是一个完整的 RLHF 训练框架实现:
"""
完整的 RLHF 训练框架
"""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from dataclasses import dataclass
from typing import Optional, List
import json
@dataclass
class RLHFConfig:
"""RLHF 训练配置"""
# 模型配置
base_model: str = "meta-llama/Llama-2-7b-hf"
reward_model: Optional[str] = None
# SFT 配置
sft_epochs: int = 3
sft_lr: float = 2e-5
sft_batch_size: int = 4
# RM 配置
rm_epochs: int = 1
rm_lr: float = 1e-5
rm_batch_size: int = 8
# PPO 配置
ppo_epochs: int = 4
ppo_lr: float = 1e-6
ppo_batch_size: int = 16
clip_range: float = 0.2
kl_coef: float = 0.1
target_kl: float = 0.02
# 生成配置
max_new_tokens: int = 256
temperature: float = 0.7
class RLHFTrainer:
"""RLHF 完整训练流程"""
def __init__(self, config: RLHFConfig):
self.config = config
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化 tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(config.base_model)
self.tokenizer.pad_token = self.tokenizer.eos_token
def stage1_sft(self, sft_data: List[dict]) -> AutoModelForCausalLM:
"""
Stage 1: 监督微调
Args:
sft_data: [{"prompt": "...", "response": "..."}, ...]
Returns:
微调后的模型
"""
print("=" * 50)
print("Stage 1: Supervised Fine-Tuning")
print("=" * 50)
# 加载基座模型
model = AutoModelForCausalLM.from_pretrained(
self.config.base_model,
torch_dtype=torch.bfloat16,
device_map="auto"
)
# 准备数据
train_dataset = self._prepare_sft_dataset(sft_data)
# 训练
# ... (使用 HuggingFace Trainer 或自定义训练循环)
print(f"SFT completed. Trained on {len(sft_data)} samples.")
return model
def stage2_rm(
self,
sft_model: AutoModelForCausalLM,
preference_data: List[dict]
) -> RewardModel:
"""
Stage 2: 奖励模型训练
Args:
sft_model: SFT 模型(用于初始化)
preference_data: [{"prompt": "...", "chosen": "...", "rejected": "..."}, ...]
Returns:
训练好的奖励模型
"""
print("=" * 50)
print("Stage 2: Reward Model Training")
print("=" * 50)
# 从 SFT 模型初始化奖励模型
reward_model = RewardModel(sft_model)
# 准备偏好数据
train_dataset = self._prepare_preference_dataset(preference_data)
# 训练奖励模型
optimizer = torch.optim.AdamW(reward_model.parameters(), lr=self.config.rm_lr)
for epoch in range(self.config.rm_epochs):
total_loss = 0
correct = 0
total = 0
for batch in train_dataset:
loss, acc = self._rm_train_step(reward_model, batch, optimizer)
total_loss += loss
correct += acc * len(batch)
total += len(batch)
print(f"Epoch {epoch+1}: Loss={total_loss/len(train_dataset):.4f}, "
f"Accuracy={correct/total:.2%}")
return reward_model
def stage3_ppo(
self,
sft_model: AutoModelForCausalLM,
reward_model: RewardModel,
prompts: List[str],
num_iterations: int = 1000
) -> AutoModelForCausalLM:
"""
Stage 3: PPO 强化学习
Args:
sft_model: SFT 模型(作为初始策略和参考模型)
reward_model: 奖励模型
prompts: 训练用的 prompt 集合
num_iterations: 训练迭代次数
Returns:
RLHF 训练后的模型
"""
print("=" * 50)
print("Stage 3: PPO Training")
print("=" * 50)
# 初始化策略模型和参考模型
policy_model = sft_model # 将被更新
ref_model = AutoModelForCausalLM.from_pretrained(
self.config.base_model # 加载原始 SFT 模型作为参考
)
ref_model.eval() # 参考模型不更新
# 初始化 PPO 训练器
ppo_trainer = PPOTrainer(
policy_model=policy_model,
ref_model=ref_model,
reward_model=reward_model,
tokenizer=self.tokenizer,
config={
'clip_range': self.config.clip_range,
'kl_coef': self.config.kl_coef,
'target_kl': self.config.target_kl
}
)
# PPO 训练循环
ppo_trainer.train(prompts, num_iterations)
return policy_model
def full_pipeline(
self,
sft_data: List[dict],
preference_data: List[dict],
ppo_prompts: List[str]
) -> AutoModelForCausalLM:
"""运行完整的 RLHF 训练流程"""
# Stage 1: SFT
sft_model = self.stage1_sft(sft_data)
# Stage 2: RM
reward_model = self.stage2_rm(sft_model, preference_data)
# Stage 3: PPO
final_model = self.stage3_ppo(sft_model, reward_model, ppo_prompts)
return final_model
# 使用示例
if __name__ == "__main__":
# 配置
config = RLHFConfig(
base_model="meta-llama/Llama-2-7b-hf",
sft_epochs=3,
rm_epochs=1,
ppo_epochs=4
)
# 加载数据
with open("sft_data.json") as f:
sft_data = json.load(f)
with open("preference_data.json") as f:
preference_data = json.load(f)
with open("ppo_prompts.json") as f:
ppo_prompts = json.load(f)
# 训练
trainer = RLHFTrainer(config)
final_model = trainer.full_pipeline(sft_data, preference_data, ppo_prompts)
# 保存模型
final_model.save_pretrained("./rlhf_model")
4. RLHF 进阶:技术演进与变体 🔄
随着 RLHF 技术的发展,研究者们提出了多种改进和简化方案。
4.1 DPO:去除奖励模型
DPO(Direct Preference Optimization)是 2023 年提出的重要方法,它直接从偏好数据优化策略,无需显式的奖励模型。
┌─────────────────────────────────────────────────────────────────────────┐
│ DPO vs RLHF 对比 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ RLHF 流程: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │偏好数据 │ → │训练 RM │ → │RM 打分 │ → │PPO 优化 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ ↓ ↓ │
│ 需要单独 采样+评估 │
│ 训练模型 计算开销大 │
│ │
│ DPO 流程: │
│ ┌─────────┐ ┌───────────────────────────────┐ │
│ │偏好数据 │ → │ 直接优化策略 (closed-form) │ │
│ └─────────┘ └───────────────────────────────┘ │
│ ↓ │
│ 无需 RM,无需采样 │
│ 训练更简单稳定 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
DPO 的核心思想:
DPO 证明了可以将 RLHF 的优化目标重新参数化,直接用策略模型表达,无需显式的奖励模型。
"""
DPO (Direct Preference Optimization) 实现
"""
import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM
class DPOTrainer:
"""DPO 训练器"""
def __init__(
self,
model: AutoModelForCausalLM,
ref_model: AutoModelForCausalLM,
tokenizer,
beta: float = 0.1 # 温度参数
):
self.model = model
self.ref_model = ref_model
self.tokenizer = tokenizer
self.beta = beta
def compute_dpo_loss(
self,
prompt: str,
chosen: str,
rejected: str
) -> torch.Tensor:
"""
DPO 损失函数
L_DPO = -log σ(β * (log π_θ(y_w|x) - log π_ref(y_w|x)
- log π_θ(y_l|x) + log π_ref(y_l|x)))
其中:
y_w = chosen (winner)
y_l = rejected (loser)
π_θ = 当前策略
π_ref = 参考策略 (SFT 模型)
"""
# 计算 chosen 的 log prob
chosen_logps = self._get_logprobs(self.model, prompt, chosen)
chosen_ref_logps = self._get_logprobs(self.ref_model, prompt, chosen)
# 计算 rejected 的 log prob
rejected_logps = self._get_logprobs(self.model, prompt, rejected)
rejected_ref_logps = self._get_logprobs(self.ref_model, prompt, rejected)
# 计算 log ratio
chosen_ratio = chosen_logps - chosen_ref_logps
rejected_ratio = rejected_logps - rejected_ref_logps
# DPO loss
loss = -F.logsigmoid(self.beta * (chosen_ratio - rejected_ratio))
return loss
def _get_logprobs(
self,
model: AutoModelForCausalLM,
prompt: str,
response: str
) -> torch.Tensor:
"""计算 response 在给定 prompt 下的 log probability"""
full_text = prompt + response
tokens = self.tokenizer(full_text, return_tensors='pt')
with torch.no_grad() if model == self.ref_model else torch.enable_grad():
outputs = model(**tokens)
logits = outputs.logits
# 只计算 response 部分的 log prob
prompt_len = len(self.tokenizer(prompt)['input_ids'])
response_logits = logits[:, prompt_len-1:-1, :]
response_tokens = tokens['input_ids'][:, prompt_len:]
log_probs = F.log_softmax(response_logits, dim=-1)
token_log_probs = torch.gather(
log_probs,
dim=-1,
index=response_tokens.unsqueeze(-1)
).squeeze(-1)
return token_log_probs.sum()
def train_step(self, batch: List[dict]) -> dict:
"""执行一步训练"""
total_loss = 0
for sample in batch:
loss = self.compute_dpo_loss(
sample['prompt'],
sample['chosen'],
sample['rejected']
)
total_loss += loss
loss = total_loss / len(batch)
loss.backward()
return {'loss': loss.item()}
# DPO 的优势
"""
1. 简单性
- 无需训练单独的奖励模型
- 无需在线采样(不需要生成新回答)
- 训练流程类似 SFT
2. 稳定性
- 避免了 PPO 的训练不稳定性
- 无需调整 KL 惩罚系数
- 超参数更少
3. 效率
- 计算开销大大降低
- 无需维护多个模型
- 内存占用更少
4. 效果
- 在多个基准上与 RLHF 效果相当
- 有时甚至更好
"""
4.2 ORPO:偏好优化的简化
ORPO(Odds Ratio Preference Optimization)进一步简化了偏好学习,甚至不需要参考模型。
"""
ORPO (Odds Ratio Preference Optimization) 实现
"""
import torch
import torch.nn.functional as F
class ORPOTrainer:
"""ORPO 训练器 - 结合 SFT 和偏好优化"""
def __init__(
self,
model,
tokenizer,
lambda_orpo: float = 0.1 # ORPO 损失权重
):
self.model = model
self.tokenizer = tokenizer
self.lambda_orpo = lambda_orpo
def compute_orpo_loss(
self,
prompt: str,
chosen: str,
rejected: str
) -> tuple:
"""
ORPO 损失 = SFT损失 + λ * 偏好损失
偏好损失使用 odds ratio:
L_OR = -log σ(log(odds(y_w|x) / odds(y_l|x)))
其中 odds(y|x) = P(y|x) / (1 - P(y|x))
"""
# 计算 chosen 的 log prob
chosen_logps, chosen_avg_logps = self._get_logprobs_and_avg(prompt, chosen)
rejected_logps, rejected_avg_logps = self._get_logprobs_and_avg(prompt, rejected)
# SFT 损失(只在 chosen 上)
sft_loss = -chosen_logps
# ORPO 损失(基于 odds ratio)
chosen_odds = chosen_avg_logps - torch.log(1 - torch.exp(chosen_avg_logps) + 1e-10)
rejected_odds = rejected_avg_logps - torch.log(1 - torch.exp(rejected_avg_logps) + 1e-10)
orpo_loss = -F.logsigmoid(chosen_odds - rejected_odds)
# 总损失
total_loss = sft_loss + self.lambda_orpo * orpo_loss
return total_loss, sft_loss, orpo_loss
def _get_logprobs_and_avg(self, prompt: str, response: str):
"""获取总 log prob 和平均 log prob"""
full_text = prompt + response
tokens = self.tokenizer(full_text, return_tensors='pt')
outputs = self.model(**tokens)
logits = outputs.logits
prompt_len = len(self.tokenizer(prompt)['input_ids'])
response_logits = logits[:, prompt_len-1:-1, :]
response_tokens = tokens['input_ids'][:, prompt_len:]
log_probs = F.log_softmax(response_logits, dim=-1)
token_log_probs = torch.gather(
log_probs,
dim=-1,
index=response_tokens.unsqueeze(-1)
).squeeze(-1)
total_logps = token_log_probs.sum()
avg_logps = token_log_probs.mean()
return total_logps, avg_logps
# ORPO 的特点
"""
1. 无需参考模型
- 比 DPO 更简单
- 内存占用更少
- 从随机初始化也可以训练
2. 结合 SFT 和偏好学习
- 一阶段完成
- 更高效的数据利用
- 简化训练流程
3. 使用 Odds Ratio
- 比 log prob ratio 更稳定
- 对序列长度不敏感
- 数学上更优雅
"""
4.3 KTO:基于前景理论的对齐
KTO(Kahneman-Tversky Optimization)基于行为经济学的前景理论,不需要配对的偏好数据。
"""
KTO (Kahneman-Tversky Optimization) 实现
"""
import torch
import torch.nn.functional as F
class KTOTrainer:
"""
KTO 训练器 - 基于前景理论
核心思想:人类对损失比收益更敏感(损失厌恶)
"""
def __init__(
self,
model,
ref_model,
tokenizer,
beta: float = 0.1,
desirable_weight: float = 1.0,
undesirable_weight: float = 1.0
):
self.model = model
self.ref_model = ref_model
self.tokenizer = tokenizer
self.beta = beta
self.desirable_weight = desirable_weight
self.undesirable_weight = undesirable_weight
def compute_kto_loss(
self,
prompt: str,
response: str,
is_desirable: bool # True = 好回答, False = 坏回答
) -> torch.Tensor:
"""
KTO 损失函数
对于好回答 (y_w):
L = w_d * (1 - σ(β * (log π_θ(y|x) - log π_ref(y|x) - KL_ref)))
对于坏回答 (y_l):
L = w_u * σ(β * (log π_θ(y|x) - log π_ref(y|x) - KL_ref))
其中 KL_ref 是参考 KL 散度的估计值
"""
# 计算 log ratio
policy_logps = self._get_logprobs(self.model, prompt, response)
ref_logps = self._get_logprobs(self.ref_model, prompt, response)
log_ratio = policy_logps - ref_logps
# 计算参考 KL(通常在整个 batch 上估计)
kl_ref = 0.0 # 简化起见,这里设为0
if is_desirable:
# 好回答:最大化 log ratio
loss = self.desirable_weight * (
1 - torch.sigmoid(self.beta * (log_ratio - kl_ref))
)
else:
# 坏回答:最小化 log ratio
loss = self.undesirable_weight * torch.sigmoid(
self.beta * (log_ratio - kl_ref)
)
return loss
def train_on_unpaired_data(self, data: List[dict]):
"""
KTO 的优势:可以使用非配对数据
data: [
{"prompt": "...", "response": "...", "is_good": True},
{"prompt": "...", "response": "...", "is_good": False},
...
]
不需要 chosen/rejected 配对!
"""
for sample in data:
loss = self.compute_kto_loss(
sample['prompt'],
sample['response'],
sample['is_good']
)
loss.backward()
# KTO 的优势
"""
1. 数据效率
- 不需要配对数据
- 可以直接使用点赞/点踩数据
- 数据收集更容易
2. 理论基础
- 基于前景理论
- 考虑了人类的损失厌恶特性
- 心理学上更合理
3. 灵活性
- 可以调整好/坏数据的权重
- 适应不同的数据分布
- 易于与其他方法结合
"""
4.4 方法对比与选择
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 偏好优化方法对比 │
├───────────┬──────────────┬──────────────┬──────────────┬──────────────────────────┤
│ 方法 │ 需要RM? │ 需要参考模型?│ 需要配对数据?│ 适用场景 │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│ RLHF │ ✓ │ ✓ │ ✓ │ 大规模训练,最高质量 │
│ (PPO) │ │ │ │ │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│ DPO │ ✗ │ ✓ │ ✓ │ 快速训练,资源有限 │
│ │ │ │ │ │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│ ORPO │ ✗ │ ✗ │ ✓ │ 从头训练,超简单流程 │
│ │ │ │ │ │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│ KTO │ ✗ │ ✓ │ ✗ │ 只有单边反馈数据 │
│ │ │ │ │ (点赞/点踩) │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│ IPO │ ✗ │ ✓ │ ✓ │ 需要更强的正则化 │
├───────────┴──────────────┴──────────────┴──────────────┴──────────────────────────┤
│ │
│ 选择指南: │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ 1. 有大量计算资源 + 需要最佳效果 → RLHF (PPO) │ │
│ │ 2. 计算资源有限 + 有配对偏好数据 → DPO │ │
│ │ 3. 想要最简单的训练流程 → ORPO │ │
│ │ 4. 只有单边反馈数据(点赞/点踩)→ KTO │ │
│ │ 5. 需要更强的理论保证 → IPO │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────────┘
5. AgentGym:通用 Agent 训练框架 🏋️
AgentGym 是一个专门为训练 LLM-based Agent 设计的框架,提供了统一的环境接口、大规模轨迹数据集和进化训练方法。
5.1 AgentGym 架构设计
┌─────────────────────────────────────────────────────────────────────────────────┐
│ AgentGym 整体架构 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ AgentEvol │ │
│ │ 进化训练器 │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ AgentTraj │ │ AgentEval │ │ AgentEnv │ │
│ │ 轨迹数据集 │ │ 评估基准 │ │ 环境套件 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌─────────────────┴──────────────────┴──────────────────┴─────────────────┐ │
│ │ 统一抽象层 (EnvClient) │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │WebShop│ │ALFWorld│ │SciWorld│ │TextCraft│ │Weather│ │Movie│ │Todo │ │ │
│ │ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │ │
│ │ ... 14 种任务环境 ... │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 核心特点: │
│ 1. 统一的 ReAct 格式 (Thought → Action → Observation) │
│ 2. 支持多种任务类型 (网页浏览、游戏、工具使用、知识推理等) │
│ 3. 真实交互环境 (不是静态数据集) │
│ 4. 可扩展的环境接口 │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
5.2 AgentTraj 数据集
AgentTraj 是 AgentGym 的核心数据集,包含大规模的 Agent 交互轨迹。
"""
AgentTraj 数据集结构与使用
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class TaskType(Enum):
"""任务类型枚举"""
WEB_BROWSING = "web_browsing" # 网页浏览
GAME = "game" # 游戏任务
TOOL_USE = "tool_use" # 工具使用
KNOWLEDGE = "knowledge" # 知识推理
CODING = "coding" # 代码生成
@dataclass
class AgentStep:
"""单步 Agent 交互"""
thought: str # Agent 的思考过程
action: str # 执行的动作
action_input: str # 动作参数
observation: str # 环境反馈
reward: Optional[float] # 即时奖励(可选)
@dataclass
class AgentTrajectory:
"""完整的 Agent 轨迹"""
task_id: str
task_type: TaskType
instruction: str # 任务指令
steps: List[AgentStep] # 交互步骤序列
final_reward: float # 最终奖励/成功标志
metadata: Dict # 元信息
class AgentTrajDataset:
"""AgentTraj 数据集加载器"""
def __init__(self, data_path: str):
self.data_path = data_path
self.trajectories = self._load_data()
def _load_data(self) -> List[AgentTrajectory]:
"""加载轨迹数据"""
# 实际实现会从 JSON/Parquet 文件加载
pass
def filter_by_type(self, task_type: TaskType) -> List[AgentTrajectory]:
"""按任务类型筛选"""
return [t for t in self.trajectories if t.task_type == task_type]
def filter_by_reward(self, min_reward: float) -> List[AgentTrajectory]:
"""筛选高质量轨迹"""
return [t for t in self.trajectories if t.final_reward >= min_reward]
def to_sft_format(self, trajectory: AgentTrajectory) -> Dict:
"""转换为 SFT 训练格式"""
prompt = f"Task: {trajectory.instruction}\n\n"
response = ""
for step in trajectory.steps:
response += f"Thought: {step.thought}\n"
response += f"Action: {step.action}\n"
response += f"Action Input: {step.action_input}\n"
response += f"Observation: {step.observation}\n\n"
return {
"prompt": prompt,
"response": response,
"reward": trajectory.final_reward
}
# 数据集统计(来自论文)
"""
AgentTraj-L 数据集:
- 总轨迹数: ~500K
- 覆盖 14 种环境
- 包含成功和失败轨迹
- 多个 LLM 生成 (GPT-4, Claude, Llama)
环境分布:
┌──────────────┬───────────┬──────────────┐
│ 环境 │ 轨迹数 │ 成功率 │
├──────────────┼───────────┼──────────────┤
│ WebShop │ 50K │ 35% │
│ ALFWorld │ 80K │ 45% │
│ SciWorld │ 40K │ 28% │
│ TextCraft │ 60K │ 52% │
│ BIRD │ 30K │ 40% │
│ ... │ ... │ ... │
└──────────────┴───────────┴──────────────┘
"""
5.3 AgentEvol 进化训练
AgentEvol 是 AgentGym 的核心训练方法,采用进化式的迭代训练策略。
┌─────────────────────────────────────────────────────────────────────────────────┐
│ AgentEvol 进化训练流程 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ 初始化阶段 │ │
│ │ ┌─────────────┐ │ │
│ │ │ Base LLM │ ────→ SFT on AgentTraj ────→ │ Agent_v0 │ │ │
│ │ └─────────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ 进化迭代 (重复 N 轮) │ │
│ │ │ │
│ │ Step 1: 探索 (Exploration) │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ Agent_v{i} 在环境中执行任务,收集新轨迹 │ │ │
│ │ │ - 使用温度采样增加多样性 │ │ │
│ │ │ - 并行在多个环境中执行 │ │ │
│ │ │ - 记录 (observation, action, reward) 序列 │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 2: 筛选 (Selection) │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ 根据最终奖励筛选高质量轨迹 │ │ │
│ │ │ - 保留 reward > threshold 的轨迹 │ │ │
│ │ │ - 可选: 使用奖励模型重新评分 │ │ │
│ │ │ - 去重和质量过滤 │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 3: 进化 (Evolution) │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ 使用筛选后的数据训练 │ │ │
│ │ │ - 可选: SFT (监督微调) │ │ │
│ │ │ - 可选: DPO (直接偏好优化) │ │ │
│ │ │ - 可选: PPO (强化学习) │ │ │
│ │ │ 得到 Agent_v{i+1} │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 4: 评估 (Evaluation) │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ 在保留测试集上评估 Agent_v{i+1} │ │ │
│ │ │ - 计算任务成功率 │ │ │
│ │ │ - 比较与 v{i} 的提升 │ │ │
│ │ │ - 决定是否继续迭代 │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
"""
AgentEvol 进化训练实现
"""
import torch
from typing import List, Dict, Tuple
from dataclasses import dataclass
import random
@dataclass
class EvolConfig:
"""AgentEvol 配置"""
num_iterations: int = 5 # 进化轮数
trajectories_per_iter: int = 1000 # 每轮收集的轨迹数
selection_ratio: float = 0.3 # 保留的轨迹比例
reward_threshold: float = 0.5 # 奖励阈值
training_method: str = "sft" # sft / dpo / ppo
temperature: float = 0.8 # 探索温度
class AgentEvol:
"""AgentEvol 进化训练器"""
def __init__(
self,
base_model,
tokenizer,
environments: List,
config: EvolConfig
):
self.model = base_model
self.tokenizer = tokenizer
self.envs = environments
self.config = config
self.trajectory_buffer = []
def explore(self) -> List[AgentTrajectory]:
"""
Step 1: 探索阶段
让 Agent 在环境中执行任务,收集轨迹
"""
new_trajectories = []
for _ in range(self.config.trajectories_per_iter):
# 随机选择环境
env = random.choice(self.envs)
# 重置环境
obs = env.reset()
trajectory = AgentTrajectory(
task_id=env.task_id,
task_type=env.task_type,
instruction=env.instruction,
steps=[],
final_reward=0.0,
metadata={}
)
# 执行交互循环
done = False
while not done:
# Agent 生成动作
prompt = self._build_prompt(trajectory.instruction, trajectory.steps, obs)
action, thought = self._generate_action(prompt)
# 环境执行
obs, reward, done, info = env.step(action)
# 记录步骤
trajectory.steps.append(AgentStep(
thought=thought,
action=action['name'],
action_input=action['input'],
observation=obs,
reward=reward
))
trajectory.final_reward = env.get_final_reward()
new_trajectories.append(trajectory)
return new_trajectories
def select(self, trajectories: List[AgentTrajectory]) -> List[AgentTrajectory]:
"""
Step 2: 筛选阶段
根据奖励筛选高质量轨迹
"""
# 按奖励排序
sorted_trajectories = sorted(
trajectories,
key=lambda t: t.final_reward,
reverse=True
)
# 筛选
selected = []
for t in sorted_trajectories:
if t.final_reward >= self.config.reward_threshold:
selected.append(t)
if len(selected) >= int(len(trajectories) * self.config.selection_ratio):
break
print(f"Selected {len(selected)} trajectories from {len(trajectories)}")
print(f"Average reward: {sum(t.final_reward for t in selected) / len(selected):.3f}")
return selected
def evolve(self, trajectories: List[AgentTrajectory]):
"""
Step 3: 进化阶段
使用筛选后的数据训练模型
"""
if self.config.training_method == "sft":
self._sft_training(trajectories)
elif self.config.training_method == "dpo":
self._dpo_training(trajectories)
elif self.config.training_method == "ppo":
self._ppo_training(trajectories)
def _sft_training(self, trajectories: List[AgentTrajectory]):
"""SFT 训练"""
# 转换为 SFT 格式
sft_data = [self._to_sft_format(t) for t in trajectories]
# 标准 SFT 训练
optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5)
for epoch in range(3):
for batch in self._batch_data(sft_data, batch_size=4):
loss = self._compute_sft_loss(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
def _dpo_training(self, trajectories: List[AgentTrajectory]):
"""DPO 训练 - 使用成功/失败轨迹配对"""
# 分离成功和失败轨迹
successful = [t for t in trajectories if t.final_reward > 0.5]
failed = [t for t in self.trajectory_buffer if t.final_reward < 0.5]
# 创建偏好对
pairs = []
for good_traj in successful:
# 找到相同任务的失败轨迹
bad_trajs = [t for t in failed if t.task_type == good_traj.task_type]
if bad_trajs:
bad_traj = random.choice(bad_trajs)
pairs.append({
'prompt': good_traj.instruction,
'chosen': self._format_trajectory(good_traj),
'rejected': self._format_trajectory(bad_traj)
})
# DPO 训练
# ... (使用之前的 DPOTrainer)
def evaluate(self, test_envs: List) -> Dict:
"""
Step 4: 评估阶段
"""
results = {
'total': 0,
'success': 0,
'by_type': {}
}
for env in test_envs:
obs = env.reset()
done = False
steps = []
while not done:
prompt = self._build_prompt(env.instruction, steps, obs)
action, thought = self._generate_action(prompt, temperature=0.0) # greedy
obs, reward, done, info = env.step(action)
steps.append({'thought': thought, 'action': action, 'obs': obs})
success = env.get_final_reward() > 0.5
results['total'] += 1
if success:
results['success'] += 1
# 按类型统计
task_type = env.task_type.value
if task_type not in results['by_type']:
results['by_type'][task_type] = {'total': 0, 'success': 0}
results['by_type'][task_type]['total'] += 1
if success:
results['by_type'][task_type]['success'] += 1
results['success_rate'] = results['success'] / results['total']
return results
def run_evolution(self, num_iterations: int = None) -> List[Dict]:
"""运行完整的进化训练"""
if num_iterations is None:
num_iterations = self.config.num_iterations
history = []
for i in range(num_iterations):
print(f"\n{'='*50}")
print(f"Evolution Iteration {i+1}/{num_iterations}")
print(f"{'='*50}")
# Step 1: 探索
print("\n[1/4] Exploring...")
new_trajectories = self.explore()
self.trajectory_buffer.extend(new_trajectories)
# Step 2: 筛选
print("\n[2/4] Selecting...")
selected = self.select(new_trajectories)
# Step 3: 进化
print("\n[3/4] Evolving...")
self.evolve(selected)
# Step 4: 评估
print("\n[4/4] Evaluating...")
eval_results = self.evaluate(self.test_envs)
history.append({
'iteration': i + 1,
'trajectories_collected': len(new_trajectories),
'trajectories_selected': len(selected),
'eval_results': eval_results
})
print(f"\nSuccess Rate: {eval_results['success_rate']:.2%}")
return history
def _generate_action(self, prompt: str, temperature: float = None) -> Tuple[Dict, str]:
"""生成动作"""
if temperature is None:
temperature = self.config.temperature
inputs = self.tokenizer(prompt, return_tensors='pt')
outputs = self.model.generate(
**inputs,
max_new_tokens=256,
temperature=temperature,
do_sample=temperature > 0
)
response = self.tokenizer.decode(outputs[0])
# 解析 thought 和 action
thought, action = self._parse_response(response)
return action, thought
5.4 实战:使用 AgentGym 训练 Agent
"""
AgentGym 实战:训练一个网页购物 Agent
"""
from agentgym import AgentGym, WebShopEnv, AgentEvol
from transformers import AutoModelForCausalLM, AutoTokenizer
# 1. 初始化环境
print("Step 1: Setting up environments...")
webshop_env = WebShopEnv(
num_products=10000,
user_personas_path="./data/user_personas.json"
)
# 2. 加载基座模型
print("Step 2: Loading base model...")
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
torch_dtype=torch.bfloat16,
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
# 3. 加载 AgentTraj 数据进行初始 SFT
print("Step 3: Initial SFT on AgentTraj...")
agenttraj = AgentTrajDataset("./data/agenttraj_webshop.json")
sft_data = agenttraj.filter_by_reward(0.5) # 只用成功的轨迹
sft_trainer = SFTTrainer(model, tokenizer)
sft_trainer.train(sft_data, epochs=3)
# 4. 进化训练
print("Step 4: Starting AgentEvol...")
evol_config = EvolConfig(
num_iterations=5,
trajectories_per_iter=500,
selection_ratio=0.3,
reward_threshold=0.5,
training_method="dpo"
)
agent_evol = AgentEvol(
base_model=model,
tokenizer=tokenizer,
environments=[webshop_env],
config=evol_config
)
# 设置测试环境
agent_evol.test_envs = [WebShopEnv(test=True) for _ in range(100)]
# 运行进化
history = agent_evol.run_evolution()
# 5. 查看训练历史
print("\n" + "="*50)
print("Training History")
print("="*50)
for record in history:
print(f"Iteration {record['iteration']}: "
f"Success Rate = {record['eval_results']['success_rate']:.2%}")
# 6. 保存最终模型
model.save_pretrained("./webshop_agent_evolved")
tokenizer.save_pretrained("./webshop_agent_evolved")
"""
预期输出:
==================================================
Training History
==================================================
Iteration 1: Success Rate = 42.00%
Iteration 2: Success Rate = 51.00%
Iteration 3: Success Rate = 58.00%
Iteration 4: Success Rate = 63.00%
Iteration 5: Success Rate = 67.00%
"""
6. R3:推理增强的强化学习 🧠
R3(Reinforcement Learning with Reasoning Rewards)是一种将推理能力融入强化学习训练的方法,特别适合需要复杂推理的 Agent 任务。
6.1 R3 的核心思想
💡 思考:为什么传统的 RL 方法在训练需要推理的 Agent 时效果不佳?
🤔 解答:
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 传统 RL vs R3 对比 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 传统 RL 的问题: │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Task: "找一本关于机器学习的书,价格低于50元" │ │
│ │ │ │
│ │ Agent 推理过程: │ │
│ │ Thought 1: 我需要搜索机器学习相关的书籍 ←────── ✓ 正确 │ │
│ │ Action 1: search("机器学习") │ │
│ │ Thought 2: 找到了很多结果,需要筛选价格 ←────── ✓ 正确 │ │
│ │ Action 2: filter(price < 50) │ │
│ │ Thought 3: 这本看起来不错,点击查看 ←────────── ✓ 正确 │ │
│ │ Action 3: click("机器学习实战") │ │
│ │ ... │ │
│ │ Final Result: 购买成功 ✓ │ │
│ │ │ │
│ │ 传统 RL 奖励: +1 (只看最终结果) │ │
│ │ │ │
│ │ 问题: │ │
│ │ - 中间推理步骤没有奖励 │ │
│ │ - 奖励稀疏,学习效率低 │ │
│ │ - 无法区分"运气好"和"推理好" │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ R3 的解决方案: │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 为每个推理步骤提供奖励: │ │
│ │ │ │
│ │ Thought 1: 我需要搜索机器学习相关的书籍 │ │
│ │ → Reasoning Reward: +0.3 (正确识别任务) │ │
│ │ │ │
│ │ Thought 2: 找到了很多结果,需要筛选价格 │ │
│ │ → Reasoning Reward: +0.4 (记住价格约束) │ │
│ │ │ │
│ │ Thought 3: 这本看起来不错,点击查看 │ │
│ │ → Reasoning Reward: +0.2 (合理的探索) │ │
│ │ │ │
│ │ Total Reward = Task Reward + Σ Reasoning Rewards │ │
│ │ = 1.0 + 0.9 = 1.9 │ │
│ │ │ │
│ │ 优势: │ │
│ │ - 密集奖励信号 │ │
│ │ - 鼓励正确的推理过程 │ │
│ │ - 学习效率大幅提升 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
6.2 推理过程奖励模型
R3 的核心是训练一个能评估推理过程质量的奖励模型。
"""
R3: 推理过程奖励模型 (Process Reward Model)
"""
import torch
import torch.nn as nn
from transformers import AutoModel
from typing import List, Dict
class ProcessRewardModel(nn.Module):
"""
推理过程奖励模型
与传统的 Outcome RM 不同,Process RM 评估每一步推理的质量
"""
def __init__(self, base_model_name: str):
super().__init__()
self.backbone = AutoModel.from_pretrained(base_model_name)
# 步骤级别奖励头
self.step_reward_head = nn.Sequential(
nn.Linear(self.backbone.config.hidden_size, 512),
nn.ReLU(),
nn.Linear(512, 1)
)
# 可选:多维度评估
self.dimension_heads = nn.ModuleDict({
'relevance': nn.Linear(self.backbone.config.hidden_size, 1),
'correctness': nn.Linear(self.backbone.config.hidden_size, 1),
'progress': nn.Linear(self.backbone.config.hidden_size, 1)
})
def forward(
self,
input_ids,
attention_mask,
step_positions: List[int] # 每个推理步骤的结束位置
) -> Dict[str, torch.Tensor]:
"""
前向传播
Args:
input_ids: token ids
attention_mask: attention mask
step_positions: 每个 Thought 结束的位置索引
Returns:
step_rewards: 每个步骤的奖励值
"""
outputs = self.backbone(input_ids, attention_mask=attention_mask)
hidden_states = outputs.last_hidden_state
# 在每个步骤位置提取隐藏状态并计算奖励
step_rewards = []
dimension_scores = {dim: [] for dim in self.dimension_heads.keys()}
for pos in step_positions:
step_hidden = hidden_states[:, pos, :]
# 总体步骤奖励
step_reward = self.step_reward_head(step_hidden)
step_rewards.append(step_reward)
# 多维度分数
for dim, head in self.dimension_heads.items():
score = head(step_hidden)
dimension_scores[dim].append(score)
return {
'step_rewards': torch.stack(step_rewards, dim=1),
'dimension_scores': {
dim: torch.stack(scores, dim=1)
for dim, scores in dimension_scores.items()
}
}
class ProcessRMTrainer:
"""推理过程奖励模型训练器"""
def __init__(self, model: ProcessRewardModel, tokenizer):
self.model = model
self.tokenizer = tokenizer
def prepare_training_data(
self,
trajectories: List[Dict]
) -> List[Dict]:
"""
准备训练数据
数据格式:每个轨迹需要标注每一步的推理质量
{
"instruction": "...",
"steps": [
{"thought": "...", "action": "...", "quality": 0.8},
{"thought": "...", "action": "...", "quality": 0.6},
...
]
}
"""
processed = []
for traj in trajectories:
# 拼接完整序列
text = f"Task: {traj['instruction']}\n\n"
step_positions = []
step_qualities = []
for step in traj['steps']:
text += f"Thought: {step['thought']}\n"
# 记录 Thought 结束位置
step_positions.append(len(self.tokenizer(text)['input_ids']) - 1)
step_qualities.append(step['quality'])
text += f"Action: {step['action']}\n"
text += f"Observation: {step.get('observation', '')}\n\n"
tokens = self.tokenizer(text, return_tensors='pt', truncation=True)
processed.append({
'input_ids': tokens['input_ids'],
'attention_mask': tokens['attention_mask'],
'step_positions': step_positions,
'step_qualities': torch.tensor(step_qualities)
})
return processed
def compute_loss(self, batch: List[Dict]) -> torch.Tensor:
"""
计算损失
使用 MSE 损失来训练步骤奖励预测
"""
total_loss = 0
for sample in batch:
outputs = self.model(
sample['input_ids'],
sample['attention_mask'],
sample['step_positions']
)
predicted_rewards = outputs['step_rewards'].squeeze(-1)
target_rewards = sample['step_qualities']
loss = nn.MSELoss()(predicted_rewards, target_rewards)
total_loss += loss
return total_loss / len(batch)
def train_with_comparisons(
self,
good_trajectories: List[Dict],
bad_trajectories: List[Dict]
):
"""
使用对比学习训练
给定相同任务的好/坏轨迹对,学习区分推理质量
"""
# 类似 DPO 的方式,但在步骤级别
pass
6.3 R3 训练流程
┌─────────────────────────────────────────────────────────────────────────────────┐
│ R3 训练完整流程 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 阶段 1: 收集带标注的推理轨迹 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 方法 A: 人工标注 (高质量,成本高) │ │
│ │ - 标注员评估每个 Thought 的质量 │ │
│ │ - 评分维度: 相关性、正确性、进展性 │ │
│ │ │ │
│ │ 方法 B: 自动标注 (大规模,需要验证) │ │
│ │ - 使用 GPT-4 等强模型评估 │ │
│ │ - 基于最终结果回溯标注 │ │
│ │ - Monte Carlo 采样估计步骤贡献 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 阶段 2: 训练推理过程奖励模型 (PRM) │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 输入: 任务描述 + 推理步骤序列 │ │
│ │ 输出: 每个步骤的质量分数 │ │
│ │ │ │
│ │ 训练目标: min Σ MSE(predicted_reward, human_label) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 阶段 3: 使用 PRM 进行强化学习 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 修改后的奖励函数: │ │
│ │ │ │
│ │ R_total = α * R_task + β * Σ R_reasoning(step_i) + γ * R_KL │ │
│ │ ↑ ↑ ↑ │ │
│ │ 任务奖励 推理过程奖励 KL 惩罚 │ │
│ │ │ │
│ │ 使用 PPO 或其他 RL 算法优化 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 阶段 4: 迭代优化 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Loop: │ │
│ │ 1. 用当前策略收集新轨迹 │ │
│ │ 2. 用 PRM 评估推理质量 │ │
│ │ 3. 筛选高质量轨迹更新 PRM │ │
│ │ 4. 用更新后的 PRM 继续 RL 训练 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
"""
R3 完整训练实现
"""
import torch
from typing import List, Dict, Tuple
from dataclasses import dataclass
@dataclass
class R3Config:
"""R3 训练配置"""
# 奖励权重
task_reward_weight: float = 1.0
reasoning_reward_weight: float = 0.5
kl_penalty_weight: float = 0.1
# 训练参数
ppo_epochs: int = 4
batch_size: int = 16
learning_rate: float = 1e-6
# PRM 更新
prm_update_freq: int = 100
prm_learning_rate: float = 1e-5
class R3Trainer:
"""R3 训练器"""
def __init__(
self,
policy_model,
ref_model,
process_rm: ProcessRewardModel,
tokenizer,
config: R3Config
):
self.policy = policy_model
self.ref = ref_model
self.prm = process_rm
self.tokenizer = tokenizer
self.config = config
def compute_r3_reward(
self,
trajectory: Dict,
task_reward: float
) -> Tuple[torch.Tensor, Dict]:
"""
计算 R3 奖励
R = α * R_task + β * Σ R_reasoning + γ * R_KL
"""
# 1. 任务奖励
weighted_task_reward = self.config.task_reward_weight * task_reward
# 2. 推理过程奖励
text = self._format_trajectory(trajectory)
tokens = self.tokenizer(text, return_tensors='pt')
step_positions = self._get_step_positions(trajectory)
with torch.no_grad():
prm_outputs = self.prm(
tokens['input_ids'],
tokens['attention_mask'],
step_positions
)
reasoning_rewards = prm_outputs['step_rewards'].squeeze()
weighted_reasoning_reward = (
self.config.reasoning_reward_weight * reasoning_rewards.sum()
)
# 3. KL 惩罚
kl_penalty = self._compute_kl(trajectory)
weighted_kl = self.config.kl_penalty_weight * kl_penalty
# 总奖励
total_reward = weighted_task_reward + weighted_reasoning_reward - weighted_kl
# 返回详细信息供分析
details = {
'task_reward': task_reward,
'reasoning_rewards': reasoning_rewards.tolist(),
'kl_penalty': kl_penalty.item(),
'total_reward': total_reward.item()
}
return total_reward, details
def train_step(self, trajectories: List[Dict]) -> Dict:
"""
执行一步 R3 训练
"""
# 收集所有样本的奖励
all_rewards = []
all_details = []
for traj in trajectories:
task_reward = traj['final_reward']
reward, details = self.compute_r3_reward(traj, task_reward)
all_rewards.append(reward)
all_details.append(details)
# 计算优势函数
rewards_tensor = torch.stack(all_rewards)
values = self._estimate_values(trajectories)
advantages = self._compute_advantages(rewards_tensor, values)
# PPO 更新
policy_loss = self._ppo_update(trajectories, advantages)
# 定期更新 PRM
if self.step_count % self.config.prm_update_freq == 0:
self._update_prm(trajectories)
return {
'policy_loss': policy_loss,
'mean_reward': rewards_tensor.mean().item(),
'mean_reasoning_reward': sum(d['reasoning_rewards'][0] for d in all_details) / len(all_details),
'mean_kl': sum(d['kl_penalty'] for d in all_details) / len(all_details)
}
def _update_prm(self, trajectories: List[Dict]):
"""
使用新收集的轨迹更新 PRM
可以使用多种策略:
1. 基于最终结果的伪标签
2. 使用 Monte Carlo 估计步骤价值
3. 使用对比学习
"""
# Monte Carlo 价值估计
for traj in trajectories:
step_values = self._monte_carlo_step_values(traj)
# 用这些估计值更新 PRM
# ...
def _monte_carlo_step_values(self, trajectory: Dict) -> List[float]:
"""
使用 Monte Carlo 方法估计每个步骤的价值贡献
思路:从某个步骤开始,多次采样完成任务,
成功率反映该步骤的价值
"""
step_values = []
for i, step in enumerate(trajectory['steps']):
# 从步骤 i 开始,用当前策略完成任务
success_count = 0
num_samples = 10
for _ in range(num_samples):
# 复制前 i 步,然后继续生成
partial_traj = self._copy_partial(trajectory, i)
completed = self._complete_trajectory(partial_traj)
if completed['final_reward'] > 0.5:
success_count += 1
step_values.append(success_count / num_samples)
return step_values
# 使用示例
def train_r3_agent():
"""使用 R3 训练 Agent 的完整流程"""
# 1. 初始化模型
policy = AutoModelForCausalLM.from_pretrained("llama-7b")
ref = AutoModelForCausalLM.from_pretrained("llama-7b")
prm = ProcessRewardModel("llama-7b")
tokenizer = AutoTokenizer.from_pretrained("llama-7b")
# 2. 加载预训练的 PRM
# prm.load_state_dict(torch.load("pretrained_prm.pt"))
# 3. 配置训练
config = R3Config(
task_reward_weight=1.0,
reasoning_reward_weight=0.5,
kl_penalty_weight=0.1
)
trainer = R3Trainer(policy, ref, prm, tokenizer, config)
# 4. 训练循环
for iteration in range(1000):
# 收集轨迹
trajectories = trainer.collect_trajectories(num=32)
# R3 更新
metrics = trainer.train_step(trajectories)
print(f"Iter {iteration}: reward={metrics['mean_reward']:.3f}, "
f"reasoning={metrics['mean_reasoning_reward']:.3f}")
6.4 实验结果分析
┌─────────────────────────────────────────────────────────────────────────────────┐
│ R3 实验结果 (来自论文) │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 数学推理任务 (GSM8K, MATH) │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 方法 │ GSM8K │ MATH │ 训练样本数 │ │
│ │ ───────────────────────────────────────────────────────── │ │
│ │ SFT │ 45.2% │ 12.8% │ 100K │ │
│ │ RLHF (ORM) │ 52.1% │ 15.3% │ 100K + 50K RL │ │
│ │ R3 (PRM) │ 58.7% │ 19.2% │ 100K + 50K RL │ │
│ │ │ │
│ │ 提升: R3 比 RLHF 在 GSM8K 上提升 6.6%,在 MATH 上提升 3.9% │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 2. Agent 任务 (WebShop, ALFWorld) │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 方法 │ WebShop │ ALFWorld │ 平均步数 │ │
│ │ ───────────────────────────────────────────────────────── │ │
│ │ SFT │ 32.5% │ 41.2% │ 8.3 │ │
│ │ RLHF │ 45.8% │ 52.6% │ 7.1 │ │
│ │ R3 │ 54.2% │ 61.8% │ 5.9 │ │
│ │ │ │
│ │ R3 不仅成功率更高,完成任务所需步数也更少 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 3. 消融实验 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 组件 │ GSM8K 影响 │ 说明 │ │
│ │ ───────────────────────────────────────────────── │ │
│ │ 去除 PRM │ -6.6% │ 退化为标准 RLHF │ │
│ │ 去除多维度评估 │ -2.1% │ 单一分数不够精细 │ │
│ │ 去除 PRM 更新 │ -3.4% │ PRM 需要持续优化 │ │
│ │ 减少推理奖励权重 │ -4.2% │ 推理奖励很重要 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 关键发现: │
│ • PRM 比 ORM 更能捕捉推理质量 │
│ • 密集的推理奖励显著提升学习效率 │
│ • PRM 需要随策略更新而更新(避免 reward hacking) │
│ • 多维度评估(相关性、正确性、进展性)比单一评分更有效 │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
7. Agent 微调方法大全 📖
本节总结各种 Agent 微调方法,帮助读者根据具体场景选择合适的技术。
7.1 基于轨迹的微调
"""
基于轨迹的微调方法
"""
from typing import List, Dict
from enum import Enum
class TrajectoryFinetuneMethod(Enum):
"""轨迹微调方法枚举"""
BEHAVIOR_CLONING = "bc" # 行为克隆
FILTERED_BC = "filtered_bc" # 筛选后的行为克隆
WEIGHTED_BC = "weighted_bc" # 加权行为克隆
TRAJECTORY_RANKING = "ranking" # 轨迹排序学习
class BehaviorCloning:
"""
行为克隆 (Behavior Cloning)
最简单的方法:直接在专家轨迹上做 SFT
"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def train(self, expert_trajectories: List[Dict]):
"""
在专家轨迹上训练
优点: 简单、稳定
缺点: 需要大量高质量数据,无法超越专家
"""
for traj in expert_trajectories:
# 转换为 (prompt, response) 格式
prompt = f"Task: {traj['instruction']}\n"
response = self._format_steps(traj['steps'])
# 标准 SFT 损失
loss = self._compute_sft_loss(prompt, response)
loss.backward()
# ...
class FilteredBC:
"""
筛选后的行为克隆
只在成功的轨迹上训练
"""
def __init__(self, model, tokenizer, success_threshold: float = 0.5):
self.model = model
self.tokenizer = tokenizer
self.threshold = success_threshold
def filter_trajectories(self, trajectories: List[Dict]) -> List[Dict]:
"""筛选成功轨迹"""
return [t for t in trajectories if t['final_reward'] >= self.threshold]
def train(self, trajectories: List[Dict]):
"""在筛选后的轨迹上训练"""
filtered = self.filter_trajectories(trajectories)
print(f"Filtered {len(filtered)}/{len(trajectories)} trajectories")
# 标准 BC 训练
for traj in filtered:
# ...
pass
class WeightedBC:
"""
加权行为克隆
根据轨迹质量分配不同的学习权重
"""
def __init__(self, model, tokenizer, temperature: float = 1.0):
self.model = model
self.tokenizer = tokenizer
self.temperature = temperature
def compute_weights(self, trajectories: List[Dict]) -> List[float]:
"""计算每条轨迹的权重"""
rewards = [t['final_reward'] for t in trajectories]
# 使用 softmax 转换为权重
import numpy as np
rewards = np.array(rewards) / self.temperature
weights = np.exp(rewards) / np.exp(rewards).sum()
return weights.tolist()
def train(self, trajectories: List[Dict]):
"""加权训练"""
weights = self.compute_weights(trajectories)
for traj, weight in zip(trajectories, weights):
loss = self._compute_sft_loss(traj)
weighted_loss = weight * loss
weighted_loss.backward()
class TrajectoryRanking:
"""
轨迹排序学习
类似 DPO,但在轨迹级别进行偏好学习
"""
def __init__(self, model, ref_model, tokenizer, beta: float = 0.1):
self.model = model
self.ref_model = ref_model
self.tokenizer = tokenizer
self.beta = beta
def create_pairs(self, trajectories: List[Dict]) -> List[tuple]:
"""创建轨迹对"""
pairs = []
# 按任务分组
by_task = {}
for traj in trajectories:
task_id = traj['task_id']
if task_id not in by_task:
by_task[task_id] = []
by_task[task_id].append(traj)
# 在同一任务内创建对
for task_id, task_trajs in by_task.items():
sorted_trajs = sorted(task_trajs, key=lambda t: t['final_reward'], reverse=True)
for i in range(len(sorted_trajs)):
for j in range(i + 1, len(sorted_trajs)):
if sorted_trajs[i]['final_reward'] > sorted_trajs[j]['final_reward']:
pairs.append((sorted_trajs[i], sorted_trajs[j]))
return pairs
def train(self, trajectories: List[Dict]):
"""轨迹排序训练"""
pairs = self.create_pairs(trajectories)
for better_traj, worse_traj in pairs:
# DPO 风格的损失
loss = self._compute_ranking_loss(better_traj, worse_traj)
loss.backward()
7.2 基于反馈的微调
"""
基于反馈的微调方法
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
class FeedbackSource(ABC):
"""反馈来源抽象基类"""
@abstractmethod
def get_feedback(self, trajectory: Dict) -> Dict:
"""获取对轨迹的反馈"""
pass
class HumanFeedback(FeedbackSource):
"""人类反馈"""
def get_feedback(self, trajectory: Dict) -> Dict:
"""
收集人类反馈
反馈类型:
- 评分 (1-5)
- 对比 (A vs B)
- 修正 (提供更好的动作)
"""
# 实际实现会调用标注平台 API
pass
class LLMFeedback(FeedbackSource):
"""LLM 自动反馈"""
def __init__(self, critic_model):
self.critic = critic_model
def get_feedback(self, trajectory: Dict) -> Dict:
"""使用强大的 LLM 评估轨迹"""
prompt = f"""
请评估以下 Agent 执行轨迹的质量:
任务: {trajectory['instruction']}
执行步骤:
{self._format_steps(trajectory['steps'])}
最终结果: {trajectory['final_outcome']}
请从以下维度评分 (1-10):
1. 任务理解: Agent 是否正确理解了任务?
2. 推理质量: 每步推理是否合理?
3. 效率: 是否用最少的步骤完成?
4. 错误恢复: 遇到错误时是否能正确处理?
输出 JSON 格式。
"""
response = self.critic.generate(prompt)
return self._parse_feedback(response)
class EnvironmentFeedback(FeedbackSource):
"""环境反馈"""
def __init__(self, reward_function):
self.reward_fn = reward_function
def get_feedback(self, trajectory: Dict) -> Dict:
"""基于环境的自动奖励"""
return {
'task_completion': trajectory['final_reward'],
'step_rewards': [self.reward_fn(step) for step in trajectory['steps']],
'efficiency': 1.0 / len(trajectory['steps']) # 步数越少越好
}
class FeedbackBasedTrainer:
"""基于反馈的训练器"""
def __init__(
self,
model,
feedback_sources: List[FeedbackSource],
aggregation: str = "weighted_average"
):
self.model = model
self.feedback_sources = feedback_sources
self.aggregation = aggregation
def collect_feedback(self, trajectory: Dict) -> Dict:
"""从多个来源收集并聚合反馈"""
all_feedback = []
for source in self.feedback_sources:
feedback = source.get_feedback(trajectory)
all_feedback.append(feedback)
return self._aggregate_feedback(all_feedback)
def train_with_feedback(self, trajectories: List[Dict]):
"""使用反馈训练"""
for traj in trajectories:
feedback = self.collect_feedback(traj)
# 根据反馈类型选择训练方式
if 'correction' in feedback:
# 使用修正数据做 SFT
self._train_on_correction(traj, feedback['correction'])
elif 'score' in feedback:
# 使用评分加权训练
self._weighted_training(traj, feedback['score'])
elif 'comparison' in feedback:
# 使用对比数据做 DPO
self._dpo_training(traj, feedback['comparison'])
7.3 基于环境的微调
"""
基于环境的微调方法 - 让 Agent 在真实环境中学习
"""
import gymnasium as gym
from typing import List, Dict, Callable
class EnvironmentBasedTrainer:
"""基于环境交互的训练器"""
def __init__(
self,
model,
env: gym.Env,
tokenizer,
reward_shaping: Callable = None
):
self.model = model
self.env = env
self.tokenizer = tokenizer
self.reward_shaping = reward_shaping or (lambda r, s, a: r)
def collect_online_data(self, num_episodes: int) -> List[Dict]:
"""在环境中在线收集数据"""
trajectories = []
for _ in range(num_episodes):
obs = self.env.reset()
trajectory = {'steps': [], 'total_reward': 0}
done = False
while not done:
# Agent 生成动作
action = self._generate_action(obs)
# 环境执行
next_obs, reward, done, info = self.env.step(action)
# 奖励塑形
shaped_reward = self.reward_shaping(reward, obs, action)
trajectory['steps'].append({
'observation': obs,
'action': action,
'reward': shaped_reward,
'next_observation': next_obs
})
trajectory['total_reward'] += reward
obs = next_obs
trajectories.append(trajectory)
return trajectories
def online_rl_training(self, num_iterations: int):
"""
在线强化学习训练
流程:
1. 在环境中收集数据
2. 更新模型
3. 重复
"""
for i in range(num_iterations):
# 收集数据
trajectories = self.collect_online_data(num_episodes=32)
# 计算奖励统计
mean_reward = sum(t['total_reward'] for t in trajectories) / len(trajectories)
# 更新模型 (可以用 PPO, DPO 等)
self._update_model(trajectories)
print(f"Iteration {i}: Mean Reward = {mean_reward:.3f}")
class CurriculumLearning:
"""
课程学习 - 由易到难训练
"""
def __init__(
self,
model,
environments: List[gym.Env], # 按难度排序
difficulty_threshold: float = 0.8
):
self.model = model
self.envs = environments
self.threshold = difficulty_threshold
self.current_level = 0
def should_advance(self, success_rate: float) -> bool:
"""判断是否应该进入下一难度"""
return success_rate >= self.threshold
def train(self):
"""课程学习训练"""
while self.current_level < len(self.envs):
env = self.envs[self.current_level]
print(f"\n=== Level {self.current_level + 1}/{len(self.envs)} ===")
# 在当前难度训练
for epoch in range(100):
trajectories = self._collect_data(env, num_episodes=50)
success_rate = self._compute_success_rate(trajectories)
self._update_model(trajectories)
print(f"Epoch {epoch}: Success Rate = {success_rate:.2%}")
if self.should_advance(success_rate):
print(f"Advancing to level {self.current_level + 2}!")
self.current_level += 1
break
7.4 自我进化训练
"""
自我进化训练 - Agent 自我改进
"""
from typing import List, Dict, Tuple
import random
class SelfEvolvingAgent:
"""
自我进化 Agent
核心思想:Agent 生成数据 → 自我评估 → 在好数据上训练
"""
def __init__(
self,
model,
critic_model, # 用于自我评估
tokenizer,
evolution_config: Dict
):
self.model = model
self.critic = critic_model
self.tokenizer = tokenizer
self.config = evolution_config
def self_play(self, tasks: List[str], num_attempts: int = 5) -> List[Dict]:
"""
自我对弈:对每个任务多次尝试
"""
all_trajectories = []
for task in tasks:
task_trajectories = []
for _ in range(num_attempts):
# 生成轨迹(使用不同温度增加多样性)
temperature = random.uniform(0.5, 1.0)
trajectory = self._generate_trajectory(task, temperature)
task_trajectories.append(trajectory)
all_trajectories.extend(task_trajectories)
return all_trajectories
def self_evaluate(self, trajectories: List[Dict]) -> List[Tuple[Dict, float]]:
"""
自我评估:使用 critic 模型评估轨迹质量
"""
evaluated = []
for traj in trajectories:
# 构造评估 prompt
eval_prompt = f"""
评估以下任务执行的质量 (0-1 分):
任务: {traj['instruction']}
执行过程:
{self._format_steps(traj['steps'])}
评分标准:
- 任务是否完成
- 步骤是否合理
- 效率是否足够
只输出一个 0-1 之间的数字。
"""
score_text = self.critic.generate(eval_prompt, max_tokens=10)
score = float(score_text.strip())
evaluated.append((traj, score))
return evaluated
def self_select(
self,
evaluated_trajectories: List[Tuple[Dict, float]],
selection_method: str = "top_k"
) -> List[Dict]:
"""
自我选择:挑选用于训练的数据
"""
if selection_method == "top_k":
# 选择评分最高的 k%
k = self.config.get('selection_ratio', 0.3)
sorted_trajs = sorted(evaluated_trajectories, key=lambda x: x[1], reverse=True)
n_select = max(1, int(len(sorted_trajs) * k))
return [t[0] for t in sorted_trajs[:n_select]]
elif selection_method == "threshold":
# 选择超过阈值的
threshold = self.config.get('score_threshold', 0.7)
return [t[0] for t, s in evaluated_trajectories if s >= threshold]
elif selection_method == "rejection_sampling":
# 拒绝采样
selected = []
for traj, score in evaluated_trajectories:
if random.random() < score:
selected.append(traj)
return selected
def self_improve(self, selected_trajectories: List[Dict]):
"""
自我改进:使用筛选后的数据训练
"""
# 可以使用多种训练方法
if self.config.get('training_method') == 'sft':
self._sft_training(selected_trajectories)
elif self.config.get('training_method') == 'dpo':
# 需要创建偏好对
pairs = self._create_preference_pairs(selected_trajectories)
self._dpo_training(pairs)
def evolve(self, tasks: List[str], num_generations: int = 10):
"""
完整进化循环
"""
for gen in range(num_generations):
print(f"\n{'='*50}")
print(f"Generation {gen + 1}/{num_generations}")
print(f"{'='*50}")
# 1. 自我对弈
print("[1/4] Self-play...")
trajectories = self.self_play(tasks)
# 2. 自我评估
print("[2/4] Self-evaluate...")
evaluated = self.self_evaluate(trajectories)
mean_score = sum(s for _, s in evaluated) / len(evaluated)
print(f" Mean score: {mean_score:.3f}")
# 3. 自我选择
print("[3/4] Self-select...")
selected = self.self_select(evaluated)
print(f" Selected {len(selected)}/{len(trajectories)} trajectories")
# 4. 自我改进
print("[4/4] Self-improve...")
self.self_improve(selected)
class ReflectionBasedImprovement:
"""
基于反思的改进
让 Agent 反思自己的错误并学习
"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def generate_reflection(self, failed_trajectory: Dict) -> str:
"""让 Agent 反思失败的原因"""
prompt = f"""
以下是一次失败的任务执行:
任务: {failed_trajectory['instruction']}
执行过程:
{self._format_steps(failed_trajectory['steps'])}
结果: 任务失败
请分析:
1. 哪一步开始出错了?
2. 错误的原因是什么?
3. 正确的做法应该是什么?
输出你的反思:
"""
reflection = self.model.generate(prompt, max_tokens=500)
return reflection
def generate_improved_trajectory(
self,
task: str,
reflection: str
) -> Dict:
"""基于反思生成改进的轨迹"""
prompt = f"""
任务: {task}
上次执行失败了,反思如下:
{reflection}
现在请重新执行任务,避免之前的错误:
"""
# 生成新轨迹
return self._generate_trajectory(prompt)
def reflection_learning(
self,
failed_trajectories: List[Dict],
success_trajectories: List[Dict]
):
"""
反思学习流程
1. 对失败轨迹生成反思
2. 基于反思生成改进轨迹
3. 结合成功轨迹训练
"""
improved_data = []
for failed_traj in failed_trajectories:
# 生成反思
reflection = self.generate_reflection(failed_traj)
# 生成改进轨迹
improved_traj = self.generate_improved_trajectory(
failed_traj['instruction'],
reflection
)
# 如果改进后成功,加入训练数据
if improved_traj['success']:
improved_data.append({
'original': failed_traj,
'reflection': reflection,
'improved': improved_traj
})
# 训练数据 = 原始成功轨迹 + 改进后的轨迹
training_data = success_trajectories + [d['improved'] for d in improved_data]
# 训练
self._train(training_data)
8. 实战项目:构建可训练的 Agent 🛠️
本节将通过一个完整的实战项目,演示如何构建和训练一个 Agent。
8.1 项目架构设计
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 可训练 Agent 项目架构 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ trainable_agent/ │
│ ├── config/ # 配置文件 │
│ │ ├── model_config.yaml # 模型配置 │
│ │ ├── training_config.yaml # 训练配置 │
│ │ └── env_config.yaml # 环境配置 │
│ │ │
│ ├── data/ # 数据处理 │
│ │ ├── collectors/ # 数据收集器 │
│ │ │ ├── trajectory_collector.py │
│ │ │ └── feedback_collector.py │
│ │ ├── processors/ # 数据处理器 │
│ │ │ ├── format_converter.py │
│ │ │ └── quality_filter.py │
│ │ └── datasets/ # 数据集定义 │
│ │ ├── sft_dataset.py │
│ │ └── preference_dataset.py │
│ │ │
│ ├── models/ # 模型相关 │
│ │ ├── agent.py # Agent 主类 │
│ │ ├── reward_model.py # 奖励模型 │
│ │ └── value_model.py # 价值模型 │
│ │ │
│ ├── training/ # 训练相关 │
│ │ ├── sft_trainer.py # SFT 训练器 │
│ │ ├── rm_trainer.py # RM 训练器 │
│ │ ├── ppo_trainer.py # PPO 训练器 │
│ │ ├── dpo_trainer.py # DPO 训练器 │
│ │ └── evol_trainer.py # 进化训练器 │
│ │ │
│ ├── environments/ # 环境接口 │
│ │ ├── base_env.py # 环境基类 │
│ │ ├── webshop_env.py # WebShop 环境 │
│ │ └── tool_env.py # 工具使用环境 │
│ │ │
│ ├── evaluation/ # 评估相关 │
│ │ ├── metrics.py # 评估指标 │
│ │ └── evaluator.py # 评估器 │
│ │ │
│ ├── utils/ # 工具函数 │
│ │ ├── logging.py │
│ │ └── checkpoint.py │
│ │ │
│ └── scripts/ # 运行脚本 │
│ ├── train_sft.py │
│ ├── train_rl.py │
│ └── evaluate.py │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
8.2 数据收集与处理
"""
trainable_agent/data/collectors/trajectory_collector.py
"""
import json
from typing import List, Dict, Generator
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
@dataclass
class Step:
"""单步数据"""
thought: str
action: str
action_input: Dict
observation: str
reward: float = 0.0
@dataclass
class Trajectory:
"""轨迹数据"""
task_id: str
instruction: str
steps: List[Step]
final_reward: float
metadata: Dict
class TrajectoryCollector:
"""轨迹收集器"""
def __init__(self, agent, environments: List, config: Dict):
self.agent = agent
self.envs = environments
self.config = config
self.trajectories = []
def collect_single(self, env, task: Dict) -> Trajectory:
"""收集单条轨迹"""
obs = env.reset(task)
steps = []
done = False
while not done and len(steps) < self.config['max_steps']:
# Agent 决策
thought, action = self.agent.act(obs)
# 环境执行
next_obs, reward, done, info = env.step(action)
steps.append(Step(
thought=thought,
action=action['name'],
action_input=action['input'],
observation=next_obs,
reward=reward
))
obs = next_obs
return Trajectory(
task_id=task['id'],
instruction=task['instruction'],
steps=steps,
final_reward=env.get_final_reward(),
metadata={'env': env.name, 'num_steps': len(steps)}
)
def collect_batch(
self,
tasks: List[Dict],
num_workers: int = 4
) -> List[Trajectory]:
"""批量收集轨迹"""
trajectories = []
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = []
for task in tasks:
env = self._get_env_for_task(task)
future = executor.submit(self.collect_single, env, task)
futures.append(future)
for future in futures:
traj = future.result()
trajectories.append(traj)
return trajectories
def collect_with_diversity(
self,
tasks: List[Dict],
samples_per_task: int = 5
) -> List[Trajectory]:
"""收集多样化的轨迹(同一任务多次尝试)"""
trajectories = []
for task in tasks:
env = self._get_env_for_task(task)
for i in range(samples_per_task):
# 使用不同的温度
self.agent.temperature = 0.5 + i * 0.1
traj = self.collect_single(env, task)
trajectories.append(traj)
return trajectories
def save(self, path: str):
"""保存收集的轨迹"""
data = [asdict(t) for t in self.trajectories]
with open(path, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
"""
trainable_agent/data/processors/quality_filter.py
"""
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class FilterConfig:
"""过滤配置"""
min_reward: float = 0.0
max_steps: int = 50
min_steps: int = 1
require_success: bool = False
custom_filters: List[Callable] = None
class QualityFilter:
"""轨迹质量过滤器"""
def __init__(self, config: FilterConfig):
self.config = config
def filter(self, trajectories: List[Trajectory]) -> List[Trajectory]:
"""过滤轨迹"""
filtered = []
for traj in trajectories:
if self._passes_all_filters(traj):
filtered.append(traj)
print(f"Filtered: {len(filtered)}/{len(trajectories)} trajectories")
return filtered
def _passes_all_filters(self, traj: Trajectory) -> bool:
"""检查是否通过所有过滤条件"""
# 基础过滤
if traj.final_reward < self.config.min_reward:
return False
if len(traj.steps) > self.config.max_steps:
return False
if len(traj.steps) < self.config.min_steps:
return False
if self.config.require_success and traj.final_reward < 0.5:
return False
# 自定义过滤
if self.config.custom_filters:
for filter_fn in self.config.custom_filters:
if not filter_fn(traj):
return False
return True
def filter_by_percentile(
self,
trajectories: List[Trajectory],
top_percentile: float = 0.3
) -> List[Trajectory]:
"""保留 top percentile 的轨迹"""
sorted_trajs = sorted(
trajectories,
key=lambda t: t.final_reward,
reverse=True
)
n_keep = max(1, int(len(sorted_trajs) * top_percentile))
return sorted_trajs[:n_keep]
8.3 训练流程实现
"""
trainable_agent/training/pipeline.py
完整的训练 Pipeline
"""
import torch
from typing import Dict, List, Optional
from pathlib import Path
import yaml
class TrainingPipeline:
"""
完整训练流程
支持多种训练方式的组合
"""
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.model = None
self.reward_model = None
self.tokenizer = None
self._setup()
def _setup(self):
"""初始化模型和组件"""
from transformers import AutoModelForCausalLM, AutoTokenizer
model_config = self.config['model']
self.tokenizer = AutoTokenizer.from_pretrained(model_config['name'])
self.model = AutoModelForCausalLM.from_pretrained(
model_config['name'],
torch_dtype=torch.bfloat16 if model_config.get('bf16') else torch.float32,
device_map="auto"
)
# 如果使用 LoRA
if model_config.get('use_lora'):
from peft import get_peft_model, LoraConfig
lora_config = LoraConfig(
r=model_config.get('lora_r', 16),
lora_alpha=model_config.get('lora_alpha', 32),
target_modules=["q_proj", "v_proj"],
lora_dropout=0.05,
bias="none",
)
self.model = get_peft_model(self.model, lora_config)
def stage_sft(self, data_path: str) -> None:
"""
Stage 1: 监督微调
"""
print("\n" + "="*60)
print("Stage 1: Supervised Fine-Tuning")
print("="*60)
from .sft_trainer import SFTTrainer
# 加载数据
with open(data_path) as f:
sft_data = json.load(f)
# 创建训练器
trainer = SFTTrainer(
model=self.model,
tokenizer=self.tokenizer,
config=self.config['sft']
)
# 训练
trainer.train(sft_data)
# 保存
save_path = Path(self.config['output_dir']) / "sft_model"
trainer.save(save_path)
print(f"SFT model saved to {save_path}")
def stage_rm(self, preference_data_path: str) -> None:
"""
Stage 2: 奖励模型训练
"""
print("\n" + "="*60)
print("Stage 2: Reward Model Training")
print("="*60)
from .rm_trainer import RMTrainer
from ..models.reward_model import RewardModel
# 初始化奖励模型(从 SFT 模型初始化)
self.reward_model = RewardModel(self.model)
# 加载偏好数据
with open(preference_data_path) as f:
preference_data = json.load(f)
# 创建训练器
trainer = RMTrainer(
reward_model=self.reward_model,
tokenizer=self.tokenizer,
config=self.config['rm']
)
# 训练
trainer.train(preference_data)
# 保存
save_path = Path(self.config['output_dir']) / "reward_model"
trainer.save(save_path)
print(f"Reward model saved to {save_path}")
def stage_rl(
self,
prompts_path: str,
method: str = "ppo" # ppo, dpo, evol
) -> None:
"""
Stage 3: 强化学习
"""
print("\n" + "="*60)
print(f"Stage 3: Reinforcement Learning ({method.upper()})")
print("="*60)
# 加载 prompts
with open(prompts_path) as f:
prompts = json.load(f)
if method == "ppo":
from .ppo_trainer import PPOTrainer
# 加载参考模型
ref_model = AutoModelForCausalLM.from_pretrained(
self.config['model']['name'],
torch_dtype=torch.bfloat16,
device_map="auto"
)
trainer = PPOTrainer(
policy_model=self.model,
ref_model=ref_model,
reward_model=self.reward_model,
tokenizer=self.tokenizer,
config=self.config['ppo']
)
elif method == "dpo":
from .dpo_trainer import DPOTrainer
ref_model = AutoModelForCausalLM.from_pretrained(
self.config['model']['name'],
torch_dtype=torch.bfloat16,
device_map="auto"
)
trainer = DPOTrainer(
model=self.model,
ref_model=ref_model,
tokenizer=self.tokenizer,
config=self.config['dpo']
)
elif method == "evol":
from .evol_trainer import EvolTrainer
from ..environments import load_environments
envs = load_environments(self.config['environments'])
trainer = EvolTrainer(
model=self.model,
tokenizer=self.tokenizer,
environments=envs,
config=self.config['evol']
)
# 训练
trainer.train(prompts)
# 保存
save_path = Path(self.config['output_dir']) / f"{method}_model"
trainer.save(save_path)
print(f"RL model saved to {save_path}")
def run_full_pipeline(
self,
sft_data: str,
preference_data: str,
rl_prompts: str,
rl_method: str = "ppo"
):
"""运行完整训练流程"""
# Stage 1
self.stage_sft(sft_data)
# Stage 2
self.stage_rm(preference_data)
# Stage 3
self.stage_rl(rl_prompts, method=rl_method)
print("\n" + "="*60)
print("Training Complete!")
print("="*60)
# 使用示例
if __name__ == "__main__":
pipeline = TrainingPipeline("config/training_config.yaml")
pipeline.run_full_pipeline(
sft_data="data/sft_trajectories.json",
preference_data="data/preferences.json",
rl_prompts="data/rl_prompts.json",
rl_method="dpo"
)
8.4 评估与迭代
"""
trainable_agent/evaluation/evaluator.py
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
import numpy as np
from collections import defaultdict
@dataclass
class EvalResult:
"""评估结果"""
success_rate: float
avg_reward: float
avg_steps: float
by_task_type: Dict[str, Dict]
by_difficulty: Dict[str, Dict]
detailed_results: List[Dict]
class AgentEvaluator:
"""Agent 评估器"""
def __init__(self, agent, environments: List, config: Dict):
self.agent = agent
self.envs = environments
self.config = config
def evaluate(
self,
test_tasks: List[Dict],
verbose: bool = True
) -> EvalResult:
"""
全面评估 Agent
"""
results = []
for task in test_tasks:
env = self._get_env_for_task(task)
result = self._evaluate_single(env, task)
results.append(result)
if verbose:
status = "✓" if result['success'] else "✗"
print(f" {status} Task {task['id']}: "
f"reward={result['reward']:.2f}, "
f"steps={result['num_steps']}")
# 汇总统计
return self._aggregate_results(results)
def _evaluate_single(self, env, task: Dict) -> Dict:
"""评估单个任务"""
obs = env.reset(task)
steps = []
done = False
# 使用 greedy decoding
self.agent.temperature = 0.0
while not done and len(steps) < self.config['max_steps']:
thought, action = self.agent.act(obs)
next_obs, reward, done, info = env.step(action)
steps.append({
'thought': thought,
'action': action,
'observation': next_obs,
'reward': reward
})
obs = next_obs
final_reward = env.get_final_reward()
return {
'task_id': task['id'],
'task_type': task.get('type', 'unknown'),
'difficulty': task.get('difficulty', 'medium'),
'success': final_reward > 0.5,
'reward': final_reward,
'num_steps': len(steps),
'steps': steps
}
def _aggregate_results(self, results: List[Dict]) -> EvalResult:
"""汇总评估结果"""
# 总体指标
successes = [r['success'] for r in results]
rewards = [r['reward'] for r in results]
steps = [r['num_steps'] for r in results]
# 按任务类型分组
by_type = defaultdict(list)
for r in results:
by_type[r['task_type']].append(r)
type_stats = {}
for task_type, type_results in by_type.items():
type_stats[task_type] = {
'count': len(type_results),
'success_rate': np.mean([r['success'] for r in type_results]),
'avg_reward': np.mean([r['reward'] for r in type_results])
}
# 按难度分组
by_difficulty = defaultdict(list)
for r in results:
by_difficulty[r['difficulty']].append(r)
diff_stats = {}
for diff, diff_results in by_difficulty.items():
diff_stats[diff] = {
'count': len(diff_results),
'success_rate': np.mean([r['success'] for r in diff_results]),
'avg_reward': np.mean([r['reward'] for r in diff_results])
}
return EvalResult(
success_rate=np.mean(successes),
avg_reward=np.mean(rewards),
avg_steps=np.mean(steps),
by_task_type=type_stats,
by_difficulty=diff_stats,
detailed_results=results
)
def compare_models(
self,
models: Dict[str, "Agent"],
test_tasks: List[Dict]
) -> Dict:
"""
对比多个模型
"""
comparison = {}
for name, model in models.items():
print(f"\nEvaluating {name}...")
self.agent = model
result = self.evaluate(test_tasks, verbose=False)
comparison[name] = {
'success_rate': result.success_rate,
'avg_reward': result.avg_reward,
'avg_steps': result.avg_steps
}
# 打印对比表格
print("\n" + "="*60)
print("Model Comparison")
print("="*60)
print(f"{'Model':<20} {'Success Rate':<15} {'Avg Reward':<15} {'Avg Steps':<10}")
print("-"*60)
for name, metrics in comparison.items():
print(f"{name:<20} {metrics['success_rate']:.2%} "
f"{metrics['avg_reward']:.3f} {metrics['avg_steps']:.1f}")
return comparison
def run_evaluation():
"""运行评估脚本"""
from trainable_agent.models.agent import TrainedAgent
from trainable_agent.environments import load_environments
# 加载模型
agent = TrainedAgent.from_pretrained("./output/dpo_model")
# 加载测试环境
envs = load_environments("config/env_config.yaml")
# 加载测试任务
with open("data/test_tasks.json") as f:
test_tasks = json.load(f)
# 评估
evaluator = AgentEvaluator(
agent=agent,
environments=envs,
config={'max_steps': 30}
)
result = evaluator.evaluate(test_tasks)
# 打印结果
print("\n" + "="*60)
print("Evaluation Results")
print("="*60)
print(f"Success Rate: {result.success_rate:.2%}")
print(f"Average Reward: {result.avg_reward:.3f}")
print(f"Average Steps: {result.avg_steps:.1f}")
print("\nBy Task Type:")
for task_type, stats in result.by_task_type.items():
print(f" {task_type}: {stats['success_rate']:.2%} ({stats['count']} tasks)")
print("\nBy Difficulty:")
for diff, stats in result.by_difficulty.items():
print(f" {diff}: {stats['success_rate']:.2%} ({stats['count']} tasks)")
if __name__ == "__main__":
run_evaluation()
9. 前沿探索:未来发展方向 🔮
9.1 多模态 Agent 训练
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 多模态 Agent 训练趋势 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 当前挑战: │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ • 视觉-语言-动作的对齐 │ │
│ │ • 多模态奖励的设计 │ │
│ │ • 跨模态的泛化能力 │ │
│ │ • 实时交互的效率 │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 研究方向: │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. Vision-Language-Action (VLA) Models │ │
│ │ └── RT-2, OpenVLA 等模型将视觉理解与动作生成统一 │ │
│ │ │ │
│ │ 2. 多模态 RLHF │ │
│ │ └── 人类对图像/视频输出的偏好学习 │ │
│ │ │ │
│ │ 3. 世界模型 (World Model) │ │
│ │ └── 学习环境动态,在想象中进行规划 │ │
│ │ │ │
│ │ 4. 具身智能 (Embodied AI) │ │
│ │ └── 机器人控制、自动驾驶等实体交互 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
9.2 大规模 Agent 协作训练
"""
多 Agent 协作训练 - 前沿方向
"""
class MultiAgentTraining:
"""
多 Agent 协作训练
研究方向:
1. Agent 之间的通信协议学习
2. 任务分工与协作
3. 竞争与合作的平衡
"""
def __init__(self, agents: List, environment):
self.agents = agents
self.env = environment
def train_communication(self):
"""训练 Agent 之间的通信"""
# 允许 Agent 相互发送消息
# 通过任务完成奖励学习有效的通信协议
pass
def train_role_specialization(self):
"""训练角色专业化"""
# 不同 Agent 专注于不同子任务
# 通过 reward shaping 鼓励分工
pass
class HierarchicalAgentTraining:
"""
层次化 Agent 训练
高层: 规划和分解任务
低层: 执行具体动作
"""
def __init__(self):
self.high_level_agent = None # 规划者
self.low_level_agent = None # 执行者
def train_hierarchical(self):
"""
层次化训练:
1. 先训练低层 Agent 完成基础技能
2. 再训练高层 Agent 学习组合技能
"""
pass
9.3 持续学习与适应
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Agent 持续学习 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 目标: Agent 能够持续从新经验中学习,而不遗忘旧知识 │
│ │
│ 技术路线: │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. Elastic Weight Consolidation (EWC) │ │
│ │ └── 保护重要参数,防止灾难性遗忘 │ │
│ │ │ │
│ │ 2. Progressive Neural Networks │ │
│ │ └── 为新任务添加新模块,保留旧模块 │ │
│ │ │ │
│ │ 3. Replay-based Methods │ │
│ │ └── 存储旧经验,混合新旧数据训练 │ │
│ │ │ │
│ │ 4. Meta-Learning │ │
│ │ └── 学习如何快速适应新任务 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 应用场景: │
│ • 个性化 Agent: 适应特定用户的偏好 │
│ • 领域适应: 从通用到专业领域 │
│ • 环境变化: 适应新的工具和 API │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
9.4 安全对齐的强化学习
"""
安全对齐的 Agent 训练
"""
class SafeRLTraining:
"""
安全强化学习训练
确保 Agent 行为安全、可控、符合人类价值观
"""
def __init__(self, agent, safety_constraints: List):
self.agent = agent
self.constraints = safety_constraints
def constrained_policy_optimization(self):
"""
约束策略优化
max E[reward]
s.t. E[cost_i] <= c_i for all i
cost 可以是:
- 危险动作的概率
- 有害内容的生成
- 违反规则的行为
"""
pass
def reward_hacking_prevention(self):
"""
防止奖励黑客 (Reward Hacking)
技术:
1. 奖励模型集成 (多个 RM 投票)
2. 对抗训练 (寻找 RM 漏洞)
3. 保守估计 (悲观奖励下界)
"""
pass
def interpretable_agent(self):
"""
可解释 Agent
让 Agent 的决策过程透明:
1. 思维链 (Chain-of-Thought)
2. 决策归因 (Attribution)
3. 反事实解释 (Counterfactual)
"""
pass
10. 总结与最佳实践 📝
10.1 方法选择指南
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Agent 训练方法选择决策树 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 开始 │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ 有高质量标注数据? │ │
│ └────────────┬────────────┘ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────┐ ┌────────────────┐ │
│ │ SFT │ │ 可以在环境中采样?│ │
│ └─────┬──────┘ └───────┬────────┘ │
│ │ │ │
│ ┌────────┴────────┐ 是 │ 否 │
│ │ 有偏好数据? │ │ │ │
│ └────────┬────────┘ ▼ ▼ │
│ │ │ ┌──────────┐ ┌──────────┐ │
│ 是 否 │AgentEvol │ │零/少样本 │ │
│ │ │ │进化训练 │ │ 提示工程 │ │
│ ▼ ▼ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌────────────┐ │
│ │ 计算资源? │ │ 使用 KTO │ │
│ └────┬─────┘ │(单边反馈) │ │
│ │ └────────────┘ │
│ 充足 │ 有限 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │RLHF/PPO │ │DPO/ORPO │ │
│ └─────────┘ └─────────┘ │
│ │
│ │
│ 需要推理能力? ─── 是 ───→ 考虑 R3 (推理过程奖励) │
│ │
│ 需要环境交互? ─── 是 ───→ 考虑 AgentGym 框架 │
│ │
│ 需要自我改进? ─── 是 ───→ 考虑自进化训练 │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
10.2 最佳实践清单
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Agent 训练最佳实践 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 📊 数据准备 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ ✓ 确保数据多样性:覆盖各种任务类型和难度 │ │
│ │ ✓ 质量优先:宁可少而精,不可多而杂 │ │
│ │ ✓ 格式统一:使用一致的 Thought-Action-Observation 格式 │ │
│ │ ✓ 包含失败样本:让模型学会处理错误情况 │ │
│ │ ✓ 标注清晰:偏好标注要有明确的标准 │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 🔧 训练配置 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ ✓ 从小规模开始:先验证流程,再扩大规模 │ │
│ │ ✓ 使用 LoRA:减少显存占用,加速迭代 │ │
│ │ ✓ 监控 KL 散度:防止策略偏离太远 │ │
│ │ ✓ 设置 early stopping:根据验证集指标 │ │
│ │ ✓ 保存检查点:方便回滚和对比 │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 📈 评估策略 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ ✓ 多维度评估:成功率、效率、安全性 │ │
│ │ ✓ 使用保留测试集:不参与训练的独立数据 │ │
│ │ ✓ 人工抽检:定期人工评估样本 │ │
│ │ ✓ A/B 测试:与基线模型对比 │ │
│ │ ✓ 错误分析:分类分析失败案例 │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ⚠️ 常见陷阱 │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ ✗ Reward Hacking:模型找到奖励漏洞 │ │
│ │ → 解决:使用多个 RM,设置奖励上限 │ │
│ │ │ │
│ │ ✗ 模式崩塌:生成内容多样性下降 │ │
│ │ → 解决:增加 KL 惩罚,使用温度采样 │ │
│ │ │ │
│ │ ✗ 过拟合:在训练任务上好,泛化差 │ │
│ │ → 解决:数据增强,正则化,早停 │ │
│ │ │ │
│ │ ✗ 训练不稳定:损失震荡或发散 │ │
│ │ → 解决:降低学习率,减小 batch size,梯度裁剪 │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
10.3 核心要点总结
💡 本文核心要点:
-
RLHF 是 LLM 对齐的基础范式
- 三阶段流程:SFT → RM → PPO
- 通过人类偏好学习奖励函数
- InstructGPT 证明了其有效性
-
DPO/ORPO/KTO 简化了偏好学习
- 无需显式训练奖励模型
- 训练更稳定,资源需求更低
- 根据数据类型选择合适方法
-
AgentGym 提供了 Agent 训练框架
- 统一的环境接口
- 大规模轨迹数据集
- AgentEvol 进化训练方法
-
R3 通过推理奖励提升学习效率
- 过程奖励模型 (PRM) 评估每步推理
- 密集奖励信号加速学习
- 特别适合需要复杂推理的任务
-
Agent 微调需要多种技术组合
- 基于轨迹、反馈、环境的不同方法
- 自我进化和反思改进
- 根据场景灵活选择
11. 参考文献 📚
核心论文
-
InstructGPT (2022)
- Ouyang et al., “Training language models to follow instructions with human feedback”
- OpenAI
- arXiv:2203.02155
-
PPO (2017)
- Schulman et al., “Proximal Policy Optimization Algorithms”
- OpenAI
- arXiv:1707.06347
-
DPO (2023)
- Rafailov et al., “Direct Preference Optimization: Your Language Model is Secretly a Reward Model”
- Stanford
- arXiv:2305.18290
-
ORPO (2024)
- Hong et al., “ORPO: Monolithic Preference Optimization without Reference Model”
- KAIST
- arXiv:2403.07691
-
KTO (2024)
- Ethayarajh et al., “KTO: Model Alignment as Prospect Theoretic Optimization”
- Stanford
- arXiv:2402.01306
-
AgentGym (2024)
- Xi et al., “AgentGym: Evolving Large Language Model-based Agents across Diverse Environments”
- Fudan University
- arXiv:2406.04151
-
Process Reward Models (2023)
- Lightman et al., “Let’s Verify Step by Step”
- OpenAI
- arXiv:2305.20050
-
ReAct (2023)
- Yao et al., “ReAct: Synergizing Reasoning and Acting in Language Models”
- Princeton & Google
- arXiv:2210.03629
-
Self-Refine (2023)
- Madaan et al., “Self-Refine: Iterative Refinement with Self-Feedback”
- CMU
- arXiv:2303.17651
-
Constitutional AI (2022)
- Bai et al., “Constitutional AI: Harmlessness from AI Feedback”
- Anthropic
- arXiv:2212.08073
GitHub 资源
-
trl - Transformer Reinforcement Learning
- https://github.com/huggingface/trl
- HuggingFace 官方 RLHF 库
-
DeepSpeed-Chat
- https://github.com/microsoft/DeepSpeedExamples/tree/master/applications/DeepSpeed-Chat
- Microsoft 的高效 RLHF 实现
-
OpenRLHF
- https://github.com/OpenLLMAI/OpenRLHF
- 开源的 RLHF 训练框架
-
AgentGym
- https://github.com/WooooDyy/AgentGym
- Agent 训练环境和数据集
-
LLaMA-Factory
- https://github.com/hiyouga/LLaMA-Factory
- 支持多种微调方法的统一框架
-
alignment-handbook
- https://github.com/huggingface/alignment-handbook
- HuggingFace 对齐训练指南
-
MOSS-RLHF
- https://github.com/OpenLMLab/MOSS-RLHF
- 复旦 MOSS 的 RLHF 实现
-
WebShop
- https://github.com/princeton-nlp/WebShop
- 网页购物 Agent 环境
-
ALFWorld
- https://github.com/alfworld/alfworld
- 文本游戏 Agent 环境
-
AgentBench
- https://github.com/THUDM/AgentBench
- Agent 评估基准
推荐学习资源
📖 教程和博客:
- HuggingFace RLHF 教程:https://huggingface.co/blog/rlhf
- Chip Huyen 的 RLHF 解析:https://huyenchip.com/2023/05/02/rlhf.html
- Lilian Weng 的 LLM 对齐综述:https://lilianweng.github.io/posts/2023-03-15-llm-agents/
📺 视频课程:
- Stanford CS224N (NLP with Deep Learning)
- Berkeley CS285 (Deep Reinforcement Learning)
- Anthropic’s AI Safety Research
🎉 恭喜你读完了本文!
Agent 训练是一个快速发展的领域,本文介绍的方法会持续演进。建议关注上述论文和 GitHub 仓库的更新,保持对最新技术的了解。
如果本文对你有帮助,欢迎点赞、收藏、分享!有问题欢迎在评论区讨论。
更多推荐
所有评论(0)