强化学习入门-DQN算法原理与实现
DQN算法实现CartPole控制,包括DQN算法原理、算法编写与运行。
目录
写在前面
参考学习古月居的博文:
深度强化学习专栏 —— 2.手撕DQN算法实现CartPole控制
本文记录自己复现上述文章的学习感悟。
理论方法
参考论文:
[1]《Playing Atari with Deep Reinforcement Learning》
[2]《Human-level control through deep reinforcement learning》
[3]《Neuronlike adaptive elements that can solve difficult learning control problems》
Deep Q-learning (DQN)算法的伪代码如下图所示:
Gym的Cart Pole问题
一根杆子通过非驱动接头连接到小车上,小车沿着无摩擦的轨道移动。摆锤垂直放置在小车上,目标是通过在小车上向左和向右施加力来平衡杆
Action space
Action一个形状为 (1,) 的 ndarray,它可以采用值 {0, 1} 指示推动小车的固定力方向。
0:将小车向左推
1:将小车向右推
注意:由施加力引起的速度变化是不固定的,它取决于杆所指向的角度,因为杆的重心一直在变,会改变力的作用效果。
Observation Space
Observation是一个形状为 (4,) 的 ndarray,其值对应于以下位置和速度:
注意:上面表示的是每个元素Observation Space的可能值,但它并不表示终止episode的临界状态。判定episode终止的条件是:
- 车的位置可以取(-4.8, 4.8) 之间的值,但如果车超过 (-2.4, 2.4) 范围,则episode终止。
- 杆的角度可以取 (-0.418, 0.418) 弧度(或 ±24°)之间的值,但如果角度超过 (-0.2095, 0.2095)(或 ±12°)范围,则事件episode终止。
Rewards
目标是尽可能长时间地保持杆子直立,因此每采取一步(包括终止步骤)都会获得 +1 的奖励。 v1 的奖励阈值是 500,v0 的奖励阈值是 200。
Starting State
所有Observations均分配为 (-0.05, 0.05) 范围内的均匀随机值
Episode End
如果发生以下任何一种情况,则该episode结束:
- 终止:杆转角大于±12°
- 终止:小车位置大于 ±2.4(小车中心到达显示屏边缘)
- 截断:episode长度大于 500(v0 的为 200)
Arguments
import gymnasium as gym
gym.make('CartPole-v1')
算法实现
算法模块结构
1)网络结构
2)模型初始化
3)动作选择规则
4)学习优化过程
网络结构
对于标量或者向量类型的数据,使用含有至少一个隐藏层的全连接网络即可,也即多层感知机,如下图所示:
多层感知机的网络结构包括隐藏层数、输入层维度、输出层维度等;
多层感知机的网络参数包括每层的网络权重。
算法的范畴:神经网络结构和强化学习算法;
环境的范畴:算法的输入信息由环境的输出信息决定,算法的输出信息由环境的输入信息决定。
针对小车倒立摆模型,参考文献[3],可知:
系统的非线性微分方程为:
由此可知系统的状态量为
因此,状态信息是
1
×
4
1\times4
1×4维的
[
x
,
x
˙
,
θ
,
θ
˙
]
[x,\dot x,\theta,\dot \theta]
[x,x˙,θ,θ˙],奖励信息是代表奖励信息的1个标量值,动作是向左或向右运动。
环境输出的状态信息决定了算法的输入信息形状是1x4,环境只接受1个动作值决定了算法最终的输出信息形状是1。
算法的输入信息是4个元素的向量,而输出是1个标量值。
使用神经网络来估计动作状态价值函数(Q(s,a))的值,意味着网络的输出层的形状要和动作的个数对应,即是包含2个元素,输入层4个元素。
准备
Pytorch
首先查看pytorch是否安装,在终端输入python,进入python环境
python
import torch
print(torch.__version__)
这里提示没有安装torch,参考“强化学习入门-Pytorch(CPU版本)简介、安装与测试运行”。
遇到的ModuleNotFoundError
# ModuleNotFoundError: No module named 'zipp'
pip install zipp
# ModuleNotFoundError: No module named 'matplotlib'
conda install matplotlib
# ModuleNotFoundError: No module named 'cartpole_swingup_envs'
# git的时候关闭代理,否则报错atal: 无法访问 '链接' GnuTLS recv error (-110): The TLS connection was non-properly terminated.
git clone https://github.com/jfpettit/cartpole-swingup-envs.git
pip install -e cartpole-swingup-envs
# ModuleNotFoundError: No module named 'torchviz'
pip install torchviz
安装好后,开始编写算法程序
算法程序
cd rl/src
touch dqn_cartpole.py
编辑dqn_cartpole.py文件
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
import cartpole_swingup_envs
from torchviz import make_dot, make_dot_from_trace
# 超参数
# 论文中的参数对应的是处理图片和卷积网络的参数,对于Gym CartPole环境来说,输入到模型的是数字数据,不适宜使用卷积网络,所以参数值会相应的改变
# 计算每个随机梯度下降 (SGD) 更新的训练案例数
BATCH_SIZE = 32
# 学习率
LR = 0.01
# 贪心算法的选择概率
EPSILON = 0.9
GAMMA = 0.9
TARGET_REPLACE_ITER = 100
# replay memory size 随机梯度下降更新是从这个数量的最新帧中采样的。
MEMORY_CAPACITY = 2000
EPISODE=2000
# 创建环境对象
#env = gym.make('CartPoleSwingUpDiscrete-v0')
env = gym.make('CartPole-v0')
# gym的多数环境都用TimeLimit(源码)包装了,以限制Epoch,就是step的次数限制,超过一定次数就会失败
# 用env.unwrapped可以得到原始的类,不受限制
# 还原env的原始设置,env外包了一层防作弊层
# 打开限制
env = env.unwrapped
#env = gym.wrappers.Monitor(env, './video/',video_callable=lambda episode_id: True,force = True)
# 动作数
N_ACTIONS = env.action_space.n
# 状态量个数
N_STATES = env.observation_space.shape[0]
# torch挂载设备
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device=torch.device("cpu")
# 如果有GPU和cuda,数据将转移到GPU执行
torch.FloatTensor=torch.cuda.FloatTensor if torch.cuda.is_available() else torch.FloatTensor
torch.LongTensor=torch.cuda.LongTensor if torch.cuda.is_available() else torch.LongTensor
class Net(nn.Module):
def __init__(self, ):
super(Net, self).__init__()
# 全连接层 输入是状态量 隐藏层有50个元素
self.fc1 = nn.Linear(N_STATES, 50)
# 权重参数初始化
self.fc1.weight.data.normal_(0, 0.1)
# 全连接层 隐藏层有50个元素 输出是动作量
self.out = nn.Linear(50, N_ACTIONS)
# 权重参数初始化
self.out.weight.data.normal_(0, 0.1)
# 上面是定义各网络模块
# 这里就是定义网络顺序,把网络组起来
def forward(self, x):
x = self.fc1(x)
# 输入层和隐藏层之间包含一个ReLU()激活函数
x = F.relu(x)
actions_value = self.out(x)
return actions_value
class DQN:
def __init__(self):
# 初始化了两个网络,action-value function Q,target action-value function
self.net,self.target_net=Net().to(device),Net().to(device)
self.learn_step_counter=0
self.memory_counter=0
# 初始化 replay memory
self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))
# 初始化优化器
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=LR)
# 初始化损失函数
self.loss_func = nn.MSELoss()
def choose_action(self, x):
x = torch.unsqueeze(torch.FloatTensor(x), 0)
# input only one sample
if np.random.uniform() < EPSILON:
actions_value = self.net.forward(x)
action = torch.max(actions_value, 1)[1].data.cpu().numpy()
action = action[0]
else:
action = np.random.randint(0, N_ACTIONS)
return action
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, [a, r], s_))
# replace the old memory with new memory
index = self.memory_counter % MEMORY_CAPACITY
self.memory[index, :] = transition
self.memory_counter += 1
def learn(self):
# target parameter update
if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
self.target_net.load_state_dict(self.net.state_dict())
self.learn_step_counter += 1
# sample batch transitions
sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
batch_memory = self.memory[sample_index, :]
batch_s = torch.FloatTensor(batch_memory[:, :N_STATES])
batch_a = torch.LongTensor(batch_memory[:, N_STATES:N_STATES+1].astype(int))
batch_r = torch.FloatTensor(batch_memory[:, N_STATES+1:N_STATES+2])
batch_s_ = torch.FloatTensor(batch_memory[:, -N_STATES:])
q = self.net(batch_s).gather(1, batch_a) # shape (batch, 1)
q_target = self.target_net(batch_s_).detach() # detach from graph, don't backpropagate
y = batch_r + GAMMA * q_target.max(1)[0].view(BATCH_SIZE, 1) # shape (batch, 1)
loss = self.loss_func(q, y)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
dqn = DQN()
plot_x_data,plot_y_data=[],[]
for i_episode in range(260):
# 由于gym版本问题,此处不加[0]会报错:
# ValueError: expected sequence of length 4 at dim 1 (got 0)
s = env.reset()[0]
episode_reward = 0
while True:
#env.render()
a = dqn.choose_action(s)
# take action
# 由于gym版本问题,此处不加,_会报错:
# ValueError: too many values to unpack (expected 4)
s_, r, done, info,_ = env.step(a)
# modify the reward
x, x_dot, theta, theta_dot = s_
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
r = r1 + r2
dqn.store_transition(s, a, r, s_)
episode_reward += r
if dqn.memory_counter > MEMORY_CAPACITY:
dqn.learn()
if done:
print('Episode: ', i_episode,
'| Episode_reward: ', round(episode_reward, 2))
if done:
break
s = s_
plot_x_data.append(i_episode)
plot_y_data.append(episode_reward)
plt.plot(plot_x_data,plot_y_data)
plt.show()
在文件路径下打开终端,执行dqn_cartpole.py
python dqn_cartpole.py
运行结果:
>>>
程序源文件
更多推荐
所有评论(0)