第二章的代码:value_iteration.ipynb及其涉及的其他代码的更新以及注解(gym版本 >= 0.26)(二)

摘要

本系列知识点讲解基于蘑菇书EasyRL中的内容进行详细的疑难点分析!具体内容请阅读蘑菇书EasyRL


对应蘑菇书附书代码——value_iteration.ipynb


value_iteration.ipynb目录下面创建envs文件夹,然后下载simple_grid.py放到envs文件夹中。

# -*- coding: utf-8 -*-

import numpy as np
import sys, os
"""
设置当前路径和父目录
详细说明:
    - os.path.abspath(''):
        - 将当前目录转换为绝对路径。
        - 例如,如果当前工作目录是 /home/user/project/envs,那么 curr_path 得到 /home/user/project/envs。
    - os.path.dirname(curr_path):
        - 返回 curr_path 的父目录。
        - 例如,上例中父目录即为 /home/user/project。
目的:
    - 为了后续将父目录添加到模块搜索路径中,使得项目中其他模块(如 envs/simple_grid.py)能被正确导入。

"""
curr_path = os.path.abspath('')
parent_path = os.path.dirname(curr_path)
"""
作用:
    - 将父目录路径追加到 sys.path 列表中。
原因:
    - Python 在导入模块时,会按照 sys.path 列表中的路径依次查找模块。
      添加父目录后,我们可以在当前项目中导入父目录下的模块。
例子:
    - 如果 envs/simple_grid.py 位于父目录下的 envs 文件夹中,
      这一步就确保了能够通过 from envs.simple_grid import DrunkenWalkEnv 正确导入该模块。
"""
sys.path.append(parent_path)
from envs.simple_grid import DrunkenWalkEnv

def all_seed(env, seed = 1):
    ## 这个函数主要是为了固定随机种子
    import numpy as np
    import random
    import os
    '''固定环境内部随机种子
       seeding.np_random(seed)'''
    env.seed(seed) 
    np.random.seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed) 
    
env = DrunkenWalkEnv(map_name="theAlley")
"""
"theAlley": [
    "S...H...H...G"
],
"""
all_seed(env, seed = 1) # 设置随机种子为1

def value_iteration(env, theta=0.005, discount_factor=0.9):
    """
    定义一个名为 value_iteration 的函数,
    用于计算给定环境 env 中各状态-动作对的最优 Q 值。
    """
    """
    env:环境对象,要求具备属性
        - env.nS:状态总数
        - env.nA:动作总数
        - env.P:状态转移概率字典,其格式为:
            env.P[state][action] = [(probability, next_state, reward, done), ...]
        这些信息通常在我们构造环境时(例如继承自 DiscreteEnv 或
                                    自定义的 DrunkenWalkEnv)就已定义好。
    theta:一个小阈值,用来判断 Q 值变化是否足够小(收敛判断),这里默认设为 0.005。
    discount_factor:折扣因子 γ,用于权衡当前奖励与未来奖励的重要性,默认值为 0.9。
    """
    '''
    Q 表初始化:
    - 使用 np.zeros((env.nS, env.nA)) 创建一个二维数组,形状为 (状态数, 动作数)。
    - 该表格用于存储每个状态下每个动作的 Q 值
      (即:从当前状态采取某动作后,所能获得的累计期望奖励)。
    - 初始时所有 Q 值设为 0。
    举例:假设环境有 16 个状态和 4 个动作,Q 表初始为 16×4 的零矩阵。
    '''
    Q = np.zeros((env.nS, env.nA)) # 初始化一个Q表格
    '''
    计数器 count:
    - 用于记录迭代次数,防止无限循环,当迭代次数超过一定值(例如 100 次)时,
      即使未完全收敛也跳出循环。
    '''
    count = 0
    while True:
        """
        无限循环:使用 while True 不断更新 Q 表,直到满足收敛条件或迭代次数超过上限。
        """
        '''
        delta 变量:
        - 用于记录本次迭代中 Q 值更新的最大变化量。
        - 每次内层循环开始前将其置为 0。
        '''
        delta = 0.0
        '''
        临时 Q 表 Q_tmp:
        - 用于存储当前迭代计算得到的新 Q 值。
        - 这种做法确保每个状态-动作对的更新是“同步”的,
          即所有 Q 值的更新都基于上一轮的 Q 表,而不是边更新边使用最新值。
        '''
        Q_tmp = np.zeros((env.nS, env.nA)) # 13*4
        """
        外层循环:遍历每个状态(state = 0 到 env.nS - 1)。
        内层循环:对每个状态,遍历所有可能动作(a = 0 到 env.nA - 1)。
        在离散 MDP 中,每个状态都有固定数目的可能动作。
        环境对象中定义的 env.P[state][a] 包含了从状态 state 采取动作 a 后所有可能的转移结果。
        """
        for state in range(env.nS):
            for a in range(env.nA):
                '''
                accum 用来计算未来奖励部分,即对下一状态的 Q 值进行加权求和
                '''
                accum = 0.0
                '''
                reward_total 用于累计当前状态-动作对的即时奖励,按概率加权求和
                '''
                reward_total = 0.0
                '''
                对于当前状态 state 和动作 a,从 env.P[state][a] 中获得所有可能转移,每个转移由四个元素组成:
                - prob:该转移发生的概率;
                - next_state:转移后的下一个状态;
                - reward:立即获得的奖励;
                - done:布尔值,指示是否到达终止状态
                        (虽然本算法主要用 Q 值更新,
                          终止状态的影响可能在奖励设置中体现)。
                '''
                '''
                env.P:
                {
                    0: 
                        {0: [(0.8, 0, -0.0, False), (0.1, 0, -0.0, False), (0.1, 0, -0.0, False)], 
                         1: [(0.8, 0, -0.0, False), (0.1, 0, -0.0, False), (0.1, 1, -0.0, False)], 
                         2: [(0.8, 1, -0.0, False), (0.1, 0, -0.0, False), (0.1, 0, -0.0, False)], 
                         3: [(0.8, 0, -0.0, False), (0.1, 1, -0.0, False), (0.1, 0, -0.0, False)]}, 
                    1: 
                        {0: [(0.8, 0, -0.0, False), (0.1, 1, -0.0, False),(0.1, 1, -0.0, False)], 
                         1: [(0.8, 1, -0.0, False), (0.1, 0, -0.0, False), (0.1, 2, -0.0, False)], 
                         2: [(0.8, 2, -0.0, False), (0.1, 1, -0.0, False), (0.1, 1, -0.0, False)], 
                         3: [(0.8, 1, -0.0, False), (0.1, 2, -0.0, False), (0.1, 0, -0.0, False)]}, 
                    2: 
                        {0: [(0.8, 1, -0.0, False), (0.1, 2, -0.0, False), (0.1, 2, -0.0, False)], 
                         1: [(0.8, 2, -0.0, False), (0.1, 1, -0.0, False), (0.1, 3, -0.0, False)], 
                         2: [(0.8, 3, -0.0, False), (0.1, 2, -0.0, False), (0.1, 2, -0.0, False)], 
                         3: [(0.8, 2, -0.0, False), (0.1, 3, -0.0, False), (0.1, 1, -0.0, False)]}, 
                 ...
                    11: 
                        {0: [(0.8, 10, -0.0, False), (0.1, 11, -0.0, False), (0.1, 11, -0.0, False)],
                         1: [(0.8, 11, -0.0, False), (0.1, 10, -0.0, False), (0.1, 12, 10, True)], 
                         2: [(0.8, 12, 10, True), (0.1, 11, -0.0, False), (0.1, 11, -0.0, False)], 
                         3: [(0.8, 11, -0.0, False), (0.1, 12, 10, True), (0.1, 10, -0.0, False)]}, 
                    12: {0: [(1.0, 12, 0, True), (0.8, 11, -0.0, False), (0.1, 12, 10, True), (0.1, 12, 10, True)], 
                         1: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 11, -0.0, False), (0.1, 12, 10, True)], 
                         2: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 12, 10, True), (0.1, 12, 10, True)], 
                         3: [(1.0, 12, 0, True), (0.8, 12, 10, True), (0.1, 12, 10, True), (0.1, 11, -0.0, False)]}
                 }
                '''
                for prob, next_state, reward, done in env.P[state][a]:
                    '''
                    未来奖励累积:
                    - 对于每个可能的下一个状态,选择该状态所有动作中 Q 值最大的那一项(即采用贪婪策略选择最优未来奖励)。
                    - 用该值乘以转移概率,再累加到 accum 上。
                    - 这反映了贝尔曼最优方程中“未来奖励”的部分,即期望未来回报的折扣加权和。
                    '''
                    accum += prob* np.max(Q[next_state, :])
                    '''
                    即时奖励累积:
                    - 同样,对每个转移,按其概率加权当前获得的奖励,累加到 reward_total 中。
                    '''
                    reward_total += prob * reward
                    """
                    举例说明:
                    假设在某状态 s 采取动作 a 后,有两个可能转移:
                    - 50% 的概率转移到状态 s',获得奖励 10,且 np.max(Q[s', :]) 当前为 20;
                    - 50% 的概率转移到状态 s'',获得奖励 0,且 np.max(Q[s'', :]) 当前为 5。
                    则:
                    - accum = 0.5 * 20 + 0.5 * 5 = 10 + 2.5 = 12.5
                    - reward_total = 0.5 * 10 + 0.5 * 0 = 5 + 0 = 5
                    """
                '''
                贝尔曼更新:
                - 计算新的 Q 值,公式为𝑄(𝑠,𝑎)=即时奖励+𝛾×未来奖励期望
                                      Q(s,a)=即时奖励+γ×未来奖励期望
                - 其中 discount_factor 作为折扣因子 γ,用于折扣未来奖励。
                保存结果:
                - 将计算结果存入临时表 Q_tmp[state, a]。
                '''
                Q_tmp[state, a] = reward_total + discount_factor * accum
                '''
                计算最大变化:
                - 计算新旧 Q 值的绝对差值;
                - 更新 delta,使其始终保持本次迭代中所有状态-动作对更新幅度的最大值。
                - 这个 delta 用于判断收敛性:如果所有 Q 值变化都很小(小于 theta),则认为算法收敛。
                '''
                delta = max(delta, abs(Q_tmp[state, a] - Q[state, a]))
        '''
        同步更新:
        - 用临时表 Q_tmp 更新 Q 表,这样下一轮迭代时所有状态-动作对的更新均基于本次迭代得到的新值。
        '''
        Q = Q_tmp
        '''
        迭代计数:
        - 记录一次完整的状态-动作更新过程。
        '''
        count += 1
        '''
        终止判断:
        - 当所有状态-动作对的更新幅度 delta 小于阈值 theta 时,认为 Q 表已经收敛,停止迭代;
        - 或者如果迭代次数超过 100 次,也强制退出循环以防止无限循环(即使未收敛)。
        '''
        if delta < theta or count > 100: # 这里设置了即使算法没有收敛,跑100次也退出循环
            break 
    return Q

Q = value_iteration(env)
print(Q)


policy = np.zeros([env.nS, env.nA]) # 初始化一个策略表格
'''
循环遍历每个状态:
- 使用 for state in range(env.nS) 遍历所有状态(状态编号从 0 到 env.nS-1)。
'''
for state in range(env.nS):
    '''
    对于当前状态 state,
    - Q[state, :] 表示所有可能动作对应的 Q 值。
    - np.argmax 返回数组中最大值的索引,即选出具有最高期望回报的动作。
    - 例如,若 Q[2, :] = [1.0, 5.5, 3.2],那么 np.argmax 返回 1,因为 5.5 是最大值。
    '''
    best_action = np.argmax(Q[state, :]) #根据价值迭代算法得到的Q表格选择出策略
    '''
    policy[state, best_action] = 1 将对应状态行中最佳动作位置的元素设为 1。
    - 这样,每个状态对应的行就变成了 one-hot 向量,其中 1 的位置表示最佳动作,其他位置为 0。
    '''
    policy[state, best_action] = 1

# policy = [int(np.argwhere(policy[i]==1)) for i in range(env.nS) ]
# policy = [int(np.argwhere(policy[i] == 1)[0][0]) for i in range(env.nS)]
policy = [np.argwhere(policy[i] == 1)[0][0].item() for i in range(env.nS)]
print(policy)

num_episode = 1000 # 测试1000次
def test(env,policy):
    rewards = []  # 记录所有回合的奖励
    success = []  # 记录该回合是否成功走到终点
    '''
    每个回合都是从环境重置开始,直到遇到终止条件(done == True)结束。
    '''
    for i_ep in range(num_episode):
        ep_reward = 0  # 记录每个episode的reward
        state = env.reset()  # 重置环境, 重新开一局(即开始新的一个回合) 这里state=0
        while True:
            action = policy[state]  # 根据算法选择一个动作
            next_state, reward, done, _ = env.step(action)  # 与环境进行一个交互
            state = next_state  # 更新状态
            ep_reward += reward
            if done:
                break
        if state==12: # 即走到终点
            success.append(1)
        else:
            success.append(0)
        rewards.append(ep_reward)
    acc_suc = np.array(success).sum()/num_episode
    print("测试的成功率是:", acc_suc)
test(env, policy)
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐