本文参考:DDPG样例代码视频
视频博主详细地对每行代码进行解释编写,强烈建议跟视频学习,如果有所收获可以充电支持支持。本文为笔者复盘学习用,侵删。

  样例代码基于python中pytorch深度学习框架,环境为gym提供的摇摆钟控制,即通过控制钟摆旋转的角度使得其保持固定在某个特定位置,详细参数见官方文档:gym-Pendulum。基于Open AI spnning up中给出的算法编写代码:
在这里插入图片描述
分为三个python文件:train、actor、test。其中actor将完成对actor和critic以及buffer的初始化以及更新。具体代码如下:

"""
train.py
"""
import random
import time

import gym
import numpy as np
from agent import DDPGAgent
import os
import torch

# 初始化环境
env = gym.make(id='Pendulum-v1')  # 导入gym-pendulum强化学习环境
STATE_DIM = env.observation_space.shape[0]  # 导入state空间维度
ACTION_DIM = env.action_space.shape[0]  # 导入action空间维度

agent = DDPGAgent(STATE_DIM, ACTION_DIM)  # TODO

# 超参数
NUM_EPISODE = 100  # 100局
NUM_STEP = 200  # 每局的步数200
# 探索方式epsilon greedy是全局探索线性递减
EPSILON_START = 1.0  # 最开始的探索率
EPSILON_END = 0.02  # 最后的探索率,表示每100步平均有2步进行探索
EPSILON_DECAY = 10000  # 总探索步数到10000后衰减截止固定至EPSILON_END

REWARD_BUFFER = np.empty(shape=NUM_EPISODE)  # 记录每局获得多少奖励

# 在每轮游戏中
for episode_i in range(NUM_EPISODE):
    state, others = env.reset()  # 初始化状态,这里只要state,others的参数返回不用
    episode_reward = 0 # 记录每轮的reward

    # 在每步中
    for step_i in range(NUM_STEP):

        # 设置epsilon greedy exploration,随着训练进行,epsilon会越来越小
        # 设置方法用到numpy中的插值工具interp(插值点=当前步数, 横坐标 = 总步数, 纵坐标 = 始末)
        epsioln = np.interp(x=episode_i*NUM_STEP+step_i, xp=[0, EPSILON_DECAY], fp=[EPSILON_START, EPSILON_END])
        random_sample = random.random()
        # 如果随机采样的值小于epsilon,随机选择动作完成exploration;否则执行get_action选择规定action
        if random_sample <= epsioln:
            action = np.random.uniform(low=-2, high=2, size=ACTION_DIM)
        else:
            # 如果每有随机到exploration,那就执行get_action函数(actor.py中)
            action = agent.get_action(state)  # TODO

        # 执行动作并返回执行信息
        next_state, reward, done, truncation, info = env.step(action)
        # 将四元组存入buffer中
        agent.replay_buffer.add_memo(state, action, reward, next_state, done)  # TODO
        # 更新state状态
        state = next_state
        episode_reward += reward
        # 完成agent更新:利用TD方法更新actor和critic
        agent.update()  # TODO

		# 如果done = 1,说明step结束,退出本轮循环
        if done:
            break
    # 记录每轮reward并打印出
    REWARD_BUFFER[episode_i] = episode_reward
    print(f"Episode:{episode_i+1}, Reward:{round(episode_reward,2)}")

# 所有episode结束后将torch模型储存下来(在当前目录建立一个models的文件夹专门存储模型)
current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
timestamp = time.strftime("%Y%m%d%H%M%S")

# 保存模型
torch.save(agent.actor.state_dict(), model + f"ddpg_actor_{timestamp}.pth")
torch.save(agent.critic.state_dict(), model + f"ddpg_critic_{timestamp}.pth")

# 关闭环境
env.close()
"""
actor.py
classes:critic/actor/replay buffer/DDPGAgent
"""
import random
import torch.optim as optim
import numpy as np
import torch
import torch.nn as nn
from collections import deque

# 设置运算设备为gpu(cuda),如果没有就用cpu训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 打印训练用的设备:cuda or cpu
print("Device type: ", device)

# 超参数
LR_ACTOR = 1e-4  # actor和critic的学习率,此处给固定值,可以考虑用Adam
LR_CRITIC = 1e-3
GAMMA = 0.99  # 折合因子
MEMORY_SIZE = 100000  # buffer的大小
BATCH_SIZE = 64  # 一次取batch_size个四元组进行训练
TAU = 5e-3  # target网络软更新参数

# actor类:神经网络初始化
class Actor(nn.Module):
    # 初始化state和action的维度,神经网络用全连接层,隐藏层维度为64
    def __init__(self, state_dim, action_dim, hidden_dim = 64):
        super(Actor, self).__init__()
        # 神经网络有三层
        self.fc1 = nn.Linear(state_dim, hidden_dim)  # 输入层
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, action_dim)  # 输出层

    # 前向传播
    def forward(self, x):
        x = torch.relu(self.fc1(x))  # 第一层激活函数Relu
        x = torch.relu(self.fc2(x))  # 第二层激活函数Relu
        # 根据实际所定输出范围进行调整,本环境下action范围是(-2,2),tanh函数范围为(-1,1),所以乘以2。relu函数范围为(0,1)
        x = torch.tanh(self.fc3(x)) * 2
        return x

# critic类:神经网络初始化
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(Critic, self).__init__()
        # Critic用于计算V值或Q值,DDPG为Q值,也就是输入为state和action的pair,输出是Q值大小也就是一维的
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)  # 输入层,注意因为critic的输入是state和action的pair,所以输入维度是两者相加
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, 1)  # 输出层,因为输出是一个Q值,所以输出维度就是1

    def forward(self, x, a):
        x = torch.cat([x, a], 1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# buffer类:经验池初始化以及定义存、取经验函数
class ReplayMemory:
    def __init__(self, capacity):
        # 常用存储记忆的包,deque是一个双向队列,可以从左右存,支持各种类型:列表字典float等等,传参规定其容量
        self.buffer = deque(maxlen=capacity)

    #存经验的方法
    def add_memo(self, state, action, reward, next_state, done):
        # state得到后是一个单列多维向量,需要给他升维到单维多列方便计算
        state = np.expand_dims(state, 0)
        next_state = np.expand_dims(next_state, 0)
        # 从队列右端进行插入:appand
        self.buffer.append((state, action, reward, next_state, done))

    # 取出batch_size个经验
    def sample(self, batch_size):
        # 从buffer中提取到batch_size个包后进行解包(*),然后将提取的元组对应数据分别对应赋值给左侧(zip)
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        # 因为state是多维要处理
        return np.concatenate(state), action, reward, np.concatenate(next_state), done

    # 判断目前buffer有多长,因为batch_size大于目前存有的量才会取
    def __len__(self):
        return len(self.buffer)

# DDPGAgent:四个网络初始化与更新
class DDPGAgent:
    def __init__(self, state_dim, action_dim):
        self.actor = Actor(state_dim, action_dim).to(device)
        self.actor_target = Actor(state_dim, action_dim).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)

        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)

        self.replay_buffer = ReplayMemory(MEMORY_SIZE)

    def get_action(self, state):
        # 转换为torch可用格式
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state)
        return action.detach().cpu().numpy()[0]

    def update(self):
        if len(self.replay_buffer) < BATCH_SIZE:
            return

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
        states = torch.FloatTensor(states).to(device)
        actions = torch.FloatTensor(np.vstack(actions)).to(device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(device)

        # 更新critic
        next_actions = self.actor_target(next_states)
        target_Q = self.critic_target(next_states, next_actions.detach())
        target_Q = rewards + (GAMMA * target_Q * (1 - dones))
        current_Q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_Q, target_Q)
        self.critic_optimizer.zero_grad()  # clean old gradients from the last step
        critic_loss.backward()  # compute the derivates of the loss
        self.critic_optimizer.step()  # update the critic para.

        # 更新actor
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()  # clean old gradients from the last step
        actor_loss.backward()  # compute the derivates of the loss
        self.actor_optimizer.step()  # update the actor para.

        # 更新critic和actor的target网络
        for target_param, param, in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

        for target_param, param, in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

"""
test.py:脱胎于train.py,直接应用其中训练保存好的models
"""
import os.path
import gym
import torch
import torch.nn as nn
import pygame
import numpy as np

# 设置训练设备:cuda or cpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# 初始化环境
env = gym.make(id="Pendulum-v1", render_mode="rgb_array")
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]

# 加载模型保存位置
current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
actor_path = model + "ddpg_actor_20240815152025.pth" # 具体文件名需要查看该目录下保存的文件名更改


class Actor(nn.Module):
    # 初始化state和action的维度,神经网络用全连接层,隐藏层维度为64
    def __init__(self, state_dim, action_dim, hidden_dim = 64):
        super(Actor, self).__init__()
        # 神经网络有三层
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    # 前向传播
    def forward(self, x):
        x = torch.relu(self.fc1(x))  # 第一层激活函数Relu
        x = torch.relu(self.fc2(x))  # 第二层激活函数Relu
        # 根据实际所定输出范围进行调整,本环境下action范围是(-2,2),tanh函数范围为(-1,1),relu函数范围为(0,1)
        x = torch.tanh(self.fc3(x)) * 2
        return x

# 应用pygame工具包展示训练动画
def process_frame(frame):
    frame = np.transpose(frame, (1, 0, 2))
    frame = pygame.surfarray.make_surface(frame)
    return pygame.transform.scale(frame, (width, height))

actor = Actor(STATE_DIM, ACTION_DIM).to(device)
actor.load_state_dict(torch.load(actor_path))

# 初始化渲染窗孔
pygame.init()
width, height = 600, 600
screen = pygame.display.set_mode((width,height))
click = pygame.time.Clock()

# Test phase
NUM_EPISODE = 30
NUM_STEP = 200

for episode_i in range(NUM_EPISODE):
    state, others = env.reset()
    episode_reward = 0  # 在具体测试中可能要测试其他指标,此处跟trainloop一样还是用reward

    for step_i in range(NUM_STEP):
        action = actor(torch.FloatTensor(state).unsqueeze(0).to(device)).detach().cpu().numpy()[0]
        next_state, reward, done, truncation, info = env.step(action)
        state = next_state
        episode_reward += reward
        # print(f"{step_i}:", action)

        frame = env.render()
        frame = process_frame(frame)
        screen.blit(frame, (0, 0))
        pygame.display.flip()
        click.tick(60) # FPS

    print(f"Episode:{episode_i}  Reward:{episode_reward}")
pygame.quit()
env.close()

运行train.py:
在这里插入图片描述
运行test.py:
在这里插入图片描述


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐