Pytorch学习笔记--SEResNet50搭建
1--ResNet50介绍上图为ResNet50的整体结构,除Input和Output环节外,还包含5个环节:Stem Block环节、Stage1-4环节和Subsequent Processing环节。1-1--Stem Block环节Stem Block环节的Input是一个三通道(C = 3, W= 224, H = 224)的图像,首先经过卷积操作(kernel_size = 7 x7,
目录
4--实例之使用SEResNet50实现数据集CIFAR10分类
1--ResNet50介绍
分析:上图为ResNet50的整体结构,除Input和Output环节外,还包含5个环节:Stem Block环节、Stage1-4环节和Subsequent Processing环节。
1-1--Stem Block环节
分析:Stem Block环节的Input是一个三通道(C = 3, W = 224, H = 224)的图像,首先经过卷积操作(kernel_size = 7 x 7,stride = 2,卷积核个数 = 64 )、归一化操作、RELU操作,再经过最大池化操作得到Output(C = 64, W = 56, H = 56),这一环节可以理解为四个Stage前的预处理操作,其核心代码如下:
self.Stem = nn.Sequential(
nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
)
1-2--Stage环节
上图为4个Stage环节的网络结构,每个Stage包含两种网络结构:Conv Block结构和Identity Block结构。
以Stage1为例分析两种结构的不同之处:
由上图可见:Conv Block结构比Identity Block结构右侧多了一个Conv和BN操作,搭建两种Block的核心代码如下:
## 导入第三方库
import torch
from torch import nn
# 搭建Conv Block和Identity Block的网络结构
class Block(nn.Module):
def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
super(Block, self).__init__()
# 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
filter1, filter2, filter3 = filters
self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
self.relu = nn.ReLU(inplace = True) # RELU操作
# 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter1),
nn.ReLU()
)
# 中间小块
self.conv2 = nn.Sequential(
nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
nn.BatchNorm2d(filter2),
nn.ReLU()
)
# 最后小块,不需要进行ReLu操作
self.conv3 = nn.Sequential(
nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
nn.BatchNorm2d(filter3),
)
# Conv Block的输入需要额外进行Conv和BN操作(结合Conv Block网络图理解)
if is_1x1conv:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter3)
)
def forward(self, x):
x_shortcut = x # 网络图中右侧的传输值
x1 = self.conv1(x) # 执行第一小Block操作
x1 = self.conv2(x1) # 执行中间小Block操作
x1 = self.conv3(x1) # 执行最后小Block操作
if self.is_1x1conv: # Conv Block进行额外的Conv和BN操作
x_shortcut = self.shortcut(x_shortcut)
x1 = x1 + x_shortcut # Add操作
x1 = self.relu(x1) # ReLU操作
return x1
细节剖析:
- 每个Stage的输出作为下一个Stage的输入;
- Stage1和Stage2-4的Conv Block结构中,左右侧第一个小Block中的stride取值不同。(Stage1: stride = 1; Stage2-4: stride = 2);
- 无论是Conv Block还是Identity Block,最后一个小Block的channel都是第一个和中间小Block的channel的四倍。
- 无论是Conv Block还是Identity Block,最后一个小Block没有RELU操作(相对于前面两个小Block)
1-3--ResNet50核心代码:
# 搭建ResNet50
class Resnet(nn.Module):
def __init__(self, cfg):
super(Resnet, self).__init__()
classes = cfg['classes'] # 分类的类别
num = cfg['num'] # ResNet50[3, 4, 6, 3];Conv Block和 Identity Block的个数
# Stem Block
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
)
# Stage1
filters = (64, 64, 256) # channel
self.Stage1 = self._make_layer(in_channels = 64, filters = filters, num = num[0], stride = 1)
# Stage2
filters = (128, 128, 512) # channel
self.Stage2 = self._make_layer(in_channels = 256, filters = filters, num = num[1], stride = 2)
# Stage3
filters = (256, 256, 1024) # channel
self.Stage3 = self._make_layer(in_channels = 512, filters = filters, num = num[2], stride = 2)
# Stage4
filters = (512, 512, 2048) # channel
self.Stage4 = self._make_layer(in_channels = 1024, filters = filters, num = num[3], stride = 2)
# 平均池化
self.global_average_pool = nn.AdaptiveAvgPool2d((1, 1))
# 全连接层 这里可理解为网络中四个Stage后的Subsequent Processing 环节
self.fc = nn.Sequential(
nn.Linear(2048, classes)
)
# 形成单个Stage的网络结构
def _make_layer(self, in_channels, filters, num, stride = 1):
layers = []
# Conv Block
block_1 = Block(in_channels, filters, stride = stride, is_1x1conv = True)
layers.append(block_1)
# Identity Block结构叠加; 基于[3, 4, 6, 3]
for i in range(1, num):
layers.append(Block(filters[2], filters, stride = 1, is_1x1conv = False))
# 返回Conv Block和Identity Block的集合,形成一个Stage的网络结构
return nn.Sequential(*layers)
def forward(self, x):
# Stem Block环节
x = self.conv1(x)
# 执行四个Stage环节
x = self.Stage1(x)
x = self.Stage2(x)
x = self.Stage3(x)
x = self.Stage4(x)
# 执行Subsequent Processing环节
x = self.global_average_pool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
2--SENet介绍
分析:上图摘自文献《Squeeze-and-Excitation Networks》,SENet的直观理解是对原图像的各个通道进行加权操作,下图是其加权操作的主要过程。
分析:由上图可知,对原图像进行加权操作,主要通过全局平均池化、全连接线性层、ReLU操作、全连接线性层以及Sigmoid函数求得各个通道的权重值,再通过Scale(乘积)操作完成加权。在实例中,用卷积层代替全连接层,试图减少图片的语义损失,计算加权矩阵的核心代码如下:
# SENet(结合SENet的网络图理解)
self.se = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
nn.ReLU(),
nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
nn.Sigmoid()
)
3--SEResNet50介绍
分析:上图将SENet添加至Residual模块中,即将SENet模块添加至ResNet中Conv Block和Identity Block的最后一个小Block之后。基于SENet搭建Conv Block和Identity Block的核心代码如下:
# 搭建基于SENet的Conv Block和Identity Block的网络结构
class Block(nn.Module):
def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
super(Block, self).__init__()
# 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
filter1, filter2, filter3 = filters
self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
self.relu = nn.ReLU(inplace = True) # RELU操作
# 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter1),
nn.ReLU()
)
# 中间小块
self.conv2 = nn.Sequential(
nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
nn.BatchNorm2d(filter2),
nn.ReLU()
)
# 最后小块,不需要进行ReLu操作
self.conv3 = nn.Sequential(
nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
nn.BatchNorm2d(filter3),
)
# Conv Block的输入需要额外进行卷积和归一化操作(结合Conv Block网络图理解)
if is_1x1conv:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter3)
)
# SENet(结合SENet的网络图理解)
self.se = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
nn.ReLU(),
nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
nn.Sigmoid()
)
def forward(self, x):
x_shortcut = x
x1 = self.conv1(x) # 执行第一Block操作
x1 = self.conv2(x1) # 执行中间Block操作
x1 = self.conv3(x1) # 执行最后Block操作
x2 = self.se(x1) # 利用SENet计算出每个通道的权重大小
x1 = x1 * x2 # 对原通道进行加权操作
if self.is_1x1conv: # Conv Block进行额外的卷积归一化操作
x_shortcut = self.shortcut(x_shortcut)
x1 = x1 + x_shortcut # Add操作
x1 = self.relu(x1) # ReLU操作
return x1
4--实例之使用SEResNet50实现数据集CIFAR10分类
直接运行的代码(详细注释):
## 导入第三方库
from torch import nn
import time
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import torch.optim as optim
# 搭建基于SENet的Conv Block和Identity Block的网络结构
class Block(nn.Module):
def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
super(Block, self).__init__()
# 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
filter1, filter2, filter3 = filters
self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
self.relu = nn.ReLU(inplace = True) # RELU操作
# 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter1),
nn.ReLU()
)
# 中间小块
self.conv2 = nn.Sequential(
nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
nn.BatchNorm2d(filter2),
nn.ReLU()
)
# 最后小块,不需要进行ReLu操作
self.conv3 = nn.Sequential(
nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
nn.BatchNorm2d(filter3),
)
# Conv Block的输入需要额外进行卷积和归一化操作(结合Conv Block网络图理解)
if is_1x1conv:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
nn.BatchNorm2d(filter3)
)
# SENet(结合SENet的网络图理解)
self.se = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
nn.ReLU(),
nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
nn.Sigmoid()
)
def forward(self, x):
x_shortcut = x
x1 = self.conv1(x) # 执行第一Block操作
x1 = self.conv2(x1) # 执行中间Block操作
x1 = self.conv3(x1) # 执行最后Block操作
x2 = self.se(x1) # 利用SENet计算出每个通道的权重大小
x1 = x1 * x2 # 对原通道进行加权操作
if self.is_1x1conv: # Conv Block进行额外的卷积归一化操作
x_shortcut = self.shortcut(x_shortcut)
x1 = x1 + x_shortcut # Add操作
x1 = self.relu(x1) # ReLU操作
return x1
# 搭建SEResNet50
class SEResnet(nn.Module):
def __init__(self, cfg):
super(SEResnet, self).__init__()
classes = cfg['classes'] # 分类的类别
num = cfg['num'] # ResNet50[3, 4, 6, 3];Conv Block和 Identity Block的个数
# Stem Block
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
)
# Stage1
filters = (64, 64, 256) # channel
self.Stage1 = self._make_layer(in_channels = 64, filters = filters, num = num[0], stride = 1)
# Stage2
filters = (128, 128, 512) # channel
self.Stage2 = self._make_layer(in_channels = 256, filters = filters, num = num[1], stride = 2)
# Stage3
filters = (256, 256, 1024) # channel
self.Stage3 = self._make_layer(in_channels = 512, filters = filters, num = num[2], stride = 2)
# Stage4
filters = (512, 512, 2048) # channel
self.Stage4 = self._make_layer(in_channels = 1024, filters = filters, num = num[3], stride = 2)
# 自适应平均池化,(1, 1)表示输出的大小(H x W)
self.global_average_pool = nn.AdaptiveAvgPool2d((1, 1))
# 全连接层 这里可理解为网络中四个Stage后的Subsequent Processing 环节
self.fc = nn.Sequential(
nn.Linear(2048, classes)
)
# 形成单个Stage的网络结构
def _make_layer(self, in_channels, filters, num, stride = 1):
layers = []
# Conv Block
block_1 = Block(in_channels, filters, stride = stride, is_1x1conv = True)
layers.append(block_1)
# Identity Block结构叠加; 基于[3, 4, 6, 3]
for i in range(1, num):
layers.append(Block(filters[2], filters, stride = 1, is_1x1conv = False))
# 返回Conv Block和Identity Block的集合,形成一个Stage的网络结构
return nn.Sequential(*layers)
def forward(self, x):
# Stem Block环节
x = self.conv1(x)
# 执行四个Stage环节
x = self.Stage1(x)
x = self.Stage2(x)
x = self.Stage3(x)
x = self.Stage4(x)
# 执行Subsequent Processing环节
x = self.global_average_pool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
# SeResNet50的参数 (注意调用这个函数将间接调用SEResnet,这里单独编写一个函数是为了方便修改成其它ResNet网络的结构)
def SeResNet50():
cfg = {
'num':(3, 4, 6, 3), # ResNet50,四个Stage中Block的个数(其中Conv Block为1个,剩下均为增加Identity Block)
'classes': (10) # 数据集分类的个数
}
return SEResnet(cfg) # 调用SEResnet网络
## 导入数据集
def load_dataset(batch_size):
# 下载训练集
train_set = torchvision.datasets.CIFAR10(
root = "data/cifar-10", train = True,
download = True, transform = transforms.ToTensor()
)
# 下载测试集
test_set = torchvision.datasets.CIFAR10(
root = "data/cifar-10", train = False,
download = True, transform = transforms.ToTensor()
)
train_iter = torch.utils.data.DataLoader(
train_set, batch_size = batch_size, shuffle = True, num_workers = 4
)
test_iter = torch.utils.data.DataLoader(
test_set, batch_size = batch_size, shuffle = True, num_workers = 4
)
return train_iter, test_iter
# 训练模型
def train(net, train_iter, criterion, optimizer, num_epochs, device, num_print, lr_scheduler = None, test_iter = None):
net.train() # 训练模式
record_train = list() # 记录每一Epoch下训练集的准确率
record_test = list() # 记录每一Epoch下测试集的准确率
for epoch in range(num_epochs):
print("========== epoch: [{}/{}] ==========".format(epoch + 1, num_epochs))
total, correct, train_loss = 0, 0, 0
start = time.time()
for i, (X, y) in enumerate(train_iter):
X, y = X.to(device), y.to(device) # GPU or CPU运行
output = net(X) # 计算输出
loss = criterion(output, y) # 计算损失
optimizer.zero_grad() # 梯度置0
loss.backward() # 计算梯度
optimizer.step() # 优化参数
train_loss += loss.item() # 累积损失
total += y.size(0) # 累积总样本数
correct += (output.argmax(dim=1) == y).sum().item() # 累积预测正确的样本数
train_acc = 100.0 * correct / total # 计算准确率
if (i + 1) % num_print == 0:
print("step: [{}/{}], train_loss: {:.3f} | train_acc: {:6.3f}% | lr: {:.6f}" \
.format(i + 1, len(train_iter), train_loss / (i + 1), \
train_acc, get_cur_lr(optimizer)))
# 调整梯度下降算法的学习率
if lr_scheduler is not None:
lr_scheduler.step()
# 输出训练的时间
print("--- cost time: {:.4f}s ---".format(time.time() - start))
if test_iter is not None: # 判断测试集是否为空 (注意这里将调用test函数)
record_test.append(test(net, test_iter, criterion, device)) # 每训练一个Epoch模型,使用测试集进行测试模型的准确度
record_train.append(train_acc)
# 返回每一个Epoch下测试集和训练集的准确率
return record_train, record_test
# 验证模型
def test(net, test_iter, criterion, device):
total, correct = 0, 0
net.eval() # 测试模式
with torch.no_grad(): # 不计算梯度
print("*************** test ***************")
for X, y in test_iter:
X, y = X.to(device), y.to(device) # CPU or GPU运行
output = net(X) # 计算输出
loss = criterion(output, y) # 计算损失
total += y.size(0) # 计算测试集总样本数
correct += (output.argmax(dim=1) == y).sum().item() # 计算测试集预测准确的样本数
test_acc = 100.0 * correct / total # 测试集准确率
# 输出测试集的损失
print("test_loss: {:.3f} | test_acc: {:6.3f}%" \
.format(loss.item(), test_acc))
print("************************************\n")
# 训练模式 (因为这里是因为每经过一个Epoch就使用测试集一次,使用测试集后,进入下一个Epoch前将模型重新置于训练模式)
net.train()
return test_acc
# 返回学习率lr的函数
def get_cur_lr(optimizer):
for param_group in optimizer.param_groups:
return param_group['lr']
# 画出每一个Epoch下测试集和训练集的准确率
def learning_curve(record_train, record_test=None):
plt.style.use("ggplot")
plt.plot(range(1, len(record_train) + 1), record_train, label = "train acc")
if record_test is not None:
plt.plot(range(1, len(record_test) + 1), record_test, label = "test acc")
plt.legend(loc=4)
plt.title("learning curve")
plt.xticks(range(0, len(record_train) + 1, 5))
plt.yticks(range(0, 101, 5))
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.show()
BATCH_SIZE = 128 # 批大小
NUM_EPOCHS = 12 # Epoch大小
NUM_CLASSES = 10 # 分类的个数
LEARNING_RATE = 0.01 # 梯度下降学习率
MOMENTUM = 0.9 # 冲量大小
WEIGHT_DECAY = 0.0005 # 权重衰减系数
NUM_PRINT = 100
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # GPU or CPU运行
def main():
net = SeResNet50()
net = net.to(DEVICE) # GPU or CPU 运行
train_iter, test_iter = load_dataset(BATCH_SIZE) # 导入训练集和测试集
criterion = nn.CrossEntropyLoss() # 损失计算器
# 优化器
optimizer = optim.SGD(
net.parameters(),
lr = LEARNING_RATE,
momentum = MOMENTUM,
weight_decay = WEIGHT_DECAY,
nesterov = True
)
# 调整学习率 (step_size:每训练step_size个epoch,更新一次参数; gamma:更新lr的乘法因子)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.1)
record_train, record_test = train(net, train_iter, criterion, optimizer, NUM_EPOCHS, DEVICE, NUM_PRINT,
lr_scheduler, test_iter)
learning_curve(record_train, record_test) # 画出准确率曲线
main()
5--参考
SENet参考
更多推荐
所有评论(0)