基于“蘑菇书”的强化学习知识点(十):第二章的代码:value_iteration.ipynb及其涉及的其他代码的更新以及注解(gym版本 >= 0.26)(二)
第二章的代码:value_iteration.ipynb及其涉及的其他代码的更新以及注解(gym版本 >= 0.26)(二)
·
第二章的代码:value_iteration.ipynb及其涉及的其他代码的更新以及注解(gym版本 >= 0.26)(二)
摘要
本系列知识点讲解基于蘑菇书EasyRL中的内容进行详细的疑难点分析!具体内容请阅读蘑菇书EasyRL!
对应蘑菇书附书代码——value_iteration.ipynb
在value_iteration.ipynb目录下面创建envs文件夹,然后下载simple_grid.py放到envs文件夹中。
# -*- coding: utf-8 -*-
import numpy as np
import sys, os
"""
设置当前路径和父目录
详细说明:
- os.path.abspath(''):
- 将当前目录转换为绝对路径。
- 例如,如果当前工作目录是 /home/user/project/envs,那么 curr_path 得到 /home/user/project/envs。
- os.path.dirname(curr_path):
- 返回 curr_path 的父目录。
- 例如,上例中父目录即为 /home/user/project。
目的:
- 为了后续将父目录添加到模块搜索路径中,使得项目中其他模块(如 envs/simple_grid.py)能被正确导入。
"""
curr_path = os.path.abspath('')
parent_path = os.path.dirname(curr_path)
"""
作用:
- 将父目录路径追加到 sys.path 列表中。
原因:
- Python 在导入模块时,会按照 sys.path 列表中的路径依次查找模块。
添加父目录后,我们可以在当前项目中导入父目录下的模块。
例子:
- 如果 envs/simple_grid.py 位于父目录下的 envs 文件夹中,
这一步就确保了能够通过 from envs.simple_grid import DrunkenWalkEnv 正确导入该模块。
"""
sys.path.append(parent_path)
from envs.simple_grid import DrunkenWalkEnv
def all_seed(env, seed = 1):
## 这个函数主要是为了固定随机种子
import numpy as np
import random
import os
'''固定环境内部随机种子
seeding.np_random(seed)'''
env.seed(seed)
np.random.seed(seed)
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
env = DrunkenWalkEnv(map_name="theAlley")
"""
"theAlley": [
"S...H...H...G"
],
"""
all_seed(env, seed = 1) # 设置随机种子为1
def value_iteration(env, theta=0.005, discount_factor=0.9):
"""
定义一个名为 value_iteration 的函数,
用于计算给定环境 env 中各状态-动作对的最优 Q 值。
"""
"""
env:环境对象,要求具备属性
- env.nS:状态总数
- env.nA:动作总数
- env.P:状态转移概率字典,其格式为:
env.P[state][action] = [(probability, next_state, reward, done), ...]
这些信息通常在我们构造环境时(例如继承自 DiscreteEnv 或
自定义的 DrunkenWalkEnv)就已定义好。
theta:一个小阈值,用来判断 Q 值变化是否足够小(收敛判断),这里默认设为 0.005。
discount_factor:折扣因子 γ,用于权衡当前奖励与未来奖励的重要性,默认值为 0.9。
"""
'''
Q 表初始化:
- 使用 np.zeros((env.nS, env.nA)) 创建一个二维数组,形状为 (状态数, 动作数)。
- 该表格用于存储每个状态下每个动作的 Q 值
(即:从当前状态采取某动作后,所能获得的累计期望奖励)。
- 初始时所有 Q 值设为 0。
举例:假设环境有 16 个状态和 4 个动作,Q 表初始为 16×4 的零矩阵。
'''
Q = np.zeros((env.nS, env.nA)) # 初始化一个Q表格
'''
计数器 count:
- 用于记录迭代次数,防止无限循环,当迭代次数超过一定值(例如 100 次)时,
即使未完全收敛也跳出循环。
'''
count = 0
while True:
"""
无限循环:使用 while True 不断更新 Q 表,直到满足收敛条件或迭代次数超过上限。
"""
'''
delta 变量:
- 用于记录本次迭代中 Q 值更新的最大变化量。
- 每次内层循环开始前将其置为 0。
'''
delta = 0.0
'''
临时 Q 表 Q_tmp:
- 用于存储当前迭代计算得到的新 Q 值。
- 这种做法确保每个状态-动作对的更新是“同步”的,
即所有 Q 值的更新都基于上一轮的 Q 表,而不是边更新边使用最新值。
'''
Q_tmp = np.zeros((env.nS, env.nA)) # 13*4
"""
外层循环:遍历每个状态(state = 0 到 env.nS - 1)。
内层循环:对每个状态,遍历所有可能动作(a = 0 到 env.nA - 1)。
在离散 MDP 中,每个状态都有固定数目的可能动作。
环境对象中定义的 env.P[state][a] 包含了从状态 state 采取动作 a 后所有可能的转移结果。
"""
for state in range(env.nS):
for a in range(env.nA):
'''
accum 用来计算未来奖励部分,即对下一状态的 Q 值进行加权求和
'''
accum = 0.0
'''
reward_total 用于累计当前状态-动作对的即时奖励,按概率加权求和
'''
reward_total = 0.0
'''
对于当前状态 state 和动作 a,从 env.P[state][a] 中获得所有可能转移,每个转移由四个元素组成:
- prob:该转移发生的概率;
- next_state:转移后的下一个状态;
- reward:立即获得的奖励;
- done:布尔值,指示是否到达终止状态
(虽然本算法主要用 Q 值更新,
终止状态的影响可能在奖励设置中体现)。
'''
'''
env.P:
{
0:
{0: [(0.8, 0, -0.0, False), (0.1, 0, -0.0, False), (0.1, 0, -0.0, False)],
1: [(0.8, 0, -0.0, False), (0.1, 0, -0.0, False), (0.1, 1, -0.0, False)],
2: [(0.8, 1, -0.0, False), (0.1, 0, -0.0, False), (0.1, 0, -0.0, False)],
3: [(0.8, 0, -0.0, False), (0.1, 1, -0.0, False), (0.1, 0, -0.0, False)]},
1:
{0: [(0.8, 0, -0.0, False), (0.1, 1, -0.0, False),(0.1, 1, -0.0, False)],
1: [(0.8, 1, -0.0, False), (0.1, 0, -0.0, False), (0.1, 2, -0.0, False)],
2: [(0.8, 2, -0.0, False), (0.1, 1, -0.0, False), (0.1, 1, -0.0, False)],
3: [(0.8, 1, -0.0, False), (0.1, 2, -0.0, False), (0.1, 0, -0.0, False)]},
2:
{0: [(0.8, 1, -0.0, False), (0.1, 2, -0.0, False), (0.1, 2, -0.0, False)],
1: [(0.8, 2, -0.0, False), (0.1, 1, -0.0, False), (0.1, 3, -0.0, False)],
2: [(0.8, 3, -0.0, False), (0.1, 2, -0.0, False), (0.1, 2, -0.0, False)],
3: [(0.8, 2, -0.0, False), (0.1, 3, -0.0, False), (0.1, 1, -0.0, False)]},
...
11:
{0: [(0.8, 10, -0.0, False), (0.1, 11, -0.0, False), (0.1, 11, -0.0, False)],
1: [(0.8, 11, -0.0, False), (0.1, 10, -0.0, False), (0.1, 12, 10, True)],
2: [(0.8, 12, 10, True), (0.1, 11, -0.0, False), (0.1, 11, -0.0, False)],
3: [(0.8, 11, -0.0, False), (0.1, 12, 10, True), (0.1, 10, -0.0, False)]},
12: {0: [(1.0, 12, 0, True), (0.8, 11, -0.0, False), (0.1, 12, 10, True), (0.1, 12, 10, True)],
1: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 11, -0.0, False), (0.1, 12, 10, True)],
2: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 12, 10, True), (0.1, 12, 10, True)],
3: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 12, 10, True), (0.1, 11, -0.0, False)]}
}
'''
for prob, next_state, reward, done in env.P[state][a]:
'''
未来奖励累积:
- 对于每个可能的下一个状态,选择该状态所有动作中 Q 值最大的那一项(即采用贪婪策略选择最优未来奖励)。
- 用该值乘以转移概率,再累加到 accum 上。
- 这反映了贝尔曼最优方程中“未来奖励”的部分,即期望未来回报的折扣加权和。
'''
accum += prob* np.max(Q[next_state, :])
'''
即时奖励累积:
- 同样,对每个转移,按其概率加权当前获得的奖励,累加到 reward_total 中。
'''
reward_total += prob * reward
"""
举例说明:
假设在某状态 s 采取动作 a 后,有两个可能转移:
- 50% 的概率转移到状态 s',获得奖励 10,且 np.max(Q[s', :]) 当前为 20;
- 50% 的概率转移到状态 s'',获得奖励 0,且 np.max(Q[s'', :]) 当前为 5。
则:
- accum = 0.5 * 20 + 0.5 * 5 = 10 + 2.5 = 12.5
- reward_total = 0.5 * 10 + 0.5 * 0 = 5 + 0 = 5
"""
'''
贝尔曼更新:
- 计算新的 Q 值,公式为𝑄(𝑠,𝑎)=即时奖励+𝛾×未来奖励期望
Q(s,a)=即时奖励+γ×未来奖励期望
- 其中 discount_factor 作为折扣因子 γ,用于折扣未来奖励。
保存结果:
- 将计算结果存入临时表 Q_tmp[state, a]。
'''
Q_tmp[state, a] = reward_total + discount_factor * accum
'''
计算最大变化:
- 计算新旧 Q 值的绝对差值;
- 更新 delta,使其始终保持本次迭代中所有状态-动作对更新幅度的最大值。
- 这个 delta 用于判断收敛性:如果所有 Q 值变化都很小(小于 theta),则认为算法收敛。
'''
delta = max(delta, abs(Q_tmp[state, a] - Q[state, a]))
'''
同步更新:
- 用临时表 Q_tmp 更新 Q 表,这样下一轮迭代时所有状态-动作对的更新均基于本次迭代得到的新值。
'''
Q = Q_tmp
'''
迭代计数:
- 记录一次完整的状态-动作更新过程。
'''
count += 1
'''
终止判断:
- 当所有状态-动作对的更新幅度 delta 小于阈值 theta 时,认为 Q 表已经收敛,停止迭代;
- 或者如果迭代次数超过 100 次,也强制退出循环以防止无限循环(即使未收敛)。
'''
if delta < theta or count > 100: # 这里设置了即使算法没有收敛,跑100次也退出循环
break
return Q
Q = value_iteration(env)
print(Q)
policy = np.zeros([env.nS, env.nA]) # 初始化一个策略表格
'''
循环遍历每个状态:
- 使用 for state in range(env.nS) 遍历所有状态(状态编号从 0 到 env.nS-1)。
'''
for state in range(env.nS):
'''
对于当前状态 state,
- Q[state, :] 表示所有可能动作对应的 Q 值。
- np.argmax 返回数组中最大值的索引,即选出具有最高期望回报的动作。
- 例如,若 Q[2, :] = [1.0, 5.5, 3.2],那么 np.argmax 返回 1,因为 5.5 是最大值。
'''
best_action = np.argmax(Q[state, :]) #根据价值迭代算法得到的Q表格选择出策略
'''
policy[state, best_action] = 1 将对应状态行中最佳动作位置的元素设为 1。
- 这样,每个状态对应的行就变成了 one-hot 向量,其中 1 的位置表示最佳动作,其他位置为 0。
'''
policy[state, best_action] = 1
# policy = [int(np.argwhere(policy[i]==1)) for i in range(env.nS) ]
# policy = [int(np.argwhere(policy[i] == 1)[0][0]) for i in range(env.nS)]
policy = [np.argwhere(policy[i] == 1)[0][0].item() for i in range(env.nS)]
print(policy)
num_episode = 1000 # 测试1000次
def test(env,policy):
rewards = [] # 记录所有回合的奖励
success = [] # 记录该回合是否成功走到终点
'''
每个回合都是从环境重置开始,直到遇到终止条件(done == True)结束。
'''
for i_ep in range(num_episode):
ep_reward = 0 # 记录每个episode的reward
state = env.reset() # 重置环境, 重新开一局(即开始新的一个回合) 这里state=0
while True:
action = policy[state] # 根据算法选择一个动作
next_state, reward, done, _ = env.step(action) # 与环境进行一个交互
state = next_state # 更新状态
ep_reward += reward
if done:
break
if state==12: # 即走到终点
success.append(1)
else:
success.append(0)
rewards.append(ep_reward)
acc_suc = np.array(success).sum()/num_episode
print("测试的成功率是:", acc_suc)
test(env, policy)
更多推荐
所有评论(0)