机器学习22-强化学习代码

pytorch在进行算法编写的时候,感觉使用了很少了代码就编写了一个很复杂的算法;在次针对算法中常用的函数进行拆解,先了解每个小函数的输入和计算规则是怎样,再去了解整体算法的业务逻辑

1-知识汇总

  • 1)生成服从均匀分布 U(0, 1) 的随机数:torch.rand(1, device=device)
  • 2)随机生成1个整数张量,元素值在 [0, 3) 区间内(即 0 或 1 或 2):torch.randint(0, 3, (1,), device=device).item()
  • 3)torch.rand(1, device=device)和torch.rand(1, device=device).item()区别
  • 4)解释代码:reward = float(np.random.rand() < 0.5

2-知识详解

1-生成服从均匀分布 U(0, 1) 的随机数:torch.rand(1, device=device)

这段代码的作用是:
在指定设备(device)上生成一个从均匀分布 [0, 1) 中采样的随机标量(1个元素的张量),并打印结果。


逐部分解释:

  1. torch.rand(1, device=device)

    • torch.rand 是 PyTorch 的函数,用于生成服从 均匀分布 U(0, 1) 的随机数。
    • 参数 1 表示生成一个形状为 (1,) 的张量(即单个标量)。
    • device=device 指定张量存储的设备(如 cpucuda:0)。如果 devicecuda,则张量会直接生成在 GPU 上。
  2. print('torch.rand:', ...)

    • 打印前缀 'torch.rand:',后跟生成的随机张量。例如输出可能是:
      torch.rand: tensor([0.3745], device='cuda:0')
      

注意事项:

  • device 需要先定义:例如 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  • 结果每次运行都不同:因为 torch.rand 每次调用都会重新采样。

示例完整代码:

import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('torch.rand:', torch.rand(1, device=device))

2-随机生成1个整数张量,元素值在 [0, 3) 区间内

# 随机生成 1 个整数张量,元素值在 [0, 2) 区间内(即 0 或 1)
# 打印结果:torch.randint: 0
print('torch.randint:', torch.randint(0, 2, (1,), device=device).item())

# 随机生成 1 个整数张量,元素值在 [0, 3) 区间内(即 0 或 1 或 2)
# 打印结果:torch.randint: 2
print('torch.randint:', torch.randint(0, 3, (1,), device=device).item())

# 打印结果:torch.randint: tensor([2], device='cuda:0')
print('torch.randint:', torch.randint(0, 3, (10,), device=device))

在指定设备(device)上,从整数集合 {0, 1} 中随机抽取 1 个整数,并立即将其转换为 Python 原生的 int 类型返回。


逐部分拆解:

代码片段 含义
torch.randint(0, 2, (1,), device=device) 随机生成 1 个整数张量,元素值在 [0, 2) 区间内(即 0 或 1)。
- 第 1 个参数 0:下限(包含)。
- 第 2 个参数 2:上限(不包含)。
- 第 3 个参数 (1,):形状,表示生成 1 个元素的 1-D 张量。
- device=device:张量直接分配在指定设备(CPU 或 GPU)。
.item() 把只有一个元素的张量转成 Python 的原生 int 值(CPU 内存),不再保留张量结构。

示例输出:

import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
value = torch.randint(0, 2, (1,), device=device).item()
print(value)   # 可能打印 0 或 1

注意事项:

  • .item() 的张量位于 GPU,PyTorch 会自动将其复制到 CPU 再返回 Python 标量。
  • 每次运行结果随机:要么 0,要么 1,概率各 50%。
  • torch.randint(0, 3, (1,), device=device).item()中(1,)的含义,(1,) 是一个元组,用来指定生成张量的形状(shape)。
    在这里它的含义是:创建一个一维、长度为 1 的张量,也就是只有一个元素的“向量”。

为什么写成 (1,)

(1,) 是一个元组,用来指定生成张量的形状(shape)。
在这里它的含义是:创建一个一维、长度为 1 的张量,也就是只有一个元素的“向量”。

  1. 语法要求
    PyTorch 里凡是接受“形状”参数的函数(randint, zeros, ones, rand, …)都要求传入 元组列表
    单独写 1 会被当成整数,而不是形状元组,因此必须加逗号变成 (1,) 或写成 [1]

  2. 与标量区分

    • torch.randint(0, 3, 1) 会报错:TypeError。
    • torch.randint(0, 3, ()) 会得到 0-D 张量(标量),但后面 .item() 时效果相同。
    • (1,) 明确表示“1 个元素的一维张量”,意图清晰,也便于后续改成 (10,) 之类批量采样。
  3. .item() 配合
    .item() 只能对 单个元素 的张量使用。用 (1,) 保证张量只有一个元素,避免误用。


小结

(1,) 就是“形状为 1 的一维张量”的 PyTorch 标准写法;写成 (1,) 而不是 1 是为了满足函数签名要求,并清楚表达“生成一个元素”的意图。


3-torch.rand(1, device=device)和torch.rand(1, device=device).item()区别

总结:可以替换,一个是【张量】,一个是【数值】;可以正常参与比较符号 < 运算,返回的还是【张量】和【数值】,还可以直接进行比较

1-只想做 Python 的 if/else 分支 → 用 .item()

2-想批量生成掩码或继续 GPU 张量计算 → 不用 .item()

# 打印结果:True
print('torch.rand:', torch.rand(1, device=device).item() < 0.5)

# 打印结果:tensor([False], device='cuda:0')
print('torch.rand:', torch.rand(1, device=device) < 0.5)

核心区别只有一句话:

  • torch.rand(1, device=device) < 0.5
    返回一个形状为 (1,) 的布尔张量,仍在原设备(CPU 或 GPU)上。
  • torch.rand(1, device=device).item() < 0.5
    返回一个 Python 的 bool 标量,一定在 CPU 上。

逐项对照

维度 torch.rand(1, device=device) < 0.5 torch.rand(1, device=device).item() < 0.5
结果类型 torch.Tensor(dtype=torch.bool Python 原生 bool
形状 (1,) 无(标量)
所在设备 device 一致(可 GPU) CPU
能否直接用于 if 不行,会报错“只有一个元素的张量才能转换为 Python 标量” 可以
典型用途 批量掩码、向量索引、继续留在 GPU 做张量运算 分支判断、Python 控制流

示例

>>> t = torch.rand(1, device='cuda')
>>> mask = t < 0.5          # 仍是 GPU 上的张量
>>> mask
tensor([True], device='cuda:0')

>>> flag = t.item() < 0.5   # Python 标量
>>> flag
True

结论

  • 只想做 Python 的 if/else 分支 → 用 .item()
  • 想批量生成掩码或继续 GPU 张量计算 → 不用 .item()

一句话区别:

  • torch.rand(1, device=device) 得到 PyTorch 张量(带设备信息)。
  • torch.rand(1, device=device).item() 得到 Python 标量float,一定在 CPU)。

详细对照

维度 torch.rand(1, device=device) torch.rand(1, device=device).item()
类型 torch.Tensor Python float
形状 (1,) 无(标量)
设备 传进来的 device(可以是 GPU) CPU
后续操作 可继续做张量运算、梯度计算等 只能做普通 Python 数值运算
内存 张量占用的显存/内存 只占一个 float 内存
典型用途 需要继续在 GPU 上参与张量计算 只想要一个随机数做判断或打印

示例

>>> x = torch.rand(1, device='cuda')
>>> x
tensor([0.7314], device='cuda:0')
>>> type(x)
torch.Tensor

>>> y = torch.rand(1, device='cuda').item()
>>> y
0.2688
>>> type(y)
float

可以,这样写完全没问题,而且是最常见的写法之一。
你给出的代码:

if torch.rand(1, device=device).item() < eps:

等价于:

if random.random() < eps:          # Python 标准库

if np.random.rand() < eps:         # NumPy

区别只在于:

写法 设备 是否依赖 PyTorch RNG 备注
torch.rand(1, device=device).item() device 一致 GPU 也能用,和模型在同一 RNG 状态
random.random() 仅 CPU 与 PyTorch 的随机种子独立
np.random.rand() 仅 CPU 需单独设置 NumPy 种子

在你的强化学习循环里,用 torch.rand(...).item() 的好处是:

  1. 统一设备
    如果整个训练流程都在 GPU 上运行,使用 torch.rand 可以避免把随机数生成放到 CPU 再 .item() 拉回 CPU——虽然 .item() 最终还是把标量拉回 CPU,但张量本身可先留在 GPU,减少潜在同步开销。

  2. 共享随机数种子
    如果你用 torch.manual_seed(...) 设了全局种子,那么 torch.rand 会与网络的随机初始化、Dropout 等共享同一 RNG 状态,实验可复现性更好。

  3. 代码一致性
    整个项目都用 PyTorch 的随机接口,看起来更统一。

总结:可以这么写,而且推荐这么写,除非你有特殊理由要换到 Python 或 NumPy 的随机接口。

4-解释代码:reward = float(np.random.rand() < 0.5

这行代码:

reward = float(np.random.rand() < 0.5)

作用:
50% 的概率 返回 1.0,以 50% 的概率 返回 0.0,并将结果赋给变量 reward


拆解说明:

代码片段 含义
np.random.rand() 从均匀分布 [0.0, 1.0) 中随机生成一个 Python 浮点数
< 0.5 判断这个随机数是否小于 0.5,得到一个 Python 布尔值 TrueFalse
float(...) 将布尔值强制转换为浮点数:
- True1.0
- False0.0

示例输出:

>>> import numpy as np
>>> reward = float(np.random.rand() < 0.5)
>>> reward
0.0   # 或 1.0,概率各 50%

一句话总结:

这是一句简洁的 伯努利奖励采样 代码,模拟“成功得 1 分,失败得 0 分”的随机奖励机制。


3-强化学习案例

1-强化学习的A/B双臂性能图像绘制

编写pytorch代码完成强化学习的性能比较:A按钮以0.4的概率返回奖励1,0.6的概率返回奖励0;B按钮以0.2的概率返回奖励1,0.8的概率返回奖励0;分别使用概率为0.1和0.01的探索概率进行强化学习尝试,绘制尝试次数和平均累计奖励的性能图像

1-单条步骤执行

import torch
import numpy as np
import matplotlib.pyplot as plt

plt.rcParams['font.family'] = 'sans-serif'  # 告诉 matplotlib 使用 sans-serif 族
plt.rcParams['font.sans-serif'] = ['SimHei']  # 优先用黑体(Win 自带)
plt.rcParams['axes.unicode_minus'] = False  # 解决负号 '−' 显示成方块的问题

# 环境参数
true_probs = {'A': 0.4, 'B': 0.2}  # 真实成功概率
optimal_reward = max(true_probs.values())  # 最优期望奖励

# 超参数
num_runs = 200  # 独立实验次数(平滑曲线)
num_steps = 200  # 每轮实验的总尝试步数
epsilons = [0.1, 0.01]

# 如果显存不足,可适当降低 num_runs 或 num_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)


def run_bandit(eps):
    """一次完整的 2000 步实验,返回每一步的累计奖励"""
    Q = torch.zeros(2, device=device)  # 估计值
    N = torch.zeros(2, device=device)  # 被选择次数
    cumulative_reward = torch.zeros(num_steps, device=device) # 累计奖励
    total_reward = 0.0

    for t in range(num_steps):
        # ε-greedy 选择动作
        if torch.rand(1, device=device) < eps:
            # 如果命中了探索(随机值<0.1)
            action = torch.randint(0, 2, (1,), device=device).item()
        else:
            # 如果命中了利用(随机值>=0.1)
            action = torch.argmax(Q).item()

        print('-----action:', action)

        # 环境反馈()
        prob = true_probs['A'] if action == 0 else true_probs['B']

        print('-----prob:', prob)

        reward = float(np.random.rand() < prob)
        print('-----reward:', reward)

        # 更新计数与估计值
        N[action] += 1
        Q[action] += (reward - Q[action]) / N[action]

        # 记录累计奖励
        total_reward += reward
        cumulative_reward[t] = total_reward / (t + 1)  # 平均累计奖励
    return cumulative_reward.cpu().numpy()


# 主循环:对两种 epsilon 各跑多次并取平均
results = {}
for eps in epsilons:
    all_curves = []
    for _ in range(num_runs):
        print(f"{eps} Runloop : {_} ")
        all_curves.append(run_bandit(eps))
    results[eps] = np.mean(np.vstack(all_curves), axis=0)

# 绘图
plt.figure(figsize=(8, 5))
x = np.arange(1, num_steps + 1)
for eps, curve in results.items():
    plt.plot(x, curve, label=f'ε = {eps}')
plt.axhline(optimal_reward, color='black', linestyle='--', label='最优期望奖励')
plt.xlabel('尝试次数')
plt.ylabel('平均累计奖励')
plt.title('不同探索概率 ε-greedy 的性能比较')
plt.legend()
plt.grid(alpha=0.3)
plt.show()


2-并发执行

import torch
import numpy as np
from time import time
import matplotlib.pyplot as plt

plt.rcParams['font.family'] = 'sans-serif'  # 告诉 matplotlib 使用 sans-serif 族
plt.rcParams['font.sans-serif'] = ['SimHei']  # 优先用黑体(Win 自带)
plt.rcParams['axes.unicode_minus'] = False  # 解决负号 '−' 显示成方块的问题

# 环境参数
true_probs = torch.tensor([0.4, 0.2], device='cuda')  # 2 个臂
optimal_reward = true_probs.max().item()

# 超参数
num_runs = 10000  # 并行实验数
num_steps = 10000  # 每轮实验步数
epsilons = [0.1, 0.01]

# 如果显存不足,可适当降低 num_runs 或 num_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)


def batch_bandit(eps, num_runs=num_runs, num_steps=num_steps):
    """
    一次性在 GPU 上跑完所有实验所有步,返回 shape=(num_steps,) 的平均累计奖励
    """
    # 1. 预生成所有随机数
    rand_action = torch.rand(num_runs, num_steps, device=device)  # ε-greedy 用
    rand_reward = torch.rand(num_runs, num_steps, device=device)  # 环境 reward 用

    # 2. 初始化张量
    Q = torch.zeros(num_runs, 2, device=device)  # 估计值
    N = torch.zeros(num_runs, 2, device=device)  # 计数
    total_reward = torch.zeros(num_runs, num_steps, device=device)

    # 3. 主循环(矢量化)
    for t in range(num_steps):
        # ε-greedy 选臂
        greedy_arm = torch.argmax(Q, dim=1)  # (num_runs,)
        explore = rand_action[:, t] < eps  # (num_runs,) bool
        action = torch.where(explore,
                             torch.randint(0, 2, (num_runs,), device=device),
                             greedy_arm)  # (num_runs,)

        # 环境反馈
        prob = true_probs[action]  # (num_runs,)
        reward = (rand_reward[:, t] < prob).float()  # (num_runs,)

        # 更新计数 & 估计值
        N.scatter_add_(1, action.unsqueeze(1), torch.ones_like(action, dtype=torch.float).unsqueeze(1))
        # 等价写法:N[torch.arange(num_runs), action] += 1

        # Q 更新:增量式
        idx = torch.arange(num_runs, device=device)
        Q[idx, action] += (reward - Q[idx, action]) / N[idx, action]

        # 记录每一步累计奖励(后续用 cumsum 做平均)
        total_reward[:, t] = reward

    # 4. 计算平均累计奖励
    cumulative = total_reward.cumsum(dim=1) / torch.arange(1, num_steps + 1, device=device)
    return cumulative.mean(dim=0).cpu().numpy()


# ----- 主程序 -----
t0 = time()
results = {}
for eps in epsilons:
    print('Running ε =', eps)
    results[eps] = batch_bandit(eps)
print('Total time: %.2fs' % (time() - t0))

# 绘图
plt.figure(figsize=(8, 5))
x = np.arange(1, num_steps + 1)
for eps, curve in results.items():
    plt.plot(x, curve, label=f'ε = {eps}')
plt.axhline(optimal_reward, color='black', linestyle='--', label='最优期望奖励')
plt.xlabel('尝试次数')
plt.ylabel('平均累计奖励')
plt.title('不同探索概率 ε-greedy 的性能比较(矢量化并行版)')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐