🎯 系列导航:本文是 Agents 系列第16篇,深入探讨 Agent 训练的核心技术,从经典的 RLHF 到最新的强化学习方法。

🏷️ 关键词:RLHF、InstructGPT、AgentGym、R3、Agent微调、强化学习


📑 目录


1. 引言:为什么 Agent 需要训练? 🎯

在这里插入图片描述

在 AI Agent 的发展历程中,训练一直是提升 Agent 能力的核心手段。虽然大语言模型(LLM)本身已经具备了强大的基础能力,但要让它成为一个高效的 Agent,还需要针对性的训练和优化。

💡 思考:为什么预训练的 LLM 不能直接成为优秀的 Agent?

🤔 解答:预训练的 LLM 主要通过大规模文本进行语言建模训练,其目标是预测下一个 token。这种训练方式使模型具备了语言理解和生成能力,但存在以下局限:

  1. 任务对齐不足:模型不知道用户的真实意图,可能生成正确但无用的回答
  2. 交互能力有限:缺乏与环境交互、使用工具的经验
  3. 推理链条不完整:难以进行多步骤的复杂推理
  4. 安全性问题:可能生成有害内容或执行危险操作
┌─────────────────────────────────────────────────────────────────────┐
│                    Agent 能力提升的训练路径                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   预训练 LLM      监督微调        RLHF/RL        Agent 专项训练       │
│       │              │              │                │              │
│       ▼              ▼              ▼                ▼              │
│   ┌──────┐      ┌──────┐      ┌──────┐        ┌──────────┐         │
│   │语言  │  →   │指令  │  →   │人类  │   →    │环境交互  │          │
│   │理解  │      │跟随  │      │偏好  │        │工具使用  │          │
│   │生成  │      │能力  │      │对齐  │        │任务完成  │          │
│   └──────┘      └──────┘      └──────┘        └──────────┘         │
│                                                                     │
│   基础能力        任务能力       价值对齐         Agent 能力          │
└─────────────────────────────────────────────────────────────────────┘

Agent 训练的核心目标是让模型:

  • 理解意图:准确理解用户需求和任务目标
  • 规划行动:制定合理的执行计划和步骤
  • 交互执行:与环境和工具有效交互
  • 学习改进:从反馈中持续优化行为

本文将深入探讨实现这些目标的各种训练方法。


2. 基础篇:从监督学习到强化学习 📚

2.1 监督微调 (SFT)

监督微调(Supervised Fine-Tuning,SFT)是最直接的训练方式,通过人工标注的数据让模型学习期望的行为。

"""
监督微调的基本流程
"""
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
from datasets import load_dataset

# 1. 加载预训练模型
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")

# 2. 准备指令微调数据
# 数据格式: {"instruction": "...", "input": "...", "output": "..."}
def format_instruction(example):
    """格式化为模型输入"""
    prompt = f"""### Instruction:
{example['instruction']}

### Input:
{example['input']}

### Response:
{example['output']}"""
    return {"text": prompt}

dataset = load_dataset("your_instruction_dataset")
formatted_dataset = dataset.map(format_instruction)

# 3. 配置训练参数
training_args = TrainingArguments(
    output_dir="./sft_model",
    num_train_epochs=3,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=4,
    learning_rate=2e-5,
    warmup_ratio=0.03,
    logging_steps=10,
    save_strategy="epoch",
    fp16=True,
)

# 4. 开始训练
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=formatted_dataset["train"],
    tokenizer=tokenizer,
)
trainer.train()

💡 思考:SFT 为什么不能完全解决 Agent 训练问题?

🤔 解答:SFT 存在几个关键限制:

  1. 数据瓶颈:高质量的 Agent 交互数据难以获取和标注
  2. 静态学习:只能从固定数据学习,无法适应动态环境
  3. 错误累积:模型在推理时可能偏离训练分布,导致错误累积
  4. 奖励稀疏:无法从最终结果反向传播学习信号
┌────────────────────────────────────────────────────────────────┐
│                    SFT 的局限性分析                              │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│   训练数据分布              推理时分布                           │
│        ┌─────┐                 ┌─────┐                         │
│        │     │                 │     │     ← 分布偏移           │
│       ╱│  A  │╲               ╱│  A' │╲        (Distribution   │
│      ╱ │     │ ╲             ╱ │     │ ╲         Shift)        │
│     ╱  └─────┘  ╲           ╱  └─────┘  ╲                      │
│    ╱             ╲         ╱      ↓      ╲                     │
│   ╱               ╲       ╱    错误累积    ╲                    │
│  ────────────────────    ────────────────────                  │
│                                                                │
│   静态数据 → 固定模式       动态环境 → 需要适应                   │
└────────────────────────────────────────────────────────────────┘

2.2 强化学习基础

强化学习(Reinforcement Learning,RL)提供了一种让 Agent 在与环境交互中学习的范式。

核心概念

概念 符号 说明
状态 (State) s s s Agent 观察到的环境信息
动作 (Action) a a a Agent 可执行的操作
奖励 (Reward) r r r 环境对动作的反馈信号
策略 (Policy) π \pi π 从状态到动作的映射
价值函数 (Value) V ( s ) V(s) V(s) 状态的长期期望回报
"""
强化学习的基本框架
"""
import numpy as np
from typing import Tuple, List

class RLAgent:
    """基本的 RL Agent 框架"""
    
    def __init__(self, state_dim: int, action_dim: int):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.policy = self._init_policy()
        self.value_function = self._init_value()
        
    def _init_policy(self):
        """初始化策略网络"""
        # 通常是一个神经网络
        pass
    
    def _init_value(self):
        """初始化价值网络"""
        pass
    
    def select_action(self, state: np.ndarray) -> int:
        """根据当前状态选择动作"""
        # π(a|s) - 策略函数
        action_probs = self.policy(state)
        return np.random.choice(self.action_dim, p=action_probs)
    
    def update(self, trajectory: List[Tuple]):
        """根据轨迹更新策略
        
        trajectory: [(s_0, a_0, r_0), (s_1, a_1, r_1), ...]
        """
        # 计算回报 G_t = r_t + γ*r_{t+1} + γ²*r_{t+2} + ...
        returns = self._compute_returns(trajectory)
        
        # 策略梯度更新
        # ∇J(θ) = E[∇log π(a|s) * A(s,a)]
        self._policy_gradient_update(trajectory, returns)


class LLMAsRLAgent:
    """将 LLM 视为 RL Agent"""
    
    def __init__(self, llm):
        self.llm = llm  # 策略网络 π_θ
        
    def generate_action(self, state: str) -> str:
        """
        状态: 对话历史/任务描述
        动作: 生成的文本/API调用
        """
        # LLM 生成 = 策略采样
        # P(token_t | token_{<t}) = π(a_t | s_t)
        return self.llm.generate(state)
    
    def compute_reward(self, state: str, action: str, 
                       ground_truth: str = None) -> float:
        """
        奖励可以来自:
        1. 人类反馈
        2. 奖励模型
        3. 环境反馈 (任务成功/失败)
        """
        pass

2.3 为什么需要 RLHF

RLHF(Reinforcement Learning from Human Feedback)结合了监督学习和强化学习的优势,通过人类反馈来指导模型训练。

┌─────────────────────────────────────────────────────────────────────────┐
│                        RLHF 的必要性                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   问题1: 奖励函数难以定义                                                 │
│   ┌───────────────────────────────────────────────────────────────┐     │
│   │  什么是"好"的回答?                                             │     │
│   │  - 准确性? → 有时需要承认不确定                                 │     │
│   │  - 详细性? → 有时简洁更好                                       │     │
│   │  - 创造性? → 有时需要严谨                                       │     │
│   │                                                               │     │
│   │  人类偏好复杂且情境相关,难以用简单规则表达                        │     │
│   └───────────────────────────────────────────────────────────────┘     │
│                                                                         │
│   问题2: 直接优化指标可能适得其反 (Goodhart's Law)                        │
│   ┌───────────────────────────────────────────────────────────────┐     │
│   │  "When a measure becomes a target, it ceases to be a good     │     │
│   │   measure."                                                   │     │
│   │                                                               │     │
│   │  例: 优化"回答长度" → 模型学会废话连篇                           │     │
│   │  例: 优化"包含关键词" → 模型生成不连贯的关键词堆砌                │     │
│   └───────────────────────────────────────────────────────────────┘     │
│                                                                         │
│   解决方案: RLHF                                                         │
│   ┌───────────────────────────────────────────────────────────────┐     │
│   │  1. 收集人类偏好数据 (比较 A vs B)                              │     │
│   │  2. 训练奖励模型学习人类偏好                                    │     │
│   │  3. 用 RL 优化模型以最大化奖励                                  │     │
│   │                                                               │     │
│   │  优势: 奖励模型可以捕捉复杂的人类偏好                            │     │
│   └───────────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────────────┘

3. InstructGPT:RLHF 的里程碑 🏆

InstructGPT 是 OpenAI 在 2022 年发布的重要工作,首次大规模验证了 RLHF 在 LLM 上的有效性。这项工作奠定了后续 ChatGPT、GPT-4 等模型的技术基础。

3.1 InstructGPT 的三阶段训练

┌─────────────────────────────────────────────────────────────────────────┐
│                    InstructGPT 三阶段训练流程                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐      │
│  │   Stage 1       │    │   Stage 2       │    │   Stage 3       │      │
│  │   SFT           │ →  │   RM Training   │ →  │   RL (PPO)      │      │
│  │   监督微调       │    │   奖励模型训练   │    │   强化学习       │      │
│  └────────┬────────┘    └────────┬────────┘    └────────┬────────┘      │
│           │                      │                      │               │
│           ▼                      ▼                      ▼               │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐      │
│  │ 人工标注数据     │    │ 人类偏好对比     │    │ 奖励模型指导     │      │
│  │ (Prompt,Response)│    │ (A > B, B > C)  │    │ Policy 优化     │      │
│  │                 │    │                 │    │                 │      │
│  │ ~13K 样本       │    │ ~33K 对比对      │    │ ~31K Prompts    │      │
│  └─────────────────┘    └─────────────────┘    └─────────────────┘      │
│                                                                         │
│  数据来源: OpenAI API 用户提交的真实 prompts + 标注员编写的 prompts       │
└─────────────────────────────────────────────────────────────────────────┘

Stage 1: 监督微调 (SFT)

"""
Stage 1: 监督微调
目标: 让模型学会按照指令格式回答
"""
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class SFTDataPoint:
    """SFT 训练数据格式"""
    prompt: str           # 用户指令
    response: str         # 标注员撰写的高质量回答
    metadata: Dict        # 元信息(来源、标注者等)

class SFTTrainer:
    """监督微调训练器"""
    
    def __init__(self, base_model, tokenizer):
        self.model = base_model
        self.tokenizer = tokenizer
        
    def prepare_data(self, data: List[SFTDataPoint]) -> List[Dict]:
        """准备训练数据"""
        processed = []
        for dp in data:
            # 拼接 prompt 和 response
            text = f"Human: {dp.prompt}\n\nAssistant: {dp.response}"
            tokens = self.tokenizer(text, truncation=True, max_length=2048)
            
            # 只在 response 部分计算 loss
            labels = tokens['input_ids'].copy()
            prompt_len = len(self.tokenizer(f"Human: {dp.prompt}\n\nAssistant: ")['input_ids'])
            labels[:prompt_len] = [-100] * prompt_len  # -100 表示忽略
            
            processed.append({
                'input_ids': tokens['input_ids'],
                'attention_mask': tokens['attention_mask'],
                'labels': labels
            })
        return processed
    
    def train(self, dataset, epochs=3, lr=1e-5):
        """执行 SFT 训练"""
        # 标准的语言模型训练
        # Loss = -Σ log P(y_t | y_{<t}, x)
        pass

Stage 2: 奖励模型训练 (RM)

"""
Stage 2: 奖励模型训练
目标: 学习人类偏好,给回答打分
"""
import torch
import torch.nn as nn
from transformers import AutoModel

class RewardModel(nn.Module):
    """奖励模型架构"""
    
    def __init__(self, base_model_name: str):
        super().__init__()
        # 使用与 SFT 相同的基座模型
        self.backbone = AutoModel.from_pretrained(base_model_name)
        # 添加一个线性层输出标量奖励
        self.reward_head = nn.Linear(self.backbone.config.hidden_size, 1)
        
    def forward(self, input_ids, attention_mask):
        """前向传播,输出奖励值"""
        outputs = self.backbone(input_ids, attention_mask=attention_mask)
        # 取最后一个 token 的隐藏状态
        last_hidden = outputs.last_hidden_state[:, -1, :]
        reward = self.reward_head(last_hidden)
        return reward.squeeze(-1)


@dataclass  
class PreferencePair:
    """人类偏好对比数据"""
    prompt: str
    chosen: str      # 被选中的(更好的)回答
    rejected: str    # 被拒绝的(较差的)回答


class RMTrainer:
    """奖励模型训练器"""
    
    def __init__(self, reward_model: RewardModel, tokenizer):
        self.model = reward_model
        self.tokenizer = tokenizer
        
    def compute_loss(self, batch: List[PreferencePair]) -> torch.Tensor:
        """
        计算偏好学习损失 (Bradley-Terry 模型)
        
        Loss = -log(σ(r_chosen - r_rejected))
        
        即:让好回答的奖励尽可能高于差回答
        """
        chosen_rewards = []
        rejected_rewards = []
        
        for pair in batch:
            # 编码 chosen
            chosen_input = f"Human: {pair.prompt}\n\nAssistant: {pair.chosen}"
            chosen_tokens = self.tokenizer(chosen_input, return_tensors='pt')
            r_chosen = self.model(**chosen_tokens)
            chosen_rewards.append(r_chosen)
            
            # 编码 rejected
            rejected_input = f"Human: {pair.prompt}\n\nAssistant: {pair.rejected}"
            rejected_tokens = self.tokenizer(rejected_input, return_tensors='pt')
            r_rejected = self.model(**rejected_tokens)
            rejected_rewards.append(r_rejected)
        
        chosen_rewards = torch.stack(chosen_rewards)
        rejected_rewards = torch.stack(rejected_rewards)
        
        # Bradley-Terry loss
        loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
        
        return loss

3.2 奖励模型的设计

💡 思考:奖励模型需要具备哪些特性?

🤔 解答

┌─────────────────────────────────────────────────────────────────────────┐
│                        奖励模型设计要点                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. 与策略模型同源                                                       │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  奖励模型通常从 SFT 模型初始化                                │     │
│     │  - 继承语言理解能力                                          │     │
│     │  - 减少训练数据需求                                          │     │
│     │  - 更好地理解生成质量                                        │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
│  2. 输出标量奖励                                                         │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  输入: (prompt, response) → 输出: scalar reward              │     │
│     │                                                             │     │
│     │  技术细节:                                                   │     │
│     │  - 通常在 EOS token 位置取隐藏状态                           │     │
│     │  - 通过线性层映射到标量                                      │     │
│     │  - 训练时使用相对比较,推理时使用绝对值                       │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
│  3. 校准与泛化                                                           │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  挑战:                                                       │     │
│     │  - 训练数据有限,需要泛化到新 prompt                          │     │
│     │  - 奖励尺度需要校准(防止 reward hacking)                    │     │
│     │  - 需要定期用新数据更新                                      │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
│  4. 多维度奖励(高级)                                                   │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  可以训练多个奖励头:                                          │     │
│     │  - Helpfulness reward                                       │     │
│     │  - Harmlessness reward                                      │     │
│     │  - Honesty reward                                           │     │
│     │                                                             │     │
│     │  最终奖励 = w1*r_help + w2*r_harmless + w3*r_honest         │     │
│     └─────────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────────────┘

3.3 PPO 算法详解

PPO(Proximal Policy Optimization)是 InstructGPT 使用的核心 RL 算法。

┌─────────────────────────────────────────────────────────────────────────┐
│                          PPO 算法核心思想                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  目标: 更新策略 π_θ 以最大化期望奖励,同时保持稳定                         │
│                                                                         │
│  核心公式:                                                               │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │                                                                 │    │
│  │  L^{CLIP}(θ) = E[ min(r(θ)A, clip(r(θ), 1-ε, 1+ε)A) ]          │    │
│  │                                                                 │    │
│  │  其中:                                                          │    │
│  │    r(θ) = π_θ(a|s) / π_{θ_old}(a|s)  ← 新旧策略比率             │    │
│  │    A = 优势函数 (Advantage)           ← 动作好坏程度             │    │
│  │    ε = 0.2 (通常)                    ← 裁剪范围                  │    │
│  │                                                                 │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                                                         │
│  为什么需要 clip?                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │                                                                 │    │
│  │   r(θ)                                                          │    │
│  │    ▲                                                            │    │
│  │    │          ╱ 无限制优化                                       │    │
│  │ 1+ε├─────────┼─────  clip 上界                                  │    │
│  │    │        ╱│                                                  │    │
│  │  1 ├───────•─┼────── 初始策略                                   │    │
│  │    │      ╱  │                                                  │    │
│  │ 1-ε├─────┼───┴─────  clip 下界                                  │    │
│  │    │    ╱                                                       │    │
│  │    └────┴──────────→ 训练步数                                   │    │
│  │                                                                 │    │
│  │   clip 机制限制每次更新的幅度,防止策略变化过大                    │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

PPO for LLM 的特殊考虑

"""
PPO for LLM 实现
"""
import torch
import torch.nn.functional as F
from typing import Dict, List, Tuple

class PPOTrainer:
    """PPO 训练器(用于 LLM)"""
    
    def __init__(
        self,
        policy_model,          # 当前策略模型 π_θ
        ref_model,             # 参考模型(SFT 模型)π_ref
        reward_model,          # 奖励模型 r_φ
        tokenizer,
        config: Dict
    ):
        self.policy = policy_model
        self.ref = ref_model
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        
        # PPO 超参数
        self.clip_range = config.get('clip_range', 0.2)
        self.kl_coef = config.get('kl_coef', 0.1)      # KL 惩罚系数
        self.vf_coef = config.get('vf_coef', 0.1)      # 价值函数系数
        self.gamma = config.get('gamma', 1.0)          # 折扣因子
        self.lam = config.get('lam', 0.95)             # GAE lambda
        
    def generate_responses(self, prompts: List[str]) -> List[Dict]:
        """使用当前策略生成回答"""
        responses = []
        for prompt in prompts:
            # 生成回答
            input_ids = self.tokenizer.encode(prompt, return_tensors='pt')
            output = self.policy.generate(
                input_ids,
                max_new_tokens=256,
                do_sample=True,
                temperature=0.7,
                return_dict_in_generate=True,
                output_scores=True
            )
            
            response_ids = output.sequences[0][len(input_ids[0]):]
            response_text = self.tokenizer.decode(response_ids)
            
            responses.append({
                'prompt': prompt,
                'response': response_text,
                'response_ids': response_ids,
                'logprobs': self._compute_logprobs(output)
            })
        
        return responses
    
    def compute_rewards(self, samples: List[Dict]) -> torch.Tensor:
        """计算奖励(包含 KL 惩罚)"""
        rewards = []
        
        for sample in samples:
            # 1. 获取奖励模型打分
            full_text = f"{sample['prompt']}{sample['response']}"
            tokens = self.tokenizer(full_text, return_tensors='pt')
            rm_reward = self.reward_model(**tokens)
            
            # 2. 计算 KL 散度惩罚
            # KL(π_θ || π_ref) ≈ log π_θ(a|s) - log π_ref(a|s)
            with torch.no_grad():
                ref_logprobs = self._get_logprobs(
                    self.ref, 
                    sample['prompt'], 
                    sample['response']
                )
            policy_logprobs = sample['logprobs']
            kl_penalty = (policy_logprobs - ref_logprobs).sum()
            
            # 3. 最终奖励 = RM奖励 - KL惩罚
            final_reward = rm_reward - self.kl_coef * kl_penalty
            rewards.append(final_reward)
        
        return torch.stack(rewards)
    
    def compute_advantages(
        self, 
        rewards: torch.Tensor, 
        values: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        使用 GAE (Generalized Advantage Estimation) 计算优势函数
        
        A_t = δ_t + (γλ)δ_{t+1} + (γλ)²δ_{t+2} + ...
        δ_t = r_t + γV(s_{t+1}) - V(s_t)
        """
        advantages = []
        returns = []
        gae = 0
        
        # 从后往前计算 (token 级别)
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]
            
            delta = rewards[t] + self.gamma * next_value - values[t]
            gae = delta + self.gamma * self.lam * gae
            advantages.insert(0, gae)
            returns.insert(0, gae + values[t])
        
        advantages = torch.tensor(advantages)
        returns = torch.tensor(returns)
        
        # 标准化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        return advantages, returns
    
    def ppo_step(self, samples: List[Dict]) -> Dict:
        """执行一步 PPO 更新"""
        
        # 1. 计算奖励
        rewards = self.compute_rewards(samples)
        
        # 2. 计算价值估计
        values = self._estimate_values(samples)
        
        # 3. 计算优势
        advantages, returns = self.compute_advantages(rewards, values)
        
        # 4. PPO 更新(多个 epoch)
        for _ in range(self.ppo_epochs):
            # 获取当前策略的 log prob
            curr_logprobs = self._get_batch_logprobs(samples)
            old_logprobs = torch.tensor([s['logprobs'] for s in samples])
            
            # 计算比率
            ratio = torch.exp(curr_logprobs - old_logprobs)
            
            # Clipped surrogate objective
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_range, 1 + self.clip_range) * advantages
            policy_loss = -torch.min(surr1, surr2).mean()
            
            # Value function loss
            curr_values = self._estimate_values(samples)
            value_loss = F.mse_loss(curr_values, returns)
            
            # Total loss
            loss = policy_loss + self.vf_coef * value_loss
            
            # 反向传播
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()
        
        return {
            'policy_loss': policy_loss.item(),
            'value_loss': value_loss.item(),
            'mean_reward': rewards.mean().item(),
            'mean_kl': self._compute_mean_kl(samples).item()
        }
    
    def train(self, prompts: List[str], num_iterations: int):
        """完整训练循环"""
        for i in range(num_iterations):
            # 1. 采样 prompts
            batch_prompts = self._sample_batch(prompts)
            
            # 2. 生成回答
            samples = self.generate_responses(batch_prompts)
            
            # 3. PPO 更新
            metrics = self.ppo_step(samples)
            
            # 4. 日志记录
            print(f"Iteration {i}: reward={metrics['mean_reward']:.3f}, "
                  f"kl={metrics['mean_kl']:.4f}")
            
            # 5. 检查 KL 散度,必要时调整
            if metrics['mean_kl'] > self.target_kl * 1.5:
                self.kl_coef *= 1.5
            elif metrics['mean_kl'] < self.target_kl / 1.5:
                self.kl_coef /= 1.5

3.4 代码实现

下面是一个完整的 RLHF 训练框架实现:

"""
完整的 RLHF 训练框架
"""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from dataclasses import dataclass
from typing import Optional, List
import json

@dataclass
class RLHFConfig:
    """RLHF 训练配置"""
    # 模型配置
    base_model: str = "meta-llama/Llama-2-7b-hf"
    reward_model: Optional[str] = None
    
    # SFT 配置
    sft_epochs: int = 3
    sft_lr: float = 2e-5
    sft_batch_size: int = 4
    
    # RM 配置
    rm_epochs: int = 1
    rm_lr: float = 1e-5
    rm_batch_size: int = 8
    
    # PPO 配置
    ppo_epochs: int = 4
    ppo_lr: float = 1e-6
    ppo_batch_size: int = 16
    clip_range: float = 0.2
    kl_coef: float = 0.1
    target_kl: float = 0.02
    
    # 生成配置
    max_new_tokens: int = 256
    temperature: float = 0.7
    

class RLHFTrainer:
    """RLHF 完整训练流程"""
    
    def __init__(self, config: RLHFConfig):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 初始化 tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(config.base_model)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        
    def stage1_sft(self, sft_data: List[dict]) -> AutoModelForCausalLM:
        """
        Stage 1: 监督微调
        
        Args:
            sft_data: [{"prompt": "...", "response": "..."}, ...]
        
        Returns:
            微调后的模型
        """
        print("=" * 50)
        print("Stage 1: Supervised Fine-Tuning")
        print("=" * 50)
        
        # 加载基座模型
        model = AutoModelForCausalLM.from_pretrained(
            self.config.base_model,
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )
        
        # 准备数据
        train_dataset = self._prepare_sft_dataset(sft_data)
        
        # 训练
        # ... (使用 HuggingFace Trainer 或自定义训练循环)
        
        print(f"SFT completed. Trained on {len(sft_data)} samples.")
        return model
    
    def stage2_rm(
        self, 
        sft_model: AutoModelForCausalLM,
        preference_data: List[dict]
    ) -> RewardModel:
        """
        Stage 2: 奖励模型训练
        
        Args:
            sft_model: SFT 模型(用于初始化)
            preference_data: [{"prompt": "...", "chosen": "...", "rejected": "..."}, ...]
        
        Returns:
            训练好的奖励模型
        """
        print("=" * 50)
        print("Stage 2: Reward Model Training")
        print("=" * 50)
        
        # 从 SFT 模型初始化奖励模型
        reward_model = RewardModel(sft_model)
        
        # 准备偏好数据
        train_dataset = self._prepare_preference_dataset(preference_data)
        
        # 训练奖励模型
        optimizer = torch.optim.AdamW(reward_model.parameters(), lr=self.config.rm_lr)
        
        for epoch in range(self.config.rm_epochs):
            total_loss = 0
            correct = 0
            total = 0
            
            for batch in train_dataset:
                loss, acc = self._rm_train_step(reward_model, batch, optimizer)
                total_loss += loss
                correct += acc * len(batch)
                total += len(batch)
            
            print(f"Epoch {epoch+1}: Loss={total_loss/len(train_dataset):.4f}, "
                  f"Accuracy={correct/total:.2%}")
        
        return reward_model
    
    def stage3_ppo(
        self,
        sft_model: AutoModelForCausalLM,
        reward_model: RewardModel,
        prompts: List[str],
        num_iterations: int = 1000
    ) -> AutoModelForCausalLM:
        """
        Stage 3: PPO 强化学习
        
        Args:
            sft_model: SFT 模型(作为初始策略和参考模型)
            reward_model: 奖励模型
            prompts: 训练用的 prompt 集合
            num_iterations: 训练迭代次数
        
        Returns:
            RLHF 训练后的模型
        """
        print("=" * 50)
        print("Stage 3: PPO Training")
        print("=" * 50)
        
        # 初始化策略模型和参考模型
        policy_model = sft_model  # 将被更新
        ref_model = AutoModelForCausalLM.from_pretrained(
            self.config.base_model  # 加载原始 SFT 模型作为参考
        )
        ref_model.eval()  # 参考模型不更新
        
        # 初始化 PPO 训练器
        ppo_trainer = PPOTrainer(
            policy_model=policy_model,
            ref_model=ref_model,
            reward_model=reward_model,
            tokenizer=self.tokenizer,
            config={
                'clip_range': self.config.clip_range,
                'kl_coef': self.config.kl_coef,
                'target_kl': self.config.target_kl
            }
        )
        
        # PPO 训练循环
        ppo_trainer.train(prompts, num_iterations)
        
        return policy_model
    
    def full_pipeline(
        self,
        sft_data: List[dict],
        preference_data: List[dict],
        ppo_prompts: List[str]
    ) -> AutoModelForCausalLM:
        """运行完整的 RLHF 训练流程"""
        
        # Stage 1: SFT
        sft_model = self.stage1_sft(sft_data)
        
        # Stage 2: RM
        reward_model = self.stage2_rm(sft_model, preference_data)
        
        # Stage 3: PPO
        final_model = self.stage3_ppo(sft_model, reward_model, ppo_prompts)
        
        return final_model


# 使用示例
if __name__ == "__main__":
    # 配置
    config = RLHFConfig(
        base_model="meta-llama/Llama-2-7b-hf",
        sft_epochs=3,
        rm_epochs=1,
        ppo_epochs=4
    )
    
    # 加载数据
    with open("sft_data.json") as f:
        sft_data = json.load(f)
    with open("preference_data.json") as f:
        preference_data = json.load(f)
    with open("ppo_prompts.json") as f:
        ppo_prompts = json.load(f)
    
    # 训练
    trainer = RLHFTrainer(config)
    final_model = trainer.full_pipeline(sft_data, preference_data, ppo_prompts)
    
    # 保存模型
    final_model.save_pretrained("./rlhf_model")

4. RLHF 进阶:技术演进与变体 🔄

随着 RLHF 技术的发展,研究者们提出了多种改进和简化方案。

4.1 DPO:去除奖励模型

DPO(Direct Preference Optimization)是 2023 年提出的重要方法,它直接从偏好数据优化策略,无需显式的奖励模型。

┌─────────────────────────────────────────────────────────────────────────┐
│                    DPO vs RLHF 对比                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   RLHF 流程:                                                            │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐             │
│   │偏好数据  │ →  │训练 RM  │ →  │RM 打分  │ →  │PPO 优化 │             │
│   └─────────┘    └─────────┘    └─────────┘    └─────────┘             │
│                       ↓              ↓                                  │
│                   需要单独      采样+评估                                │
│                   训练模型      计算开销大                               │
│                                                                         │
│   DPO 流程:                                                             │
│   ┌─────────┐    ┌───────────────────────────────┐                      │
│   │偏好数据  │ →  │  直接优化策略 (closed-form)   │                      │
│   └─────────┘    └───────────────────────────────┘                      │
│                                ↓                                        │
│                         无需 RM,无需采样                                │
│                         训练更简单稳定                                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

DPO 的核心思想

DPO 证明了可以将 RLHF 的优化目标重新参数化,直接用策略模型表达,无需显式的奖励模型。

"""
DPO (Direct Preference Optimization) 实现
"""
import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM

class DPOTrainer:
    """DPO 训练器"""
    
    def __init__(
        self,
        model: AutoModelForCausalLM,
        ref_model: AutoModelForCausalLM,
        tokenizer,
        beta: float = 0.1  # 温度参数
    ):
        self.model = model
        self.ref_model = ref_model
        self.tokenizer = tokenizer
        self.beta = beta
        
    def compute_dpo_loss(
        self,
        prompt: str,
        chosen: str,
        rejected: str
    ) -> torch.Tensor:
        """
        DPO 损失函数
        
        L_DPO = -log σ(β * (log π_θ(y_w|x) - log π_ref(y_w|x) 
                         - log π_θ(y_l|x) + log π_ref(y_l|x)))
        
        其中:
          y_w = chosen (winner)
          y_l = rejected (loser)
          π_θ = 当前策略
          π_ref = 参考策略 (SFT 模型)
        """
        # 计算 chosen 的 log prob
        chosen_logps = self._get_logprobs(self.model, prompt, chosen)
        chosen_ref_logps = self._get_logprobs(self.ref_model, prompt, chosen)
        
        # 计算 rejected 的 log prob
        rejected_logps = self._get_logprobs(self.model, prompt, rejected)
        rejected_ref_logps = self._get_logprobs(self.ref_model, prompt, rejected)
        
        # 计算 log ratio
        chosen_ratio = chosen_logps - chosen_ref_logps
        rejected_ratio = rejected_logps - rejected_ref_logps
        
        # DPO loss
        loss = -F.logsigmoid(self.beta * (chosen_ratio - rejected_ratio))
        
        return loss
    
    def _get_logprobs(
        self,
        model: AutoModelForCausalLM,
        prompt: str,
        response: str
    ) -> torch.Tensor:
        """计算 response 在给定 prompt 下的 log probability"""
        
        full_text = prompt + response
        tokens = self.tokenizer(full_text, return_tensors='pt')
        
        with torch.no_grad() if model == self.ref_model else torch.enable_grad():
            outputs = model(**tokens)
            logits = outputs.logits
        
        # 只计算 response 部分的 log prob
        prompt_len = len(self.tokenizer(prompt)['input_ids'])
        response_logits = logits[:, prompt_len-1:-1, :]
        response_tokens = tokens['input_ids'][:, prompt_len:]
        
        log_probs = F.log_softmax(response_logits, dim=-1)
        token_log_probs = torch.gather(
            log_probs, 
            dim=-1, 
            index=response_tokens.unsqueeze(-1)
        ).squeeze(-1)
        
        return token_log_probs.sum()
    
    def train_step(self, batch: List[dict]) -> dict:
        """执行一步训练"""
        total_loss = 0
        
        for sample in batch:
            loss = self.compute_dpo_loss(
                sample['prompt'],
                sample['chosen'],
                sample['rejected']
            )
            total_loss += loss
        
        loss = total_loss / len(batch)
        loss.backward()
        
        return {'loss': loss.item()}


# DPO 的优势
"""
1. 简单性
   - 无需训练单独的奖励模型
   - 无需在线采样(不需要生成新回答)
   - 训练流程类似 SFT

2. 稳定性
   - 避免了 PPO 的训练不稳定性
   - 无需调整 KL 惩罚系数
   - 超参数更少

3. 效率
   - 计算开销大大降低
   - 无需维护多个模型
   - 内存占用更少

4. 效果
   - 在多个基准上与 RLHF 效果相当
   - 有时甚至更好
"""

4.2 ORPO:偏好优化的简化

ORPO(Odds Ratio Preference Optimization)进一步简化了偏好学习,甚至不需要参考模型。

"""
ORPO (Odds Ratio Preference Optimization) 实现
"""
import torch
import torch.nn.functional as F

class ORPOTrainer:
    """ORPO 训练器 - 结合 SFT 和偏好优化"""
    
    def __init__(
        self,
        model,
        tokenizer,
        lambda_orpo: float = 0.1  # ORPO 损失权重
    ):
        self.model = model
        self.tokenizer = tokenizer
        self.lambda_orpo = lambda_orpo
    
    def compute_orpo_loss(
        self,
        prompt: str,
        chosen: str,
        rejected: str
    ) -> tuple:
        """
        ORPO 损失 = SFT损失 + λ * 偏好损失
        
        偏好损失使用 odds ratio:
        L_OR = -log σ(log(odds(y_w|x) / odds(y_l|x)))
        
        其中 odds(y|x) = P(y|x) / (1 - P(y|x))
        """
        # 计算 chosen 的 log prob
        chosen_logps, chosen_avg_logps = self._get_logprobs_and_avg(prompt, chosen)
        rejected_logps, rejected_avg_logps = self._get_logprobs_and_avg(prompt, rejected)
        
        # SFT 损失(只在 chosen 上)
        sft_loss = -chosen_logps
        
        # ORPO 损失(基于 odds ratio)
        chosen_odds = chosen_avg_logps - torch.log(1 - torch.exp(chosen_avg_logps) + 1e-10)
        rejected_odds = rejected_avg_logps - torch.log(1 - torch.exp(rejected_avg_logps) + 1e-10)
        
        orpo_loss = -F.logsigmoid(chosen_odds - rejected_odds)
        
        # 总损失
        total_loss = sft_loss + self.lambda_orpo * orpo_loss
        
        return total_loss, sft_loss, orpo_loss
    
    def _get_logprobs_and_avg(self, prompt: str, response: str):
        """获取总 log prob 和平均 log prob"""
        full_text = prompt + response
        tokens = self.tokenizer(full_text, return_tensors='pt')
        
        outputs = self.model(**tokens)
        logits = outputs.logits
        
        prompt_len = len(self.tokenizer(prompt)['input_ids'])
        response_logits = logits[:, prompt_len-1:-1, :]
        response_tokens = tokens['input_ids'][:, prompt_len:]
        
        log_probs = F.log_softmax(response_logits, dim=-1)
        token_log_probs = torch.gather(
            log_probs,
            dim=-1,
            index=response_tokens.unsqueeze(-1)
        ).squeeze(-1)
        
        total_logps = token_log_probs.sum()
        avg_logps = token_log_probs.mean()
        
        return total_logps, avg_logps


# ORPO 的特点
"""
1. 无需参考模型
   - 比 DPO 更简单
   - 内存占用更少
   - 从随机初始化也可以训练

2. 结合 SFT 和偏好学习
   - 一阶段完成
   - 更高效的数据利用
   - 简化训练流程

3. 使用 Odds Ratio
   - 比 log prob ratio 更稳定
   - 对序列长度不敏感
   - 数学上更优雅
"""

4.3 KTO:基于前景理论的对齐

KTO(Kahneman-Tversky Optimization)基于行为经济学的前景理论,不需要配对的偏好数据。

"""
KTO (Kahneman-Tversky Optimization) 实现
"""
import torch
import torch.nn.functional as F

class KTOTrainer:
    """
    KTO 训练器 - 基于前景理论
    
    核心思想:人类对损失比收益更敏感(损失厌恶)
    """
    
    def __init__(
        self,
        model,
        ref_model,
        tokenizer,
        beta: float = 0.1,
        desirable_weight: float = 1.0,
        undesirable_weight: float = 1.0
    ):
        self.model = model
        self.ref_model = ref_model
        self.tokenizer = tokenizer
        self.beta = beta
        self.desirable_weight = desirable_weight
        self.undesirable_weight = undesirable_weight
    
    def compute_kto_loss(
        self,
        prompt: str,
        response: str,
        is_desirable: bool  # True = 好回答, False = 坏回答
    ) -> torch.Tensor:
        """
        KTO 损失函数
        
        对于好回答 (y_w):
          L = w_d * (1 - σ(β * (log π_θ(y|x) - log π_ref(y|x) - KL_ref)))
        
        对于坏回答 (y_l):
          L = w_u * σ(β * (log π_θ(y|x) - log π_ref(y|x) - KL_ref))
        
        其中 KL_ref 是参考 KL 散度的估计值
        """
        # 计算 log ratio
        policy_logps = self._get_logprobs(self.model, prompt, response)
        ref_logps = self._get_logprobs(self.ref_model, prompt, response)
        
        log_ratio = policy_logps - ref_logps
        
        # 计算参考 KL(通常在整个 batch 上估计)
        kl_ref = 0.0  # 简化起见,这里设为0
        
        if is_desirable:
            # 好回答:最大化 log ratio
            loss = self.desirable_weight * (
                1 - torch.sigmoid(self.beta * (log_ratio - kl_ref))
            )
        else:
            # 坏回答:最小化 log ratio
            loss = self.undesirable_weight * torch.sigmoid(
                self.beta * (log_ratio - kl_ref)
            )
        
        return loss
    
    def train_on_unpaired_data(self, data: List[dict]):
        """
        KTO 的优势:可以使用非配对数据
        
        data: [
            {"prompt": "...", "response": "...", "is_good": True},
            {"prompt": "...", "response": "...", "is_good": False},
            ...
        ]
        
        不需要 chosen/rejected 配对!
        """
        for sample in data:
            loss = self.compute_kto_loss(
                sample['prompt'],
                sample['response'],
                sample['is_good']
            )
            loss.backward()


# KTO 的优势
"""
1. 数据效率
   - 不需要配对数据
   - 可以直接使用点赞/点踩数据
   - 数据收集更容易

2. 理论基础
   - 基于前景理论
   - 考虑了人类的损失厌恶特性
   - 心理学上更合理

3. 灵活性
   - 可以调整好/坏数据的权重
   - 适应不同的数据分布
   - 易于与其他方法结合
"""

4.4 方法对比与选择

┌─────────────────────────────────────────────────────────────────────────────────┐
│                         偏好优化方法对比                                          │
├───────────┬──────────────┬──────────────┬──────────────┬──────────────────────────┤
│   方法    │  需要RM?     │  需要参考模型?│  需要配对数据?│        适用场景          │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│   RLHF    │     ✓        │      ✓       │      ✓       │ 大规模训练,最高质量      │
│   (PPO)   │              │              │              │                          │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│   DPO     │     ✗        │      ✓       │      ✓       │ 快速训练,资源有限        │
│           │              │              │              │                          │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│   ORPO    │     ✗        │      ✗       │      ✓       │ 从头训练,超简单流程      │
│           │              │              │              │                          │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│   KTO     │     ✗        │      ✓       │      ✗       │ 只有单边反馈数据          │
│           │              │              │              │ (点赞/点踩)              │
├───────────┼──────────────┼──────────────┼──────────────┼──────────────────────────┤
│   IPO     │     ✗        │      ✓       │      ✓       │ 需要更强的正则化          │
├───────────┴──────────────┴──────────────┴──────────────┴──────────────────────────┤
│                                                                                   │
│  选择指南:                                                                         │
│  ┌─────────────────────────────────────────────────────────────────────────────┐ │
│  │  1. 有大量计算资源 + 需要最佳效果 → RLHF (PPO)                               │ │
│  │  2. 计算资源有限 + 有配对偏好数据 → DPO                                      │ │
│  │  3. 想要最简单的训练流程 → ORPO                                              │ │
│  │  4. 只有单边反馈数据(点赞/点踩)→ KTO                                       │ │
│  │  5. 需要更强的理论保证 → IPO                                                 │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────────┘

5. AgentGym:通用 Agent 训练框架 🏋️

AgentGym 是一个专门为训练 LLM-based Agent 设计的框架,提供了统一的环境接口、大规模轨迹数据集和进化训练方法。

5.1 AgentGym 架构设计

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           AgentGym 整体架构                                      │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│                              ┌─────────────────┐                                │
│                              │   AgentEvol     │                                │
│                              │   进化训练器     │                                │
│                              └────────┬────────┘                                │
│                                       │                                         │
│                    ┌──────────────────┼──────────────────┐                      │
│                    │                  │                  │                      │
│                    ▼                  ▼                  ▼                      │
│            ┌──────────────┐   ┌──────────────┐   ┌──────────────┐               │
│            │   AgentTraj  │   │   AgentEval  │   │   AgentEnv   │               │
│            │   轨迹数据集  │   │   评估基准    │   │   环境套件   │               │
│            └──────────────┘   └──────────────┘   └──────────────┘               │
│                    │                  │                  │                      │
│                    │                  │                  │                      │
│  ┌─────────────────┴──────────────────┴──────────────────┴─────────────────┐    │
│  │                          统一抽象层 (EnvClient)                          │    │
│  │   ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐       │    │
│  │   │WebShop│ │ALFWorld│ │SciWorld│ │TextCraft│ │Weather│ │Movie│ │Todo │   │    │
│  │   └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘       │    │
│  │                    ... 14 种任务环境 ...                                 │    │
│  └─────────────────────────────────────────────────────────────────────────┘    │
│                                                                                 │
│  核心特点:                                                                       │
│  1. 统一的 ReAct 格式 (Thought → Action → Observation)                          │
│  2. 支持多种任务类型 (网页浏览、游戏、工具使用、知识推理等)                         │
│  3. 真实交互环境 (不是静态数据集)                                                 │
│  4. 可扩展的环境接口                                                             │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

5.2 AgentTraj 数据集

AgentTraj 是 AgentGym 的核心数据集,包含大规模的 Agent 交互轨迹。

"""
AgentTraj 数据集结构与使用
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class TaskType(Enum):
    """任务类型枚举"""
    WEB_BROWSING = "web_browsing"      # 网页浏览
    GAME = "game"                       # 游戏任务
    TOOL_USE = "tool_use"              # 工具使用
    KNOWLEDGE = "knowledge"            # 知识推理
    CODING = "coding"                  # 代码生成


@dataclass
class AgentStep:
    """单步 Agent 交互"""
    thought: str              # Agent 的思考过程
    action: str               # 执行的动作
    action_input: str         # 动作参数
    observation: str          # 环境反馈
    reward: Optional[float]   # 即时奖励(可选)


@dataclass
class AgentTrajectory:
    """完整的 Agent 轨迹"""
    task_id: str
    task_type: TaskType
    instruction: str          # 任务指令
    steps: List[AgentStep]    # 交互步骤序列
    final_reward: float       # 最终奖励/成功标志
    metadata: Dict            # 元信息


class AgentTrajDataset:
    """AgentTraj 数据集加载器"""
    
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.trajectories = self._load_data()
        
    def _load_data(self) -> List[AgentTrajectory]:
        """加载轨迹数据"""
        # 实际实现会从 JSON/Parquet 文件加载
        pass
    
    def filter_by_type(self, task_type: TaskType) -> List[AgentTrajectory]:
        """按任务类型筛选"""
        return [t for t in self.trajectories if t.task_type == task_type]
    
    def filter_by_reward(self, min_reward: float) -> List[AgentTrajectory]:
        """筛选高质量轨迹"""
        return [t for t in self.trajectories if t.final_reward >= min_reward]
    
    def to_sft_format(self, trajectory: AgentTrajectory) -> Dict:
        """转换为 SFT 训练格式"""
        prompt = f"Task: {trajectory.instruction}\n\n"
        response = ""
        
        for step in trajectory.steps:
            response += f"Thought: {step.thought}\n"
            response += f"Action: {step.action}\n"
            response += f"Action Input: {step.action_input}\n"
            response += f"Observation: {step.observation}\n\n"
        
        return {
            "prompt": prompt,
            "response": response,
            "reward": trajectory.final_reward
        }


# 数据集统计(来自论文)
"""
AgentTraj-L 数据集:
- 总轨迹数: ~500K
- 覆盖 14 种环境
- 包含成功和失败轨迹
- 多个 LLM 生成 (GPT-4, Claude, Llama)

环境分布:
┌──────────────┬───────────┬──────────────┐
│     环境      │  轨迹数   │   成功率     │
├──────────────┼───────────┼──────────────┤
│   WebShop    │   50K     │    35%      │
│   ALFWorld   │   80K     │    45%      │
│   SciWorld   │   40K     │    28%      │
│   TextCraft  │   60K     │    52%      │
│   BIRD       │   30K     │    40%      │
│   ...        │   ...     │    ...      │
└──────────────┴───────────┴──────────────┘
"""

5.3 AgentEvol 进化训练

AgentEvol 是 AgentGym 的核心训练方法,采用进化式的迭代训练策略。

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        AgentEvol 进化训练流程                                    │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                         初始化阶段                                       │   │
│   │   ┌─────────────┐                                                       │   │
│   │   │  Base LLM   │ ────→ SFT on AgentTraj ────→ │ Agent_v0 │            │   │
│   │   └─────────────┘                               └──────────┘            │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                        │                                        │
│                                        ▼                                        │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                         进化迭代 (重复 N 轮)                              │   │
│   │                                                                         │   │
│   │   Step 1: 探索 (Exploration)                                            │   │
│   │   ┌───────────────────────────────────────────────────────────────┐     │   │
│   │   │  Agent_v{i} 在环境中执行任务,收集新轨迹                        │     │   │
│   │   │  - 使用温度采样增加多样性                                       │     │   │
│   │   │  - 并行在多个环境中执行                                         │     │   │
│   │   │  - 记录 (observation, action, reward) 序列                     │     │   │
│   │   └───────────────────────────────────────────────────────────────┘     │   │
│   │                              │                                          │   │
│   │                              ▼                                          │   │
│   │   Step 2: 筛选 (Selection)                                              │   │
│   │   ┌───────────────────────────────────────────────────────────────┐     │   │
│   │   │  根据最终奖励筛选高质量轨迹                                      │     │   │
│   │   │  - 保留 reward > threshold 的轨迹                               │     │   │
│   │   │  - 可选: 使用奖励模型重新评分                                    │     │   │
│   │   │  - 去重和质量过滤                                               │     │   │
│   │   └───────────────────────────────────────────────────────────────┘     │   │
│   │                              │                                          │   │
│   │                              ▼                                          │   │
│   │   Step 3: 进化 (Evolution)                                              │   │
│   │   ┌───────────────────────────────────────────────────────────────┐     │   │
│   │   │  使用筛选后的数据训练                                           │     │   │
│   │   │  - 可选: SFT (监督微调)                                         │     │   │
│   │   │  - 可选: DPO (直接偏好优化)                                     │     │   │
│   │   │  - 可选: PPO (强化学习)                                         │     │   │
│   │   │  得到 Agent_v{i+1}                                              │     │   │
│   │   └───────────────────────────────────────────────────────────────┘     │   │
│   │                              │                                          │   │
│   │                              ▼                                          │   │
│   │   Step 4: 评估 (Evaluation)                                             │   │
│   │   ┌───────────────────────────────────────────────────────────────┐     │   │
│   │   │  在保留测试集上评估 Agent_v{i+1}                                │     │   │
│   │   │  - 计算任务成功率                                               │     │   │
│   │   │  - 比较与 v{i} 的提升                                           │     │   │
│   │   │  - 决定是否继续迭代                                             │     │   │
│   │   └───────────────────────────────────────────────────────────────┘     │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘
"""
AgentEvol 进化训练实现
"""
import torch
from typing import List, Dict, Tuple
from dataclasses import dataclass
import random

@dataclass
class EvolConfig:
    """AgentEvol 配置"""
    num_iterations: int = 5           # 进化轮数
    trajectories_per_iter: int = 1000 # 每轮收集的轨迹数
    selection_ratio: float = 0.3      # 保留的轨迹比例
    reward_threshold: float = 0.5     # 奖励阈值
    training_method: str = "sft"      # sft / dpo / ppo
    temperature: float = 0.8          # 探索温度


class AgentEvol:
    """AgentEvol 进化训练器"""
    
    def __init__(
        self,
        base_model,
        tokenizer,
        environments: List,
        config: EvolConfig
    ):
        self.model = base_model
        self.tokenizer = tokenizer
        self.envs = environments
        self.config = config
        self.trajectory_buffer = []
        
    def explore(self) -> List[AgentTrajectory]:
        """
        Step 1: 探索阶段
        让 Agent 在环境中执行任务,收集轨迹
        """
        new_trajectories = []
        
        for _ in range(self.config.trajectories_per_iter):
            # 随机选择环境
            env = random.choice(self.envs)
            
            # 重置环境
            obs = env.reset()
            trajectory = AgentTrajectory(
                task_id=env.task_id,
                task_type=env.task_type,
                instruction=env.instruction,
                steps=[],
                final_reward=0.0,
                metadata={}
            )
            
            # 执行交互循环
            done = False
            while not done:
                # Agent 生成动作
                prompt = self._build_prompt(trajectory.instruction, trajectory.steps, obs)
                action, thought = self._generate_action(prompt)
                
                # 环境执行
                obs, reward, done, info = env.step(action)
                
                # 记录步骤
                trajectory.steps.append(AgentStep(
                    thought=thought,
                    action=action['name'],
                    action_input=action['input'],
                    observation=obs,
                    reward=reward
                ))
            
            trajectory.final_reward = env.get_final_reward()
            new_trajectories.append(trajectory)
        
        return new_trajectories
    
    def select(self, trajectories: List[AgentTrajectory]) -> List[AgentTrajectory]:
        """
        Step 2: 筛选阶段
        根据奖励筛选高质量轨迹
        """
        # 按奖励排序
        sorted_trajectories = sorted(
            trajectories, 
            key=lambda t: t.final_reward, 
            reverse=True
        )
        
        # 筛选
        selected = []
        for t in sorted_trajectories:
            if t.final_reward >= self.config.reward_threshold:
                selected.append(t)
            if len(selected) >= int(len(trajectories) * self.config.selection_ratio):
                break
        
        print(f"Selected {len(selected)} trajectories from {len(trajectories)}")
        print(f"Average reward: {sum(t.final_reward for t in selected) / len(selected):.3f}")
        
        return selected
    
    def evolve(self, trajectories: List[AgentTrajectory]):
        """
        Step 3: 进化阶段
        使用筛选后的数据训练模型
        """
        if self.config.training_method == "sft":
            self._sft_training(trajectories)
        elif self.config.training_method == "dpo":
            self._dpo_training(trajectories)
        elif self.config.training_method == "ppo":
            self._ppo_training(trajectories)
    
    def _sft_training(self, trajectories: List[AgentTrajectory]):
        """SFT 训练"""
        # 转换为 SFT 格式
        sft_data = [self._to_sft_format(t) for t in trajectories]
        
        # 标准 SFT 训练
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5)
        
        for epoch in range(3):
            for batch in self._batch_data(sft_data, batch_size=4):
                loss = self._compute_sft_loss(batch)
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
    
    def _dpo_training(self, trajectories: List[AgentTrajectory]):
        """DPO 训练 - 使用成功/失败轨迹配对"""
        # 分离成功和失败轨迹
        successful = [t for t in trajectories if t.final_reward > 0.5]
        failed = [t for t in self.trajectory_buffer if t.final_reward < 0.5]
        
        # 创建偏好对
        pairs = []
        for good_traj in successful:
            # 找到相同任务的失败轨迹
            bad_trajs = [t for t in failed if t.task_type == good_traj.task_type]
            if bad_trajs:
                bad_traj = random.choice(bad_trajs)
                pairs.append({
                    'prompt': good_traj.instruction,
                    'chosen': self._format_trajectory(good_traj),
                    'rejected': self._format_trajectory(bad_traj)
                })
        
        # DPO 训练
        # ... (使用之前的 DPOTrainer)
    
    def evaluate(self, test_envs: List) -> Dict:
        """
        Step 4: 评估阶段
        """
        results = {
            'total': 0,
            'success': 0,
            'by_type': {}
        }
        
        for env in test_envs:
            obs = env.reset()
            done = False
            steps = []
            
            while not done:
                prompt = self._build_prompt(env.instruction, steps, obs)
                action, thought = self._generate_action(prompt, temperature=0.0)  # greedy
                obs, reward, done, info = env.step(action)
                steps.append({'thought': thought, 'action': action, 'obs': obs})
            
            success = env.get_final_reward() > 0.5
            results['total'] += 1
            if success:
                results['success'] += 1
            
            # 按类型统计
            task_type = env.task_type.value
            if task_type not in results['by_type']:
                results['by_type'][task_type] = {'total': 0, 'success': 0}
            results['by_type'][task_type]['total'] += 1
            if success:
                results['by_type'][task_type]['success'] += 1
        
        results['success_rate'] = results['success'] / results['total']
        return results
    
    def run_evolution(self, num_iterations: int = None) -> List[Dict]:
        """运行完整的进化训练"""
        if num_iterations is None:
            num_iterations = self.config.num_iterations
        
        history = []
        
        for i in range(num_iterations):
            print(f"\n{'='*50}")
            print(f"Evolution Iteration {i+1}/{num_iterations}")
            print(f"{'='*50}")
            
            # Step 1: 探索
            print("\n[1/4] Exploring...")
            new_trajectories = self.explore()
            self.trajectory_buffer.extend(new_trajectories)
            
            # Step 2: 筛选
            print("\n[2/4] Selecting...")
            selected = self.select(new_trajectories)
            
            # Step 3: 进化
            print("\n[3/4] Evolving...")
            self.evolve(selected)
            
            # Step 4: 评估
            print("\n[4/4] Evaluating...")
            eval_results = self.evaluate(self.test_envs)
            
            history.append({
                'iteration': i + 1,
                'trajectories_collected': len(new_trajectories),
                'trajectories_selected': len(selected),
                'eval_results': eval_results
            })
            
            print(f"\nSuccess Rate: {eval_results['success_rate']:.2%}")
        
        return history
    
    def _generate_action(self, prompt: str, temperature: float = None) -> Tuple[Dict, str]:
        """生成动作"""
        if temperature is None:
            temperature = self.config.temperature
        
        inputs = self.tokenizer(prompt, return_tensors='pt')
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=256,
            temperature=temperature,
            do_sample=temperature > 0
        )
        
        response = self.tokenizer.decode(outputs[0])
        # 解析 thought 和 action
        thought, action = self._parse_response(response)
        return action, thought

5.4 实战:使用 AgentGym 训练 Agent

"""
AgentGym 实战:训练一个网页购物 Agent
"""
from agentgym import AgentGym, WebShopEnv, AgentEvol
from transformers import AutoModelForCausalLM, AutoTokenizer

# 1. 初始化环境
print("Step 1: Setting up environments...")
webshop_env = WebShopEnv(
    num_products=10000,
    user_personas_path="./data/user_personas.json"
)

# 2. 加载基座模型
print("Step 2: Loading base model...")
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-hf",
    torch_dtype=torch.bfloat16,
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")

# 3. 加载 AgentTraj 数据进行初始 SFT
print("Step 3: Initial SFT on AgentTraj...")
agenttraj = AgentTrajDataset("./data/agenttraj_webshop.json")
sft_data = agenttraj.filter_by_reward(0.5)  # 只用成功的轨迹
sft_trainer = SFTTrainer(model, tokenizer)
sft_trainer.train(sft_data, epochs=3)

# 4. 进化训练
print("Step 4: Starting AgentEvol...")
evol_config = EvolConfig(
    num_iterations=5,
    trajectories_per_iter=500,
    selection_ratio=0.3,
    reward_threshold=0.5,
    training_method="dpo"
)

agent_evol = AgentEvol(
    base_model=model,
    tokenizer=tokenizer,
    environments=[webshop_env],
    config=evol_config
)

# 设置测试环境
agent_evol.test_envs = [WebShopEnv(test=True) for _ in range(100)]

# 运行进化
history = agent_evol.run_evolution()

# 5. 查看训练历史
print("\n" + "="*50)
print("Training History")
print("="*50)
for record in history:
    print(f"Iteration {record['iteration']}: "
          f"Success Rate = {record['eval_results']['success_rate']:.2%}")

# 6. 保存最终模型
model.save_pretrained("./webshop_agent_evolved")
tokenizer.save_pretrained("./webshop_agent_evolved")

"""
预期输出:
==================================================
Training History
==================================================
Iteration 1: Success Rate = 42.00%
Iteration 2: Success Rate = 51.00%
Iteration 3: Success Rate = 58.00%
Iteration 4: Success Rate = 63.00%
Iteration 5: Success Rate = 67.00%
"""

6. R3:推理增强的强化学习 🧠

R3(Reinforcement Learning with Reasoning Rewards)是一种将推理能力融入强化学习训练的方法,特别适合需要复杂推理的 Agent 任务。

6.1 R3 的核心思想

💡 思考:为什么传统的 RL 方法在训练需要推理的 Agent 时效果不佳?

🤔 解答

┌─────────────────────────────────────────────────────────────────────────────────┐
│                      传统 RL vs R3 对比                                          │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   传统 RL 的问题:                                                                │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   Task: "找一本关于机器学习的书,价格低于50元"                            │   │
│   │                                                                         │   │
│   │   Agent 推理过程:                                                        │   │
│   │   Thought 1: 我需要搜索机器学习相关的书籍 ←────── ✓ 正确                 │   │
│   │   Action 1: search("机器学习")                                          │   │
│   │   Thought 2: 找到了很多结果,需要筛选价格 ←────── ✓ 正确                 │   │
│   │   Action 2: filter(price < 50)                                          │   │
│   │   Thought 3: 这本看起来不错,点击查看 ←────────── ✓ 正确                 │   │
│   │   Action 3: click("机器学习实战")                                        │   │
│   │   ...                                                                   │   │
│   │   Final Result: 购买成功 ✓                                              │   │
│   │                                                                         │   │
│   │   传统 RL 奖励: +1 (只看最终结果)                                        │   │
│   │                                                                         │   │
│   │   问题:                                                                  │   │
│   │   - 中间推理步骤没有奖励                                                 │   │
│   │   - 奖励稀疏,学习效率低                                                 │   │
│   │   - 无法区分"运气好"和"推理好"                                           │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   R3 的解决方案:                                                                 │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   为每个推理步骤提供奖励:                                                 │   │
│   │                                                                         │   │
│   │   Thought 1: 我需要搜索机器学习相关的书籍                                 │   │
│   │              → Reasoning Reward: +0.3 (正确识别任务)                     │   │
│   │                                                                         │   │
│   │   Thought 2: 找到了很多结果,需要筛选价格                                 │   │
│   │              → Reasoning Reward: +0.4 (记住价格约束)                     │   │
│   │                                                                         │   │
│   │   Thought 3: 这本看起来不错,点击查看                                     │   │
│   │              → Reasoning Reward: +0.2 (合理的探索)                       │   │
│   │                                                                         │   │
│   │   Total Reward = Task Reward + Σ Reasoning Rewards                      │   │
│   │                = 1.0 + 0.9 = 1.9                                         │   │
│   │                                                                         │   │
│   │   优势:                                                                  │   │
│   │   - 密集奖励信号                                                        │   │
│   │   - 鼓励正确的推理过程                                                   │   │
│   │   - 学习效率大幅提升                                                     │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

6.2 推理过程奖励模型

R3 的核心是训练一个能评估推理过程质量的奖励模型。

"""
R3: 推理过程奖励模型 (Process Reward Model)
"""
import torch
import torch.nn as nn
from transformers import AutoModel
from typing import List, Dict

class ProcessRewardModel(nn.Module):
    """
    推理过程奖励模型
    
    与传统的 Outcome RM 不同,Process RM 评估每一步推理的质量
    """
    
    def __init__(self, base_model_name: str):
        super().__init__()
        self.backbone = AutoModel.from_pretrained(base_model_name)
        
        # 步骤级别奖励头
        self.step_reward_head = nn.Sequential(
            nn.Linear(self.backbone.config.hidden_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )
        
        # 可选:多维度评估
        self.dimension_heads = nn.ModuleDict({
            'relevance': nn.Linear(self.backbone.config.hidden_size, 1),
            'correctness': nn.Linear(self.backbone.config.hidden_size, 1),
            'progress': nn.Linear(self.backbone.config.hidden_size, 1)
        })
    
    def forward(
        self, 
        input_ids, 
        attention_mask,
        step_positions: List[int]  # 每个推理步骤的结束位置
    ) -> Dict[str, torch.Tensor]:
        """
        前向传播
        
        Args:
            input_ids: token ids
            attention_mask: attention mask
            step_positions: 每个 Thought 结束的位置索引
        
        Returns:
            step_rewards: 每个步骤的奖励值
        """
        outputs = self.backbone(input_ids, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state
        
        # 在每个步骤位置提取隐藏状态并计算奖励
        step_rewards = []
        dimension_scores = {dim: [] for dim in self.dimension_heads.keys()}
        
        for pos in step_positions:
            step_hidden = hidden_states[:, pos, :]
            
            # 总体步骤奖励
            step_reward = self.step_reward_head(step_hidden)
            step_rewards.append(step_reward)
            
            # 多维度分数
            for dim, head in self.dimension_heads.items():
                score = head(step_hidden)
                dimension_scores[dim].append(score)
        
        return {
            'step_rewards': torch.stack(step_rewards, dim=1),
            'dimension_scores': {
                dim: torch.stack(scores, dim=1) 
                for dim, scores in dimension_scores.items()
            }
        }


class ProcessRMTrainer:
    """推理过程奖励模型训练器"""
    
    def __init__(self, model: ProcessRewardModel, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def prepare_training_data(
        self, 
        trajectories: List[Dict]
    ) -> List[Dict]:
        """
        准备训练数据
        
        数据格式:每个轨迹需要标注每一步的推理质量
        
        {
            "instruction": "...",
            "steps": [
                {"thought": "...", "action": "...", "quality": 0.8},
                {"thought": "...", "action": "...", "quality": 0.6},
                ...
            ]
        }
        """
        processed = []
        
        for traj in trajectories:
            # 拼接完整序列
            text = f"Task: {traj['instruction']}\n\n"
            step_positions = []
            step_qualities = []
            
            for step in traj['steps']:
                text += f"Thought: {step['thought']}\n"
                # 记录 Thought 结束位置
                step_positions.append(len(self.tokenizer(text)['input_ids']) - 1)
                step_qualities.append(step['quality'])
                text += f"Action: {step['action']}\n"
                text += f"Observation: {step.get('observation', '')}\n\n"
            
            tokens = self.tokenizer(text, return_tensors='pt', truncation=True)
            
            processed.append({
                'input_ids': tokens['input_ids'],
                'attention_mask': tokens['attention_mask'],
                'step_positions': step_positions,
                'step_qualities': torch.tensor(step_qualities)
            })
        
        return processed
    
    def compute_loss(self, batch: List[Dict]) -> torch.Tensor:
        """
        计算损失
        
        使用 MSE 损失来训练步骤奖励预测
        """
        total_loss = 0
        
        for sample in batch:
            outputs = self.model(
                sample['input_ids'],
                sample['attention_mask'],
                sample['step_positions']
            )
            
            predicted_rewards = outputs['step_rewards'].squeeze(-1)
            target_rewards = sample['step_qualities']
            
            loss = nn.MSELoss()(predicted_rewards, target_rewards)
            total_loss += loss
        
        return total_loss / len(batch)
    
    def train_with_comparisons(
        self,
        good_trajectories: List[Dict],
        bad_trajectories: List[Dict]
    ):
        """
        使用对比学习训练
        
        给定相同任务的好/坏轨迹对,学习区分推理质量
        """
        # 类似 DPO 的方式,但在步骤级别
        pass

6.3 R3 训练流程

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           R3 训练完整流程                                        │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   阶段 1: 收集带标注的推理轨迹                                                   │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   方法 A: 人工标注 (高质量,成本高)                                      │   │
│   │   - 标注员评估每个 Thought 的质量                                        │   │
│   │   - 评分维度: 相关性、正确性、进展性                                     │   │
│   │                                                                         │   │
│   │   方法 B: 自动标注 (大规模,需要验证)                                    │   │
│   │   - 使用 GPT-4 等强模型评估                                              │   │
│   │   - 基于最终结果回溯标注                                                 │   │
│   │   - Monte Carlo 采样估计步骤贡献                                         │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                       │                                         │
│                                       ▼                                         │
│   阶段 2: 训练推理过程奖励模型 (PRM)                                             │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   输入: 任务描述 + 推理步骤序列                                          │   │
│   │   输出: 每个步骤的质量分数                                               │   │
│   │                                                                         │   │
│   │   训练目标: min Σ MSE(predicted_reward, human_label)                    │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                       │                                         │
│                                       ▼                                         │
│   阶段 3: 使用 PRM 进行强化学习                                                  │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   修改后的奖励函数:                                                      │   │
│   │                                                                         │   │
│   │   R_total = α * R_task + β * Σ R_reasoning(step_i) + γ * R_KL           │   │
│   │             ↑              ↑                          ↑                 │   │
│   │         任务奖励       推理过程奖励                KL 惩罚               │   │
│   │                                                                         │   │
│   │   使用 PPO 或其他 RL 算法优化                                            │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                       │                                         │
│                                       ▼                                         │
│   阶段 4: 迭代优化                                                               │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   Loop:                                                                 │   │
│   │     1. 用当前策略收集新轨迹                                              │   │
│   │     2. 用 PRM 评估推理质量                                               │   │
│   │     3. 筛选高质量轨迹更新 PRM                                            │   │
│   │     4. 用更新后的 PRM 继续 RL 训练                                       │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘
"""
R3 完整训练实现
"""
import torch
from typing import List, Dict, Tuple
from dataclasses import dataclass

@dataclass
class R3Config:
    """R3 训练配置"""
    # 奖励权重
    task_reward_weight: float = 1.0
    reasoning_reward_weight: float = 0.5
    kl_penalty_weight: float = 0.1
    
    # 训练参数
    ppo_epochs: int = 4
    batch_size: int = 16
    learning_rate: float = 1e-6
    
    # PRM 更新
    prm_update_freq: int = 100
    prm_learning_rate: float = 1e-5


class R3Trainer:
    """R3 训练器"""
    
    def __init__(
        self,
        policy_model,
        ref_model,
        process_rm: ProcessRewardModel,
        tokenizer,
        config: R3Config
    ):
        self.policy = policy_model
        self.ref = ref_model
        self.prm = process_rm
        self.tokenizer = tokenizer
        self.config = config
        
    def compute_r3_reward(
        self,
        trajectory: Dict,
        task_reward: float
    ) -> Tuple[torch.Tensor, Dict]:
        """
        计算 R3 奖励
        
        R = α * R_task + β * Σ R_reasoning + γ * R_KL
        """
        # 1. 任务奖励
        weighted_task_reward = self.config.task_reward_weight * task_reward
        
        # 2. 推理过程奖励
        text = self._format_trajectory(trajectory)
        tokens = self.tokenizer(text, return_tensors='pt')
        step_positions = self._get_step_positions(trajectory)
        
        with torch.no_grad():
            prm_outputs = self.prm(
                tokens['input_ids'],
                tokens['attention_mask'],
                step_positions
            )
        
        reasoning_rewards = prm_outputs['step_rewards'].squeeze()
        weighted_reasoning_reward = (
            self.config.reasoning_reward_weight * reasoning_rewards.sum()
        )
        
        # 3. KL 惩罚
        kl_penalty = self._compute_kl(trajectory)
        weighted_kl = self.config.kl_penalty_weight * kl_penalty
        
        # 总奖励
        total_reward = weighted_task_reward + weighted_reasoning_reward - weighted_kl
        
        # 返回详细信息供分析
        details = {
            'task_reward': task_reward,
            'reasoning_rewards': reasoning_rewards.tolist(),
            'kl_penalty': kl_penalty.item(),
            'total_reward': total_reward.item()
        }
        
        return total_reward, details
    
    def train_step(self, trajectories: List[Dict]) -> Dict:
        """
        执行一步 R3 训练
        """
        # 收集所有样本的奖励
        all_rewards = []
        all_details = []
        
        for traj in trajectories:
            task_reward = traj['final_reward']
            reward, details = self.compute_r3_reward(traj, task_reward)
            all_rewards.append(reward)
            all_details.append(details)
        
        # 计算优势函数
        rewards_tensor = torch.stack(all_rewards)
        values = self._estimate_values(trajectories)
        advantages = self._compute_advantages(rewards_tensor, values)
        
        # PPO 更新
        policy_loss = self._ppo_update(trajectories, advantages)
        
        # 定期更新 PRM
        if self.step_count % self.config.prm_update_freq == 0:
            self._update_prm(trajectories)
        
        return {
            'policy_loss': policy_loss,
            'mean_reward': rewards_tensor.mean().item(),
            'mean_reasoning_reward': sum(d['reasoning_rewards'][0] for d in all_details) / len(all_details),
            'mean_kl': sum(d['kl_penalty'] for d in all_details) / len(all_details)
        }
    
    def _update_prm(self, trajectories: List[Dict]):
        """
        使用新收集的轨迹更新 PRM
        
        可以使用多种策略:
        1. 基于最终结果的伪标签
        2. 使用 Monte Carlo 估计步骤价值
        3. 使用对比学习
        """
        # Monte Carlo 价值估计
        for traj in trajectories:
            step_values = self._monte_carlo_step_values(traj)
            # 用这些估计值更新 PRM
            # ...
    
    def _monte_carlo_step_values(self, trajectory: Dict) -> List[float]:
        """
        使用 Monte Carlo 方法估计每个步骤的价值贡献
        
        思路:从某个步骤开始,多次采样完成任务,
              成功率反映该步骤的价值
        """
        step_values = []
        
        for i, step in enumerate(trajectory['steps']):
            # 从步骤 i 开始,用当前策略完成任务
            success_count = 0
            num_samples = 10
            
            for _ in range(num_samples):
                # 复制前 i 步,然后继续生成
                partial_traj = self._copy_partial(trajectory, i)
                completed = self._complete_trajectory(partial_traj)
                if completed['final_reward'] > 0.5:
                    success_count += 1
            
            step_values.append(success_count / num_samples)
        
        return step_values


# 使用示例
def train_r3_agent():
    """使用 R3 训练 Agent 的完整流程"""
    
    # 1. 初始化模型
    policy = AutoModelForCausalLM.from_pretrained("llama-7b")
    ref = AutoModelForCausalLM.from_pretrained("llama-7b")
    prm = ProcessRewardModel("llama-7b")
    tokenizer = AutoTokenizer.from_pretrained("llama-7b")
    
    # 2. 加载预训练的 PRM
    # prm.load_state_dict(torch.load("pretrained_prm.pt"))
    
    # 3. 配置训练
    config = R3Config(
        task_reward_weight=1.0,
        reasoning_reward_weight=0.5,
        kl_penalty_weight=0.1
    )
    
    trainer = R3Trainer(policy, ref, prm, tokenizer, config)
    
    # 4. 训练循环
    for iteration in range(1000):
        # 收集轨迹
        trajectories = trainer.collect_trajectories(num=32)
        
        # R3 更新
        metrics = trainer.train_step(trajectories)
        
        print(f"Iter {iteration}: reward={metrics['mean_reward']:.3f}, "
              f"reasoning={metrics['mean_reasoning_reward']:.3f}")

6.4 实验结果分析

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        R3 实验结果 (来自论文)                                    │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   1. 数学推理任务 (GSM8K, MATH)                                                 │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   方法          │  GSM8K  │  MATH   │  训练样本数                        │   │
│   │   ─────────────────────────────────────────────────────────            │   │
│   │   SFT           │  45.2%  │  12.8%  │  100K                            │   │
│   │   RLHF (ORM)    │  52.1%  │  15.3%  │  100K + 50K RL                   │   │
│   │   R3 (PRM)      │  58.7%  │  19.2%  │  100K + 50K RL                   │   │
│   │                                                                         │   │
│   │   提升: R3 比 RLHF 在 GSM8K 上提升 6.6%,在 MATH 上提升 3.9%             │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   2. Agent 任务 (WebShop, ALFWorld)                                             │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   方法          │ WebShop │ ALFWorld │  平均步数                        │   │
│   │   ─────────────────────────────────────────────────────────            │   │
│   │   SFT           │  32.5%  │  41.2%   │  8.3                            │   │
│   │   RLHF          │  45.8%  │  52.6%   │  7.1                            │   │
│   │   R3            │  54.2%  │  61.8%   │  5.9                            │   │
│   │                                                                         │   │
│   │   R3 不仅成功率更高,完成任务所需步数也更少                               │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   3. 消融实验                                                                    │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   组件            │  GSM8K 影响   │  说明                               │   │
│   │   ─────────────────────────────────────────────────                    │   │
│   │   去除 PRM        │  -6.6%        │  退化为标准 RLHF                    │   │
│   │   去除多维度评估   │  -2.1%        │  单一分数不够精细                   │   │
│   │   去除 PRM 更新   │  -3.4%        │  PRM 需要持续优化                   │   │
│   │   减少推理奖励权重 │  -4.2%        │  推理奖励很重要                     │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   关键发现:                                                                      │
│   • PRM 比 ORM 更能捕捉推理质量                                                  │
│   • 密集的推理奖励显著提升学习效率                                                │
│   • PRM 需要随策略更新而更新(避免 reward hacking)                              │
│   • 多维度评估(相关性、正确性、进展性)比单一评分更有效                           │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

7. Agent 微调方法大全 📖

本节总结各种 Agent 微调方法,帮助读者根据具体场景选择合适的技术。

7.1 基于轨迹的微调

"""
基于轨迹的微调方法
"""
from typing import List, Dict
from enum import Enum

class TrajectoryFinetuneMethod(Enum):
    """轨迹微调方法枚举"""
    BEHAVIOR_CLONING = "bc"           # 行为克隆
    FILTERED_BC = "filtered_bc"       # 筛选后的行为克隆
    WEIGHTED_BC = "weighted_bc"       # 加权行为克隆
    TRAJECTORY_RANKING = "ranking"    # 轨迹排序学习


class BehaviorCloning:
    """
    行为克隆 (Behavior Cloning)
    
    最简单的方法:直接在专家轨迹上做 SFT
    """
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def train(self, expert_trajectories: List[Dict]):
        """
        在专家轨迹上训练
        
        优点: 简单、稳定
        缺点: 需要大量高质量数据,无法超越专家
        """
        for traj in expert_trajectories:
            # 转换为 (prompt, response) 格式
            prompt = f"Task: {traj['instruction']}\n"
            response = self._format_steps(traj['steps'])
            
            # 标准 SFT 损失
            loss = self._compute_sft_loss(prompt, response)
            loss.backward()
            # ...


class FilteredBC:
    """
    筛选后的行为克隆
    
    只在成功的轨迹上训练
    """
    
    def __init__(self, model, tokenizer, success_threshold: float = 0.5):
        self.model = model
        self.tokenizer = tokenizer
        self.threshold = success_threshold
    
    def filter_trajectories(self, trajectories: List[Dict]) -> List[Dict]:
        """筛选成功轨迹"""
        return [t for t in trajectories if t['final_reward'] >= self.threshold]
    
    def train(self, trajectories: List[Dict]):
        """在筛选后的轨迹上训练"""
        filtered = self.filter_trajectories(trajectories)
        print(f"Filtered {len(filtered)}/{len(trajectories)} trajectories")
        
        # 标准 BC 训练
        for traj in filtered:
            # ...
            pass


class WeightedBC:
    """
    加权行为克隆
    
    根据轨迹质量分配不同的学习权重
    """
    
    def __init__(self, model, tokenizer, temperature: float = 1.0):
        self.model = model
        self.tokenizer = tokenizer
        self.temperature = temperature
    
    def compute_weights(self, trajectories: List[Dict]) -> List[float]:
        """计算每条轨迹的权重"""
        rewards = [t['final_reward'] for t in trajectories]
        
        # 使用 softmax 转换为权重
        import numpy as np
        rewards = np.array(rewards) / self.temperature
        weights = np.exp(rewards) / np.exp(rewards).sum()
        
        return weights.tolist()
    
    def train(self, trajectories: List[Dict]):
        """加权训练"""
        weights = self.compute_weights(trajectories)
        
        for traj, weight in zip(trajectories, weights):
            loss = self._compute_sft_loss(traj)
            weighted_loss = weight * loss
            weighted_loss.backward()


class TrajectoryRanking:
    """
    轨迹排序学习
    
    类似 DPO,但在轨迹级别进行偏好学习
    """
    
    def __init__(self, model, ref_model, tokenizer, beta: float = 0.1):
        self.model = model
        self.ref_model = ref_model
        self.tokenizer = tokenizer
        self.beta = beta
    
    def create_pairs(self, trajectories: List[Dict]) -> List[tuple]:
        """创建轨迹对"""
        pairs = []
        
        # 按任务分组
        by_task = {}
        for traj in trajectories:
            task_id = traj['task_id']
            if task_id not in by_task:
                by_task[task_id] = []
            by_task[task_id].append(traj)
        
        # 在同一任务内创建对
        for task_id, task_trajs in by_task.items():
            sorted_trajs = sorted(task_trajs, key=lambda t: t['final_reward'], reverse=True)
            
            for i in range(len(sorted_trajs)):
                for j in range(i + 1, len(sorted_trajs)):
                    if sorted_trajs[i]['final_reward'] > sorted_trajs[j]['final_reward']:
                        pairs.append((sorted_trajs[i], sorted_trajs[j]))
        
        return pairs
    
    def train(self, trajectories: List[Dict]):
        """轨迹排序训练"""
        pairs = self.create_pairs(trajectories)
        
        for better_traj, worse_traj in pairs:
            # DPO 风格的损失
            loss = self._compute_ranking_loss(better_traj, worse_traj)
            loss.backward()

7.2 基于反馈的微调

"""
基于反馈的微调方法
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional

class FeedbackSource(ABC):
    """反馈来源抽象基类"""
    
    @abstractmethod
    def get_feedback(self, trajectory: Dict) -> Dict:
        """获取对轨迹的反馈"""
        pass


class HumanFeedback(FeedbackSource):
    """人类反馈"""
    
    def get_feedback(self, trajectory: Dict) -> Dict:
        """
        收集人类反馈
        
        反馈类型:
        - 评分 (1-5)
        - 对比 (A vs B)
        - 修正 (提供更好的动作)
        """
        # 实际实现会调用标注平台 API
        pass


class LLMFeedback(FeedbackSource):
    """LLM 自动反馈"""
    
    def __init__(self, critic_model):
        self.critic = critic_model
    
    def get_feedback(self, trajectory: Dict) -> Dict:
        """使用强大的 LLM 评估轨迹"""
        prompt = f"""
        请评估以下 Agent 执行轨迹的质量:
        
        任务: {trajectory['instruction']}
        
        执行步骤:
        {self._format_steps(trajectory['steps'])}
        
        最终结果: {trajectory['final_outcome']}
        
        请从以下维度评分 (1-10):
        1. 任务理解: Agent 是否正确理解了任务?
        2. 推理质量: 每步推理是否合理?
        3. 效率: 是否用最少的步骤完成?
        4. 错误恢复: 遇到错误时是否能正确处理?
        
        输出 JSON 格式。
        """
        
        response = self.critic.generate(prompt)
        return self._parse_feedback(response)


class EnvironmentFeedback(FeedbackSource):
    """环境反馈"""
    
    def __init__(self, reward_function):
        self.reward_fn = reward_function
    
    def get_feedback(self, trajectory: Dict) -> Dict:
        """基于环境的自动奖励"""
        return {
            'task_completion': trajectory['final_reward'],
            'step_rewards': [self.reward_fn(step) for step in trajectory['steps']],
            'efficiency': 1.0 / len(trajectory['steps'])  # 步数越少越好
        }


class FeedbackBasedTrainer:
    """基于反馈的训练器"""
    
    def __init__(
        self,
        model,
        feedback_sources: List[FeedbackSource],
        aggregation: str = "weighted_average"
    ):
        self.model = model
        self.feedback_sources = feedback_sources
        self.aggregation = aggregation
    
    def collect_feedback(self, trajectory: Dict) -> Dict:
        """从多个来源收集并聚合反馈"""
        all_feedback = []
        
        for source in self.feedback_sources:
            feedback = source.get_feedback(trajectory)
            all_feedback.append(feedback)
        
        return self._aggregate_feedback(all_feedback)
    
    def train_with_feedback(self, trajectories: List[Dict]):
        """使用反馈训练"""
        for traj in trajectories:
            feedback = self.collect_feedback(traj)
            
            # 根据反馈类型选择训练方式
            if 'correction' in feedback:
                # 使用修正数据做 SFT
                self._train_on_correction(traj, feedback['correction'])
            elif 'score' in feedback:
                # 使用评分加权训练
                self._weighted_training(traj, feedback['score'])
            elif 'comparison' in feedback:
                # 使用对比数据做 DPO
                self._dpo_training(traj, feedback['comparison'])

7.3 基于环境的微调

"""
基于环境的微调方法 - 让 Agent 在真实环境中学习
"""
import gymnasium as gym
from typing import List, Dict, Callable

class EnvironmentBasedTrainer:
    """基于环境交互的训练器"""
    
    def __init__(
        self,
        model,
        env: gym.Env,
        tokenizer,
        reward_shaping: Callable = None
    ):
        self.model = model
        self.env = env
        self.tokenizer = tokenizer
        self.reward_shaping = reward_shaping or (lambda r, s, a: r)
    
    def collect_online_data(self, num_episodes: int) -> List[Dict]:
        """在环境中在线收集数据"""
        trajectories = []
        
        for _ in range(num_episodes):
            obs = self.env.reset()
            trajectory = {'steps': [], 'total_reward': 0}
            done = False
            
            while not done:
                # Agent 生成动作
                action = self._generate_action(obs)
                
                # 环境执行
                next_obs, reward, done, info = self.env.step(action)
                
                # 奖励塑形
                shaped_reward = self.reward_shaping(reward, obs, action)
                
                trajectory['steps'].append({
                    'observation': obs,
                    'action': action,
                    'reward': shaped_reward,
                    'next_observation': next_obs
                })
                trajectory['total_reward'] += reward
                obs = next_obs
            
            trajectories.append(trajectory)
        
        return trajectories
    
    def online_rl_training(self, num_iterations: int):
        """
        在线强化学习训练
        
        流程:
        1. 在环境中收集数据
        2. 更新模型
        3. 重复
        """
        for i in range(num_iterations):
            # 收集数据
            trajectories = self.collect_online_data(num_episodes=32)
            
            # 计算奖励统计
            mean_reward = sum(t['total_reward'] for t in trajectories) / len(trajectories)
            
            # 更新模型 (可以用 PPO, DPO 等)
            self._update_model(trajectories)
            
            print(f"Iteration {i}: Mean Reward = {mean_reward:.3f}")


class CurriculumLearning:
    """
    课程学习 - 由易到难训练
    """
    
    def __init__(
        self,
        model,
        environments: List[gym.Env],  # 按难度排序
        difficulty_threshold: float = 0.8
    ):
        self.model = model
        self.envs = environments
        self.threshold = difficulty_threshold
        self.current_level = 0
    
    def should_advance(self, success_rate: float) -> bool:
        """判断是否应该进入下一难度"""
        return success_rate >= self.threshold
    
    def train(self):
        """课程学习训练"""
        while self.current_level < len(self.envs):
            env = self.envs[self.current_level]
            print(f"\n=== Level {self.current_level + 1}/{len(self.envs)} ===")
            
            # 在当前难度训练
            for epoch in range(100):
                trajectories = self._collect_data(env, num_episodes=50)
                success_rate = self._compute_success_rate(trajectories)
                
                self._update_model(trajectories)
                
                print(f"Epoch {epoch}: Success Rate = {success_rate:.2%}")
                
                if self.should_advance(success_rate):
                    print(f"Advancing to level {self.current_level + 2}!")
                    self.current_level += 1
                    break

7.4 自我进化训练

"""
自我进化训练 - Agent 自我改进
"""
from typing import List, Dict, Tuple
import random

class SelfEvolvingAgent:
    """
    自我进化 Agent
    
    核心思想:Agent 生成数据 → 自我评估 → 在好数据上训练
    """
    
    def __init__(
        self,
        model,
        critic_model,  # 用于自我评估
        tokenizer,
        evolution_config: Dict
    ):
        self.model = model
        self.critic = critic_model
        self.tokenizer = tokenizer
        self.config = evolution_config
    
    def self_play(self, tasks: List[str], num_attempts: int = 5) -> List[Dict]:
        """
        自我对弈:对每个任务多次尝试
        """
        all_trajectories = []
        
        for task in tasks:
            task_trajectories = []
            
            for _ in range(num_attempts):
                # 生成轨迹(使用不同温度增加多样性)
                temperature = random.uniform(0.5, 1.0)
                trajectory = self._generate_trajectory(task, temperature)
                task_trajectories.append(trajectory)
            
            all_trajectories.extend(task_trajectories)
        
        return all_trajectories
    
    def self_evaluate(self, trajectories: List[Dict]) -> List[Tuple[Dict, float]]:
        """
        自我评估:使用 critic 模型评估轨迹质量
        """
        evaluated = []
        
        for traj in trajectories:
            # 构造评估 prompt
            eval_prompt = f"""
            评估以下任务执行的质量 (0-1 分):
            
            任务: {traj['instruction']}
            
            执行过程:
            {self._format_steps(traj['steps'])}
            
            评分标准:
            - 任务是否完成
            - 步骤是否合理
            - 效率是否足够
            
            只输出一个 0-1 之间的数字。
            """
            
            score_text = self.critic.generate(eval_prompt, max_tokens=10)
            score = float(score_text.strip())
            
            evaluated.append((traj, score))
        
        return evaluated
    
    def self_select(
        self, 
        evaluated_trajectories: List[Tuple[Dict, float]],
        selection_method: str = "top_k"
    ) -> List[Dict]:
        """
        自我选择:挑选用于训练的数据
        """
        if selection_method == "top_k":
            # 选择评分最高的 k%
            k = self.config.get('selection_ratio', 0.3)
            sorted_trajs = sorted(evaluated_trajectories, key=lambda x: x[1], reverse=True)
            n_select = max(1, int(len(sorted_trajs) * k))
            return [t[0] for t in sorted_trajs[:n_select]]
        
        elif selection_method == "threshold":
            # 选择超过阈值的
            threshold = self.config.get('score_threshold', 0.7)
            return [t[0] for t, s in evaluated_trajectories if s >= threshold]
        
        elif selection_method == "rejection_sampling":
            # 拒绝采样
            selected = []
            for traj, score in evaluated_trajectories:
                if random.random() < score:
                    selected.append(traj)
            return selected
    
    def self_improve(self, selected_trajectories: List[Dict]):
        """
        自我改进:使用筛选后的数据训练
        """
        # 可以使用多种训练方法
        if self.config.get('training_method') == 'sft':
            self._sft_training(selected_trajectories)
        elif self.config.get('training_method') == 'dpo':
            # 需要创建偏好对
            pairs = self._create_preference_pairs(selected_trajectories)
            self._dpo_training(pairs)
    
    def evolve(self, tasks: List[str], num_generations: int = 10):
        """
        完整进化循环
        """
        for gen in range(num_generations):
            print(f"\n{'='*50}")
            print(f"Generation {gen + 1}/{num_generations}")
            print(f"{'='*50}")
            
            # 1. 自我对弈
            print("[1/4] Self-play...")
            trajectories = self.self_play(tasks)
            
            # 2. 自我评估
            print("[2/4] Self-evaluate...")
            evaluated = self.self_evaluate(trajectories)
            mean_score = sum(s for _, s in evaluated) / len(evaluated)
            print(f"      Mean score: {mean_score:.3f}")
            
            # 3. 自我选择
            print("[3/4] Self-select...")
            selected = self.self_select(evaluated)
            print(f"      Selected {len(selected)}/{len(trajectories)} trajectories")
            
            # 4. 自我改进
            print("[4/4] Self-improve...")
            self.self_improve(selected)


class ReflectionBasedImprovement:
    """
    基于反思的改进
    
    让 Agent 反思自己的错误并学习
    """
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def generate_reflection(self, failed_trajectory: Dict) -> str:
        """让 Agent 反思失败的原因"""
        prompt = f"""
        以下是一次失败的任务执行:
        
        任务: {failed_trajectory['instruction']}
        
        执行过程:
        {self._format_steps(failed_trajectory['steps'])}
        
        结果: 任务失败
        
        请分析:
        1. 哪一步开始出错了?
        2. 错误的原因是什么?
        3. 正确的做法应该是什么?
        
        输出你的反思:
        """
        
        reflection = self.model.generate(prompt, max_tokens=500)
        return reflection
    
    def generate_improved_trajectory(
        self, 
        task: str, 
        reflection: str
    ) -> Dict:
        """基于反思生成改进的轨迹"""
        prompt = f"""
        任务: {task}
        
        上次执行失败了,反思如下:
        {reflection}
        
        现在请重新执行任务,避免之前的错误:
        """
        
        # 生成新轨迹
        return self._generate_trajectory(prompt)
    
    def reflection_learning(
        self, 
        failed_trajectories: List[Dict],
        success_trajectories: List[Dict]
    ):
        """
        反思学习流程
        
        1. 对失败轨迹生成反思
        2. 基于反思生成改进轨迹
        3. 结合成功轨迹训练
        """
        improved_data = []
        
        for failed_traj in failed_trajectories:
            # 生成反思
            reflection = self.generate_reflection(failed_traj)
            
            # 生成改进轨迹
            improved_traj = self.generate_improved_trajectory(
                failed_traj['instruction'],
                reflection
            )
            
            # 如果改进后成功,加入训练数据
            if improved_traj['success']:
                improved_data.append({
                    'original': failed_traj,
                    'reflection': reflection,
                    'improved': improved_traj
                })
        
        # 训练数据 = 原始成功轨迹 + 改进后的轨迹
        training_data = success_trajectories + [d['improved'] for d in improved_data]
        
        # 训练
        self._train(training_data)

8. 实战项目:构建可训练的 Agent 🛠️

本节将通过一个完整的实战项目,演示如何构建和训练一个 Agent。

8.1 项目架构设计

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        可训练 Agent 项目架构                                     │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   trainable_agent/                                                              │
│   ├── config/                     # 配置文件                                     │
│   │   ├── model_config.yaml       # 模型配置                                     │
│   │   ├── training_config.yaml    # 训练配置                                     │
│   │   └── env_config.yaml         # 环境配置                                     │
│   │                                                                             │
│   ├── data/                       # 数据处理                                     │
│   │   ├── collectors/             # 数据收集器                                   │
│   │   │   ├── trajectory_collector.py                                           │
│   │   │   └── feedback_collector.py                                             │
│   │   ├── processors/             # 数据处理器                                   │
│   │   │   ├── format_converter.py                                               │
│   │   │   └── quality_filter.py                                                 │
│   │   └── datasets/               # 数据集定义                                   │
│   │       ├── sft_dataset.py                                                    │
│   │       └── preference_dataset.py                                             │
│   │                                                                             │
│   ├── models/                     # 模型相关                                     │
│   │   ├── agent.py                # Agent 主类                                   │
│   │   ├── reward_model.py         # 奖励模型                                     │
│   │   └── value_model.py          # 价值模型                                     │
│   │                                                                             │
│   ├── training/                   # 训练相关                                     │
│   │   ├── sft_trainer.py          # SFT 训练器                                   │
│   │   ├── rm_trainer.py           # RM 训练器                                    │
│   │   ├── ppo_trainer.py          # PPO 训练器                                   │
│   │   ├── dpo_trainer.py          # DPO 训练器                                   │
│   │   └── evol_trainer.py         # 进化训练器                                   │
│   │                                                                             │
│   ├── environments/               # 环境接口                                     │
│   │   ├── base_env.py             # 环境基类                                     │
│   │   ├── webshop_env.py          # WebShop 环境                                 │
│   │   └── tool_env.py             # 工具使用环境                                  │
│   │                                                                             │
│   ├── evaluation/                 # 评估相关                                     │
│   │   ├── metrics.py              # 评估指标                                     │
│   │   └── evaluator.py            # 评估器                                       │
│   │                                                                             │
│   ├── utils/                      # 工具函数                                     │
│   │   ├── logging.py                                                            │
│   │   └── checkpoint.py                                                         │
│   │                                                                             │
│   └── scripts/                    # 运行脚本                                     │
│       ├── train_sft.py                                                          │
│       ├── train_rl.py                                                           │
│       └── evaluate.py                                                           │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

8.2 数据收集与处理

"""
trainable_agent/data/collectors/trajectory_collector.py
"""
import json
from typing import List, Dict, Generator
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor

@dataclass
class Step:
    """单步数据"""
    thought: str
    action: str
    action_input: Dict
    observation: str
    reward: float = 0.0

@dataclass
class Trajectory:
    """轨迹数据"""
    task_id: str
    instruction: str
    steps: List[Step]
    final_reward: float
    metadata: Dict

class TrajectoryCollector:
    """轨迹收集器"""
    
    def __init__(self, agent, environments: List, config: Dict):
        self.agent = agent
        self.envs = environments
        self.config = config
        self.trajectories = []
    
    def collect_single(self, env, task: Dict) -> Trajectory:
        """收集单条轨迹"""
        obs = env.reset(task)
        steps = []
        done = False
        
        while not done and len(steps) < self.config['max_steps']:
            # Agent 决策
            thought, action = self.agent.act(obs)
            
            # 环境执行
            next_obs, reward, done, info = env.step(action)
            
            steps.append(Step(
                thought=thought,
                action=action['name'],
                action_input=action['input'],
                observation=next_obs,
                reward=reward
            ))
            
            obs = next_obs
        
        return Trajectory(
            task_id=task['id'],
            instruction=task['instruction'],
            steps=steps,
            final_reward=env.get_final_reward(),
            metadata={'env': env.name, 'num_steps': len(steps)}
        )
    
    def collect_batch(
        self, 
        tasks: List[Dict], 
        num_workers: int = 4
    ) -> List[Trajectory]:
        """批量收集轨迹"""
        trajectories = []
        
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = []
            for task in tasks:
                env = self._get_env_for_task(task)
                future = executor.submit(self.collect_single, env, task)
                futures.append(future)
            
            for future in futures:
                traj = future.result()
                trajectories.append(traj)
        
        return trajectories
    
    def collect_with_diversity(
        self, 
        tasks: List[Dict],
        samples_per_task: int = 5
    ) -> List[Trajectory]:
        """收集多样化的轨迹(同一任务多次尝试)"""
        trajectories = []
        
        for task in tasks:
            env = self._get_env_for_task(task)
            
            for i in range(samples_per_task):
                # 使用不同的温度
                self.agent.temperature = 0.5 + i * 0.1
                traj = self.collect_single(env, task)
                trajectories.append(traj)
        
        return trajectories
    
    def save(self, path: str):
        """保存收集的轨迹"""
        data = [asdict(t) for t in self.trajectories]
        with open(path, 'w') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)


"""
trainable_agent/data/processors/quality_filter.py
"""
from typing import List, Callable
from dataclasses import dataclass

@dataclass
class FilterConfig:
    """过滤配置"""
    min_reward: float = 0.0
    max_steps: int = 50
    min_steps: int = 1
    require_success: bool = False
    custom_filters: List[Callable] = None

class QualityFilter:
    """轨迹质量过滤器"""
    
    def __init__(self, config: FilterConfig):
        self.config = config
    
    def filter(self, trajectories: List[Trajectory]) -> List[Trajectory]:
        """过滤轨迹"""
        filtered = []
        
        for traj in trajectories:
            if self._passes_all_filters(traj):
                filtered.append(traj)
        
        print(f"Filtered: {len(filtered)}/{len(trajectories)} trajectories")
        return filtered
    
    def _passes_all_filters(self, traj: Trajectory) -> bool:
        """检查是否通过所有过滤条件"""
        # 基础过滤
        if traj.final_reward < self.config.min_reward:
            return False
        
        if len(traj.steps) > self.config.max_steps:
            return False
        
        if len(traj.steps) < self.config.min_steps:
            return False
        
        if self.config.require_success and traj.final_reward < 0.5:
            return False
        
        # 自定义过滤
        if self.config.custom_filters:
            for filter_fn in self.config.custom_filters:
                if not filter_fn(traj):
                    return False
        
        return True
    
    def filter_by_percentile(
        self, 
        trajectories: List[Trajectory],
        top_percentile: float = 0.3
    ) -> List[Trajectory]:
        """保留 top percentile 的轨迹"""
        sorted_trajs = sorted(
            trajectories, 
            key=lambda t: t.final_reward, 
            reverse=True
        )
        n_keep = max(1, int(len(sorted_trajs) * top_percentile))
        return sorted_trajs[:n_keep]

8.3 训练流程实现

"""
trainable_agent/training/pipeline.py

完整的训练 Pipeline
"""
import torch
from typing import Dict, List, Optional
from pathlib import Path
import yaml

class TrainingPipeline:
    """
    完整训练流程
    
    支持多种训练方式的组合
    """
    
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)
        
        self.model = None
        self.reward_model = None
        self.tokenizer = None
        
        self._setup()
    
    def _setup(self):
        """初始化模型和组件"""
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        model_config = self.config['model']
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_config['name'])
        self.model = AutoModelForCausalLM.from_pretrained(
            model_config['name'],
            torch_dtype=torch.bfloat16 if model_config.get('bf16') else torch.float32,
            device_map="auto"
        )
        
        # 如果使用 LoRA
        if model_config.get('use_lora'):
            from peft import get_peft_model, LoraConfig
            
            lora_config = LoraConfig(
                r=model_config.get('lora_r', 16),
                lora_alpha=model_config.get('lora_alpha', 32),
                target_modules=["q_proj", "v_proj"],
                lora_dropout=0.05,
                bias="none",
            )
            self.model = get_peft_model(self.model, lora_config)
    
    def stage_sft(self, data_path: str) -> None:
        """
        Stage 1: 监督微调
        """
        print("\n" + "="*60)
        print("Stage 1: Supervised Fine-Tuning")
        print("="*60)
        
        from .sft_trainer import SFTTrainer
        
        # 加载数据
        with open(data_path) as f:
            sft_data = json.load(f)
        
        # 创建训练器
        trainer = SFTTrainer(
            model=self.model,
            tokenizer=self.tokenizer,
            config=self.config['sft']
        )
        
        # 训练
        trainer.train(sft_data)
        
        # 保存
        save_path = Path(self.config['output_dir']) / "sft_model"
        trainer.save(save_path)
        
        print(f"SFT model saved to {save_path}")
    
    def stage_rm(self, preference_data_path: str) -> None:
        """
        Stage 2: 奖励模型训练
        """
        print("\n" + "="*60)
        print("Stage 2: Reward Model Training")
        print("="*60)
        
        from .rm_trainer import RMTrainer
        from ..models.reward_model import RewardModel
        
        # 初始化奖励模型(从 SFT 模型初始化)
        self.reward_model = RewardModel(self.model)
        
        # 加载偏好数据
        with open(preference_data_path) as f:
            preference_data = json.load(f)
        
        # 创建训练器
        trainer = RMTrainer(
            reward_model=self.reward_model,
            tokenizer=self.tokenizer,
            config=self.config['rm']
        )
        
        # 训练
        trainer.train(preference_data)
        
        # 保存
        save_path = Path(self.config['output_dir']) / "reward_model"
        trainer.save(save_path)
        
        print(f"Reward model saved to {save_path}")
    
    def stage_rl(
        self, 
        prompts_path: str,
        method: str = "ppo"  # ppo, dpo, evol
    ) -> None:
        """
        Stage 3: 强化学习
        """
        print("\n" + "="*60)
        print(f"Stage 3: Reinforcement Learning ({method.upper()})")
        print("="*60)
        
        # 加载 prompts
        with open(prompts_path) as f:
            prompts = json.load(f)
        
        if method == "ppo":
            from .ppo_trainer import PPOTrainer
            
            # 加载参考模型
            ref_model = AutoModelForCausalLM.from_pretrained(
                self.config['model']['name'],
                torch_dtype=torch.bfloat16,
                device_map="auto"
            )
            
            trainer = PPOTrainer(
                policy_model=self.model,
                ref_model=ref_model,
                reward_model=self.reward_model,
                tokenizer=self.tokenizer,
                config=self.config['ppo']
            )
            
        elif method == "dpo":
            from .dpo_trainer import DPOTrainer
            
            ref_model = AutoModelForCausalLM.from_pretrained(
                self.config['model']['name'],
                torch_dtype=torch.bfloat16,
                device_map="auto"
            )
            
            trainer = DPOTrainer(
                model=self.model,
                ref_model=ref_model,
                tokenizer=self.tokenizer,
                config=self.config['dpo']
            )
            
        elif method == "evol":
            from .evol_trainer import EvolTrainer
            from ..environments import load_environments
            
            envs = load_environments(self.config['environments'])
            
            trainer = EvolTrainer(
                model=self.model,
                tokenizer=self.tokenizer,
                environments=envs,
                config=self.config['evol']
            )
        
        # 训练
        trainer.train(prompts)
        
        # 保存
        save_path = Path(self.config['output_dir']) / f"{method}_model"
        trainer.save(save_path)
        
        print(f"RL model saved to {save_path}")
    
    def run_full_pipeline(
        self,
        sft_data: str,
        preference_data: str,
        rl_prompts: str,
        rl_method: str = "ppo"
    ):
        """运行完整训练流程"""
        # Stage 1
        self.stage_sft(sft_data)
        
        # Stage 2
        self.stage_rm(preference_data)
        
        # Stage 3
        self.stage_rl(rl_prompts, method=rl_method)
        
        print("\n" + "="*60)
        print("Training Complete!")
        print("="*60)


# 使用示例
if __name__ == "__main__":
    pipeline = TrainingPipeline("config/training_config.yaml")
    
    pipeline.run_full_pipeline(
        sft_data="data/sft_trajectories.json",
        preference_data="data/preferences.json",
        rl_prompts="data/rl_prompts.json",
        rl_method="dpo"
    )

8.4 评估与迭代

"""
trainable_agent/evaluation/evaluator.py
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
import numpy as np
from collections import defaultdict

@dataclass
class EvalResult:
    """评估结果"""
    success_rate: float
    avg_reward: float
    avg_steps: float
    by_task_type: Dict[str, Dict]
    by_difficulty: Dict[str, Dict]
    detailed_results: List[Dict]

class AgentEvaluator:
    """Agent 评估器"""
    
    def __init__(self, agent, environments: List, config: Dict):
        self.agent = agent
        self.envs = environments
        self.config = config
    
    def evaluate(
        self, 
        test_tasks: List[Dict],
        verbose: bool = True
    ) -> EvalResult:
        """
        全面评估 Agent
        """
        results = []
        
        for task in test_tasks:
            env = self._get_env_for_task(task)
            result = self._evaluate_single(env, task)
            results.append(result)
            
            if verbose:
                status = "✓" if result['success'] else "✗"
                print(f"  {status} Task {task['id']}: "
                      f"reward={result['reward']:.2f}, "
                      f"steps={result['num_steps']}")
        
        # 汇总统计
        return self._aggregate_results(results)
    
    def _evaluate_single(self, env, task: Dict) -> Dict:
        """评估单个任务"""
        obs = env.reset(task)
        steps = []
        done = False
        
        # 使用 greedy decoding
        self.agent.temperature = 0.0
        
        while not done and len(steps) < self.config['max_steps']:
            thought, action = self.agent.act(obs)
            next_obs, reward, done, info = env.step(action)
            
            steps.append({
                'thought': thought,
                'action': action,
                'observation': next_obs,
                'reward': reward
            })
            
            obs = next_obs
        
        final_reward = env.get_final_reward()
        
        return {
            'task_id': task['id'],
            'task_type': task.get('type', 'unknown'),
            'difficulty': task.get('difficulty', 'medium'),
            'success': final_reward > 0.5,
            'reward': final_reward,
            'num_steps': len(steps),
            'steps': steps
        }
    
    def _aggregate_results(self, results: List[Dict]) -> EvalResult:
        """汇总评估结果"""
        # 总体指标
        successes = [r['success'] for r in results]
        rewards = [r['reward'] for r in results]
        steps = [r['num_steps'] for r in results]
        
        # 按任务类型分组
        by_type = defaultdict(list)
        for r in results:
            by_type[r['task_type']].append(r)
        
        type_stats = {}
        for task_type, type_results in by_type.items():
            type_stats[task_type] = {
                'count': len(type_results),
                'success_rate': np.mean([r['success'] for r in type_results]),
                'avg_reward': np.mean([r['reward'] for r in type_results])
            }
        
        # 按难度分组
        by_difficulty = defaultdict(list)
        for r in results:
            by_difficulty[r['difficulty']].append(r)
        
        diff_stats = {}
        for diff, diff_results in by_difficulty.items():
            diff_stats[diff] = {
                'count': len(diff_results),
                'success_rate': np.mean([r['success'] for r in diff_results]),
                'avg_reward': np.mean([r['reward'] for r in diff_results])
            }
        
        return EvalResult(
            success_rate=np.mean(successes),
            avg_reward=np.mean(rewards),
            avg_steps=np.mean(steps),
            by_task_type=type_stats,
            by_difficulty=diff_stats,
            detailed_results=results
        )
    
    def compare_models(
        self,
        models: Dict[str, "Agent"],
        test_tasks: List[Dict]
    ) -> Dict:
        """
        对比多个模型
        """
        comparison = {}
        
        for name, model in models.items():
            print(f"\nEvaluating {name}...")
            self.agent = model
            result = self.evaluate(test_tasks, verbose=False)
            
            comparison[name] = {
                'success_rate': result.success_rate,
                'avg_reward': result.avg_reward,
                'avg_steps': result.avg_steps
            }
        
        # 打印对比表格
        print("\n" + "="*60)
        print("Model Comparison")
        print("="*60)
        print(f"{'Model':<20} {'Success Rate':<15} {'Avg Reward':<15} {'Avg Steps':<10}")
        print("-"*60)
        
        for name, metrics in comparison.items():
            print(f"{name:<20} {metrics['success_rate']:.2%}         "
                  f"{metrics['avg_reward']:.3f}          {metrics['avg_steps']:.1f}")
        
        return comparison


def run_evaluation():
    """运行评估脚本"""
    from trainable_agent.models.agent import TrainedAgent
    from trainable_agent.environments import load_environments
    
    # 加载模型
    agent = TrainedAgent.from_pretrained("./output/dpo_model")
    
    # 加载测试环境
    envs = load_environments("config/env_config.yaml")
    
    # 加载测试任务
    with open("data/test_tasks.json") as f:
        test_tasks = json.load(f)
    
    # 评估
    evaluator = AgentEvaluator(
        agent=agent,
        environments=envs,
        config={'max_steps': 30}
    )
    
    result = evaluator.evaluate(test_tasks)
    
    # 打印结果
    print("\n" + "="*60)
    print("Evaluation Results")
    print("="*60)
    print(f"Success Rate: {result.success_rate:.2%}")
    print(f"Average Reward: {result.avg_reward:.3f}")
    print(f"Average Steps: {result.avg_steps:.1f}")
    
    print("\nBy Task Type:")
    for task_type, stats in result.by_task_type.items():
        print(f"  {task_type}: {stats['success_rate']:.2%} ({stats['count']} tasks)")
    
    print("\nBy Difficulty:")
    for diff, stats in result.by_difficulty.items():
        print(f"  {diff}: {stats['success_rate']:.2%} ({stats['count']} tasks)")


if __name__ == "__main__":
    run_evaluation()

9. 前沿探索:未来发展方向 🔮

9.1 多模态 Agent 训练

┌─────────────────────────────────────────────────────────────────────────────────┐
│                      多模态 Agent 训练趋势                                       │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   当前挑战:                                                                      │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │  • 视觉-语言-动作的对齐                                                  │   │
│   │  • 多模态奖励的设计                                                      │   │
│   │  • 跨模态的泛化能力                                                      │   │
│   │  • 实时交互的效率                                                        │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   研究方向:                                                                      │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   1. Vision-Language-Action (VLA) Models                                │   │
│   │      └── RT-2, OpenVLA 等模型将视觉理解与动作生成统一                     │   │
│   │                                                                         │   │
│   │   2. 多模态 RLHF                                                        │   │
│   │      └── 人类对图像/视频输出的偏好学习                                   │   │
│   │                                                                         │   │
│   │   3. 世界模型 (World Model)                                             │   │
│   │      └── 学习环境动态,在想象中进行规划                                  │   │
│   │                                                                         │   │
│   │   4. 具身智能 (Embodied AI)                                             │   │
│   │      └── 机器人控制、自动驾驶等实体交互                                  │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

9.2 大规模 Agent 协作训练

"""
多 Agent 协作训练 - 前沿方向
"""

class MultiAgentTraining:
    """
    多 Agent 协作训练
    
    研究方向:
    1. Agent 之间的通信协议学习
    2. 任务分工与协作
    3. 竞争与合作的平衡
    """
    
    def __init__(self, agents: List, environment):
        self.agents = agents
        self.env = environment
    
    def train_communication(self):
        """训练 Agent 之间的通信"""
        # 允许 Agent 相互发送消息
        # 通过任务完成奖励学习有效的通信协议
        pass
    
    def train_role_specialization(self):
        """训练角色专业化"""
        # 不同 Agent 专注于不同子任务
        # 通过 reward shaping 鼓励分工
        pass


class HierarchicalAgentTraining:
    """
    层次化 Agent 训练
    
    高层: 规划和分解任务
    低层: 执行具体动作
    """
    
    def __init__(self):
        self.high_level_agent = None  # 规划者
        self.low_level_agent = None   # 执行者
    
    def train_hierarchical(self):
        """
        层次化训练:
        1. 先训练低层 Agent 完成基础技能
        2. 再训练高层 Agent 学习组合技能
        """
        pass

9.3 持续学习与适应

┌─────────────────────────────────────────────────────────────────────────────────┐
│                         Agent 持续学习                                           │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   目标: Agent 能够持续从新经验中学习,而不遗忘旧知识                               │
│                                                                                 │
│   技术路线:                                                                      │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │                                                                         │   │
│   │   1. Elastic Weight Consolidation (EWC)                                 │   │
│   │      └── 保护重要参数,防止灾难性遗忘                                    │   │
│   │                                                                         │   │
│   │   2. Progressive Neural Networks                                        │   │
│   │      └── 为新任务添加新模块,保留旧模块                                  │   │
│   │                                                                         │   │
│   │   3. Replay-based Methods                                               │   │
│   │      └── 存储旧经验,混合新旧数据训练                                    │   │
│   │                                                                         │   │
│   │   4. Meta-Learning                                                      │   │
│   │      └── 学习如何快速适应新任务                                         │   │
│   │                                                                         │   │
│   └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│   应用场景:                                                                      │
│   • 个性化 Agent: 适应特定用户的偏好                                             │
│   • 领域适应: 从通用到专业领域                                                   │
│   • 环境变化: 适应新的工具和 API                                                 │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

9.4 安全对齐的强化学习

"""
安全对齐的 Agent 训练
"""

class SafeRLTraining:
    """
    安全强化学习训练
    
    确保 Agent 行为安全、可控、符合人类价值观
    """
    
    def __init__(self, agent, safety_constraints: List):
        self.agent = agent
        self.constraints = safety_constraints
    
    def constrained_policy_optimization(self):
        """
        约束策略优化
        
        max E[reward] 
        s.t. E[cost_i] <= c_i for all i
        
        cost 可以是:
        - 危险动作的概率
        - 有害内容的生成
        - 违反规则的行为
        """
        pass
    
    def reward_hacking_prevention(self):
        """
        防止奖励黑客 (Reward Hacking)
        
        技术:
        1. 奖励模型集成 (多个 RM 投票)
        2. 对抗训练 (寻找 RM 漏洞)
        3. 保守估计 (悲观奖励下界)
        """
        pass
    
    def interpretable_agent(self):
        """
        可解释 Agent
        
        让 Agent 的决策过程透明:
        1. 思维链 (Chain-of-Thought)
        2. 决策归因 (Attribution)
        3. 反事实解释 (Counterfactual)
        """
        pass

10. 总结与最佳实践 📝

10.1 方法选择指南

┌─────────────────────────────────────────────────────────────────────────────────┐
│                         Agent 训练方法选择决策树                                  │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│                              开始                                                │
│                                │                                                │
│                   ┌────────────┴────────────┐                                   │
│                   │  有高质量标注数据?       │                                   │
│                   └────────────┬────────────┘                                   │
│                        │              │                                         │
│                       是             否                                         │
│                        │              │                                         │
│                        ▼              ▼                                         │
│               ┌────────────┐  ┌────────────────┐                                │
│               │    SFT     │  │ 可以在环境中采样?│                                │
│               └─────┬──────┘  └───────┬────────┘                                │
│                     │                  │                                         │
│            ┌────────┴────────┐    是   │   否                                   │
│            │  有偏好数据?     │         │    │                                   │
│            └────────┬────────┘         ▼    ▼                                   │
│               │           │     ┌──────────┐ ┌──────────┐                       │
│              是          否     │AgentEvol │ │零/少样本 │                        │
│               │           │     │进化训练  │ │ 提示工程 │                        │
│               ▼           ▼     └──────────┘ └──────────┘                       │
│        ┌──────────┐  ┌────────────┐                                             │
│        │ 计算资源? │  │ 使用 KTO   │                                             │
│        └────┬─────┘  │(单边反馈)  │                                             │
│             │        └────────────┘                                             │
│      充足   │   有限                                                            │
│             │    │                                                              │
│             ▼    ▼                                                              │
│       ┌─────────┐ ┌─────────┐                                                   │
│       │RLHF/PPO │ │DPO/ORPO │                                                   │
│       └─────────┘ └─────────┘                                                   │
│                                                                                 │
│                                                                                 │
│   需要推理能力? ─── 是 ───→ 考虑 R3 (推理过程奖励)                               │
│                                                                                 │
│   需要环境交互? ─── 是 ───→ 考虑 AgentGym 框架                                   │
│                                                                                 │
│   需要自我改进? ─── 是 ───→ 考虑自进化训练                                        │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

10.2 最佳实践清单

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           Agent 训练最佳实践                                     │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  📊 数据准备                                                                     │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │  ✓ 确保数据多样性:覆盖各种任务类型和难度                                 │    │
│  │  ✓ 质量优先:宁可少而精,不可多而杂                                       │    │
│  │  ✓ 格式统一:使用一致的 Thought-Action-Observation 格式                  │    │
│  │  ✓ 包含失败样本:让模型学会处理错误情况                                   │    │
│  │  ✓ 标注清晰:偏好标注要有明确的标准                                       │    │
│  └─────────────────────────────────────────────────────────────────────────┘    │
│                                                                                 │
│  🔧 训练配置                                                                     │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │  ✓ 从小规模开始:先验证流程,再扩大规模                                   │    │
│  │  ✓ 使用 LoRA:减少显存占用,加速迭代                                      │    │
│  │  ✓ 监控 KL 散度:防止策略偏离太远                                         │    │
│  │  ✓ 设置 early stopping:根据验证集指标                                    │    │
│  │  ✓ 保存检查点:方便回滚和对比                                             │    │
│  └─────────────────────────────────────────────────────────────────────────┘    │
│                                                                                 │
│  📈 评估策略                                                                     │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │  ✓ 多维度评估:成功率、效率、安全性                                       │    │
│  │  ✓ 使用保留测试集:不参与训练的独立数据                                   │    │
│  │  ✓ 人工抽检:定期人工评估样本                                             │    │
│  │  ✓ A/B 测试:与基线模型对比                                               │    │
│  │  ✓ 错误分析:分类分析失败案例                                             │    │
│  └─────────────────────────────────────────────────────────────────────────┘    │
│                                                                                 │
│  ⚠️ 常见陷阱                                                                    │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │  ✗ Reward Hacking:模型找到奖励漏洞                                       │    │
│  │    → 解决:使用多个 RM,设置奖励上限                                      │    │
│  │                                                                          │    │
│  │  ✗ 模式崩塌:生成内容多样性下降                                           │    │
│  │    → 解决:增加 KL 惩罚,使用温度采样                                     │    │
│  │                                                                          │    │
│  │  ✗ 过拟合:在训练任务上好,泛化差                                         │    │
│  │    → 解决:数据增强,正则化,早停                                         │    │
│  │                                                                          │    │
│  │  ✗ 训练不稳定:损失震荡或发散                                             │    │
│  │    → 解决:降低学习率,减小 batch size,梯度裁剪                          │    │
│  └─────────────────────────────────────────────────────────────────────────┘    │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

10.3 核心要点总结

💡 本文核心要点

  1. RLHF 是 LLM 对齐的基础范式

    • 三阶段流程:SFT → RM → PPO
    • 通过人类偏好学习奖励函数
    • InstructGPT 证明了其有效性
  2. DPO/ORPO/KTO 简化了偏好学习

    • 无需显式训练奖励模型
    • 训练更稳定,资源需求更低
    • 根据数据类型选择合适方法
  3. AgentGym 提供了 Agent 训练框架

    • 统一的环境接口
    • 大规模轨迹数据集
    • AgentEvol 进化训练方法
  4. R3 通过推理奖励提升学习效率

    • 过程奖励模型 (PRM) 评估每步推理
    • 密集奖励信号加速学习
    • 特别适合需要复杂推理的任务
  5. Agent 微调需要多种技术组合

    • 基于轨迹、反馈、环境的不同方法
    • 自我进化和反思改进
    • 根据场景灵活选择

11. 参考文献 📚

核心论文

  1. InstructGPT (2022)

    • Ouyang et al., “Training language models to follow instructions with human feedback”
    • OpenAI
    • arXiv:2203.02155
  2. PPO (2017)

    • Schulman et al., “Proximal Policy Optimization Algorithms”
    • OpenAI
    • arXiv:1707.06347
  3. DPO (2023)

    • Rafailov et al., “Direct Preference Optimization: Your Language Model is Secretly a Reward Model”
    • Stanford
    • arXiv:2305.18290
  4. ORPO (2024)

    • Hong et al., “ORPO: Monolithic Preference Optimization without Reference Model”
    • KAIST
    • arXiv:2403.07691
  5. KTO (2024)

    • Ethayarajh et al., “KTO: Model Alignment as Prospect Theoretic Optimization”
    • Stanford
    • arXiv:2402.01306
  6. AgentGym (2024)

    • Xi et al., “AgentGym: Evolving Large Language Model-based Agents across Diverse Environments”
    • Fudan University
    • arXiv:2406.04151
  7. Process Reward Models (2023)

  8. ReAct (2023)

    • Yao et al., “ReAct: Synergizing Reasoning and Acting in Language Models”
    • Princeton & Google
    • arXiv:2210.03629
  9. Self-Refine (2023)

    • Madaan et al., “Self-Refine: Iterative Refinement with Self-Feedback”
    • CMU
    • arXiv:2303.17651
  10. Constitutional AI (2022)

    • Bai et al., “Constitutional AI: Harmlessness from AI Feedback”
    • Anthropic
    • arXiv:2212.08073

GitHub 资源

  1. trl - Transformer Reinforcement Learning

    • https://github.com/huggingface/trl
    • HuggingFace 官方 RLHF 库
  2. DeepSpeed-Chat

    • https://github.com/microsoft/DeepSpeedExamples/tree/master/applications/DeepSpeed-Chat
    • Microsoft 的高效 RLHF 实现
  3. OpenRLHF

    • https://github.com/OpenLLMAI/OpenRLHF
    • 开源的 RLHF 训练框架
  4. AgentGym

    • https://github.com/WooooDyy/AgentGym
    • Agent 训练环境和数据集
  5. LLaMA-Factory

    • https://github.com/hiyouga/LLaMA-Factory
    • 支持多种微调方法的统一框架
  6. alignment-handbook

    • https://github.com/huggingface/alignment-handbook
    • HuggingFace 对齐训练指南
  7. MOSS-RLHF

    • https://github.com/OpenLMLab/MOSS-RLHF
    • 复旦 MOSS 的 RLHF 实现
  8. WebShop

    • https://github.com/princeton-nlp/WebShop
    • 网页购物 Agent 环境
  9. ALFWorld

    • https://github.com/alfworld/alfworld
    • 文本游戏 Agent 环境
  10. AgentBench

    • https://github.com/THUDM/AgentBench
    • Agent 评估基准

推荐学习资源

📖 教程和博客

  • HuggingFace RLHF 教程:https://huggingface.co/blog/rlhf
  • Chip Huyen 的 RLHF 解析:https://huyenchip.com/2023/05/02/rlhf.html
  • Lilian Weng 的 LLM 对齐综述:https://lilianweng.github.io/posts/2023-03-15-llm-agents/

📺 视频课程

  • Stanford CS224N (NLP with Deep Learning)
  • Berkeley CS285 (Deep Reinforcement Learning)
  • Anthropic’s AI Safety Research

🎉 恭喜你读完了本文!

Agent 训练是一个快速发展的领域,本文介绍的方法会持续演进。建议关注上述论文和 GitHub 仓库的更新,保持对最新技术的了解。

如果本文对你有帮助,欢迎点赞、收藏、分享!有问题欢迎在评论区讨论。


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐