一、概念

        DQN(Deep Q-Network)是深度强化学习领域的一种算法,它将深度学习与强化学习相结合,用于解决高维状态空间的强化学习问题。传统的Q-Learning算法使用MDP(马尔可夫决策过程)表格来存储状态-动作对的价值函数。随着状态数量的增加,这种方法的学习效率会急剧下降,特别是当状态空间非常大时,Q-Learning算法可能难以求解,这被称为“维数灾难”。通俗来说,Q-Learning算法适合在有限状态和有限动作的场景中使用,否则Q值表将会无法覆盖所有可能的情况。因此,DQN利用神经网络去拟合Q-Learning中的Q表。这样,即使状态空间非常大,DQN也能够有效地处理,因为它可以通过神经网络的泛化能力来近似状态-动作对的价值函数

        此外,DQN算法还引入了一些关键技巧,如经验回放(Experience Replay)固定Q目标(Fixed Q-targets)。经验回放技术通过存储智能体与环境交互的经验,并在训练时从中随机采样一批经验用于训练,从而打破了数据之间的相关性,提高了模型的泛化能力。固定Q目标技术则通过固定一个目标网络来生成目标Q值,并定期将策略网络的参数复制到目标网络,从而避免了训练过程中的不稳定性和振荡问题(使用了两个相同结构的神经网络来实现)。

二、模型原理

        DQN的核心思想是使用神经网络来逼近状态-动作对的值函数Q(s, a),其中s表示状态,a表示动作。传统的Q-Learning维护一个Q值表,对于当前状态,我们通过查表的方式来找到最佳动作;而DQN则训练一个Q网络,实现f(s)=a的映射函数,从而避免了维度灾难。DQN算法主要包括以下几个关键部分:

1、经验回放机制(Experience Replay)

        经验回放机制通过存储agent与环境交互产生的经验(即状态、动作、奖励和下一状态的四元组),并在训练过程中随机采样这些经验来更新网络。这样做的好处是打破了数据之间的时间相关性,使得输入到网络中的数据更符合独立同分布,有助于网络的稳定训练。

2、目标网络(Target Network)

        目标网络用于计算目标Q值,其参数是延后更新的,通常是在主网络(即Q网络)参数更新一定步数后,将主网络的参数复制给目标网络。引入目标网络可以减少在训练过程中由于目标Q值的频繁变化而导致的网络训练不稳定问题

3、损失函数(Loss Function)

        DQN的损失函数基于均方误差(MSE)来计算预测Q值与目标Q值之间的差异。通过梯度下降等优化算法来最小化这个损失函数,从而更新Q网络的参数。

三、建模流程

1、初始化

  • 初始化经验池(用于存储经验)。
  • 随机初始化Q网络(即主网络)的参数。
  • 初始化目标网络,其参数与Q网络相同(或为其副本)。

2、获取初始状态

  • 重置环境,获得第一个状态。

3、选择动作

  • 使用ε-greedy策略生成一个动作:当前状态下生成一个随机数,如果随机数小于ε,则随机选择一个动作(探索模式,开启探索模式的概率为ε);否则选择使当前Q值最大的动作(利用模式,开启该模式的概率为1-ε)。

4、执行动作并获取反馈

  • 根据选择的动作与环境进行交互,获得反馈的奖励、下一个状态和是否触发终止条件。

5、存储经验

  • 将经验(即状态、动作、奖励和下一个状态)存入经验池。

6、采样经验并更新网络

  • 从经验池中随机获取一个minibatch的经验。
  • 使用目标网络计算目标Q值。
  • 根据预测Q值和目标Q值计算损失函数。
  • 使用优化算法(如梯度下降)更新Q网络的参数。

7、更新目标网络

  • 每隔一定步数(如每N步),将Q网络的参数复制给目标网络。

8、判断是否终止

  • 如果达到终止条件(如达到最大迭代次数或满足某个性能指标),则训练结束。
  • 否则,回到步骤3继续训练。

四、python实现

        这里我们使用gym库来构建交互环境。gym库由OpenAI开发,用于强化学习的实践探索,它能够通过简单的接口来实现智能体与环境的交互。

        我们在这里构建了一个CartPole的游戏环境,这个游戏中有一台随机左右移动的小车,小车上有一根可以360度旋转的棍子,我们的目标是在小车的移动过程中,在尽可能长的时间内保持棍子竖直向上(可倾斜一定角度,默认是12度以内)而不倒下来,一旦棍子与Y轴的夹角超过12度则任务失败(有点像耍杂技里面的头顶N个大碗表演)。例如,棍子向左倒的时候我们控制小车向右,棍子向右倒的时候控制小车向左。

# 导入必要的库
import torch
from torch import nn
import numpy as np
import gym
import matplotlib.pyplot as plt
device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'


# 定义Q网络
class QNet(nn.Module):
    def __init__(self, STATE_SIZE, ACTION_SIZE):
        super(QNet, self).__init__()
        self.flatten = nn.Flatten()
        # 构建一个简单的全连接神经网络
        self.layers = nn.Sequential(
            nn.Linear(STATE_SIZE, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, ACTION_SIZE)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.layers(x)
        return logits


# 定义DQN_Net
class DQN_Net(object):
    def __init__(self, EPSILON, MEMORY_CAPACITY, STATE_SIZE, ACTION_SIZE, TARGET_NET_UPDATE, BATCH_SIZE, GAMMA, EPSILON_DECAY, lr):
        # 初始化策略网络和目标网络
        self.EPSILON = EPSILON
        self.MEMORY_CAPACITY = MEMORY_CAPACITY
        self.STATE_SIZE = STATE_SIZE
        self.ACTION_SIZE = ACTION_SIZE
        self.TARGET_NET_UPDATE = TARGET_NET_UPDATE
        self.BATCH_SIZE = BATCH_SIZE
        self.GAMMA = GAMMA
        self.EPSILON_DECAY = EPSILON_DECAY

        self.strategy_net = QNet(self.STATE_SIZE, self.ACTION_SIZE).to(device)
        self.target_net = QNet(self.STATE_SIZE, self.ACTION_SIZE).to(device)
        self.target_net.load_state_dict(self.strategy_net.state_dict())
        self.learn_step = 0  # 初始化学习步数记录
        self.memory_counter = 0  # 初始化回放池存储量
        # 存储空间容量:(s, a, r, s_next),s和s_next都是维度STATE_SIZE的向量,a、r为标量
        self.memory = np.zeros((self.MEMORY_CAPACITY, self.STATE_SIZE * 2 + 2))
        self.optimizer = torch.optim.Adam(self.strategy_net.parameters(), lr=lr)
        self.loss_func = nn.MSELoss()

    def choose_action(self, x):
        # 将输入x转换为FloatTensor,增加一个维度
        x = torch.unsqueeze(torch.FloatTensor(x), 0).to(device)

        # 核心部分
        action_value = self.strategy_net(x)
        # e-greedy动作选择:小于阈值,随机探索;大于阈值,贪婪策略
        if np.random.uniform() < self.EPSILON:
            action = np.random.randint(0, self.ACTION_SIZE)
        else:
            action_value = self.strategy_net(x)  # 神经网络模拟的动作值
            action = torch.max(action_value, 1)[1].data.cpu().numpy()
            action = action[0]

        return action

    def store_transition(self, s, a, r, s_next):
        transition = np.hstack((s, a, r, s_next))
        # 存储新数据,如果memory还没有满,则直接在后面插入新数据,否则覆盖之前的旧数据
        index = self.memory_counter % self.MEMORY_CAPACITY
        self.memory[index, :] = transition
        self.memory_counter += 1

    def update_epsilon(self):
        # 自定义epsilon衰减函数,目的是随着训练的进行,智能体趋于利用已有知识,而非继续探索
        self.EPSILON = max(self.EPSILON*self.EPSILON_DECAY, 0.05)

    def learn(self):
        # 如果到达目标网络更新轮数,将策略网络的参数给目标网络
        if self.learn_step % self.TARGET_NET_UPDATE == 0:
            self.target_net.load_state_dict(self.strategy_net.state_dict())
        self.learn_step += 1

        # 随机抽取BATCH_SIZE个历史经验数据
        sample_index = np.random.choice(self.MEMORY_CAPACITY, self.BATCH_SIZE)
        b_memory = self.memory[sample_index, :]

        # 分别取出当前batch内的s、a、r和s_next的值
        b_s = torch.FloatTensor(b_memory[:, :self.STATE_SIZE]).to(device)
        b_a = torch.LongTensor(b_memory[:, self.STATE_SIZE:self.STATE_SIZE + 1]).to(device)
        b_r = torch.FloatTensor(b_memory[:, self.STATE_SIZE + 1:self.STATE_SIZE + 2]).to(device)
        b_s_next = torch.FloatTensor(b_memory[:, -self.STATE_SIZE:]).to(device)

        # 使用策略网络计算当前状态下每个动作的Q值,并选择实际采取的动作对应的Q值
        q_eval = self.strategy_net(b_s).gather(1, b_a)
        # 使用目标网络计算下一个状态下的Q值,并阻止梯度回传。
        q_next = self.target_net(b_s_next).detach()
        # TD目标值提供了一个介于真实环境反馈(即时奖励)和模型预测(未来预期奖励)之间的估计,它用于指导智能体学习如何在当前状态下选择最佳动作。
        q_target = b_r + self.GAMMA * q_next.max(1)[0].view(self.BATCH_SIZE, 1)

        # 损失函数计算当前Q值估计(q_eval)与目标Q值(q_target)之间的差异,模型的目的是最小化这个差异
        loss = self.loss_func(q_eval, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


if __name__ == '__main__':
    BATCH_SIZE = 32
    lr = 0.01
    EPSILON = 0.4  # 贪婪阈值
    GAMMA = 0.8  # 折扣系数
    TARGET_NET_UPDATE = 50  # 目标网络更新频率
    # 定义epsilon的衰减因子
    EPSILON_DECAY = 0.95
    MEMORY_CAPACITY = 1500  # 经验回放池大小
    
    # 假设在这个任务中,我们的目标是智能体单轮游戏的总奖励超过500
    target_reward = 500

    # 初始化环境
    # 'CartPole-v1'是一个经典的控制任务,目标是保持一个倒立摆(cartpole)直立。
    # render_mode='human'参数指定了渲染模式,这里设置为'human'表示环境将在一个可视化窗口中渲染,以便人类观察智能体的行为。
    env = gym.make('CartPole-v1', render_mode='human')
    # 获取环境的观测空间(即状态空间)的大小,.shape属性返回状态空间的形状,这里通过索引[0]获取形状的第一个维度,即状态向量的维度数。
    STATE_SIZE = env.observation_space.shape[0]
    # .n属性返回动作空间中动作的数量,即智能体在每个状态下可以执行的不同动作的数量。
    ACTION_SIZE = env.action_space.n

    dqn = DQN_Net(EPSILON, MEMORY_CAPACITY, STATE_SIZE, ACTION_SIZE, TARGET_NET_UPDATE, BATCH_SIZE, GAMMA, EPSILON_DECAY, lr)
    reward_list = []

    for i in range(500):
        # 定期更新epsilon,不宜太频繁
        if i>0 and i%10==0:
            dqn.update_epsilon()

        # 重置环境并获取初始的状态
        s, _ = env.reset()
        # 初始化本轮训练的总奖励和为0
        episode_reward_sum = 0
        print(f'---------------第{i}轮训练---------------')
        while True:
            # 渲染环境,即显示游戏画面
            env.render()

            # 使用DQN网络选择一个动作a
            a = dqn.choose_action(s)
            # 执行动作a,获取下一个状态s_next,奖励r,结束标志done,以及其他信息
            s_next, r, done, info, _ = env.step(a)

            # 从下一个状态中提取小车的位置、速度、杆的角度和角速度
            x, x_dot, theta, theta_dot = s_next
            # 计算小车偏离中心的惩罚项r1
            r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
            # 计算杆偏离垂直的惩罚项r2
            r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
            # 计算新的奖励new_r,它是原始奖励和惩罚项的组合
            new_r = r1 + r2

            # 将状态转移元组存储到DQN的回放池中
            dqn.store_transition(s, a, new_r, s_next)
            # 更新本轮训练的总奖励和
            episode_reward_sum += r
            # 更新当前状态为下一个状态
            s = s_next

            # 如果回放池中的存储量超过了容量限制,则调用DQN的学习方法,从回放池中抽取样本进行学习
            if dqn.memory_counter > MEMORY_CAPACITY:
                dqn.learn()

            if done:
                print(f'episode:{i},reward_sum:{episode_reward_sum}')
                reward_list.append(episode_reward_sum)
                break

        # 当总奖励超过target_reward时停止训练智能体
        if reward_list[-1]>target_reward:
            break

    x_list = [x for x in range(0, len(reward_list))]
    plt.title('Episode Reward')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.plot(x_list, reward_list)
    plt.show()

        通过多轮的学习,智能体逐渐学习到了较好的控制策略,进而使得游戏能够获得的总奖励越来越多。这得益于我们设计了epsilon随着迭代次数的增加而定期减小的策略,否则如果始终保持epsilon在一个较小的阈值,则智能体探索的行为将会很慢;而如果始终保持epsilon在一个较大的阈值则智能体的学习过程会出现震荡。

五、总结

        本文给出的示例充分体现了强化学习中的DQN算法的有效性。通读代码可以发现,该算法与Q-Learning算法的核心都是ε-greedy策略,通过调整超参数ε来控制智能体的学习效果。一般来说,ε应当随着迭代次数的增加而逐渐减小,即智能体从探索行为逐渐转向稳健的策略利用行为。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐