目录

1、动态规划的前提

2、策略迭代(Policy Iteration)

2.1、算法步骤

(1)初始化

(2)策略评估(Policy Evaluation)

(3)策略改进(Policy Improvement)

(4)收敛判断

2.2、 直观理解

3、价值迭代(Value Iteration)

3.1、 算法步骤

(1)初始化

(2)迭代更新价值函数

(3)提取最优策略

3.2、 直观理解

4、策略迭代 vs 价值迭代

5、实验代码

5.1、悬崖漫步环境

5.1.1、策略迭代

​编辑

5.1.2、价值迭代

5.2、冰湖环境

5.2.1、策略迭代

5.2.2、价值迭代


在马尔可夫决策过程(MDP)中,动态规划(Dynamic Programming, DP)算法是求解最优策略和最优价值函数的经典方法。其核心思想是利用贝尔曼方程,通过迭代更新价值函数或策略,最终收敛到最优解。动态规划的两个核心算法是:策略迭代(Policy Iteration) 和价值迭代(Value Iteration)

1、动态规划的前提

动态规划算法适用的前提是已知环境模型,即:

  • 状态转移概率P(s'|s,a)(在状态 s 执行动作 a 后转移到s'的概率);
  • 奖励函数 R(s,a,s') 或 R(s,a)(执行动作后的即时奖励)。

基于此,DP 可通过迭代求解贝尔曼方程,无需与环境交互(属于 “模型已知” 的规划问题)。

2、策略迭代(Policy Iteration)

策略迭代的核心是 **“策略评估 - 策略改进” 的交替循环 **:先固定当前策略,计算其对应的状态价值函数(策略评估);再基于该价值函数优化策略(策略改进),重复直至策略不再变化(收敛到最优策略)。

2.1、算法步骤

策略迭代分为两步:策略评估策略改进,流程如下:

(1)初始化
  • 随机初始化一个策略 \pi(例如,每个状态下随机选择一个动作);
  • 初始化状态价值函数 V(s)(例如,全为 0)。
(2)策略评估(Policy Evaluation)

在当前策略\pi 下,通过贝尔曼期望方程迭代更新状态价值函数 V(s),直至收敛:V_{\pi}(s) = \sum_{a} \pi(a|s) \left[ R(s,a) + \gamma \sum_{s'} P(s'|s,a) V_{\pi}(s') \right]

  • 含义:当前策略下,状态 s 的价值 = 执行策略推荐的动作后,获得的即时奖励 + 未来状态价值的贴现和。
  • 迭代方式:对每个状态 s,用新计算的V(s) 替换旧值,直至V(s) 的变化量小于阈值(如 \theta = 1e-6)。
(3)策略改进(Policy Improvement)

基于当前收敛的V_{\pi}(s),通过贪婪策略更新策略 \pi,使其更优:\pi'(a|s) = \arg\max_{a} \left[ R(s,a) + \gamma \sum_{s'} P(s'|s,a) V_{\pi}(s') \right]

  • 含义:对每个状态 s,选择能最大化 “即时奖励 + 未来价值” 的动作(贪婪选择),生成新策略 \pi'
(4)收敛判断

若新策略\pi'与旧策略\pi完全一致(即所有状态的动作选择均不变),则策略收敛,此时的策略为最优策略 \pi^*;否则,用 \pi'替换 \pi,回到步骤(2)重复迭代。

2.2、 直观理解

  • 策略评估:就像 “给当前策略打分”,计算每个状态在该策略下的长期价值(例如:“按当前策略走,这个状态最终能拿到多少奖励”)。
  • 策略改进:基于打分结果 “优化策略”,对每个状态选择能获得更高价值的动作(例如:“发现走左边比走右边价值高,那就改走左边”)。
  • 重复这两步,策略会逐渐变好,最终不再变化(此时为最优策略)。

3、价值迭代(Value Iteration)

价值迭代直接迭代优化最优价值函数,无需显式评估策略。其核心是利用贝尔曼最优方程,每次迭代都让价值函数更接近最优值,直至收敛,最后从最优价值函数中提取最优策略。

3.1、 算法步骤

价值迭代的核心是迭代更新价值函数,步骤如下:

(1)初始化
  • 初始化状态价值函数 V(s)(例如,全为 0)。
(2)迭代更新价值函数

利用贝尔曼最优方程更新每个状态的价值函数:V(s) \leftarrow \max_{a} \left[ R(s,a) + \gamma \sum_{s'} P(s'|s,a) V(s') \right]

  • 含义:每个状态的最优价值 = 选择最优动作后,获得的 “即时奖励 + 未来最优价值的贴现和”。
  • 迭代方式:对每个状态 s,用上述公式更新V(s),直至所有状态的价值函数变化量小于阈值(收敛)。
(3)提取最优策略

当价值函数收敛到最优价值 V^*(s)后,通过贪婪策略提取最优策略:\pi^*(a|s) = \arg\max_{a} \left[ R(s,a) + \gamma \sum_{s'} P(s'|s,a) V^*(s') \right]

3.2、 直观理解

价值迭代可以看作是 “简化版的策略迭代”:它省略了策略迭代中 “完整的策略评估” 步骤,每次迭代都直接用最优动作更新价值函数(相当于 “策略评估只做一步就进行策略改进”)。 例如:“不管当前策略是什么,直接计算每个状态下最好的动作能带来的价值,反复迭代后,价值函数会收敛到最优,此时再确定最优策略”。

4、策略迭代 vs 价值迭代

维度 策略迭代(Policy Iteration) 价值迭代(Value Iteration)
核心步骤 策略评估(收敛到当前策略的 V)→ 策略改进(贪婪更新策略) 直接用贝尔曼最优方程迭代 V,最后提取策略
迭代对象 策略和价值函数交替更新 仅价值函数迭代,策略最后提取
收敛速度 通常更快(策略改进会快速收敛),但策略评估可能耗时 迭代次数可能更多,但每次迭代更简单(无需完整评估策略)
适用场景 状态空间小(策略评估成本低) 状态空间大(可早期停止,近似最优)
收敛结果 严格收敛到最优策略和最优 V 收敛到最优 V,进而提取最优策略

总结

  • 策略迭代和价值迭代都是基于动态规划的 MDP 求解算法,依赖环境模型和贝尔曼方程。
  • 策略迭代通过 “评估 - 改进” 循环优化策略,收敛快但每次评估成本可能高;
  • 价值迭代直接优化价值函数,实现简单,适合状态空间较大的场景。
  • 两者最终都能收敛到最优策略和最优价值函数,选择哪种取决于具体问题的状态 / 动作空间大小。

5、实验代码

5.1、悬崖漫步环境

5.1.1、策略迭代

"""
文件名: 4.2
作者: 墨尘
日期: 2025/7/21
项目名: d2l_learning
"""
import copy

"""悬崖漫步环境"""
"""该类用于模拟 "悬崖漫步" 环境,定义了状态、动作、奖励和状态转移规则,是强化学习智能体交互的场景。"""
class CliffWalkingEnv:
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 列数
        self.nrow = nrow  # 行数
        self.P = self.createP()  # 状态转移概率矩阵:P[s][a] = [(p, next_state, reward, done)]
        # 存储每个状态s下执行动作a后的转移规则(概率、下一状态、奖励、是否终止)。

    """createP方法构建状态转移矩阵P,其中P[s][a]表示在状态s执行动作a后,可能的转移结果(概率、下一状态、奖励、是否终止)"""

    def createP(self):
        # P[s][a] = [(p, next_state, reward, done)]:s是一维状态索引,a是动作(0-3)
        P = [[[] for _ in range(4)] for _ in range(self.nrow * self.ncol)]
        # 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):  # 遍历行(y坐标:0~3)
            for j in range(self.ncol):  # 遍历列(x坐标:0~11)
                # 状态表示:用一维索引s = i*ncol + j表示二维坐标(i,j)(如(3,0)对应s=3*12+0=36,(3,11)对应s=3*12+11=47)
                s = i * self.ncol + j  # 将二维坐标(i,j)转换为一维状态索引s(0~47)
                for a in range(4):  # 遍历4个动作(0-左,1-右,2-上,3-下)
                    # 计算执行动作a后的下一坐标(next_x, next_y)
                    next_x = j + change[a][0]  # x方向变化(左/右)
                    next_y = i + change[a][1]  # y方向变化(上/下)
                    """
                    转移规则:
                        边界:超出网格则原地不动,奖励 - 1;
                        悬崖:最后一行中间 10 列((3,1)到(3,10)),掉入后回到起点,奖励 - 100;
                        终点:(3,11),到达后终止,奖励 0;
                        普通格子:奖励 - 1,继续移动。
                    """
                    # 1. 处理边界情况:如果超出网格范围,停在原地
                    if next_x < 0 or next_x >= self.ncol or next_y < 0 or next_y >= self.nrow:
                        next_state = s  # 下一状态仍是当前状态(原地不动)
                        reward = -1  # 边界移动惩罚
                        done = False  # 未终止

                    # 2. 处理非边界情况
                    else:
                        # 将下一坐标转换为一维状态索引
                        next_state = next_y * self.ncol + next_x

                        # 2.1 判断是否为悬崖(最后一行,且不是起点(3,0)和终点(3,11))
                        if next_y == self.nrow - 1 and next_x > 0 and next_x < self.ncol - 1:
                            reward = -100  # 掉入悬崖的惩罚
                            next_state = (self.nrow - 1) * self.ncol + 0  # 回到起点(3,0)
                            done = False  # 未终止(需重新开始)

                        # 2.2 判断是否为终点(3,11)
                        elif next_y == self.nrow - 1 and next_x == self.ncol - 1:
                            reward = 0  # 到达终点的奖励
                            done = True  # 回合终止

                        # 2.3 普通格子(非悬崖、非终点)
                        else:
                            reward = -1  # 每步移动的基础惩罚(鼓励最短路径)
                            done = False  # 未终止

                    # 确定性转移(概率p=1.0,因为环境是确定的)
                    P[s][a] = [(1.0, next_state, reward, done)]
        return P


"""策略迭代算法"""
class PolicyIteration:
    # 设置环境、收敛阈值theta(用于判断价值函数是否收敛)、
    # 折扣因子gamma(通常 0<gamma<1),并初始化状态价值v和策略pi。

    def __init__(self, env, theta, gamma):
        self.env = env  # 环境
        self.theta = theta  # 收敛阈值(判断状态价值是否稳定)
        self.gamma = gamma  # 折扣因子(权衡即时奖励和未来奖励)
        self.v = [0.0] * (self.env.nrow * self.env.ncol)  # 状态价值函数V(s),初始为0
        # 初始策略:均匀随机策略(每个状态下4个动作的概率均为0.25)
        self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(self.env.nrow * self.env.ncol)]

    # 策略评估:计算当前策略下的状态价值函数V(s)
    def policy_evaluation(self):
        cnt = 1  # 迭代计数
        while True:
            max_diff = 0.0  # 记录本轮迭代中状态价值的最大变化
            new_v = [0.0] * (self.env.nrow * self.env.ncol)  # 存储新的状态价值
            for s in range(self.env.nrow * self.env.ncol):  # 遍历所有状态
                qsa_total = 0.0  # 状态s的价值(基于当前策略)
                for a in range(4):  # 遍历所有动作
                    # 遍历状态转移结果(这里是确定性转移,只有一个结果)
                    for (p, next_state, r, done) in self.env.P[s][a]:
                        # 状态价值公式:V(s) = sum(策略概率 * (即时奖励 + 折扣*下一状态价值))
                        # (1-done)确保终止状态的未来价值为0(done=True时系数为0)
                        qsa_total += self.pi[s][a] * (r + self.gamma * self.v[next_state] * (1 - done))
                new_v[s] = qsa_total  # 更新状态s的价值
                # 计算与旧价值的差异,跟踪最大值
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v  # 用新价值覆盖旧价值
            if max_diff < self.theta:  # 当最大差异小于阈值,价值函数收敛
                break
            cnt += 1
        print(f"策略评估进行{cnt}轮后完成")

    # 策略提升:基于当前价值函数更新为贪婪策略
    def policy_improvement(self):
        for s in range(self.env.nrow * self.env.ncol):  # 遍历所有状态
            qsa_list = []  # 存储每个动作的Q值(动作价值)
            for a in range(4):  # 遍历所有动作
                qsa = 0.0  # 动作a在状态s的Q值
                for (p, next_state, r, done) in self.env.P[s][a]:
                    # Q值公式:Q(s,a) = 即时奖励 + 折扣*下一状态价值
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)  # 记录当前动作的Q值
            # 找到最大Q值(最优动作的Q值)
            max_q = max(qsa_list)
            # 统计有多少个动作达到最大Q值(可能有多个最优动作)
            cnt_max = qsa_list.count(max_q)
            # 更新策略:最优动作的概率为1/cnt_max(平均分配概率),非最优动作为0
            self.pi[s] = [1.0 / cnt_max if q == max_q else 0.0 for q in qsa_list]
        print("策略提升完成")
        return self.pi

    # 策略迭代主循环
    def policy_iteration(self):
        iter_cnt = 0
        while True:
            iter_cnt += 1
            self.policy_evaluation()  # 第一步:评估当前策略
            old_pi = copy.deepcopy(self.pi)  # 保存旧策略(深拷贝避免引用关联)
            new_pi = self.policy_improvement()  # 第二步:提升策略为贪婪策略
            # 若新旧策略完全一致,说明策略已收敛(最优策略)
            if old_pi == new_pi:
                print(f"策略迭代在第{iter_cnt}轮收敛")
                break


# 打印智能体的状态价值和策略
def print_agent(agent, action_meaning, disaster, end):
    print("状态价值:")
    for i in range(agent.env.nrow):  # 按行打印
        for j in range(agent.env.ncol):  # 按列打印
            s = i * agent.env.ncol + j  # 状态一维索引
            print(f"{agent.v[s]:.3f}", end="\t")  # 保留3位小数
        print()  # 换行

    print("\n策略:")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            s = i * agent.env.ncol + j
            if s in disaster:  # 悬崖位置
                print("悬崖 ", end="")
            elif s in end:  # 终点位置
                print("终点 ", end="")
            else:  # 普通格子:打印有概率的动作
                pi_str = ""
                for k in range(len(action_meaning)):
                    if agent.pi[s][k] > 0:  # 只显示策略中概率>0的动作
                        pi_str += action_meaning[k]
                print(f"{pi_str:4}", end="")  # 对齐格式
        print()  # 换行


if __name__ == '__main__':
    ncol = 12
    nrow = 4
    # 创建环境
    env = CliffWalkingEnv(ncol, nrow)
    # 动作含义:左、右、上、下(与createP中的change对应)
    action_meaning = ['<', '>', '^', 'v']
    # 定义悬崖位置(最后一行,列1~10)
    disaster = [(nrow-1)*ncol + j for j in range(1, ncol-1)]
    # 定义终点位置(3,11)
    end = [(nrow-1)*ncol + (ncol-1)]
    # 策略迭代参数
    theta = 1e-6  # 收敛阈值
    gamma = 0.9  # 折扣因子
    # 创建并运行策略迭代智能体
    agent = PolicyIteration(env, theta, gamma)
    agent.policy_iteration()
    # 打印结果
    print_agent(agent, action_meaning, disaster, end)

5.1.2、价值迭代

"""
悬崖漫步环境下的价值迭代算法完整版
- 环境:4行12列网格,包含起点、悬崖、终点
- 算法:价值迭代(Value Iteration)
- 功能:求解最优状态价值函数和最优策略
"""
import numpy as np


class CliffWalkingEnv:
    """悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 网格列数(x轴)
        self.nrow = nrow  # 网格行数(y轴)
        self.state_count = nrow * ncol  # 总状态数:4×12=48
        self.action_count = 4  # 动作数:0-左,1-右,2-上,3-下

        # 状态转移矩阵:P[s][a] = [(概率p, 下一状态s', 奖励r, 是否终止done)]
        self.P = self._build_transition_matrix()

    def _build_transition_matrix(self):
        """构建状态转移矩阵(核心环境逻辑)"""
        P = [[[] for _ in range(self.action_count)] for _ in range(self.state_count)]
        # 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
        action_deltas = [[0, -1], [0, 1], [-1, 0], [1, 0]]

        for row in range(self.nrow):  # 行坐标(y:0~3)
            for col in range(self.ncol):  # 列坐标(x:0~11)
                s = row * self.ncol + col  # 二维坐标转一维状态索引

                for a in range(self.action_count):  # 遍历所有动作
                    # 计算动作后的坐标
                    new_col = col + action_deltas[a][0]
                    new_row = row + action_deltas[a][1]

                    # 1. 处理边界:超出网格则原地不动
                    if new_col < 0 or new_col >= self.ncol or new_row < 0 or new_row >= self.nrow:
                        next_state = s
                        reward = -1
                        done = False

                    # 2. 处理非边界情况
                    else:
                        next_state = new_row * self.ncol + new_col

                        # 2.1 悬崖判断(最后一行中间10列:(3,1)~(3,10))
                        if new_row == self.nrow - 1 and 0 < new_col < self.ncol - 1:
                            reward = -100  # 悬崖惩罚
                            next_state = (self.nrow - 1) * self.ncol  # 回到起点(3,0)
                            done = False

                        # 2.2 终点判断(右下角(3,11))
                        elif new_row == self.nrow - 1 and new_col == self.ncol - 1:
                            reward = 0  # 终点奖励
                            done = True  # 回合终止

                        # 2.3 普通格子
                        else:
                            reward = -1  # 每步惩罚(鼓励最短路径)
                            done = False

                    # 确定性转移(概率p=1.0)
                    P[s][a] = [(1.0, next_state, reward, done)]

        return P


class ValueIteration:
    """价值迭代算法实现"""
    def __init__(self, env, theta=1e-6, gamma=0.9):
        self.env = env  # 环境
        self.theta = theta  # 收敛阈值(价值函数变化小于该值则停止)
        self.gamma = gamma  # 折扣因子(权衡即时奖励和未来奖励)

        # 初始化状态价值函数(所有状态价值为0)
        self.v = np.zeros(env.state_count)
        # 初始化策略(存储每个状态下选择动作的概率)
        self.pi = np.zeros((env.state_count, env.action_count))

    def run(self):
        """执行价值迭代主循环"""
        iteration_count = 0

        while True:
            max_value_diff = 0  # 记录价值函数的最大变化
            new_v = np.zeros_like(self.v)  # 存储新的状态价值

            # 1. 更新每个状态的价值
            for s in range(self.env.state_count):
                # 计算所有动作的Q值
                q_values = []
                for a in range(self.env.action_count):
                    # 遍历状态转移结果(确定性环境只有一个结果)
                    p, next_state, reward, done = self.env.P[s][a][0]
                    # Q值公式:即时奖励 + 折扣×下一状态价值(终止状态无未来价值)
                    q = reward + self.gamma * self.v[next_state] * (1 - done)
                    q_values.append(q)

                # 状态价值 = 最大Q值(贝尔曼最优方程)
                new_v[s] = max(q_values)
                # 更新最大价值变化
                max_value_diff = max(max_value_diff, abs(new_v[s] - self.v[s]))

            # 2. 检查是否收敛
            self.v = new_v
            iteration_count += 1
            if max_value_diff < self.theta:
                print(f"价值迭代收敛,共迭代 {iteration_count} 轮")
                break

        # 3. 从最优价值函数提取最优策略
        self._extract_policy()

    def _extract_policy(self):
        """根据最优价值函数生成贪婪策略"""
        for s in range(self.env.state_count):
            # 计算每个动作的Q值
            q_values = []
            for a in range(self.env.action_count):
                p, next_state, reward, done = self.env.P[s][a][0]
                q = reward + self.gamma * self.v[next_state] * (1 - done)
                q_values.append(q)

            # 选择Q值最大的动作(贪婪策略)
            max_q = max(q_values)
            # 处理多个动作Q值相同的情况(均分概率)
            best_actions = [q == max_q for q in q_values]
            self.pi[s] = best_actions / np.sum(best_actions)


def visualize_results(agent, action_symbols=['←', '→', '↑', '↓']):
    """可视化状态价值和最优策略"""
    env = agent.env

    # 1. 打印状态价值
    print("=" * 50)
    print("状态价值函数(越接近终点价值越高):")
    for row in range(env.nrow):
        row_values = []
        for col in range(env.ncol):
            s = row * env.ncol + col
            row_values.append(f"{agent.v[s]:.2f}")
        print(" | ".join(row_values))
    print("=" * 50)

    # 2. 打印最优策略
    print("\n最优策略(箭头表示推荐动作):")
    for row in range(env.nrow):
        row_policy = []
        for col in range(env.ncol):
            s = row * env.ncol + col

            # 标记特殊位置
            if row == env.nrow - 1 and 0 < col < env.ncol - 1:
                row_policy.append("悬崖")  # 悬崖
            elif row == env.nrow - 1 and col == env.ncol - 1:
                row_policy.append("终点")  # 终点
            elif row == env.nrow - 1 and col == 0:
                row_policy.append("起点")  # 起点
            else:
                # 拼接所有推荐动作的符号
                actions = [action_symbols[a] for a in range(env.action_count) if agent.pi[s][a] > 0]
                row_policy.append("".join(actions))

        print(" | ".join(row_policy))
    print("=" * 50)


if __name__ == "__main__":
    # 1. 创建环境
    env = CliffWalkingEnv(ncol=12, nrow=4)

    # 2. 初始化并运行价值迭代
    agent = ValueIteration(env, theta=1e-6, gamma=0.9)
    agent.run()

    # 3. 可视化结果
    visualize_results(agent)

 

5.2、冰湖环境

5.2.1、策略迭代

"""
冰湖环境(FrozenLake):
- 网格:4x4(可扩展)
- 地形:S(起点)、F(冰面)、H(冰洞)、G(目标)
- 移动:执行动作时有概率向相邻方向偏移(打滑)
- 奖励:到达目标+1,掉入冰洞0,其他0
"""
import copy
import numpy as np


class FrozenLakeEnv:
    def __init__(self, nrow=4, ncol=4, slip_prob=0.2):
        self.nrow = nrow  # 行数
        self.ncol = ncol  # 列数
        self.slip_prob = slip_prob  # 打滑概率(动作偏移的概率)
        self.P = self.createP()  # 状态转移概率矩阵
        self.reset()

    def reset(self):
        """重置环境到起点"""
        self.current_state = 0  # 起点固定为(0,0),一维索引0
        return self.current_state

    def createP(self):
        """创建状态转移概率矩阵:P[s][a] = [(概率, 下一状态, 奖励, 是否终止), ...]"""
        # 定义地形(4x4示例):S=起点, F=冰面, H=冰洞, G=目标
        # 可修改为更大网格,如8x8
        grid = [
            ['S', 'F', 'F', 'F'],
            ['F', 'H', 'F', 'H'],
            ['F', 'F', 'F', 'H'],
            ['H', 'F', 'F', 'G']
        ]

        # 保存地形信息,供render方法使用
        self.grid = grid

        # 动作对应的方向:0-上, 1-右, 2-下, 3-左(与冰湖标准动作一致)
        directions = [(-1, 0), (0, 1), (1, 0), (0, -1)]

        # 初始化转移矩阵:P[状态数][动作数] = 转移列表
        P = [[[] for _ in range(4)] for _ in range(self.nrow * self.ncol)]

        for i in range(self.nrow):  # 行(y坐标)
            for j in range(self.ncol):  # 列(x坐标)
                s = i * self.ncol + j  # 二维坐标转一维状态索引
                cell = grid[i][j]

                # 若当前是冰洞或目标,无需转移(已终止)
                if cell in ['H', 'G']:
                    continue

                # 对每个动作计算转移概率
                for a in range(4):
                    # 冰面打滑:动作有概率向相邻方向偏移(如想向上,可能实际向左/右/上)
                    # 概率分配:原方向60%,左右偏移各20%(总和100%)
                    for delta in [-1, 0, 1]:  # 偏移量:-1=左偏, 0=原方向, 1=右偏
                        # 计算实际执行的动作(取模4确保在0-3范围内)
                        actual_a = (a + delta) % 4
                        # 偏移概率:原方向(delta=0)占60%,左右偏移各20%
                        prob = 0.6 if delta == 0 else 0.2

                        # 计算实际动作后的坐标
                        dy, dx = directions[actual_a]
                        next_i = i + dy
                        next_j = j + dx

                        # 边界检查:超出网格则停在原地
                        if 0 <= next_i < self.nrow and 0 <= next_j < self.ncol:
                            next_s = next_i * self.ncol + next_j  # 下一状态索引
                            next_cell = grid[next_i][next_j]
                        else:
                            next_s = s  # 超出边界,停在当前状态
                            next_cell = cell  # 地形不变

                        # 奖励与终止条件
                        if next_cell == 'H':  # 掉入冰洞
                            reward = 0.0
                            done = True
                        elif next_cell == 'G':  # 到达目标
                            reward = 1.0
                            done = True
                        else:  # 冰面
                            reward = 0.0
                            done = False

                        # 添加转移规则:(概率, 下一状态, 奖励, 终止标志)
                        P[s][a].append((prob, next_s, reward, done))

        return P

    def step(self, action):
        """执行动作,返回转移结果(符合Gym标准)"""
        # 从当前状态和动作的转移列表中随机选择一个结果(按概率)
        transitions = self.P[self.current_state][action]
        probs = [t[0] for t in transitions]  # 提取概率列表
        # 按概率随机选择一个转移(非确定性)
        chosen = np.random.choice(len(transitions), p=probs)
        p, next_s, reward, done = transitions[chosen]

        self.current_state = next_s  # 更新当前状态
        return next_s, reward, done, {}  # 返回4个值(符合Gym规范)

    def render(self):
        """可视化当前环境状态"""
        grid = [[' ' for _ in range(self.ncol)] for _ in range(self.nrow)]
        # 标记地形
        for i in range(self.nrow):
            for j in range(self.ncol):
                s = i * self.ncol + j
                if s == self.current_state:
                    grid[i][j] = 'A'  # 智能体位置
                elif self.grid[i][j] == 'H':  # 简化冰洞判断逻辑
                    grid[i][j] = 'H'  # 冰洞
                elif i == self.nrow - 1 and j == self.ncol - 1:
                    grid[i][j] = 'G'  # 目标
                else:
                    grid[i][j] = 'F'  # 冰面
        # 打印网格
        for row in grid:
            print('|' + '|'.join(row) + '|')
        print()


# 策略迭代算法(复用之前的框架,适配冰湖环境)
class PolicyIteration:
    def __init__(self, env, theta=1e-6, gamma=0.9):
        self.env = env
        self.theta = theta  # 收敛阈值
        self.gamma = gamma  # 折扣因子
        self.v = [0.0] * (env.nrow * env.ncol)  # 状态价值
        self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(env.nrow * env.ncol)]  # 初始随机策略

    def policy_evaluation(self):
        """评估当前策略的状态价值"""
        while True:
            max_diff = 0.0
            new_v = [0.0] * (self.env.nrow * self.env.ncol)
            for s in range(self.env.nrow * self.env.ncol):
                v_total = 0.0
                for a in range(4):
                    # 累加每个动作的概率×(奖励+折扣未来价值)
                    for (p, next_s, r, done) in self.env.P[s][a]:
                        v_total += self.pi[s][a] * p * (r + self.gamma * self.v[next_s] * (1 - done))
                new_v[s] = v_total
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta:
                break

    def policy_improvement(self):
        """基于当前价值改进策略为贪婪策略"""
        for s in range(self.env.nrow * self.env.ncol):
            q_values = []
            for a in range(4):
                q = 0.0
                for (p, next_s, r, done) in self.env.P[s][a]:
                    q += p * (r + self.gamma * self.v[next_s] * (1 - done))
                q_values.append(q)
            # 选择Q值最大的动作
            max_q = max(q_values)
            self.pi[s] = [1.0 if q == max_q else 0.0 for q in q_values]
        return self.pi

    def policy_iteration(self):
        """策略迭代主循环"""
        iter_cnt = 0
        while True:
            iter_cnt += 1
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)
            new_pi = self.policy_improvement()
            if old_pi == new_pi:
                print(f"策略在第{iter_cnt}轮迭代收敛")
                break


# 打印策略和状态价值
def print_agent(agent, action_meaning=['↑', '→', '↓', '←']):
    print("状态价值:")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            s = i * agent.env.ncol + j
            print(f"{agent.v[s]:.2f}", end="\t")
        print()
    print("\n策略:")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            s = i * agent.env.ncol + j
            # 冰洞和目标不显示策略
            if any(s == (i * agent.env.ncol + j) for i in range(agent.env.nrow) for j in range(agent.env.ncol) if
                   (agent.env.nrow, agent.env.ncol, i, j) in [(4, 4, 1, 1), (4, 4, 1, 3), (4, 4, 2, 3), (4, 4, 3, 0)]):
                print("H", end="\t")
            elif s == agent.env.nrow * agent.env.ncol - 1:
                print("G", end="\t")
            else:
                # 打印有概率的动作
                pi_str = ""
                for a in range(4):
                    if agent.pi[s][a] > 0:
                        pi_str += action_meaning[a]
                print(pi_str, end="\t")
        print()


# 测试冰湖环境
if __name__ == "__main__":
    # 定义动作含义:0-上, 1-右, 2-下, 3-左
    action_meaning = ['↑', '→', '↓', '←']  # 补充定义

    # 创建4x4冰湖环境
    env = FrozenLakeEnv(nrow=4, ncol=4, slip_prob=0.2)
    # 策略迭代参数
    agent = PolicyIteration(env, theta=1e-6, gamma=0.9)
    agent.policy_iteration()
    # 打印结果
    print_agent(agent, action_meaning=action_meaning)

    # 测试执行最优策略
    print("\n执行最优策略:")
    state = env.reset()
    env.render()
    done = False
    step = 0
    while not done:
        step += 1
        action = np.argmax(agent.pi[state])  # 获取最优动作索引(0-3)
        next_state, reward, done, _ = env.step(action)
        # 使用action_meaning将动作索引转换为符号
        print(f"步骤{step}:动作{action_meaning[action]},奖励{reward}")
        env.render()
        state = next_state
        if step > 100:
            print("超过最大步数,可能陷入循环")
            break

策略在第3轮迭代收敛
状态价值:
0.17    0.15    0.21    0.14    
0.21    0.00    0.30    0.00    
0.32    0.52    0.56    0.00    
0.00    0.68    0.85    0.00    

策略:
↓    ↑    ↓    ←    
↓    H    ↓    H    
→    ↓    ↓    H    
H    →    →    G    

执行最优策略:
|A|F|F|F|
|F|H|F|H|
|F|F|F|H|
|H|F|F|G|

步骤1:动作↓,奖励0.0
|A|F|F|F|
|F|H|F|H|
|F|F|F|H|
|H|F|F|G|

步骤2:动作↓,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|

步骤3:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤4:动作→,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|

步骤5:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤6:动作→,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|

步骤7:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤8:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|

步骤9:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤10:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|

步骤11:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤12:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|

步骤13:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|

步骤14:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|

步骤15:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|F|A|H|
|H|F|F|G|

步骤16:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|F|F|A|
|H|F|F|G|

 

5.2.2、价值迭代

"""
冰湖环境(FrozenLake)下的价值迭代算法
- 环境:4x4网格,包含冰面(F)、冰洞(H)、起点(S)、目标(G)
- 特性:动作存在打滑(非确定性转移),掉入冰洞或到达目标后终止
- 算法:价值迭代(Value Iteration)
"""
import numpy as np


class FrozenLakeEnv:
    """冰湖环境:非确定性转移,存在冰洞和打滑机制"""
    def __init__(self, ncol=4, nrow=4, slip_prob=0.2):
        self.ncol = ncol  # 列数(x轴)
        self.nrow = nrow  # 行数(y轴)
        self.state_count = nrow * ncol  # 总状态数:4×4=16
        self.action_count = 4  # 动作数:0-左,1-右,2-上,3-下
        self.slip_prob = slip_prob  # 打滑概率(左右偏移各占slip_prob)

        # 定义地形:S=起点, F=冰面, H=冰洞, G=目标
        self.grid = [
            ['S', 'F', 'F', 'F'],
            ['F', 'H', 'F', 'H'],
            ['F', 'F', 'F', 'H'],
            ['H', 'F', 'F', 'G']
        ]

        # 状态转移矩阵:P[s][a] = [(概率p, 下一状态s', 奖励r, 是否终止done)]
        self.P = self._build_transition_matrix()

    def _build_transition_matrix(self):
        """构建非确定性状态转移矩阵(核心环境逻辑)"""
        P = [[[] for _ in range(self.action_count)] for _ in range(self.state_count)]
        # 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
        action_deltas = [[0, -1], [0, 1], [-1, 0], [1, 0]]

        for row in range(self.nrow):
            for col in range(self.ncol):
                s = row * self.ncol + col  # 二维坐标转一维状态索引
                cell = self.grid[row][col]

                # 冰洞和目标是终止状态,无需转移
                if cell in ['H', 'G']:
                    continue

                # 遍历所有动作
                for a in range(self.action_count):
                    # 非确定性转移:原方向60%,左右偏移各20%(打滑机制)
                    for delta in [-1, 0, 1]:  # 偏移量:-1=左偏, 0=原方向, 1=右偏
                        actual_a = (a + delta) % 4  # 实际执行的动作(取模确保在0-3范围)
                        prob = 0.6 if delta == 0 else 0.2  # 概率分配

                        # 计算实际动作后的坐标
                        new_col = col + action_deltas[actual_a][0]
                        new_row = row + action_deltas[actual_a][1]

                        # 边界处理:超出网格则原地不动
                        if new_col < 0 or new_col >= self.ncol or new_row < 0 or new_row >= self.nrow:
                            next_state = s
                            next_cell = cell
                        else:
                            next_state = new_row * self.ncol + new_col
                            next_cell = self.grid[new_row][new_col]

                        # 奖励与终止条件
                        if next_cell == 'H':  # 掉入冰洞
                            reward = 0.0
                            done = True
                        elif next_cell == 'G':  # 到达目标
                            reward = 1.0
                            done = True
                        else:  # 冰面
                            reward = 0.0
                            done = False

                        # 添加转移规则
                        P[s][a].append((prob, next_state, reward, done))

        return P


class ValueIteration:
    """价值迭代算法(适配非确定性环境)"""
    def __init__(self, env, theta=1e-6, gamma=0.9):
        self.env = env
        self.theta = theta  # 收敛阈值
        self.gamma = gamma  # 折扣因子
        self.v = np.zeros(env.state_count)  # 状态价值函数
        self.pi = np.zeros((env.state_count, env.action_count))  # 策略

    def run(self):
        """执行价值迭代主循环"""
        iteration_count = 0

        while True:
            max_value_diff = 0
            new_v = np.zeros_like(self.v)

            # 更新每个状态的价值
            for s in range(self.env.state_count):
                # 跳过终止状态(冰洞和目标)
                if self.env.grid[s // self.env.ncol][s % self.env.ncol] in ['H', 'G']:
                    new_v[s] = 0.0
                    continue

                # 计算所有动作的Q值
                q_values = []
                for a in range(self.env.action_count):
                    q = 0.0
                    # 累加所有可能转移的概率加权值
                    for p, next_state, reward, done in self.env.P[s][a]:
                        q += p * (reward + self.gamma * self.v[next_state] * (1 - done))
                    q_values.append(q)

                # 状态价值 = 最大Q值
                new_v[s] = max(q_values)
                max_value_diff = max(max_value_diff, abs(new_v[s] - self.v[s]))

            # 检查收敛
            self.v = new_v
            iteration_count += 1
            if max_value_diff < self.theta:
                print(f"价值迭代收敛,共迭代 {iteration_count} 轮")
                break

        # 提取最优策略
        self._extract_policy()

    def _extract_policy(self):
        """从最优价值函数生成贪婪策略"""
        for s in range(self.env.state_count):
            # 跳过终止状态
            if self.env.grid[s // self.env.ncol][s % self.env.ncol] in ['H', 'G']:
                continue

            # 计算每个动作的Q值
            q_values = []
            for a in range(self.env.action_count):
                q = 0.0
                for p, next_state, reward, done in self.env.P[s][a]:
                    q += p * (reward + self.gamma * self.v[next_state] * (1 - done))
                q_values.append(q)

            # 选择Q值最大的动作(处理平局)
            max_q = max(q_values)
            best_actions = [q == max_q for q in q_values]
            self.pi[s] = best_actions / np.sum(best_actions)


def visualize_results(agent, action_symbols=['←', '→', '↑', '↓']):
    """可视化状态价值和最优策略"""
    env = agent.env

    # 打印状态价值
    print("=" * 40)
    print("状态价值函数(越接近目标价值越高):")
    for row in range(env.nrow):
        row_values = []
        for col in range(env.ncol):
            s = row * env.ncol + col
            row_values.append(f"{agent.v[s]:.2f}")
        print(" | ".join(row_values))
    print("=" * 40)

    # 打印最优策略
    print("\n最优策略(箭头表示推荐动作):")
    for row in range(env.nrow):
        row_policy = []
        for col in range(env.ncol):
            s = row * env.ncol + col
            cell = env.grid[row][col]

            # 标记特殊位置
            if cell == 'H':
                row_policy.append("冰洞")
            elif cell == 'G':
                row_policy.append("目标")
            elif cell == 'S':
                row_policy.append("起点")
            else:
                # 拼接推荐动作
                actions = [action_symbols[a] for a in range(env.action_count) if agent.pi[s][a] > 0]
                row_policy.append("".join(actions))

        print(" | ".join(row_policy))
    print("=" * 40)


if __name__ == "__main__":
    # 创建冰湖环境(4x4网格,打滑概率20%)
    env = FrozenLakeEnv(ncol=4, nrow=4, slip_prob=0.2)

    # 初始化并运行价值迭代
    agent = ValueIteration(env, theta=1e-6, gamma=0.9)
    agent.run()

    # 可视化结果
    visualize_results(agent)

 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐