一文读懂动态规划算法(策略迭代+价值迭代+完整代码)—强化学习(2)
在马尔可夫决策过程(MDP)中,动态规划(Dynamic Programming, DP)算法是求解最优策略和最优价值函数的经典方法。其核心思想是利用,通过迭代更新价值函数或策略,最终收敛到最优解。和。
目录
在马尔可夫决策过程(MDP)中,动态规划(Dynamic Programming, DP)算法是求解最优策略和最优价值函数的经典方法。其核心思想是利用贝尔曼方程,通过迭代更新价值函数或策略,最终收敛到最优解。动态规划的两个核心算法是:策略迭代(Policy Iteration) 和价值迭代(Value Iteration)。
1、动态规划的前提
动态规划算法适用的前提是已知环境模型,即:
- 状态转移概率
(在状态 s 执行动作 a 后转移到
的概率);
- 奖励函数
或
(执行动作后的即时奖励)。
基于此,DP 可通过迭代求解贝尔曼方程,无需与环境交互(属于 “模型已知” 的规划问题)。
2、策略迭代(Policy Iteration)
策略迭代的核心是 **“策略评估 - 策略改进” 的交替循环 **:先固定当前策略,计算其对应的状态价值函数(策略评估);再基于该价值函数优化策略(策略改进),重复直至策略不再变化(收敛到最优策略)。
2.1、算法步骤
策略迭代分为两步:策略评估和策略改进,流程如下:
(1)初始化
- 随机初始化一个策略
(例如,每个状态下随机选择一个动作);
- 初始化状态价值函数
(例如,全为 0)。
(2)策略评估(Policy Evaluation)
在当前策略 下,通过贝尔曼期望方程迭代更新状态价值函数
,直至收敛:
- 含义:当前策略下,状态 s 的价值 = 执行策略推荐的动作后,获得的即时奖励 + 未来状态价值的贴现和。
- 迭代方式:对每个状态 s,用新计算的
替换旧值,直至
的变化量小于阈值(如
)。
(3)策略改进(Policy Improvement)
基于当前收敛的,通过贪婪策略更新策略
,使其更优:
- 含义:对每个状态 s,选择能最大化 “即时奖励 + 未来价值” 的动作(贪婪选择),生成新策略
。
(4)收敛判断
若新策略与旧策略
完全一致(即所有状态的动作选择均不变),则策略收敛,此时的策略为最优策略
;否则,用
替换
,回到步骤(2)重复迭代。
2.2、 直观理解
- 策略评估:就像 “给当前策略打分”,计算每个状态在该策略下的长期价值(例如:“按当前策略走,这个状态最终能拿到多少奖励”)。
- 策略改进:基于打分结果 “优化策略”,对每个状态选择能获得更高价值的动作(例如:“发现走左边比走右边价值高,那就改走左边”)。
- 重复这两步,策略会逐渐变好,最终不再变化(此时为最优策略)。
3、价值迭代(Value Iteration)
价值迭代直接迭代优化最优价值函数,无需显式评估策略。其核心是利用贝尔曼最优方程,每次迭代都让价值函数更接近最优值,直至收敛,最后从最优价值函数中提取最优策略。
3.1、 算法步骤
价值迭代的核心是迭代更新价值函数,步骤如下:
(1)初始化
- 初始化状态价值函数
(例如,全为 0)。
(2)迭代更新价值函数
利用贝尔曼最优方程更新每个状态的价值函数:
- 含义:每个状态的最优价值 = 选择最优动作后,获得的 “即时奖励 + 未来最优价值的贴现和”。
- 迭代方式:对每个状态 s,用上述公式更新
,直至所有状态的价值函数变化量小于阈值(收敛)。
(3)提取最优策略
当价值函数收敛到最优价值 后,通过贪婪策略提取最优策略:
3.2、 直观理解
价值迭代可以看作是 “简化版的策略迭代”:它省略了策略迭代中 “完整的策略评估” 步骤,每次迭代都直接用最优动作更新价值函数(相当于 “策略评估只做一步就进行策略改进”)。 例如:“不管当前策略是什么,直接计算每个状态下最好的动作能带来的价值,反复迭代后,价值函数会收敛到最优,此时再确定最优策略”。
4、策略迭代 vs 价值迭代
| 维度 | 策略迭代(Policy Iteration) | 价值迭代(Value Iteration) |
|---|---|---|
| 核心步骤 | 策略评估(收敛到当前策略的 V)→ 策略改进(贪婪更新策略) | 直接用贝尔曼最优方程迭代 V,最后提取策略 |
| 迭代对象 | 策略和价值函数交替更新 | 仅价值函数迭代,策略最后提取 |
| 收敛速度 | 通常更快(策略改进会快速收敛),但策略评估可能耗时 | 迭代次数可能更多,但每次迭代更简单(无需完整评估策略) |
| 适用场景 | 状态空间小(策略评估成本低) | 状态空间大(可早期停止,近似最优) |
| 收敛结果 | 严格收敛到最优策略和最优 V | 收敛到最优 V,进而提取最优策略 |
总结
- 策略迭代和价值迭代都是基于动态规划的 MDP 求解算法,依赖环境模型和贝尔曼方程。
- 策略迭代通过 “评估 - 改进” 循环优化策略,收敛快但每次评估成本可能高;
- 价值迭代直接优化价值函数,实现简单,适合状态空间较大的场景。
- 两者最终都能收敛到最优策略和最优价值函数,选择哪种取决于具体问题的状态 / 动作空间大小。
5、实验代码
5.1、悬崖漫步环境
5.1.1、策略迭代
"""
文件名: 4.2
作者: 墨尘
日期: 2025/7/21
项目名: d2l_learning
"""
import copy
"""悬崖漫步环境"""
"""该类用于模拟 "悬崖漫步" 环境,定义了状态、动作、奖励和状态转移规则,是强化学习智能体交互的场景。"""
class CliffWalkingEnv:
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol # 列数
self.nrow = nrow # 行数
self.P = self.createP() # 状态转移概率矩阵:P[s][a] = [(p, next_state, reward, done)]
# 存储每个状态s下执行动作a后的转移规则(概率、下一状态、奖励、是否终止)。
"""createP方法构建状态转移矩阵P,其中P[s][a]表示在状态s执行动作a后,可能的转移结果(概率、下一状态、奖励、是否终止)"""
def createP(self):
# P[s][a] = [(p, next_state, reward, done)]:s是一维状态索引,a是动作(0-3)
P = [[[] for _ in range(4)] for _ in range(self.nrow * self.ncol)]
# 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.nrow): # 遍历行(y坐标:0~3)
for j in range(self.ncol): # 遍历列(x坐标:0~11)
# 状态表示:用一维索引s = i*ncol + j表示二维坐标(i,j)(如(3,0)对应s=3*12+0=36,(3,11)对应s=3*12+11=47)
s = i * self.ncol + j # 将二维坐标(i,j)转换为一维状态索引s(0~47)
for a in range(4): # 遍历4个动作(0-左,1-右,2-上,3-下)
# 计算执行动作a后的下一坐标(next_x, next_y)
next_x = j + change[a][0] # x方向变化(左/右)
next_y = i + change[a][1] # y方向变化(上/下)
"""
转移规则:
边界:超出网格则原地不动,奖励 - 1;
悬崖:最后一行中间 10 列((3,1)到(3,10)),掉入后回到起点,奖励 - 100;
终点:(3,11),到达后终止,奖励 0;
普通格子:奖励 - 1,继续移动。
"""
# 1. 处理边界情况:如果超出网格范围,停在原地
if next_x < 0 or next_x >= self.ncol or next_y < 0 or next_y >= self.nrow:
next_state = s # 下一状态仍是当前状态(原地不动)
reward = -1 # 边界移动惩罚
done = False # 未终止
# 2. 处理非边界情况
else:
# 将下一坐标转换为一维状态索引
next_state = next_y * self.ncol + next_x
# 2.1 判断是否为悬崖(最后一行,且不是起点(3,0)和终点(3,11))
if next_y == self.nrow - 1 and next_x > 0 and next_x < self.ncol - 1:
reward = -100 # 掉入悬崖的惩罚
next_state = (self.nrow - 1) * self.ncol + 0 # 回到起点(3,0)
done = False # 未终止(需重新开始)
# 2.2 判断是否为终点(3,11)
elif next_y == self.nrow - 1 and next_x == self.ncol - 1:
reward = 0 # 到达终点的奖励
done = True # 回合终止
# 2.3 普通格子(非悬崖、非终点)
else:
reward = -1 # 每步移动的基础惩罚(鼓励最短路径)
done = False # 未终止
# 确定性转移(概率p=1.0,因为环境是确定的)
P[s][a] = [(1.0, next_state, reward, done)]
return P
"""策略迭代算法"""
class PolicyIteration:
# 设置环境、收敛阈值theta(用于判断价值函数是否收敛)、
# 折扣因子gamma(通常 0<gamma<1),并初始化状态价值v和策略pi。
def __init__(self, env, theta, gamma):
self.env = env # 环境
self.theta = theta # 收敛阈值(判断状态价值是否稳定)
self.gamma = gamma # 折扣因子(权衡即时奖励和未来奖励)
self.v = [0.0] * (self.env.nrow * self.env.ncol) # 状态价值函数V(s),初始为0
# 初始策略:均匀随机策略(每个状态下4个动作的概率均为0.25)
self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(self.env.nrow * self.env.ncol)]
# 策略评估:计算当前策略下的状态价值函数V(s)
def policy_evaluation(self):
cnt = 1 # 迭代计数
while True:
max_diff = 0.0 # 记录本轮迭代中状态价值的最大变化
new_v = [0.0] * (self.env.nrow * self.env.ncol) # 存储新的状态价值
for s in range(self.env.nrow * self.env.ncol): # 遍历所有状态
qsa_total = 0.0 # 状态s的价值(基于当前策略)
for a in range(4): # 遍历所有动作
# 遍历状态转移结果(这里是确定性转移,只有一个结果)
for (p, next_state, r, done) in self.env.P[s][a]:
# 状态价值公式:V(s) = sum(策略概率 * (即时奖励 + 折扣*下一状态价值))
# (1-done)确保终止状态的未来价值为0(done=True时系数为0)
qsa_total += self.pi[s][a] * (r + self.gamma * self.v[next_state] * (1 - done))
new_v[s] = qsa_total # 更新状态s的价值
# 计算与旧价值的差异,跟踪最大值
max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
self.v = new_v # 用新价值覆盖旧价值
if max_diff < self.theta: # 当最大差异小于阈值,价值函数收敛
break
cnt += 1
print(f"策略评估进行{cnt}轮后完成")
# 策略提升:基于当前价值函数更新为贪婪策略
def policy_improvement(self):
for s in range(self.env.nrow * self.env.ncol): # 遍历所有状态
qsa_list = [] # 存储每个动作的Q值(动作价值)
for a in range(4): # 遍历所有动作
qsa = 0.0 # 动作a在状态s的Q值
for (p, next_state, r, done) in self.env.P[s][a]:
# Q值公式:Q(s,a) = 即时奖励 + 折扣*下一状态价值
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa) # 记录当前动作的Q值
# 找到最大Q值(最优动作的Q值)
max_q = max(qsa_list)
# 统计有多少个动作达到最大Q值(可能有多个最优动作)
cnt_max = qsa_list.count(max_q)
# 更新策略:最优动作的概率为1/cnt_max(平均分配概率),非最优动作为0
self.pi[s] = [1.0 / cnt_max if q == max_q else 0.0 for q in qsa_list]
print("策略提升完成")
return self.pi
# 策略迭代主循环
def policy_iteration(self):
iter_cnt = 0
while True:
iter_cnt += 1
self.policy_evaluation() # 第一步:评估当前策略
old_pi = copy.deepcopy(self.pi) # 保存旧策略(深拷贝避免引用关联)
new_pi = self.policy_improvement() # 第二步:提升策略为贪婪策略
# 若新旧策略完全一致,说明策略已收敛(最优策略)
if old_pi == new_pi:
print(f"策略迭代在第{iter_cnt}轮收敛")
break
# 打印智能体的状态价值和策略
def print_agent(agent, action_meaning, disaster, end):
print("状态价值:")
for i in range(agent.env.nrow): # 按行打印
for j in range(agent.env.ncol): # 按列打印
s = i * agent.env.ncol + j # 状态一维索引
print(f"{agent.v[s]:.3f}", end="\t") # 保留3位小数
print() # 换行
print("\n策略:")
for i in range(agent.env.nrow):
for j in range(agent.env.ncol):
s = i * agent.env.ncol + j
if s in disaster: # 悬崖位置
print("悬崖 ", end="")
elif s in end: # 终点位置
print("终点 ", end="")
else: # 普通格子:打印有概率的动作
pi_str = ""
for k in range(len(action_meaning)):
if agent.pi[s][k] > 0: # 只显示策略中概率>0的动作
pi_str += action_meaning[k]
print(f"{pi_str:4}", end="") # 对齐格式
print() # 换行
if __name__ == '__main__':
ncol = 12
nrow = 4
# 创建环境
env = CliffWalkingEnv(ncol, nrow)
# 动作含义:左、右、上、下(与createP中的change对应)
action_meaning = ['<', '>', '^', 'v']
# 定义悬崖位置(最后一行,列1~10)
disaster = [(nrow-1)*ncol + j for j in range(1, ncol-1)]
# 定义终点位置(3,11)
end = [(nrow-1)*ncol + (ncol-1)]
# 策略迭代参数
theta = 1e-6 # 收敛阈值
gamma = 0.9 # 折扣因子
# 创建并运行策略迭代智能体
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
# 打印结果
print_agent(agent, action_meaning, disaster, end)
5.1.2、价值迭代
"""
悬崖漫步环境下的价值迭代算法完整版
- 环境:4行12列网格,包含起点、悬崖、终点
- 算法:价值迭代(Value Iteration)
- 功能:求解最优状态价值函数和最优策略
"""
import numpy as np
class CliffWalkingEnv:
"""悬崖漫步环境"""
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol # 网格列数(x轴)
self.nrow = nrow # 网格行数(y轴)
self.state_count = nrow * ncol # 总状态数:4×12=48
self.action_count = 4 # 动作数:0-左,1-右,2-上,3-下
# 状态转移矩阵:P[s][a] = [(概率p, 下一状态s', 奖励r, 是否终止done)]
self.P = self._build_transition_matrix()
def _build_transition_matrix(self):
"""构建状态转移矩阵(核心环境逻辑)"""
P = [[[] for _ in range(self.action_count)] for _ in range(self.state_count)]
# 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
action_deltas = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for row in range(self.nrow): # 行坐标(y:0~3)
for col in range(self.ncol): # 列坐标(x:0~11)
s = row * self.ncol + col # 二维坐标转一维状态索引
for a in range(self.action_count): # 遍历所有动作
# 计算动作后的坐标
new_col = col + action_deltas[a][0]
new_row = row + action_deltas[a][1]
# 1. 处理边界:超出网格则原地不动
if new_col < 0 or new_col >= self.ncol or new_row < 0 or new_row >= self.nrow:
next_state = s
reward = -1
done = False
# 2. 处理非边界情况
else:
next_state = new_row * self.ncol + new_col
# 2.1 悬崖判断(最后一行中间10列:(3,1)~(3,10))
if new_row == self.nrow - 1 and 0 < new_col < self.ncol - 1:
reward = -100 # 悬崖惩罚
next_state = (self.nrow - 1) * self.ncol # 回到起点(3,0)
done = False
# 2.2 终点判断(右下角(3,11))
elif new_row == self.nrow - 1 and new_col == self.ncol - 1:
reward = 0 # 终点奖励
done = True # 回合终止
# 2.3 普通格子
else:
reward = -1 # 每步惩罚(鼓励最短路径)
done = False
# 确定性转移(概率p=1.0)
P[s][a] = [(1.0, next_state, reward, done)]
return P
class ValueIteration:
"""价值迭代算法实现"""
def __init__(self, env, theta=1e-6, gamma=0.9):
self.env = env # 环境
self.theta = theta # 收敛阈值(价值函数变化小于该值则停止)
self.gamma = gamma # 折扣因子(权衡即时奖励和未来奖励)
# 初始化状态价值函数(所有状态价值为0)
self.v = np.zeros(env.state_count)
# 初始化策略(存储每个状态下选择动作的概率)
self.pi = np.zeros((env.state_count, env.action_count))
def run(self):
"""执行价值迭代主循环"""
iteration_count = 0
while True:
max_value_diff = 0 # 记录价值函数的最大变化
new_v = np.zeros_like(self.v) # 存储新的状态价值
# 1. 更新每个状态的价值
for s in range(self.env.state_count):
# 计算所有动作的Q值
q_values = []
for a in range(self.env.action_count):
# 遍历状态转移结果(确定性环境只有一个结果)
p, next_state, reward, done = self.env.P[s][a][0]
# Q值公式:即时奖励 + 折扣×下一状态价值(终止状态无未来价值)
q = reward + self.gamma * self.v[next_state] * (1 - done)
q_values.append(q)
# 状态价值 = 最大Q值(贝尔曼最优方程)
new_v[s] = max(q_values)
# 更新最大价值变化
max_value_diff = max(max_value_diff, abs(new_v[s] - self.v[s]))
# 2. 检查是否收敛
self.v = new_v
iteration_count += 1
if max_value_diff < self.theta:
print(f"价值迭代收敛,共迭代 {iteration_count} 轮")
break
# 3. 从最优价值函数提取最优策略
self._extract_policy()
def _extract_policy(self):
"""根据最优价值函数生成贪婪策略"""
for s in range(self.env.state_count):
# 计算每个动作的Q值
q_values = []
for a in range(self.env.action_count):
p, next_state, reward, done = self.env.P[s][a][0]
q = reward + self.gamma * self.v[next_state] * (1 - done)
q_values.append(q)
# 选择Q值最大的动作(贪婪策略)
max_q = max(q_values)
# 处理多个动作Q值相同的情况(均分概率)
best_actions = [q == max_q for q in q_values]
self.pi[s] = best_actions / np.sum(best_actions)
def visualize_results(agent, action_symbols=['←', '→', '↑', '↓']):
"""可视化状态价值和最优策略"""
env = agent.env
# 1. 打印状态价值
print("=" * 50)
print("状态价值函数(越接近终点价值越高):")
for row in range(env.nrow):
row_values = []
for col in range(env.ncol):
s = row * env.ncol + col
row_values.append(f"{agent.v[s]:.2f}")
print(" | ".join(row_values))
print("=" * 50)
# 2. 打印最优策略
print("\n最优策略(箭头表示推荐动作):")
for row in range(env.nrow):
row_policy = []
for col in range(env.ncol):
s = row * env.ncol + col
# 标记特殊位置
if row == env.nrow - 1 and 0 < col < env.ncol - 1:
row_policy.append("悬崖") # 悬崖
elif row == env.nrow - 1 and col == env.ncol - 1:
row_policy.append("终点") # 终点
elif row == env.nrow - 1 and col == 0:
row_policy.append("起点") # 起点
else:
# 拼接所有推荐动作的符号
actions = [action_symbols[a] for a in range(env.action_count) if agent.pi[s][a] > 0]
row_policy.append("".join(actions))
print(" | ".join(row_policy))
print("=" * 50)
if __name__ == "__main__":
# 1. 创建环境
env = CliffWalkingEnv(ncol=12, nrow=4)
# 2. 初始化并运行价值迭代
agent = ValueIteration(env, theta=1e-6, gamma=0.9)
agent.run()
# 3. 可视化结果
visualize_results(agent)

5.2、冰湖环境
5.2.1、策略迭代
"""
冰湖环境(FrozenLake):
- 网格:4x4(可扩展)
- 地形:S(起点)、F(冰面)、H(冰洞)、G(目标)
- 移动:执行动作时有概率向相邻方向偏移(打滑)
- 奖励:到达目标+1,掉入冰洞0,其他0
"""
import copy
import numpy as np
class FrozenLakeEnv:
def __init__(self, nrow=4, ncol=4, slip_prob=0.2):
self.nrow = nrow # 行数
self.ncol = ncol # 列数
self.slip_prob = slip_prob # 打滑概率(动作偏移的概率)
self.P = self.createP() # 状态转移概率矩阵
self.reset()
def reset(self):
"""重置环境到起点"""
self.current_state = 0 # 起点固定为(0,0),一维索引0
return self.current_state
def createP(self):
"""创建状态转移概率矩阵:P[s][a] = [(概率, 下一状态, 奖励, 是否终止), ...]"""
# 定义地形(4x4示例):S=起点, F=冰面, H=冰洞, G=目标
# 可修改为更大网格,如8x8
grid = [
['S', 'F', 'F', 'F'],
['F', 'H', 'F', 'H'],
['F', 'F', 'F', 'H'],
['H', 'F', 'F', 'G']
]
# 保存地形信息,供render方法使用
self.grid = grid
# 动作对应的方向:0-上, 1-右, 2-下, 3-左(与冰湖标准动作一致)
directions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
# 初始化转移矩阵:P[状态数][动作数] = 转移列表
P = [[[] for _ in range(4)] for _ in range(self.nrow * self.ncol)]
for i in range(self.nrow): # 行(y坐标)
for j in range(self.ncol): # 列(x坐标)
s = i * self.ncol + j # 二维坐标转一维状态索引
cell = grid[i][j]
# 若当前是冰洞或目标,无需转移(已终止)
if cell in ['H', 'G']:
continue
# 对每个动作计算转移概率
for a in range(4):
# 冰面打滑:动作有概率向相邻方向偏移(如想向上,可能实际向左/右/上)
# 概率分配:原方向60%,左右偏移各20%(总和100%)
for delta in [-1, 0, 1]: # 偏移量:-1=左偏, 0=原方向, 1=右偏
# 计算实际执行的动作(取模4确保在0-3范围内)
actual_a = (a + delta) % 4
# 偏移概率:原方向(delta=0)占60%,左右偏移各20%
prob = 0.6 if delta == 0 else 0.2
# 计算实际动作后的坐标
dy, dx = directions[actual_a]
next_i = i + dy
next_j = j + dx
# 边界检查:超出网格则停在原地
if 0 <= next_i < self.nrow and 0 <= next_j < self.ncol:
next_s = next_i * self.ncol + next_j # 下一状态索引
next_cell = grid[next_i][next_j]
else:
next_s = s # 超出边界,停在当前状态
next_cell = cell # 地形不变
# 奖励与终止条件
if next_cell == 'H': # 掉入冰洞
reward = 0.0
done = True
elif next_cell == 'G': # 到达目标
reward = 1.0
done = True
else: # 冰面
reward = 0.0
done = False
# 添加转移规则:(概率, 下一状态, 奖励, 终止标志)
P[s][a].append((prob, next_s, reward, done))
return P
def step(self, action):
"""执行动作,返回转移结果(符合Gym标准)"""
# 从当前状态和动作的转移列表中随机选择一个结果(按概率)
transitions = self.P[self.current_state][action]
probs = [t[0] for t in transitions] # 提取概率列表
# 按概率随机选择一个转移(非确定性)
chosen = np.random.choice(len(transitions), p=probs)
p, next_s, reward, done = transitions[chosen]
self.current_state = next_s # 更新当前状态
return next_s, reward, done, {} # 返回4个值(符合Gym规范)
def render(self):
"""可视化当前环境状态"""
grid = [[' ' for _ in range(self.ncol)] for _ in range(self.nrow)]
# 标记地形
for i in range(self.nrow):
for j in range(self.ncol):
s = i * self.ncol + j
if s == self.current_state:
grid[i][j] = 'A' # 智能体位置
elif self.grid[i][j] == 'H': # 简化冰洞判断逻辑
grid[i][j] = 'H' # 冰洞
elif i == self.nrow - 1 and j == self.ncol - 1:
grid[i][j] = 'G' # 目标
else:
grid[i][j] = 'F' # 冰面
# 打印网格
for row in grid:
print('|' + '|'.join(row) + '|')
print()
# 策略迭代算法(复用之前的框架,适配冰湖环境)
class PolicyIteration:
def __init__(self, env, theta=1e-6, gamma=0.9):
self.env = env
self.theta = theta # 收敛阈值
self.gamma = gamma # 折扣因子
self.v = [0.0] * (env.nrow * env.ncol) # 状态价值
self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(env.nrow * env.ncol)] # 初始随机策略
def policy_evaluation(self):
"""评估当前策略的状态价值"""
while True:
max_diff = 0.0
new_v = [0.0] * (self.env.nrow * self.env.ncol)
for s in range(self.env.nrow * self.env.ncol):
v_total = 0.0
for a in range(4):
# 累加每个动作的概率×(奖励+折扣未来价值)
for (p, next_s, r, done) in self.env.P[s][a]:
v_total += self.pi[s][a] * p * (r + self.gamma * self.v[next_s] * (1 - done))
new_v[s] = v_total
max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
self.v = new_v
if max_diff < self.theta:
break
def policy_improvement(self):
"""基于当前价值改进策略为贪婪策略"""
for s in range(self.env.nrow * self.env.ncol):
q_values = []
for a in range(4):
q = 0.0
for (p, next_s, r, done) in self.env.P[s][a]:
q += p * (r + self.gamma * self.v[next_s] * (1 - done))
q_values.append(q)
# 选择Q值最大的动作
max_q = max(q_values)
self.pi[s] = [1.0 if q == max_q else 0.0 for q in q_values]
return self.pi
def policy_iteration(self):
"""策略迭代主循环"""
iter_cnt = 0
while True:
iter_cnt += 1
self.policy_evaluation()
old_pi = copy.deepcopy(self.pi)
new_pi = self.policy_improvement()
if old_pi == new_pi:
print(f"策略在第{iter_cnt}轮迭代收敛")
break
# 打印策略和状态价值
def print_agent(agent, action_meaning=['↑', '→', '↓', '←']):
print("状态价值:")
for i in range(agent.env.nrow):
for j in range(agent.env.ncol):
s = i * agent.env.ncol + j
print(f"{agent.v[s]:.2f}", end="\t")
print()
print("\n策略:")
for i in range(agent.env.nrow):
for j in range(agent.env.ncol):
s = i * agent.env.ncol + j
# 冰洞和目标不显示策略
if any(s == (i * agent.env.ncol + j) for i in range(agent.env.nrow) for j in range(agent.env.ncol) if
(agent.env.nrow, agent.env.ncol, i, j) in [(4, 4, 1, 1), (4, 4, 1, 3), (4, 4, 2, 3), (4, 4, 3, 0)]):
print("H", end="\t")
elif s == agent.env.nrow * agent.env.ncol - 1:
print("G", end="\t")
else:
# 打印有概率的动作
pi_str = ""
for a in range(4):
if agent.pi[s][a] > 0:
pi_str += action_meaning[a]
print(pi_str, end="\t")
print()
# 测试冰湖环境
if __name__ == "__main__":
# 定义动作含义:0-上, 1-右, 2-下, 3-左
action_meaning = ['↑', '→', '↓', '←'] # 补充定义
# 创建4x4冰湖环境
env = FrozenLakeEnv(nrow=4, ncol=4, slip_prob=0.2)
# 策略迭代参数
agent = PolicyIteration(env, theta=1e-6, gamma=0.9)
agent.policy_iteration()
# 打印结果
print_agent(agent, action_meaning=action_meaning)
# 测试执行最优策略
print("\n执行最优策略:")
state = env.reset()
env.render()
done = False
step = 0
while not done:
step += 1
action = np.argmax(agent.pi[state]) # 获取最优动作索引(0-3)
next_state, reward, done, _ = env.step(action)
# 使用action_meaning将动作索引转换为符号
print(f"步骤{step}:动作{action_meaning[action]},奖励{reward}")
env.render()
state = next_state
if step > 100:
print("超过最大步数,可能陷入循环")
break
策略在第3轮迭代收敛
状态价值:
0.17 0.15 0.21 0.14
0.21 0.00 0.30 0.00
0.32 0.52 0.56 0.00
0.00 0.68 0.85 0.00策略:
↓ ↑ ↓ ←
↓ H ↓ H
→ ↓ ↓ H
H → → G执行最优策略:
|A|F|F|F|
|F|H|F|H|
|F|F|F|H|
|H|F|F|G|步骤1:动作↓,奖励0.0
|A|F|F|F|
|F|H|F|H|
|F|F|F|H|
|H|F|F|G|步骤2:动作↓,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|步骤3:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤4:动作→,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|步骤5:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤6:动作→,奖励0.0
|F|F|F|F|
|A|H|F|H|
|F|F|F|H|
|H|F|F|G|步骤7:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤8:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|步骤9:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤10:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|步骤11:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤12:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|步骤13:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|A|F|F|H|
|H|F|F|G|步骤14:动作→,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|A|F|H|
|H|F|F|G|步骤15:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|F|A|H|
|H|F|F|G|步骤16:动作↓,奖励0.0
|F|F|F|F|
|F|H|F|H|
|F|F|F|A|
|H|F|F|G|
5.2.2、价值迭代
"""
冰湖环境(FrozenLake)下的价值迭代算法
- 环境:4x4网格,包含冰面(F)、冰洞(H)、起点(S)、目标(G)
- 特性:动作存在打滑(非确定性转移),掉入冰洞或到达目标后终止
- 算法:价值迭代(Value Iteration)
"""
import numpy as np
class FrozenLakeEnv:
"""冰湖环境:非确定性转移,存在冰洞和打滑机制"""
def __init__(self, ncol=4, nrow=4, slip_prob=0.2):
self.ncol = ncol # 列数(x轴)
self.nrow = nrow # 行数(y轴)
self.state_count = nrow * ncol # 总状态数:4×4=16
self.action_count = 4 # 动作数:0-左,1-右,2-上,3-下
self.slip_prob = slip_prob # 打滑概率(左右偏移各占slip_prob)
# 定义地形:S=起点, F=冰面, H=冰洞, G=目标
self.grid = [
['S', 'F', 'F', 'F'],
['F', 'H', 'F', 'H'],
['F', 'F', 'F', 'H'],
['H', 'F', 'F', 'G']
]
# 状态转移矩阵:P[s][a] = [(概率p, 下一状态s', 奖励r, 是否终止done)]
self.P = self._build_transition_matrix()
def _build_transition_matrix(self):
"""构建非确定性状态转移矩阵(核心环境逻辑)"""
P = [[[] for _ in range(self.action_count)] for _ in range(self.state_count)]
# 动作对应的坐标变化:[左, 右, 上, 下](x变化, y变化)
action_deltas = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for row in range(self.nrow):
for col in range(self.ncol):
s = row * self.ncol + col # 二维坐标转一维状态索引
cell = self.grid[row][col]
# 冰洞和目标是终止状态,无需转移
if cell in ['H', 'G']:
continue
# 遍历所有动作
for a in range(self.action_count):
# 非确定性转移:原方向60%,左右偏移各20%(打滑机制)
for delta in [-1, 0, 1]: # 偏移量:-1=左偏, 0=原方向, 1=右偏
actual_a = (a + delta) % 4 # 实际执行的动作(取模确保在0-3范围)
prob = 0.6 if delta == 0 else 0.2 # 概率分配
# 计算实际动作后的坐标
new_col = col + action_deltas[actual_a][0]
new_row = row + action_deltas[actual_a][1]
# 边界处理:超出网格则原地不动
if new_col < 0 or new_col >= self.ncol or new_row < 0 or new_row >= self.nrow:
next_state = s
next_cell = cell
else:
next_state = new_row * self.ncol + new_col
next_cell = self.grid[new_row][new_col]
# 奖励与终止条件
if next_cell == 'H': # 掉入冰洞
reward = 0.0
done = True
elif next_cell == 'G': # 到达目标
reward = 1.0
done = True
else: # 冰面
reward = 0.0
done = False
# 添加转移规则
P[s][a].append((prob, next_state, reward, done))
return P
class ValueIteration:
"""价值迭代算法(适配非确定性环境)"""
def __init__(self, env, theta=1e-6, gamma=0.9):
self.env = env
self.theta = theta # 收敛阈值
self.gamma = gamma # 折扣因子
self.v = np.zeros(env.state_count) # 状态价值函数
self.pi = np.zeros((env.state_count, env.action_count)) # 策略
def run(self):
"""执行价值迭代主循环"""
iteration_count = 0
while True:
max_value_diff = 0
new_v = np.zeros_like(self.v)
# 更新每个状态的价值
for s in range(self.env.state_count):
# 跳过终止状态(冰洞和目标)
if self.env.grid[s // self.env.ncol][s % self.env.ncol] in ['H', 'G']:
new_v[s] = 0.0
continue
# 计算所有动作的Q值
q_values = []
for a in range(self.env.action_count):
q = 0.0
# 累加所有可能转移的概率加权值
for p, next_state, reward, done in self.env.P[s][a]:
q += p * (reward + self.gamma * self.v[next_state] * (1 - done))
q_values.append(q)
# 状态价值 = 最大Q值
new_v[s] = max(q_values)
max_value_diff = max(max_value_diff, abs(new_v[s] - self.v[s]))
# 检查收敛
self.v = new_v
iteration_count += 1
if max_value_diff < self.theta:
print(f"价值迭代收敛,共迭代 {iteration_count} 轮")
break
# 提取最优策略
self._extract_policy()
def _extract_policy(self):
"""从最优价值函数生成贪婪策略"""
for s in range(self.env.state_count):
# 跳过终止状态
if self.env.grid[s // self.env.ncol][s % self.env.ncol] in ['H', 'G']:
continue
# 计算每个动作的Q值
q_values = []
for a in range(self.env.action_count):
q = 0.0
for p, next_state, reward, done in self.env.P[s][a]:
q += p * (reward + self.gamma * self.v[next_state] * (1 - done))
q_values.append(q)
# 选择Q值最大的动作(处理平局)
max_q = max(q_values)
best_actions = [q == max_q for q in q_values]
self.pi[s] = best_actions / np.sum(best_actions)
def visualize_results(agent, action_symbols=['←', '→', '↑', '↓']):
"""可视化状态价值和最优策略"""
env = agent.env
# 打印状态价值
print("=" * 40)
print("状态价值函数(越接近目标价值越高):")
for row in range(env.nrow):
row_values = []
for col in range(env.ncol):
s = row * env.ncol + col
row_values.append(f"{agent.v[s]:.2f}")
print(" | ".join(row_values))
print("=" * 40)
# 打印最优策略
print("\n最优策略(箭头表示推荐动作):")
for row in range(env.nrow):
row_policy = []
for col in range(env.ncol):
s = row * env.ncol + col
cell = env.grid[row][col]
# 标记特殊位置
if cell == 'H':
row_policy.append("冰洞")
elif cell == 'G':
row_policy.append("目标")
elif cell == 'S':
row_policy.append("起点")
else:
# 拼接推荐动作
actions = [action_symbols[a] for a in range(env.action_count) if agent.pi[s][a] > 0]
row_policy.append("".join(actions))
print(" | ".join(row_policy))
print("=" * 40)
if __name__ == "__main__":
# 创建冰湖环境(4x4网格,打滑概率20%)
env = FrozenLakeEnv(ncol=4, nrow=4, slip_prob=0.2)
# 初始化并运行价值迭代
agent = ValueIteration(env, theta=1e-6, gamma=0.9)
agent.run()
# 可视化结果
visualize_results(agent)
更多推荐

所有评论(0)