本文介绍马尔可夫决策过程(Markov Decision Process,简称 MDP)。从最广义的角度来说,MDP 的定义取决于其状态转移方式与奖励计算方式。
状态转移由概率 P ( s ′ ∣ s , a ) P(s' |s,a) P(ss,a)定义,该概率表示从状态s出发、执行动作a后,最终到达状态s ′的可能性。目前有多种定义奖励的方式,但为方便起见,使用 r ( s , a , s ′ ) r(s,a,s') r(s,a,s)函数来定义奖励。

相关资料CS294

对于初学者,这是一个简单的MDP过程
在这里插入图片描述

可以用如下代码进行描述

transition_probs = {
    's0': {
        'a0': {'s0': 0.5, 's2': 0.5},
        'a1': {'s2': 1}
    },
    's1': {
        'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
        'a1': {'s1': 0.95, 's2': 0.05}
    },
    's2': {
        'a0': {'s0': 0.4, 's2': 0.6},
        'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
    }
}
rewards = {
    's1': {'a0': {'s0': +5}},
    's2': {'a1': {'s0': -1}}
}

from mdp import MDP
mdp = MDP(transition_probs, rewards, initial_state='s0')

现在我们可以像使用其他任何 Gym 环境一样使用 MDP 了

print('initial state =', mdp.reset())
next_state, reward, done, info = mdp.step('a1')
print('next_state = %s, reward = %s, done = %s' % (next_state, reward, done))

在这里插入图片描述

这个 MDP Gym 环境包含其他一些在执行价值迭代(Value Iteration)时需要用到的方法

print("mdp.get_all_states =", mdp.get_all_states())
print("mdp.get_possible_actions('s1') = ", mdp.get_possible_actions('s1'))
print("mdp.get_next_states('s1', 'a0') = ", mdp.get_next_states('s1', 'a0'))
print("mdp.get_reward('s1', 'a0', 's0') = ", mdp.get_reward('s1', 'a0', 's0'))
print("mdp.get_transition_prob('s1', 'a0', 's0') = ", mdp.get_transition_prob('s1', 'a0', 's0'))

在这里插入图片描述

可视化MDP

在此之前,你需要为系统和 Python 环境分别安装 Graphviz 工具:
为系统安装 Graphviz
对于 Ubuntu 系统,直接运行命令:sudo apt-get install graphviz
对于 macOS(OSX)系统,运行命令:brew install graphviz
对于Windows系统,访问此链接进行下载

为 Python 环境安装 Graphviz 库
运行命令:pip install graphviz

安装完成后,需重启 Jupyter Notebook 或相关开发环境,确保 Graphviz 能被正常调用。
导入依赖库

from mdp import has_graphviz
from IPython.display import display
print("Graphviz available:", has_graphviz)

进行可视化

if has_graphviz:
    from mdp import plot_graph, plot_graph_with_state_values, plot_graph_optimal_strategy_and_state_values
    display(plot_graph(mdp))

结果如下所示
在这里插入图片描述

价值迭代

现在我们来构建求解这个 MDP 的算法。目前为止,最简单的算法是价值迭代(Value Iteration,VI)。
以下是价值迭代的伪代码:

  1. 初始化 V ( 0 ) ( s ) = 0 V^{(0)}(s)=0 V(0)(s)=0,对于所有状态 s
  2. 对于 i = 0 , 1 , 2 , … i=0, 1, 2, \dots i=0,1,2,(迭代直到收敛)
  3. V ( i + 1 ) ( s ) = max ⁡ a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] V(i+1)(s)=maxasP(ss,a)[r(s,a,s)+γVi(s)], 对于所有状态 s

下面这一个函数,用于计算状态 - 动作价值函数 Q π Q^{\pi} Qπ,其定义如下:
Q i ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] Q_i(s, a) = \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] Qi(s,a)=sP(ss,a)[r(s,a,s)+γVi(s)]这个函数的作用是,在当前价值函数 V i V_i Vi 下,计算在状态 s 采取动作 a 后的期望累积奖励。

def get_action_value(mdp, state_values, state, action, gamma):
    """ Computes Q(s,a) as in formula above """
    q_value = 0.0
    # 获取从state执行action后所有可能的下一状态
    next_states = mdp.get_next_states(state, action)
    
    for next_state in next_states:
        # 获取转移概率 P(s'|s,a)
        prob = mdp.get_transition_prob(state, action, next_state)
        # 获取即时奖励 r(s,a,s')
        reward = mdp.get_reward(state, action, next_state)
        # 累加计算 Q(s,a) = sum[ P(s'|s,a) * (r + gamma*V(s')) ]
        q_value += prob * (reward + gamma * state_values[next_state])
    
    return q_value

进行测试

import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)

接下来介绍状态价值更新的公式:
V ( i + 1 ) ( s ) = max ⁡ a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] = max ⁡ a Q i ( s , a ) V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = \max_a Q_i(s,a) V(i+1)(s)=amaxsP(ss,a)[r(s,a,s)+γVi(s)]=amaxQi(s,a)

可以用如下代码表示

def get_new_state_value(mdp, state_values, state, gamma):
    """ Computes next V(s) as in formula above. Please do not change state_values in process. """
    if mdp.is_terminal(state):
        return 0

    # 获取当前状态下所有可能的动作
    possible_actions = mdp.get_possible_actions(state)
    
    # 存储每个动作的Q值,取最大值作为新的状态价值
    max_q_value = -float('inf')
    for action in possible_actions:
        # 计算当前动作的Q值(调用之前实现的get_action_value函数)
        q_value = get_action_value(mdp, state_values, state, action, gamma)
        # 更新最大值
        if q_value > max_q_value:
            max_q_value = q_value
    
    return max_q_value

进行测试

test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert np.isclose(get_new_state_value(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9), -13500000000.0), \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert test_Vs == test_Vs_copy, "Please do not change state_values in get_new_state_value"

下面是结合之前实现的 get_action_value 和 get_new_state_value 函数的完整价值迭代(Value Iteration)算法实现。该算法通过迭代更新所有状态的价值,最终收敛到最优状态价值 V ∗ ( s ) V ^∗ (s) V(s)

# parameters
gamma = 0.9            # discount for MDP
num_iter = 100         # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001

# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}

if has_graphviz:
    display(plot_graph_with_state_values(mdp, state_values))

for i in range(num_iter):

    # Compute new state values using the functions you defined above.
    # It must be a dict {state : float V_new(state)}
    new_state_values = {}
    for state in mdp.get_all_states():
        new_state_values[state] = get_new_state_value(mdp, state_values, state, gamma)

    assert isinstance(new_state_values, dict)

    # Compute difference
    diff = max(abs(new_state_values[s] - state_values[s])
               for s in mdp.get_all_states())
    print("iter %4i   |   diff: %6.5f   |   " % (i, diff), end="")
    print('   '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
    state_values = new_state_values

    if diff < min_difference:
        print("Terminated")
        break

代码运行结果如下所示
在这里插入图片描述
进行可视化
在这里插入图片描述

打印最终的状态价值

print("Final state values:", state_values)

assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01

在这里插入图片描述
要基于最优状态价值 V ∗ ( s ) V^{*}(s) V(s)提取最优策略
π ∗ ( s ) = a r g m a x a ∑ s ′ P ( s ′ ∣ s , a ) ⋅ [ r ( s , a , s ′ ) + γ V i ( s ′ ) ] = a r g m a x a Q i ( s , a ) \pi^*(s) = argmax_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = argmax_a Q_i(s,a) π(s)=argmaxasP(ss,a)[r(s,a,s)+γVi(s)]=argmaxaQi(s,a)核心是对每个状态计算所有动作的 Q(s,a),并选择 Q 值最大的动作(即 argmax)。以下是具体实现代码:

def get_optimal_action(mdp, state_values, state, gamma=0.9):
    """ Finds optimal action using formula above. """
    if mdp.is_terminal(state):
        return None

    # 获取当前状态下所有可能的动作
    possible_actions = mdp.get_possible_actions(state)
    
    # 计算每个动作的Q值,存储为 {动作: Q值}
    action_q = {}
    for action in possible_actions:
        q_value = get_action_value(mdp, state_values, state, action, gamma)
        action_q[action] = q_value
    
    # 选择Q值最大的动作(argmax_a Q(s,a))
    optimal_action = max(action_q, key=action_q.get)
    
    return optimal_action

进行测试

assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'

assert get_optimal_action(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9) == 'a0', \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert get_optimal_action(mdp, {'s0': -2e10, 's1': 0, 's2': -1e10}, 's0', 0.9) == 'a1', \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"

进行可视化

if has_graphviz:
    display(plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value))

在这里插入图片描述

计算平均奖励

# Measure agent's average reward

s = mdp.reset()
rewards = []
for _ in range(10000):
    s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
    rewards.append(r)

print("average reward: ", np.mean(rewards))

assert(0.40 < np.mean(rewards) < 0.55)

在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐