值迭代(Value Iteration)是解决马尔可夫决策过程(MDP)的经典动态规划算法。其核心基于贝尔曼最优性原理,通过迭代方式求解最优价值函数。


  1. Bellman 方程
  2.  Bellman 最优方程
  3.  值迭代
  4.  Python 例子

一  Bellman 方程

  1.1 逐元素形式

    1.2  矩阵形式

    

  • V(s) :状态 s 的值,表示处于该状态的长期奖励。
  • r(s,a) :在状态 s 下采取行动 a所获得的即时奖励。
  • γ:折扣因子(介于 0 和 1 之间),用于确定未来奖励相对于即时奖励的重要性。
  • P(s_j|s)通过采取行动a从状态 s转换到状态 s_j的概率。
  • \pi 策略


二  Bellman 最优方程

        目的是找到最优策略\pi使得状态值V最大

        通过contraction mapping 理论,可知道最优状态值可以通过迭代更新求解

      Contraction Mapping 理论(压缩映像原理),又称 Banach 不动点定理(Banach Fixed-Point Theorem),是泛函分析和度量空间理论中的核心定理,为研究非线性方程(如代数方程、积分方程、微分方程)的解的存在性与唯一性提供了关键工具。其核心思想是通过构造一个“压缩”的自映射,证明其存在唯一的不动点,并通过迭代法构造性求解。

2.1  最优策略求解

    因为

    所以

          

例子:

   

2.2 最大状态值求解

   通过迭代更新,最后以指数级收敛(contraction mapping 理论)


   三  值迭代算法

        值迭代算法通过策略更新与值更新两个步骤不断迭代,直至收敛至最优策略。
具体流程如下:

  1. 根据当前状态值函数计算各状态-动作对的Q值

  2. 采用贪婪策略选择最优动作

  3. 依据更新后的策略重新计算状态值函数
    重复以上过程直至值函数收敛。

伪代码:值迭代算法

初始化

             已知概率模型 P(s_j|s_i,a)r(s_j|s_i,a).初始状态值 v_0.

目标

             寻找最优状态值和一个最优策略,解决贝尔曼最优方程

迭代过程

             当 v_k​ 在 ∥v_k-v_{k-1}​∥ 大于一个预定义的小阈值的意义下尚未收敛时,对于第 k 次迭代,执行以下步骤:  

       对于每一个状态 s∈S

            对于每一个动作 a∈A(s)

       step1: 执行 Q值计算:       

  •          

  • step2  策略更新

  •               

  • step3 值更新   

  •   这个算法通过不断迭代更新状态值函数 v 和策略 π,直到值函数收敛,从而找到最优策略


四  Python 例子

# -*- coding: utf-8 -*-
"""
Created on Fri Sep 12 15:36:54 2025

@author: chengxf2
"""

"""
贝尔曼值迭代算法实现
基于马尔可夫决策过程(MDP)的最优价值函数求解
"""

import numpy as np
from typing import Dict, List, Tuple
import matplotlib.pyplot as plt

STATE0 = 0
STATE1 = 1
STATE2 = 2  
MOVE_LEFT = 0
MOVE_RIGHT = 1


class BellmanValueIteration:
    """
    贝尔曼值迭代算法实现类
    
    通过迭代更新状态价值函数,最终收敛到最优价值函数和最优策略
    基于贝尔曼最优方程: V*(s) = max_a [ R(s,a) + γ * Σ P(s'|s,a) * V*(s') ]
    """
    
    def __init__(self, states: List[int], actions: List[int], 
                 transition_probs: Dict[Tuple[int, int, int], float],
                 rewards: Dict[Tuple[int, int, int], float],
                 gamma: float = 0.95, theta: float = 1e-6):
        """
        初始化值迭代算法
        
        Args:
            states: 状态列表 [0, 1, 2, ..., n-1]
            actions: 动作列表 [0, 1, 2, ..., m-1]
            transition_probs: 转移概率字典 {(s, a, s'): probability}
            rewards: 奖励字典 {(s, a, s'): reward}
            gamma: 折扣因子,默认0.95
            theta: 收敛阈值,默认1e-6
        """
        self.states = states
        self.actions = actions
        self.transition_probs = transition_probs
        self.rewards = rewards
        self.gamma = gamma  # 折扣因子
        self.theta = theta  # 收敛阈值
        
        # 初始化价值函数,所有状态价值为0
        self.values = {state: 0.0 for state in states}
        
        # 存储每次迭代的价值函数变化用于分析
        self.value_history = []
        self.delta_history = []
    
    def get_transition_info(self, state: int, action: int) -> List[Tuple[int, float, float]]:
        """
        获取给定状态和动作的所有可能转移信息
        
        Args:
            state: 当前状态
            action: 执行的动作
            
        Returns:
            列表,每个元素为(下一个状态, 转移概率, 即时奖励)
        """
        transitions = []
        for next_state in self.states:
            key = (state, action, next_state)
            if key in self.transition_probs and self.transition_probs[key] > 0:
                prob = self.transition_probs[key]
                reward = self.rewards.get(key, 0.0)
                transitions.append((next_state, prob, reward))
        return transitions
    
    def compute_q_value(self, state: int, action: int) -> float:
        """
        计算状态-动作对的Q值
        
        Args:
            state: 当前状态
            action: 执行的动作
            
        Returns:
            Q(s, a) = Σ P(s'|s,a) * [R(s,a,s') + γ * V(s')]
        """
        q_value = 0.0
        transitions = self.get_transition_info(state, action)
        
        for next_state, prob, reward in transitions:
            # 贝尔曼方程的核心计算
            q_value += prob * (reward + self.gamma * self.values[next_state])
        
        return q_value
    
    def value_iteration_step(self) -> float:
        """
        执行一次值迭代更新
        
        Returns:
            本次迭代中价值函数的最大变化量
        """
        max_delta = 0.0  # 记录最大变化量
        
        # 创建新的价值函数副本用于批量更新
        new_values = self.values.copy()
        
        for state in self.states:
            # 保存旧值用于计算变化量
            old_value = self.values[state]
            
            # 计算所有可能动作的Q值
            q_values = []
            for action in self.actions:
                q_value = self.compute_q_value(state, action)
                q_values.append(q_value)
            
            # 贝尔曼最优更新:选择最大Q值作为新的状态价值
            # 确保有可用的动作
            if q_values:  
                new_values[state] = max(q_values)
            
            # 更新最大变化量
            delta = abs(old_value - new_values[state])
            max_delta = max(max_delta, delta)
        
        # 批量更新价值函数
        self.values = new_values
        
        return max_delta
    
    def solve(self, max_iterations: int = 1000) -> Dict[int, float]:
        """
        执行值迭代算法直到收敛
        
        Args:
            max_iterations: 最大迭代次数,防止无限循环
            
        Returns:
            收敛后的最优价值函数
        """
        print("开始值迭代求解...")
        iteration = 0
        
        while iteration < max_iterations:
            # 执行一次迭代更新
            max_delta = self.value_iteration_step()
            
            # 记录历史数据用于分析
            self.value_history.append(self.values.copy())
            self.delta_history.append(max_delta)
            
            # 打印迭代信息
            if iteration % 10 == 0:
                print(f"迭代 {iteration}: 最大变化量 = {max_delta:.3f}")
            
            # 检查是否收敛
            if max_delta < self.theta:
                print(f"在第 {iteration} 次迭代后收敛")
                break
            
            iteration += 1
        
        if iteration == max_iterations:
            print("达到最大迭代次数,可能尚未完全收敛")
        
        return self.values
    
    def extract_policy(self) -> Dict[int, int]:
        """
        从最优价值函数中提取最优策略
        
        Returns:
            最优策略:每个状态对应的最优动作
        """
        policy = {}
        
        for state in self.states:
            best_action = None
            best_value = float('-inf')
            
            # 对每个动作计算Q值,选择最大的
            for action in self.actions:
                q_value = self.compute_q_value(state, action)
                if q_value > best_value:
                    best_value = q_value
                    best_action = action
            
            policy[state] = best_action
        
        return policy
    
    def plot_convergence(self):
        """绘制价值函数收敛过程"""
        plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
        plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
        if not self.value_history:
            print("没有收敛历史数据")
            return
        
        plt.figure(figsize=(12, 6))
        
        # 绘制每个状态的价值收敛过程
        for state in self.states:
            state_values = [v[state] for v in self.value_history]
            plt.plot(state_values, label=f'状态 {state}')
        
        plt.xlabel('迭代次数')
        plt.ylabel('状态价值')
        plt.title('值迭代收敛过程')
        plt.legend()
        plt.grid(True)
        plt.show()
        
        # 绘制最大变化量的收敛过程
        plt.figure(figsize=(12, 6))
        plt.plot(self.delta_history)
        plt.xlabel('迭代次数')
        plt.ylabel('最大变化量')
        plt.title('值迭代变化量收敛过程')
        plt.yscale('log')  # 使用对数坐标更好地显示收敛
        plt.grid(True)
        plt.show()


# 示例:简单的3状态MDP问题
def create_example_mdp():
    """
    创建一个简单的3状态MDP示例
    
    状态: 0, 1, 2
    动作: 0 (左), 1 (右)
    状态2是终止状态,奖励为+1
    """
    
    states =  [STATE0, STATE1, STATE2]
    actions = [MOVE_LEFT, MOVE_RIGHT]  # 0: 向左, 1: 向右
    
    # 初始化转移概率和奖励字典
    transition_probs = {}
    rewards = {}
    #state,action, new_state=1
    # 状态0:向左移动到状态0,向右移动到状态1
    transition_probs[(STATE0, MOVE_LEFT, STATE0)] = 1.0  # 向左移动,保持在状态0
    transition_probs[(STATE0, MOVE_RIGHT, STATE1)] = 1.0  # 向右移动,到状态1
    rewards[(STATE0, MOVE_LEFT, STATE0)] = 0.0
    rewards[(STATE0, MOVE_RIGHT, STATE1)] = 0.0
    
    # 状态1:向左移动到状态0,向右移动到状态2
    transition_probs[(STATE1, MOVE_LEFT, STATE0)] = 1.0  # 向左移动,到状态0
    transition_probs[(STATE1, MOVE_RIGHT, STATE2)] = 1.0  # 向右移动,到状态2
    rewards[(STATE1, MOVE_LEFT, STATE0)] = 0.0
    rewards[(STATE1, MOVE_RIGHT, STATE2)] = 1.0  # 到达终止状态获得奖励
    
    # 状态2是终止状态,所有动作都保持在状态2
    transition_probs[(STATE2, MOVE_LEFT, STATE1)] = 1.0
    transition_probs[(STATE2, MOVE_RIGHT, STATE2)] = 1.0
    rewards[(STATE2, MOVE_LEFT, STATE1)] = -1.0
    rewards[(STATE2, MOVE_RIGHT, STATE2)] = -1.0
    
    return states, actions, transition_probs, rewards


def main():
    """主函数:演示值迭代算法的使用"""
    # 创建示例MDP
    states, actions, transition_probs, rewards = create_example_mdp()
    
    # 初始化值迭代算法
    vi = BellmanValueIteration(
        states=states,
        actions=actions,
        transition_probs=transition_probs,
        rewards=rewards,
        gamma=0.9,      # 折扣因子
        theta=1e-3      # 收敛阈值
    )
    
    # 执行值迭代求解
    optimal_values = vi.solve(max_iterations=300)
    
    # 输出最优价值函数
    print("\n最优价值函数:")
    for state, value in optimal_values.items():
        print(f"V*(STATE{state}) = {value:.4f}")
    
    # 提取并输出最优策略
    optimal_policy = vi.extract_policy()
    print("\n最优策略:")
    for state, action in optimal_policy.items():
        action_name = "向左" if action == 0 else "向右"
        print(f"状态 {state} -> {action_name}")
    
    # 绘制收敛过程
    vi.plot_convergence()
    
    # 验证贝尔曼最优方程
    print("\n验证贝尔曼最优方程:")
    for state in states:
        # 计算当前状态的贝尔曼最优更新
        q_values = []
        for action in actions:
            q_value = vi.compute_q_value(state, action)
            q_values.append(q_value)
            print(f"Q({state}, {action}) = {q_value:.4f}")
        
        # 检查是否满足 V*(s) = max_a Q(s,a)
        max_q = max(q_values) if q_values else 0
        print(f"V*({state}) = {optimal_values[state]:.4f}, max_a Q(s,a) = {max_q:.4f}")
        print(f"一致性检查: {'通过' if abs(optimal_values[state] - max_q) < 1e-3 else '失败'}")
        print("-" * 40)


if __name__ == "__main__":
    # 设置随机种子确保可重复性
    np.random.seed(42)
    
    # 运行主程序
    main()

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐