用三个类别的图片文件夹进行模型训练,简单实现图像分类,并将另一个文件夹中图片进行分类,将预测的图像根据类别放到对应文件夹中,用已有的模型继续训练。用pytorch实现
import os# 解决OpenMP冲突nn.ReLU(),nn.ReLU(),nn.ReLU(),nn.ReLU(),return x"""训练模型并保存最佳模型"""# 训练阶段# 验证阶段# 保存最佳模型# 可视化训练过程。
import os
import shutil
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
# 解决OpenMP冲突
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
class SimpleCNN(nn.Module):
def __init__(self, num_classes):
super(SimpleCNN, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.classifier = nn.Sequential(
nn.Linear(128 * 28 * 28, 512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, num_classes)
)
def forward(self, x):
x = self.features(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10, model_path='model.pth'):
"""
训练模型并保存最佳模型
"""
best_acc = 0.0
train_loss_history = []
val_loss_history = []
train_acc_history = []
val_acc_history = []
for epoch in range(num_epochs):
# 训练阶段
model.train()
running_loss = 0.0
running_corrects = 0
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
epoch_loss = running_loss / len(train_loader.dataset)
epoch_acc = running_corrects.double() / len(train_loader.dataset)
train_loss_history.append(epoch_loss)
train_acc_history.append(epoch_acc)
# 验证阶段
model.eval()
val_running_loss = 0.0
val_running_corrects = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
val_running_loss += loss.item() * inputs.size(0)
val_running_corrects += torch.sum(preds == labels.data)
val_epoch_loss = val_running_loss / len(val_loader.dataset)
val_epoch_acc = val_running_corrects.double() / len(val_loader.dataset)
val_loss_history.append(val_epoch_loss)
val_acc_history.append(val_epoch_acc)
# 保存最佳模型
if val_epoch_acc > best_acc:
best_acc = val_epoch_acc
torch.save(model.state_dict(), model_path)
print(f"New best model saved with val_acc: {val_epoch_acc:.4f}")
print(f'Epoch {epoch+1}/{num_epochs}')
print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
print(f'Val Loss: {val_epoch_loss:.4f} Acc: {val_epoch_acc:.4f}')
print('-' * 20)
# 可视化训练过程
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Val Loss')
plt.title('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(train_acc_history, label='Train Acc')
plt.plot(val_acc_history, label='Val Acc')
plt.title('Accuracy')
plt.legend()
plt.show()
return model
def predict_and_sort_images(model, class_names, predict_dir, output_base_dir, transform, device):
"""
预测图像并按照类别分类存储
"""
# 确保输出目录存在
os.makedirs(output_base_dir, exist_ok=True)
# 为每个类别创建子目录
for class_name in class_names:
os.makedirs(os.path.join(output_base_dir, class_name), exist_ok=True)
# 预测并分类图像
for filename in os.listdir(predict_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
img_path = os.path.join(predict_dir, filename)
try:
# 预测图像
predicted_class, probabilities = predict_image(
img_path, model, class_names, transform, device
)
# 确定目标路径
dest_dir = os.path.join(output_base_dir, predicted_class)
dest_path = os.path.join(dest_dir, filename)
# 复制文件到对应类别目录
shutil.copy2(img_path, dest_path)
print(f"Image '{filename}' classified as '{predicted_class}' and moved to '{dest_dir}'")
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
def predict_image(image_path, model, class_names, transform, device):
model.eval()
image = Image.open(image_path)
image = transform(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model(image)
_, predicted = torch.max(outputs, 1)
probabilities = torch.nn.functional.softmax(outputs, dim=1)[0] * 100
return class_names[predicted.item()], probabilities.cpu().numpy()
def main():
# 设置路径
train_dir = 'path_to_your_training_folders' # 训练数据目录
predict_dir = 'path_to_your_prediction_images' # 待分类图像目录
output_dir = 'classified_images' # 分类后图像输出目录
model_save_path = 'pytorch_image_classifier.pth' # 模型保存路径
# 训练参数
img_size = 224
batch_size = 32
num_epochs = 10
learning_rate = 0.001
continue_training = True # 设置为True继续训练,False只进行预测
# 数据增强和转换
data_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(img_size),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(img_size),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
# 加载数据集
train_dataset = datasets.ImageFolder(train_dir, transform=data_transforms['train'])
class_names = train_dataset.classes
num_classes = len(class_names)
print(f"Detected classes: {class_names}")
# 划分训练集和验证集
dataset_size = len(train_dataset)
val_size = int(0.2 * dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = torch.utils.data.random_split(
train_dataset, [train_size, val_size]
)
# 修正验证集的transform
for i in range(len(val_dataset)):
val_dataset.dataset.transform = data_transforms['val']
# 创建数据加载器
num_workers = 0 if os.name == 'nt' else 4
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers
)
# 初始化模型、损失函数和优化器
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = SimpleCNN(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 加载已有模型(如果存在)
if os.path.exists(model_save_path):
model.load_state_dict(torch.load(model_save_path))
print(f"Loaded existing model from {model_save_path}")
# 如果要继续训练
if continue_training:
print("Continuing training...")
model = train_model(
model=model,
train_loader=train_loader,
val_loader=val_loader,
criterion=criterion,
optimizer=optimizer,
device=device,
num_epochs=num_epochs,
model_path=model_save_path
)
else:
print("No existing model found, training new model...")
model = train_model(
model=model,
train_loader=train_loader,
val_loader=val_loader,
criterion=criterion,
optimizer=optimizer,
device=device,
num_epochs=num_epochs,
model_path=model_save_path
)
# 预测并分类图像
predict_and_sort_images(
model=model,
class_names=class_names,
predict_dir=predict_dir,
output_base_dir=output_dir,
transform=data_transforms['val'],
device=device
)
if __name__ == '__main__':
main()
主要改进点
-
模型继续训练功能:
-
添加了
continue_training标志控制是否继续训练 -
加载已有模型后可以继续训练
-
训练过程中会保存最佳模型
-
-
模块化训练函数:
-
将训练过程封装到
train_model函数中 -
支持从任意点继续训练
-
-
最佳模型保存:
-
在验证集上表现最好的模型会自动保存
-
避免过拟合影响最终模型质量
-
-
灵活的流程控制:
-
可以自由选择只进行预测或继续训练
-
模型不存在时会自动开始新训练
-
使用说明
扩展建议
这个实现提供了完整的模型继续训练流程,同时保留了图像分类和自动存储功能
-
设置训练参数:
-
修改
continue_training为True继续训练,False只进行预测 -
调整
num_epochs设置训练轮数
-
-
准备数据:
-
确保训练数据目录结构正确
-
预测图像放在指定目录
-
-
运行程序:
-
程序会自动检测是否存在已有模型
-
根据设置决定是否继续训练
-
最后会对预测图像进行分类并存储
-
-
监控训练:
-
训练过程中会显示损失和准确率曲线
-
最佳模型会自动保存
-
-
学习率调度:添加
torch.optim.lr_scheduler实现学习率衰减 -
早停机制:当验证损失不再下降时停止训练
-
模型检查点:定期保存模型而不仅是最佳模型
-
TensorBoard集成:可视化更多训练指标
更多推荐
所有评论(0)