深度强化学习笔记03【DDPG pytorch样例代码】
样例代码基于python中pytorch深度学习框架,环境为gym提供的摇摆钟控制,即通过控制钟摆旋转的角度使得其保持固定在某个特定位置(gym-Pendulum)
·
本文参考:DDPG样例代码视频
视频博主详细地对每行代码进行解释编写,强烈建议跟视频学习,如果有所收获可以充电支持支持。本文为笔者复盘学习用,侵删。
样例代码基于python中pytorch深度学习框架,环境为gym提供的摇摆钟控制,即通过控制钟摆旋转的角度使得其保持固定在某个特定位置,详细参数见官方文档:gym-Pendulum。基于Open AI spnning up中给出的算法编写代码:
分为三个python文件:train、actor、test。其中actor将完成对actor和critic以及buffer的初始化以及更新。具体代码如下:
"""
train.py
"""
import random
import time
import gym
import numpy as np
from agent import DDPGAgent
import os
import torch
# 初始化环境
env = gym.make(id='Pendulum-v1') # 导入gym-pendulum强化学习环境
STATE_DIM = env.observation_space.shape[0] # 导入state空间维度
ACTION_DIM = env.action_space.shape[0] # 导入action空间维度
agent = DDPGAgent(STATE_DIM, ACTION_DIM) # TODO
# 超参数
NUM_EPISODE = 100 # 100局
NUM_STEP = 200 # 每局的步数200
# 探索方式epsilon greedy是全局探索线性递减
EPSILON_START = 1.0 # 最开始的探索率
EPSILON_END = 0.02 # 最后的探索率,表示每100步平均有2步进行探索
EPSILON_DECAY = 10000 # 总探索步数到10000后衰减截止固定至EPSILON_END
REWARD_BUFFER = np.empty(shape=NUM_EPISODE) # 记录每局获得多少奖励
# 在每轮游戏中
for episode_i in range(NUM_EPISODE):
state, others = env.reset() # 初始化状态,这里只要state,others的参数返回不用
episode_reward = 0 # 记录每轮的reward
# 在每步中
for step_i in range(NUM_STEP):
# 设置epsilon greedy exploration,随着训练进行,epsilon会越来越小
# 设置方法用到numpy中的插值工具interp(插值点=当前步数, 横坐标 = 总步数, 纵坐标 = 始末)
epsioln = np.interp(x=episode_i*NUM_STEP+step_i, xp=[0, EPSILON_DECAY], fp=[EPSILON_START, EPSILON_END])
random_sample = random.random()
# 如果随机采样的值小于epsilon,随机选择动作完成exploration;否则执行get_action选择规定action
if random_sample <= epsioln:
action = np.random.uniform(low=-2, high=2, size=ACTION_DIM)
else:
# 如果每有随机到exploration,那就执行get_action函数(actor.py中)
action = agent.get_action(state) # TODO
# 执行动作并返回执行信息
next_state, reward, done, truncation, info = env.step(action)
# 将四元组存入buffer中
agent.replay_buffer.add_memo(state, action, reward, next_state, done) # TODO
# 更新state状态
state = next_state
episode_reward += reward
# 完成agent更新:利用TD方法更新actor和critic
agent.update() # TODO
# 如果done = 1,说明step结束,退出本轮循环
if done:
break
# 记录每轮reward并打印出
REWARD_BUFFER[episode_i] = episode_reward
print(f"Episode:{episode_i+1}, Reward:{round(episode_reward,2)}")
# 所有episode结束后将torch模型储存下来(在当前目录建立一个models的文件夹专门存储模型)
current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
timestamp = time.strftime("%Y%m%d%H%M%S")
# 保存模型
torch.save(agent.actor.state_dict(), model + f"ddpg_actor_{timestamp}.pth")
torch.save(agent.critic.state_dict(), model + f"ddpg_critic_{timestamp}.pth")
# 关闭环境
env.close()
"""
actor.py
classes:critic/actor/replay buffer/DDPGAgent
"""
import random
import torch.optim as optim
import numpy as np
import torch
import torch.nn as nn
from collections import deque
# 设置运算设备为gpu(cuda),如果没有就用cpu训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 打印训练用的设备:cuda or cpu
print("Device type: ", device)
# 超参数
LR_ACTOR = 1e-4 # actor和critic的学习率,此处给固定值,可以考虑用Adam
LR_CRITIC = 1e-3
GAMMA = 0.99 # 折合因子
MEMORY_SIZE = 100000 # buffer的大小
BATCH_SIZE = 64 # 一次取batch_size个四元组进行训练
TAU = 5e-3 # target网络软更新参数
# actor类:神经网络初始化
class Actor(nn.Module):
# 初始化state和action的维度,神经网络用全连接层,隐藏层维度为64
def __init__(self, state_dim, action_dim, hidden_dim = 64):
super(Actor, self).__init__()
# 神经网络有三层
self.fc1 = nn.Linear(state_dim, hidden_dim) # 输入层
self.fc2 = nn.Linear(hidden_dim, hidden_dim) # 隐藏层
self.fc3 = nn.Linear(hidden_dim, action_dim) # 输出层
# 前向传播
def forward(self, x):
x = torch.relu(self.fc1(x)) # 第一层激活函数Relu
x = torch.relu(self.fc2(x)) # 第二层激活函数Relu
# 根据实际所定输出范围进行调整,本环境下action范围是(-2,2),tanh函数范围为(-1,1),所以乘以2。relu函数范围为(0,1)
x = torch.tanh(self.fc3(x)) * 2
return x
# critic类:神经网络初始化
class Critic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=64):
super(Critic, self).__init__()
# Critic用于计算V值或Q值,DDPG为Q值,也就是输入为state和action的pair,输出是Q值大小也就是一维的
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim) # 输入层,注意因为critic的输入是state和action的pair,所以输入维度是两者相加
self.fc2 = nn.Linear(hidden_dim, hidden_dim) # 隐藏层
self.fc3 = nn.Linear(hidden_dim, 1) # 输出层,因为输出是一个Q值,所以输出维度就是1
def forward(self, x, a):
x = torch.cat([x, a], 1)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
# buffer类:经验池初始化以及定义存、取经验函数
class ReplayMemory:
def __init__(self, capacity):
# 常用存储记忆的包,deque是一个双向队列,可以从左右存,支持各种类型:列表字典float等等,传参规定其容量
self.buffer = deque(maxlen=capacity)
#存经验的方法
def add_memo(self, state, action, reward, next_state, done):
# state得到后是一个单列多维向量,需要给他升维到单维多列方便计算
state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)
# 从队列右端进行插入:appand
self.buffer.append((state, action, reward, next_state, done))
# 取出batch_size个经验
def sample(self, batch_size):
# 从buffer中提取到batch_size个包后进行解包(*),然后将提取的元组对应数据分别对应赋值给左侧(zip)
state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
# 因为state是多维要处理
return np.concatenate(state), action, reward, np.concatenate(next_state), done
# 判断目前buffer有多长,因为batch_size大于目前存有的量才会取
def __len__(self):
return len(self.buffer)
# DDPGAgent:四个网络初始化与更新
class DDPGAgent:
def __init__(self, state_dim, action_dim):
self.actor = Actor(state_dim, action_dim).to(device)
self.actor_target = Actor(state_dim, action_dim).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)
self.critic = Critic(state_dim, action_dim).to(device)
self.critic_target = Critic(state_dim, action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)
self.replay_buffer = ReplayMemory(MEMORY_SIZE)
def get_action(self, state):
# 转换为torch可用格式
state = torch.FloatTensor(state).unsqueeze(0).to(device)
action = self.actor(state)
return action.detach().cpu().numpy()[0]
def update(self):
if len(self.replay_buffer) < BATCH_SIZE:
return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
states = torch.FloatTensor(states).to(device)
actions = torch.FloatTensor(np.vstack(actions)).to(device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(device)
next_states = torch.FloatTensor(next_states).to(device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(device)
# 更新critic
next_actions = self.actor_target(next_states)
target_Q = self.critic_target(next_states, next_actions.detach())
target_Q = rewards + (GAMMA * target_Q * (1 - dones))
current_Q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_Q, target_Q)
self.critic_optimizer.zero_grad() # clean old gradients from the last step
critic_loss.backward() # compute the derivates of the loss
self.critic_optimizer.step() # update the critic para.
# 更新actor
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optimizer.zero_grad() # clean old gradients from the last step
actor_loss.backward() # compute the derivates of the loss
self.actor_optimizer.step() # update the actor para.
# 更新critic和actor的target网络
for target_param, param, in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
for target_param, param, in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
"""
test.py:脱胎于train.py,直接应用其中训练保存好的models
"""
import os.path
import gym
import torch
import torch.nn as nn
import pygame
import numpy as np
# 设置训练设备:cuda or cpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# 初始化环境
env = gym.make(id="Pendulum-v1", render_mode="rgb_array")
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]
# 加载模型保存位置
current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
actor_path = model + "ddpg_actor_20240815152025.pth" # 具体文件名需要查看该目录下保存的文件名更改
class Actor(nn.Module):
# 初始化state和action的维度,神经网络用全连接层,隐藏层维度为64
def __init__(self, state_dim, action_dim, hidden_dim = 64):
super(Actor, self).__init__()
# 神经网络有三层
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
# 前向传播
def forward(self, x):
x = torch.relu(self.fc1(x)) # 第一层激活函数Relu
x = torch.relu(self.fc2(x)) # 第二层激活函数Relu
# 根据实际所定输出范围进行调整,本环境下action范围是(-2,2),tanh函数范围为(-1,1),relu函数范围为(0,1)
x = torch.tanh(self.fc3(x)) * 2
return x
# 应用pygame工具包展示训练动画
def process_frame(frame):
frame = np.transpose(frame, (1, 0, 2))
frame = pygame.surfarray.make_surface(frame)
return pygame.transform.scale(frame, (width, height))
actor = Actor(STATE_DIM, ACTION_DIM).to(device)
actor.load_state_dict(torch.load(actor_path))
# 初始化渲染窗孔
pygame.init()
width, height = 600, 600
screen = pygame.display.set_mode((width,height))
click = pygame.time.Clock()
# Test phase
NUM_EPISODE = 30
NUM_STEP = 200
for episode_i in range(NUM_EPISODE):
state, others = env.reset()
episode_reward = 0 # 在具体测试中可能要测试其他指标,此处跟trainloop一样还是用reward
for step_i in range(NUM_STEP):
action = actor(torch.FloatTensor(state).unsqueeze(0).to(device)).detach().cpu().numpy()[0]
next_state, reward, done, truncation, info = env.step(action)
state = next_state
episode_reward += reward
# print(f"{step_i}:", action)
frame = env.render()
frame = process_frame(frame)
screen.blit(frame, (0, 0))
pygame.display.flip()
click.tick(60) # FPS
print(f"Episode:{episode_i} Reward:{episode_reward}")
pygame.quit()
env.close()
运行train.py:
运行test.py:
更多推荐
已为社区贡献3条内容
所有评论(0)