写在前面

参考学习古月居的博文:

深度强化学习专栏 —— 2.手撕DQN算法实现CartPole控制

本文记录自己复现上述文章的学习感悟。

理论方法

参考论文:
[1]《Playing Atari with Deep Reinforcement Learning
[2]《Human-level control through deep reinforcement learning
[3]《Neuronlike adaptive elements that can solve difficult learning control problems

Deep Q-learning (DQN)算法的伪代码如下图所示:
出自文献[1]

图1. 出自文献[1]

在这里插入图片描述

图2. 出自文献[2]


参数

图2. 算法参数,出自文献[2]

Gym的Cart Pole问题

在这里插入图片描述
一根杆子通过非驱动接头连接到小车上,小车沿着无摩擦的轨道移动。摆锤垂直放置在小车上,目标是通过在小车上向左和向右施加力来平衡杆
在这里插入图片描述

Action space

Action一个形状为 (1,) 的 ndarray,它可以采用值 {0, 1} 指示推动小车的固定力方向。
0:将小车向左推
1:将小车向右推
注意:由施加力引起的速度变化是不固定的,它取决于杆所指向的角度,因为杆的重心一直在变,会改变力的作用效果。

Observation Space

Observation是一个形状为 (4,) 的 ndarray,其值对应于以下位置和速度:
在这里插入图片描述注意:上面表示的是每个元素Observation Space的可能值,但它并不表示终止episode的临界状态。判定episode终止的条件是:

  1. 车的位置可以取(-4.8, 4.8) 之间的值,但如果车超过 (-2.4, 2.4) 范围,则episode终止。
  2. 杆的角度可以取 (-0.418, 0.418) 弧度(或 ±24°)之间的值,但如果角度超过 (-0.2095, 0.2095)(或 ±12°)范围,则事件episode终止。

Rewards

目标是尽可能长时间地保持杆子直立,因此每采取一步(包括终止步骤)都会获得 +1 的奖励。 v1 的奖励阈值是 500,v0 的奖励阈值是 200。

Starting State

所有Observations均分配为 (-0.05, 0.05) 范围内的均匀随机值

Episode End

如果发生以下任何一种情况,则该episode结束:

  1. 终止:杆转角大于±12°
  2. 终止:小车位置大于 ±2.4(小车中心到达显示屏边缘)
  3. 截断:episode长度大于 500(v0 的为 200)

Arguments

import gymnasium as gym
gym.make('CartPole-v1')

算法实现

算法模块结构
1)网络结构
2)模型初始化
3)动作选择规则
4)学习优化过程

网络结构

对于标量或者向量类型的数据,使用含有至少一个隐藏层的全连接网络即可,也即多层感知机,如下图所示:
在这里插入图片描述多层感知机的网络结构包括隐藏层数、输入层维度、输出层维度等;
多层感知机的网络参数包括每层的网络权重。

算法的范畴:神经网络结构和强化学习算法;
环境的范畴:算法的输入信息由环境的输出信息决定,算法的输出信息由环境的输入信息决定。

针对小车倒立摆模型,参考文献[3],可知:
在这里插入图片描述
系统的非线性微分方程为:
在这里插入图片描述
由此可知系统的状态量为

在这里插入图片描述
因此,状态信息是 1 × 4 1\times4 1×4维的 [ x , x ˙ , θ , θ ˙ ] [x,\dot x,\theta,\dot \theta] [x,x˙,θ,θ˙],奖励信息是代表奖励信息的1个标量值,动作是向左或向右运动。

环境输出的状态信息决定了算法的输入信息形状是1x4,环境只接受1个动作值决定了算法最终的输出信息形状是1。

算法的输入信息是4个元素的向量,而输出是1个标量值。

使用神经网络来估计动作状态价值函数(Q(s,a))的值,意味着网络的输出层的形状要和动作的个数对应,即是包含2个元素,输入层4个元素。

准备

Pytorch

首先查看pytorch是否安装,在终端输入python,进入python环境

python
import torch
print(torch.__version__)

在这里插入图片描述这里提示没有安装torch,参考“强化学习入门-Pytorch(CPU版本)简介、安装与测试运行”。

遇到的ModuleNotFoundError

# ModuleNotFoundError: No module named 'zipp'
pip install zipp
# ModuleNotFoundError: No module named 'matplotlib'
conda install matplotlib
# ModuleNotFoundError: No module named 'cartpole_swingup_envs' 
# git的时候关闭代理,否则报错atal: 无法访问 '链接' GnuTLS recv error (-110): The TLS connection was non-properly terminated.
git clone https://github.com/jfpettit/cartpole-swingup-envs.git
pip install -e cartpole-swingup-envs
# ModuleNotFoundError: No module named 'torchviz'
pip install torchviz

安装好后,开始编写算法程序

算法程序

cd rl/src
touch dqn_cartpole.py

编辑dqn_cartpole.py文件

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

import cartpole_swingup_envs
from torchviz import make_dot, make_dot_from_trace

# 超参数
# 论文中的参数对应的是处理图片和卷积网络的参数,对于Gym CartPole环境来说,输入到模型的是数字数据,不适宜使用卷积网络,所以参数值会相应的改变
# 计算每个随机梯度下降 (SGD) 更新的训练案例数
BATCH_SIZE = 32 
# 学习率           
LR = 0.01    
# 贪心算法的选择概率              
EPSILON = 0.9               
GAMMA = 0.9                
TARGET_REPLACE_ITER = 100  
# replay memory size 随机梯度下降更新是从这个数量的最新帧中采样的。
MEMORY_CAPACITY = 2000     
EPISODE=2000                

# 创建环境对象
#env = gym.make('CartPoleSwingUpDiscrete-v0')
env = gym.make('CartPole-v0')
# gym的多数环境都用TimeLimit(源码)包装了,以限制Epoch,就是step的次数限制,超过一定次数就会失败
# 用env.unwrapped可以得到原始的类,不受限制
# 还原env的原始设置,env外包了一层防作弊层
# 打开限制
env = env.unwrapped
#env = gym.wrappers.Monitor(env, './video/',video_callable=lambda episode_id: True,force = True)
# 动作数
N_ACTIONS = env.action_space.n
# 状态量个数
N_STATES = env.observation_space.shape[0]
# torch挂载设备
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device=torch.device("cpu")
# 如果有GPU和cuda,数据将转移到GPU执行
torch.FloatTensor=torch.cuda.FloatTensor if torch.cuda.is_available() else torch.FloatTensor
torch.LongTensor=torch.cuda.LongTensor if torch.cuda.is_available() else torch.LongTensor

class Net(nn.Module):
    def __init__(self, ):
        super(Net, self).__init__()
        # 全连接层 输入是状态量 隐藏层有50个元素
        self.fc1 = nn.Linear(N_STATES, 50)
        # 权重参数初始化
        self.fc1.weight.data.normal_(0, 0.1)
        # 全连接层 隐藏层有50个元素 输出是动作量
        self.out = nn.Linear(50, N_ACTIONS)
        # 权重参数初始化
        self.out.weight.data.normal_(0, 0.1)
	# 上面是定义各网络模块
    # 这里就是定义网络顺序,把网络组起来
    def forward(self, x):
        x = self.fc1(x)
        # 输入层和隐藏层之间包含一个ReLU()激活函数
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value

class DQN:
    def __init__(self):
        # 初始化了两个网络,action-value function Q,target action-value function
        self.net,self.target_net=Net().to(device),Net().to(device)
        
        self.learn_step_counter=0
        self.memory_counter=0
        # 初始化 replay memory
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))
        # 初始化优化器
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=LR)
        # 初始化损失函数
        self.loss_func = nn.MSELoss()
        

    def choose_action(self, x):
        x = torch.unsqueeze(torch.FloatTensor(x), 0)
        # input only one sample
        if np.random.uniform() < EPSILON:  
            actions_value = self.net.forward(x)
            action = torch.max(actions_value, 1)[1].data.cpu().numpy()
            action = action[0]
        else:   
            action = np.random.randint(0, N_ACTIONS)
            
        return action
        
    def store_transition(self, s, a, r, s_):
        transition = np.hstack((s, [a, r], s_))
        # replace the old memory with new memory
        index = self.memory_counter % MEMORY_CAPACITY
        self.memory[index, :] = transition
        self.memory_counter += 1

    def learn(self):
        # target parameter update
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.net.state_dict())
        self.learn_step_counter += 1

        # sample batch transitions
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        batch_memory = self.memory[sample_index, :]
        batch_s = torch.FloatTensor(batch_memory[:, :N_STATES])
        batch_a = torch.LongTensor(batch_memory[:, N_STATES:N_STATES+1].astype(int))
        batch_r = torch.FloatTensor(batch_memory[:, N_STATES+1:N_STATES+2])
        batch_s_ = torch.FloatTensor(batch_memory[:, -N_STATES:])

        
        q = self.net(batch_s).gather(1, batch_a)  # shape (batch, 1)
        q_target = self.target_net(batch_s_).detach()     # detach from graph, don't backpropagate
        y = batch_r + GAMMA * q_target.max(1)[0].view(BATCH_SIZE, 1)   # shape (batch, 1)
        loss = self.loss_func(q, y)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
       
dqn = DQN()
   
plot_x_data,plot_y_data=[],[]
for i_episode in range(260):
		# 由于gym版本问题,此处不加[0]会报错:
		# ValueError: expected sequence of length 4 at dim 1 (got 0)
    s = env.reset()[0]
    episode_reward = 0
    while True:
        #env.render()
        a = dqn.choose_action(s)

        # take action
        # 由于gym版本问题,此处不加,_会报错:
        # ValueError: too many values to unpack (expected 4)
        s_, r, done, info,_ = env.step(a)

        # modify the reward
        x, x_dot, theta, theta_dot = s_
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        r = r1 + r2

        dqn.store_transition(s, a, r, s_)

        episode_reward += r
        if dqn.memory_counter > MEMORY_CAPACITY:
            dqn.learn()
            if done:
                print('Episode: ', i_episode,
                      '| Episode_reward: ', round(episode_reward, 2))

        if done:
            break
        s = s_
    plot_x_data.append(i_episode)
    plot_y_data.append(episode_reward)
    plt.plot(plot_x_data,plot_y_data)
 
plt.show()

在文件路径下打开终端,执行dqn_cartpole.py

python dqn_cartpole.py 

运行结果:
在这里插入图片描述
在这里插入图片描述>>>
程序源文件

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐