
【强化学习】Sarsa算法求解悬崖行走问题 + Python代码实战
Sarsa 是一种同策略(on-policy)算法,它优化的是它实际执行的策略,它直接用下一步会执行的动作去优化 Q 表格。同策略在学习的过程中,只存在一种策略,它用一种策略去做动作的选取,也用一种策略去做优化。所以 Sarsa 知道它下一步的动作有可能会跑到悬崖那边去,它就会在优化自己的策略的时候,尽可能离悬崖远一点。Q(S,A)←Q(S,A)+α(R+γQ(S′,A′)−Q(S,A))Q(S,
一、Sarsa算法简介
下面仅对Sarsa算法对简单介绍
Sarsa 是一种同策略(on-policy)算法,它优化的是它实际执行的策略,它直接用下一步会执行的动作去优化 Q 表格。
同策略在学习的过程中,只存在一种策略,它用一种策略去做动作的选取,也用一种策略去做优化。所以 Sarsa 知道它下一步的动作有可能会跑到悬崖那边去,它就会在优化自己的策略的时候,尽可能离悬崖远一点。
1.1 更新公式
Sarsa 的更新公式可写为
Q ( S , A ) ← Q ( S , A ) + α ( R + γ Q ( S ′ , A ′ ) − Q ( S , A ) ) Q(S, A) \leftarrow Q(S, A)+\alpha\left(R+\gamma Q\left(S^{\prime}, A^{\prime}\right)-Q(S, A)\right) Q(S,A)←Q(S,A)+α(R+γQ(S′,A′)−Q(S,A))
Sarsa的更新公式与时序差分方法的公式是类似的。 S ′ S^{\prime} S′ 就是 s t + 1 s_{t+1} st+1 。我们就是用下一步的 Q \mathrm{Q} Q 值 Q ( S ′ , A ′ ) Q\left(S^{\prime}, A^{\prime}\right) Q(S′,A′) 来更新这一步的 Q \mathrm{Q} Q 值 Q ( S , A ) Q(S, A) Q(S,A) ,不断地强化每一个 Q \mathrm{Q} Q 值。
n
=
1
(
Sarsa)
Q
t
1
=
r
t
+
1
+
γ
Q
(
s
t
+
1
,
a
t
+
1
)
n
=
2
Q
t
2
=
r
t
+
1
+
γ
r
t
+
2
+
γ
2
Q
(
s
t
+
2
,
a
t
+
2
)
⋮
n
=
∞
(
M
C
)
Q
t
∞
=
r
t
+
1
+
γ
r
t
+
2
+
…
+
γ
T
−
t
−
1
r
T
\begin{aligned} n=1 (\text { Sarsa) } \quad Q_t^1 &=r_{t+1}+\gamma Q\left(s_{t+1}, a_{t+1}\right) \\ n=2 \quad Q_t^2 &=r_{t+1}+\gamma r_{t+2}+\gamma^2 Q\left(s_{t+2}, a_{t+2}\right) \\ \vdots \\ n=\infty (\mathrm{MC}) \quad Q_t^{\infty} &=r_{t+1}+\gamma r_{t+2}+\ldots+\gamma^{T-t-1} r_T \end{aligned}
n=1( Sarsa) Qt1n=2Qt2⋮n=∞(MC)Qt∞=rt+1+γQ(st+1,at+1)=rt+1+γrt+2+γ2Q(st+2,at+2)=rt+1+γrt+2+…+γT−t−1rT
我们考虑
n
n
n 步的回报
(
n
=
1
,
2
,
⋯
,
∞
)
(n=1,2, \cdots, \infty)
(n=1,2,⋯,∞) ,如上式所示。Sarsa 属于单步更新算法,每执行一个动作,就会更新一次价值 和策略。如果不进行单步更新,而是采取
n
n
n 步更新或者回合更新,即在执行
n
n
n 步之后再更新价值和策略,这样我们就得到了
n
n
n 步 Sarsa (
n
n
n-step Sarsa)。
1.2 预测策略
Sarsa算法采用 ε \varepsilon ε-贪心搜索的策略
1.3 详细资料
关于更加详细的Sarsa算法介绍,请看我之前发的博客:【EasyRL学习笔记】第三章 表格型方法(Q-Table、Sarsa、Q-Learning)
在学习Sarsa算法前你最好能了解以下知识点:
- 时序差分方法
- ε \varepsilon ε-贪心搜索策略
- Q-Table
二、Python代码实战
2.1 运行前配置
准备好一个RL_Utils.py文件,文件内容可以从我的一篇里博客获取:【RL工具类】强化学习常用函数工具类(Python代码)
这一步很重要,后面需要引入该RL_Utils.py文件
2.2 主要代码
# -*-coding:utf-8-*-
# Author: WSKH
# Blog: wskh0929.blog.csdn.net
# Time: 2022/10/20 11:41
import argparse
import datetime
import math
import time
import turtle
from collections import defaultdict
import dill
import gym
# 这里需要改成自己的RL_Utils.py文件的路径
from Python.ReinforcementLearning.EasyRL.RL_Utils import *
# 悬崖行走地图
class CliffWalkingWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.t = None
self.unit = 50
self.max_x = 12
self.max_y = 4
def draw_x_line(self, y, x0, x1, color='gray'):
assert x1 > x0
self.t.color(color)
self.t.setheading(0)
self.t.up()
self.t.goto(x0, y)
self.t.down()
self.t.forward(x1 - x0)
def draw_y_line(self, x, y0, y1, color='gray'):
assert y1 > y0
self.t.color(color)
self.t.setheading(90)
self.t.up()
self.t.goto(x, y0)
self.t.down()
self.t.forward(y1 - y0)
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for i in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for _ in range(2):
self.t.forward(self.max_x * self.unit)
self.t.left(90)
self.t.forward(self.max_y * self.unit)
self.t.left(90)
for i in range(1, self.max_y):
self.draw_x_line(
y=i * self.unit, x0=0, x1=self.max_x * self.unit)
for i in range(1, self.max_x):
self.draw_y_line(
x=i * self.unit, y0=0, y1=self.max_y * self.unit)
for i in range(1, self.max_x - 1):
self.draw_box(i, 0, 'black')
self.draw_box(self.max_x - 1, 0, 'yellow')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
# Sarsa智能体对象
class Sarsa:
def __init__(self, arg_dict):
# 采样次数
self.sample_count = 0
# 动作数
self.n_actions = arg_dict['n_actions']
# 学习率
self.lr = arg_dict['lr']
# 未来奖励衰减系数
self.gamma = arg_dict['gamma']
# 当前的epsilon值
self.epsilon = arg_dict['epsilon_start']
# 初始的epsilon值
self.epsilon_start = arg_dict['epsilon_start']
# 最后的epsilon值
self.epsilon_end = arg_dict['epsilon_end']
# epsilon衰变参数
self.epsilon_decay = arg_dict['epsilon_decay']
# 使用嵌套字典表示Q(s,a),这里首先将所有Q(s、a)设置为0
self.Q_table = defaultdict(lambda: np.zeros(self.n_actions))
# 训练过程: 用e-greedy policy获取行动
def sample_action(self, state):
# 采样数更新
self.sample_count += 1
# 计算当前epsilon值
self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
math.exp(-1. * self.sample_count / self.epsilon_decay)
# 根据均匀分布获取一个0-1的随机值,如果随机值大于当前epsilon,则按照最大Q值来选择动作,否则随机选择一个动作
return np.argmax(self.Q_table[str(state)]) if np.random.uniform(0, 1) > self.epsilon else np.random.choice(
self.n_actions)
# 测试过程: 用最大Q值获取行动
def predict_action(self, state):
return np.argmax(self.Q_table[str(state)])
# 更新Q表格
def update(self, state, action, reward, next_state, next_action, done):
# 计算Q估计
Q_predict = self.Q_table[str(state)][action]
# 计算Q现实
if done:
# 如果回合结束,则直接等于当前奖励
Q_target = reward
else:
# 如果回合每结束,则按照
Q_target = reward + self.gamma * self.Q_table[str(next_state)][next_action]
# 根据Q估计和Q现实,差分地更新Q表格
self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)
# 保存模型
def save_model(self, path):
# 如果路径不存在,就自动创建
Path(path).mkdir(parents=True, exist_ok=True)
torch.save(
obj=self.Q_table,
f=path + "checkpoint.pkl",
pickle_module=dill
)
# 加载模型
def load_model(self, path):
self.Q_table = torch.load(f=path + 'checkpoint.pkl', pickle_module=dill)
# 训练函数
def train(arg_dict, env, agent):
# 开始计时
startTime = time.time()
print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")
print("开始训练智能体......")
# 记录每个epoch的奖励
rewards = []
# 记录每个epoch的智能体到达终点用的步数
steps = []
for epoch in range(arg_dict['train_eps']):
# 每个epoch的总奖励
ep_reward = 0
# 每个epoch的步数记录器
ep_step = 0
# 重置环境,并获取初始状态
state = env.reset()
# 根据e-贪心策略获取当前动作
action = agent.sample_action(state)
while True:
# 画图
if arg_dict['train_render']:
env.render()
# 执行当前动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境
next_state, reward, done, _ = env.step(action)
# 根据e-贪心策略获取下一个动作
next_action = agent.sample_action(next_state)
# 智能体更新,根据当前状态和动作、下一个状态和奖励,改进Q函数
agent.update(state, action, reward, next_state, next_action, done)
# 更新当前状态为下一时刻状态
state = next_state
# 更新当前动作为下一时刻动作
action = next_action
# 累加记录奖励
ep_reward += reward
# 步数+1
ep_step += 1
# 如果当前回合结束,则跳出循环
if done:
break
# 记录奖励、步数信息
rewards.append(ep_reward)
steps.append(ep_step)
# 每隔10次迭代就输出一次
if (epoch + 1) % 10 == 0:
print(
f'Epoch: {epoch + 1}/{arg_dict["train_eps"]}, Reward: {ep_reward:.2f}, Steps:{ep_step}, Epislon: {agent.epsilon:.3f}')
print("智能体训练结束 , 用时: " + str(time.time() - startTime) + " s")
return {'epochs': range(len(rewards)), 'rewards': rewards, 'steps': steps}
# 测试函数
def test(arg_dict, env, agent):
startTime = time.time()
print("开始测试智能体......")
print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")
# 记录每个epoch的奖励
rewards = []
# 记录每个epoch的智能体到达终点用的步数
steps = []
for epoch in range(arg_dict['test_eps']):
# 每个epoch的总奖励
ep_reward = 0
# 每个epoch的步数记录器
ep_step = 0
# 重置环境,并获取初始状态
state = env.reset()
while True:
# 画图
if arg_dict['test_render']:
env.render()
# 根据最大Q值选择动作
action = agent.predict_action(state)
# 执行动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境
next_state, reward, done, _ = env.step(action)
# 更新当前状态为下一时刻状态
state = next_state
# 累加记录奖励
ep_reward += reward
# 步数+1
ep_step += 1
# 如果当前回合结束,则跳出循环
if done:
break
# 记录奖励、步数信息
rewards.append(ep_reward)
steps.append(ep_step)
# 输出测试信息
print(f"Epochs: {epoch + 1}/{arg_dict['test_eps']}, Steps:{ep_step}, Reward: {ep_reward:.2f}")
print("测试结束 , 用时: " + str(time.time() - startTime) + " s")
return {'episodes': range(len(rewards)), 'rewards': rewards, 'steps': steps}
# 创建环境和智能体
def create_env_agent(arg_dict):
# 创建环境
env = gym.make(arg_dict['env_name'])
env = CliffWalkingWapper(env)
# 设置随机种子
all_seed(env, seed=arg_dict["seed"])
# 获取状态数
try:
n_states = env.observation_space.n
except AttributeError:
n_states = env.observation_space.shape[0]
# 获取动作数
n_actions = env.action_space.n
print(f"状态数: {n_states}, 动作数: {n_actions}")
# 将状态数和动作数加入算法参数字典
arg_dict.update({"n_states": n_states, "n_actions": n_actions})
# 实例化智能体对象
agent = Sarsa(arg_dict)
# 返回环境,智能体
return env, agent
if __name__ == '__main__':
# 防止报错 OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
# 获取当前路径
curr_path = os.path.dirname(os.path.abspath(__file__))
# 获取当前时间
curr_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
# 相关参数设置
parser = argparse.ArgumentParser(description="hyper parameters")
parser.add_argument('--algo_name', default='Sarsa', type=str, help="name of algorithm")
parser.add_argument('--env_name', default='CliffWalking-v0', type=str, help="name of environment")
parser.add_argument('--train_eps', default=400, type=int, help="episodes of training")
parser.add_argument('--test_eps', default=20, type=int, help="episodes of testing")
parser.add_argument('--gamma', default=0.90, type=float, help="discounted factor")
parser.add_argument('--epsilon_start', default=0.95, type=float, help="initial value of epsilon")
parser.add_argument('--epsilon_end', default=0.01, type=float, help="final value of epsilon")
parser.add_argument('--epsilon_decay', default=300, type=int, help="decay rate of epsilon")
parser.add_argument('--lr', default=0.1, type=float, help="learning rate")
parser.add_argument('--device', default='cpu', type=str, help="cpu or cuda")
parser.add_argument('--seed', default=520, type=int, help="seed")
parser.add_argument('--show_fig', default=False, type=bool, help="if show figure or not")
parser.add_argument('--save_fig', default=True, type=bool, help="if save figure or not")
parser.add_argument('--train_render', default=False, type=bool,
help="Whether to render the environment during training")
parser.add_argument('--test_render', default=True, type=bool,
help="Whether to render the environment during testing")
args = parser.parse_args()
default_args = {'result_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/results/",
'model_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/models/",
}
# 将参数转化为字典 type(dict)
arg_dict = {**vars(args), **default_args}
print("算法参数字典:", arg_dict)
# 创建环境和智能体
env, agent = create_env_agent(arg_dict)
# 传入算法参数、环境、智能体,然后开始训练
res_dic = train(arg_dict, env, agent)
print("算法返回结果字典:", res_dic)
# 保存相关信息
agent.save_model(path=arg_dict['model_path'])
save_args(arg_dict, path=arg_dict['result_path'])
save_results(res_dic, tag='train', path=arg_dict['result_path'])
plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="train")
# =================================================================================================
# 创建新环境和智能体用来测试
print("=" * 300)
env, agent = create_env_agent(arg_dict)
# 加载已保存的智能体
agent.load_model(path=arg_dict['model_path'])
res_dic = test(arg_dict, env, agent)
save_results(res_dic, tag='test', path=arg_dict['result_path'])
plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="test")
2.3 运行结果展示
由于有些输出太长,下面仅展示部分输出
状态数: 48, 动作数: 4
环境名: CliffWalking-v0, 算法名: Sarsa, Device: cpu
开始训练智能体......
Epoch: 10/400, Reward: -328.00, Steps:229, Epislon: 0.010
Epoch: 20/400, Reward: -133.00, Steps:133, Epislon: 0.010
Epoch: 30/400, Reward: -65.00, Steps:65, Epislon: 0.010
Epoch: 40/400, Reward: -47.00, Steps:47, Epislon: 0.010
Epoch: 50/400, Reward: -63.00, Steps:63, Epislon: 0.010
Epoch: 60/400, Reward: -83.00, Steps:83, Epislon: 0.010
Epoch: 70/400, Reward: -31.00, Steps:31, Epislon: 0.010
Epoch: 80/400, Reward: -37.00, Steps:37, Epislon: 0.010
Epoch: 90/400, Reward: -53.00, Steps:53, Epislon: 0.010
Epoch: 100/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 110/400, Reward: -39.00, Steps:39, Epislon: 0.010
Epoch: 120/400, Reward: -45.00, Steps:45, Epislon: 0.010
Epoch: 130/400, Reward: -42.00, Steps:42, Epislon: 0.010
Epoch: 140/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 150/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 160/400, Reward: -39.00, Steps:39, Epislon: 0.010
Epoch: 170/400, Reward: -17.00, Steps:17, Epislon: 0.010
Epoch: 180/400, Reward: -35.00, Steps:35, Epislon: 0.010
Epoch: 190/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 200/400, Reward: -23.00, Steps:23, Epislon: 0.010
Epoch: 210/400, Reward: -19.00, Steps:19, Epislon: 0.010
Epoch: 220/400, Reward: -27.00, Steps:27, Epislon: 0.010
Epoch: 230/400, Reward: -19.00, Steps:19, Epislon: 0.010
Epoch: 240/400, Reward: -21.00, Steps:21, Epislon: 0.010
Epoch: 250/400, Reward: -35.00, Steps:35, Epislon: 0.010
Epoch: 260/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 270/400, Reward: -31.00, Steps:31, Epislon: 0.010
Epoch: 280/400, Reward: -27.00, Steps:27, Epislon: 0.010
Epoch: 290/400, Reward: -23.00, Steps:23, Epislon: 0.010
Epoch: 300/400, Reward: -29.00, Steps:29, Epislon: 0.010
Epoch: 310/400, Reward: -25.00, Steps:25, Epislon: 0.010
Epoch: 320/400, Reward: -19.00, Steps:19, Epislon: 0.010
Epoch: 330/400, Reward: -21.00, Steps:21, Epislon: 0.010
Epoch: 340/400, Reward: -21.00, Steps:21, Epislon: 0.010
Epoch: 350/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 360/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 370/400, Reward: -17.00, Steps:17, Epislon: 0.010
Epoch: 380/400, Reward: -17.00, Steps:17, Epislon: 0.010
Epoch: 390/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 400/400, Reward: -15.00, Steps:15, Epislon: 0.010
智能体训练结束 , 用时: 0.42161107063293457 s
============================================================================================================================================================================================================================================================================================================
状态数: 48, 动作数: 4
开始测试智能体......
环境名: CliffWalking-v0, 算法名: Sarsa, Device: cpu
Epochs: 1/20, Steps:15, Reward: -15.00
Epochs: 2/20, Steps:15, Reward: -15.00
Epochs: 3/20, Steps:15, Reward: -15.00
Epochs: 4/20, Steps:15, Reward: -15.00
Epochs: 5/20, Steps:15, Reward: -15.00
Epochs: 6/20, Steps:15, Reward: -15.00
Epochs: 7/20, Steps:15, Reward: -15.00
Epochs: 8/20, Steps:15, Reward: -15.00
Epochs: 9/20, Steps:15, Reward: -15.00
Epochs: 10/20, Steps:15, Reward: -15.00
Epochs: 11/20, Steps:15, Reward: -15.00
Epochs: 12/20, Steps:15, Reward: -15.00
Epochs: 13/20, Steps:15, Reward: -15.00
Epochs: 14/20, Steps:15, Reward: -15.00
Epochs: 15/20, Steps:15, Reward: -15.00
Epochs: 16/20, Steps:15, Reward: -15.00
Epochs: 17/20, Steps:15, Reward: -15.00
Epochs: 18/20, Steps:15, Reward: -15.00
Epochs: 19/20, Steps:15, Reward: -15.00
Epochs: 20/20, Steps:15, Reward: -15.00
测试结束 , 用时: 14.93819785118103 s
可以看出Sarsa这种on-policy的算法不是很激进,会绕开悬崖一段距离走向终点,这也是on-policy算法的一个弊端。
2.4 关于可视化寻路过程的设置
如果你觉得寻路过程可视化比较耗时,你可以进行设置,取消可视化。
或者你想看看训练过程的可视化,也可以进行相关设置(当然我看过了,前期很无聊,智能体一直在乱走)
更多推荐
所有评论(0)