强化学习2.1 MDP(Markov decision process)学习
本文介绍马尔可夫决策过程(Markov Decision Process,简称 MDP)。从最广义的角度来说,MDP 的定义取决于其状态转移方式与奖励计算方式。
状态转移由概率 P ( s ′ ∣ s , a ) P(s' |s,a) P(s′∣s,a)定义,该概率表示从状态s出发、执行动作a后,最终到达状态s ′的可能性。目前有多种定义奖励的方式,但为方便起见,使用 r ( s , a , s ′ ) r(s,a,s') r(s,a,s′)函数来定义奖励。
相关资料CS294
对于初学者,这是一个简单的MDP过程
可以用如下代码进行描述
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's2': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
from mdp import MDP
mdp = MDP(transition_probs, rewards, initial_state='s0')
现在我们可以像使用其他任何 Gym 环境一样使用 MDP 了
print('initial state =', mdp.reset())
next_state, reward, done, info = mdp.step('a1')
print('next_state = %s, reward = %s, done = %s' % (next_state, reward, done))

这个 MDP Gym 环境包含其他一些在执行价值迭代(Value Iteration)时需要用到的方法
print("mdp.get_all_states =", mdp.get_all_states())
print("mdp.get_possible_actions('s1') = ", mdp.get_possible_actions('s1'))
print("mdp.get_next_states('s1', 'a0') = ", mdp.get_next_states('s1', 'a0'))
print("mdp.get_reward('s1', 'a0', 's0') = ", mdp.get_reward('s1', 'a0', 's0'))
print("mdp.get_transition_prob('s1', 'a0', 's0') = ", mdp.get_transition_prob('s1', 'a0', 's0'))

可视化MDP
在此之前,你需要为系统和 Python 环境分别安装 Graphviz 工具:
为系统安装 Graphviz
对于 Ubuntu 系统,直接运行命令:sudo apt-get install graphviz
对于 macOS(OSX)系统,运行命令:brew install graphviz
对于Windows系统,访问此链接进行下载
为 Python 环境安装 Graphviz 库
运行命令:pip install graphviz
安装完成后,需重启 Jupyter Notebook 或相关开发环境,确保 Graphviz 能被正常调用。
导入依赖库
from mdp import has_graphviz
from IPython.display import display
print("Graphviz available:", has_graphviz)
进行可视化
if has_graphviz:
from mdp import plot_graph, plot_graph_with_state_values, plot_graph_optimal_strategy_and_state_values
display(plot_graph(mdp))
结果如下所示
价值迭代
现在我们来构建求解这个 MDP 的算法。目前为止,最简单的算法是价值迭代(Value Iteration,VI)。
以下是价值迭代的伪代码:
- 初始化 V ( 0 ) ( s ) = 0 V^{(0)}(s)=0 V(0)(s)=0,对于所有状态 s
- 对于 i = 0 , 1 , 2 , … i=0, 1, 2, \dots i=0,1,2,…(迭代直到收敛)
- V ( i + 1 ) ( s ) = max a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] V(i+1)(s)=maxa∑s′P(s′∣s,a)⋅[r(s,a,s′)+γVi(s′)], 对于所有状态 s
下面这一个函数,用于计算状态 - 动作价值函数 Q π Q^{\pi} Qπ,其定义如下:
Q i ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] Q_i(s, a) = \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] Qi(s,a)=s′∑P(s′∣s,a)⋅[r(s,a,s′)+γVi(s′)]这个函数的作用是,在当前价值函数 V i V_i Vi 下,计算在状态 s 采取动作 a 后的期望累积奖励。
def get_action_value(mdp, state_values, state, action, gamma):
""" Computes Q(s,a) as in formula above """
q_value = 0.0
# 获取从state执行action后所有可能的下一状态
next_states = mdp.get_next_states(state, action)
for next_state in next_states:
# 获取转移概率 P(s'|s,a)
prob = mdp.get_transition_prob(state, action, next_state)
# 获取即时奖励 r(s,a,s')
reward = mdp.get_reward(state, action, next_state)
# 累加计算 Q(s,a) = sum[ P(s'|s,a) * (r + gamma*V(s')) ]
q_value += prob * (reward + gamma * state_values[next_state])
return q_value
进行测试
import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)
接下来介绍状态价值更新的公式:
V ( i + 1 ) ( s ) = max a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] = max a Q i ( s , a ) V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = \max_a Q_i(s,a) V(i+1)(s)=amaxs′∑P(s′∣s,a)⋅[r(s,a,s′)+γVi(s′)]=amaxQi(s,a)
可以用如下代码表示
def get_new_state_value(mdp, state_values, state, gamma):
""" Computes next V(s) as in formula above. Please do not change state_values in process. """
if mdp.is_terminal(state):
return 0
# 获取当前状态下所有可能的动作
possible_actions = mdp.get_possible_actions(state)
# 存储每个动作的Q值,取最大值作为新的状态价值
max_q_value = -float('inf')
for action in possible_actions:
# 计算当前动作的Q值(调用之前实现的get_action_value函数)
q_value = get_action_value(mdp, state_values, state, action, gamma)
# 更新最大值
if q_value > max_q_value:
max_q_value = q_value
return max_q_value
进行测试
test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert np.isclose(get_new_state_value(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9), -13500000000.0), \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert test_Vs == test_Vs_copy, "Please do not change state_values in get_new_state_value"
下面是结合之前实现的 get_action_value 和 get_new_state_value 函数的完整价值迭代(Value Iteration)算法实现。该算法通过迭代更新所有状态的价值,最终收敛到最优状态价值 V ∗ ( s ) V ^∗ (s) V∗(s)。
# parameters
gamma = 0.9 # discount for MDP
num_iter = 100 # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001
# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
for i in range(num_iter):
# Compute new state values using the functions you defined above.
# It must be a dict {state : float V_new(state)}
new_state_values = {}
for state in mdp.get_all_states():
new_state_values[state] = get_new_state_value(mdp, state_values, state, gamma)
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | " % (i, diff), end="")
print(' '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
state_values = new_state_values
if diff < min_difference:
print("Terminated")
break
代码运行结果如下所示
进行可视化
打印最终的状态价值
print("Final state values:", state_values)
assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01

要基于最优状态价值 V ∗ ( s ) V^{*}(s) V∗(s)提取最优策略
π ∗ ( s ) = a r g m a x a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] = a r g m a x a Q i ( s , a ) \pi^*(s) = argmax_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = argmax_a Q_i(s,a) π∗(s)=argmaxas′∑P(s′∣s,a)⋅[r(s,a,s′)+γVi(s′)]=argmaxaQi(s,a)核心是对每个状态计算所有动作的 Q(s,a),并选择 Q 值最大的动作(即 argmax)。以下是具体实现代码:
def get_optimal_action(mdp, state_values, state, gamma=0.9):
""" Finds optimal action using formula above. """
if mdp.is_terminal(state):
return None
# 获取当前状态下所有可能的动作
possible_actions = mdp.get_possible_actions(state)
# 计算每个动作的Q值,存储为 {动作: Q值}
action_q = {}
for action in possible_actions:
q_value = get_action_value(mdp, state_values, state, action, gamma)
action_q[action] = q_value
# 选择Q值最大的动作(argmax_a Q(s,a))
optimal_action = max(action_q, key=action_q.get)
return optimal_action
进行测试
assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'
assert get_optimal_action(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9) == 'a0', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert get_optimal_action(mdp, {'s0': -2e10, 's1': 0, 's2': -1e10}, 's0', 0.9) == 'a1', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
进行可视化
if has_graphviz:
display(plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value))

计算平均奖励
# Measure agent's average reward
s = mdp.reset()
rewards = []
for _ in range(10000):
s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
print("average reward: ", np.mean(rewards))
assert(0.40 < np.mean(rewards) < 0.55)

更多推荐
所有评论(0)