策略梯度 ( PG )

PG 的基本思想:直接训练神经网络来输入 state 输出 action,这个过程中不去计算 Q
在这里插入图片描述

如果说 DQN是一个 TD + 神经网络 的算法,那么PG 是一个蒙地卡罗+神经网络的算法。

在这里插入图片描述

传统的做法是使用 Q_table 查表的方式或 Q网络 通过 critic 评判来反过来选择 action,这里 PG 是直接使用 actor 输出各动作的概率值

策略梯度公式推导

这里 PG 是直接使用 actor 去跟环境互动,然后一个episode 回合最后可以加得一个 total reward(R),
R = r1 + r2 + … + rn

PG 要去最大化的就是整个回合的 total reward。注意,即使同一个 actor,total reward 每次也大概率不会相同,因为 actor 和 environment 都存在随机性。Actor 看到同一个场景,会有一定概率选择不同的 action。environment 环境也具有随机性,采取同样的 action,每次看到的 observation 也会不一样。

所以我们要去最大化的不是某一次的 R,而是最大化 它的期望值。我们用 R的期望 来衡量 actor的好坏

我们假设一个回合的轨迹 trajectory:
在这里插入图片描述

如果使用 actor 去打一场游戏,每一个回合的轨迹有一个被采样的概率,这个概率依赖于actor的参数:
在这里插入图片描述

使用actor玩 N 次游戏,就会产生N 条轨迹,也就是相当于采样 N次,加和所有可能的轨迹就是期望
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

PG代码实现

在这里插入图片描述

CartPole-v1 游戏有一个车子,车子上面立一支杆。
智能体的任务是,让车子必须左右移动来保持车上的杆保持竖直。如果杆子倾斜超过12度,则游戏结束。

每坚持一帧,智能体能获得 1 reward,如果能获得 200reward, 那么游戏结束。

如果杆子掉下来,游戏失败。继续下一轮游戏。

Cart Pole 的状态是连续型的状态,所以我们可以用几个状态特征来表示。状态特征:

  • 车子位置:[-2.4, 2.4]
  • 车子速度:[-Inf, Inf]
  • 杆子角度:[-41.8, 41.8]
  • 杆子(顶端)速度:[-Inf, Inf]

智能体可以决定做两个动作:

  • 0:把车子往左拉
  • 1:把车子往右拉

Reward:
除最终最终状态外,所有状态都能获得 1 reward

最终状态条件:

  • 杆子角度 >= +12°
  • 车子位置 >= +2.4
  • 坚持 200步

所以 Cart Pole 游戏其实就是让智能体学会玩杂耍,坚持时间越长获得奖励越多

在这里插入图片描述在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

"""
This part of code is the reinforcement learning brain, which is a brain of the agent.
All decisions are made in here.

Policy Gradient, Reinforcement Learning.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
Tensorflow: 1.0
gym: 0.8.0
"""

# reproducible
np.random.seed(1)

class PolicyGradient:
    def __init__(
            self,
            n_actions,
            n_features,
            learning_rate=0.01,
            reward_decay=0.95,
            output_graph=False,
    ):
        self.n_actions = n_actions
        self.n_features = n_features
        self.lr = learning_rate
        self.gamma = reward_decay

        self.ep_obs, self.ep_as, self.ep_rs = [], [], []

        self._build_net()

        self.sess = tf.Session()

        if output_graph:
            # $ tensorboard --logdir=logs
            # http://0.0.0.0:6006/
            # tf.train.SummaryWriter soon be deprecated, use following
            tf.summary.FileWriter("logs/", self.sess.graph)

        self.sess.run(tf.global_variables_initializer())

    def _build_net(self):
        with tf.name_scope('inputs'):
            self.tf_obs = tf.placeholder(tf.float32, [None, self.n_features], name="observations")
            self.tf_acts = tf.placeholder(tf.int32, [None, ], name="actions_num")
            self.tf_vt = tf.placeholder(tf.float32, [None, ], name="actions_value")
        # fc1
        layer = tf.layers.dense(
            inputs=self.tf_obs,
            units=10,
            activation=tf.nn.tanh,  # tanh activation
            kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
            bias_initializer=tf.constant_initializer(0.1),
            name='fc1'
        )
        # fc2
        all_act = tf.layers.dense(
            inputs=layer,
            units=self.n_actions,
            activation=None,
            kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
            bias_initializer=tf.constant_initializer(0.1),
            name='fc2'
        )

        self.all_act_prob = tf.nn.softmax(all_act, name='act_prob')  # use softmax to convert to probability

        with tf.name_scope('loss'):
            # to maximize total reward (log_p * R) is to minimize -(log_p * R), and the tf only have minimize(loss)
            neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_act, labels=self.tf_acts)   # this is negative log of chosen action
            # or in this way:
            # neg_log_prob = tf.reduce_sum(-tf.log(self.all_act_prob)*tf.one_hot(self.tf_acts, self.n_actions), axis=1)
            loss = tf.reduce_mean(neg_log_prob * self.tf_vt)  # reward guided loss

        with tf.name_scope('train'):
            self.train_op = tf.train.AdamOptimizer(self.lr).minimize(loss)

    def choose_action(self, observation):
        observation = observation[np.newaxis,:]
        prob_weights = self.sess.run(self.all_act_prob, feed_dict={self.tf_obs:observation})
        action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel())  # select action w.r.t the actions prob
        return action

    def store_transition(self, s, a, r):
        self.ep_obs.append(s)
        self.ep_as.append(a)
        self.ep_rs.append(r)

    def learn(self):
        # discount and normalize episode reward
        discounted_ep_rs_norm = self._discount_and_norm_rewards()

        # train on episode
        self.sess.run(self.train_op, feed_dict={
             self.tf_obs: np.vstack(self.ep_obs),  # shape=[None, n_obs]
             self.tf_acts: np.array(self.ep_as),  # shape=[None, ]
             self.tf_vt: discounted_ep_rs_norm,  # shape=[None, ]
        })

        self.ep_obs, self.ep_as, self.ep_rs = [], [], []    # empty episode data
        return discounted_ep_rs_norm

    def _discount_and_norm_rewards(self):
        # discount episode rewards
        discounted_ep_rs = np.zeros_like(self.ep_rs)
        running_add = 0
        for t in reversed(range(0, len(self.ep_rs))):
            running_add = running_add * self.gamma + self.ep_rs[t]
            discounted_ep_rs[t] = running_add

        # normalize episode rewards
        discounted_ep_rs -= np.mean(discounted_ep_rs)
        discounted_ep_rs /= np.std(discounted_ep_rs)
        return discounted_ep_rs


"""
Policy Gradient, Reinforcement Learning.

The cart pole example

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
Tensorflow: 1.0
gym: 0.8.0
"""

DISPLAY_REWARD_THRESHOLD = 400  # renders environment if total episode reward is greater then this threshold
RENDER = False  # rendering wastes time

env = gym.make('CartPole-v0', render_mode='human')
# env = gym.make('MountainCar-v0',render_mode='human')
env = env.unwrapped

print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

RL = PolicyGradient(
    n_actions=env.action_space.n,
    n_features=env.observation_space.shape[0],
    learning_rate=0.02,
    reward_decay=0.99,
    # output_graph=True,
)

for i_episode in range(3000):

    observation = env.reset()[0]

    while True:
        if RENDER:
            env.render()

        action = RL.choose_action(observation)

        observation_, reward, done, info, _ = env.step(action)

        RL.store_transition(observation, action, reward)

        if done:
            ep_rs_sum = sum(RL.ep_rs)

            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.99 + ep_rs_sum * 0.01

            if running_reward > DISPLAY_REWARD_THRESHOLD:
                RENDER = True     # rendering
            print("episode:", i_episode, "  reward:", int(running_reward))

            vt = RL.learn()

            break

        observation = observation_

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐