文章目录
- 简述
- 模型结构
- 模型参数、优化器、损失函数
- 参数初始化
- 优化器
- 损失函数
- 模型训练、测试集预测、模型保存、日志记录
- 训练
- 测试集测试
- 模型保存
- 模型训练完整代码
- tensorboard训练可视化结果
- train_loss
- 测试准确率
- 测试集loss
- 模型应用
- 模型独立应用代码`api.py`
- 预测结果
简述
使用pytorch实现一个用于训练CIFAR10的模型,在训练过程中使用CIFAR10的测试数据集记录准确度。训练结束后,搜集一些图片,单独实现对训练后模型的应用代码。
另外会在文中尽量给出各种用法的官方文档链接。
代码分为:
- 模型训练代码
train.py
,包含数据加载、模型封装、训练、tensorboard记录、模型保存等; - 模型应用代码
api.py
,包含对训练所保存模型的加载、数据准备、结果预测等;
注意:
本文目的是使用pytorch来构建一个结构完善的模型,体现出pytorch的各种功能函数、模型设计理念,来学习深度学习,而非训练一个高精度的分类识别模型。
不足:
- 参数初始化或许可以考虑kaiming(因为用的是ReLU);
- 可以加上k折交叉验证;
- 训练时可以把batch_size的图片加入tensorboard,文中batch_size=256,若每个batch_size都加的话数据太多了,所以文中是每逢整百的训练次数时记录一下该批次的loss值,加图片的话可以在该代码处添加;
模型结构
来源:https://www.researchgate.net/profile/Yiren-Zhou-6/publication/312170477/figure/fig1/AS:448817725218816@1484017892071/Structure-of-LeNet-5.png
在上述图片基础上增加了nn.BatchNorm2d
、nn.ReLU
以及nn.Dropout
,最终结构如下:
layers = nn.Sequential( # shape(3,32,32) -> shape(32,32,32) nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,32,32) -> shape(32,16,16) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,16,16) -> shape(32,16,16) nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,16,16) -> shape(32,8,8) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,8,8) -> shape(64,8,8) nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(64), nn.ReLU(), # shape(64, 8, 8) -> shape(64,4,4) nn.MaxPool2d(kernel_size=2, stride=2), # shape(64,4,4) -> shape(64 * 4 * 4,) nn.Flatten(), nn.Linear(64 * 4 * 4, 64), nn.ReLU(), nn.Dropout(0.5), nn.Linear(64, 10)
)
可以看看使用tensorboard的writer.add_graph
函数实现的模型结构图:
模型参数、优化器、损失函数
参数初始化
模型参数使用nn.init.normal_
作初始化,但模型中存在ReLU
,应考虑使用kaiming He
初始化。
apply
函数:Module — PyTorch 2.4 documentation
参数初始化函数:torch.nn.init — PyTorch 2.4 documentation
def init_normal(m): # 考虑使用kaiming if m is nn.Linear: nn.init.normal_(m.weight, mean=0, std=0.01) nn.init.zeros_(m.bias)# 定义模型、数据初始化
net = CIFAR10Net()
net.apply(init_normal)
优化器
优化器使用Adam,即Momentum
和AdaGrad
的结合。
文档:Adam — PyTorch 2.4 documentation
# 优化器
weight_decay = 0.0001optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)
损失函数
分类任务,自然是用交叉熵损失函数了。
loss_fn = nn.CrossEntropyLoss()
模型训练、测试集预测、模型保存、日志记录
注意,代码前面部分代码有定义
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
训练
net.train()
for images, labels in train_loader: images, labels = images.to(device), labels.to(device) outputs = net(images) loss = loss_fn(outputs, labels) # 优化器处理 optimizer.zero_grad() loss.backward() optimizer.step() total_train_step += 1 if total_train_step % 100 == 0: print(f'Epoch: {epoch + 1}, 累计训练次数: {total_train_step}, 本次loss: {loss.item():.4f}') writer.add_scalar('train_loss', loss.item(), total_train_step) current_time = time.time() writer.add_scalar('train_time', current_time-start_time, total_train_step)
测试集测试
net.eval()
total_test_loss = 0
total_test_acc = 0 # 整个测试集正确个数
with torch.no_grad(): for images, labels in test_loader: images, labels = images.to(device), labels.to(device) outputs = net(images) loss = loss_fn(outputs, labels) total_test_loss += loss.item() accuracy = (outputs.argmax(1) == labels).sum() total_test_acc += accuracy print(f'整个测试集loss值和: {total_test_loss:.4f}, batch_size: {batch_size}')
print(f'整个测试集正确率: {(total_test_acc / test_data_size) * 100:.4f}%')
writer.add_scalar('test_loss', total_test_loss, epoch + 1)
writer.add_scalar('test_acc', (total_test_acc / test_data_size) * 100, epoch + 1)
模型保存
torch.save(net.state_dict(), './save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size)))
模型训练完整代码
train.py
import torch
import torchvision
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from torch.utils import data
from torch import nn
import time
from datetime import datetime def load_data_CIFAR10(resize=None): """ 下载 CIFAR10 数据集,然后将其加载到内存中 transforms.ToTensor() 转换为形状为C x H x W的FloatTensor,并且会将像素值从[0, 255]缩放到[0.0, 1.0] """ trans = [transforms.ToTensor()] if resize: trans.insert(0, transforms.Resize(resize)) trans = transforms.Compose(trans) cifar_train = torchvision.datasets.CIFAR10(root="../data", train=True, transform=trans, download=False) cifar_test = torchvision.datasets.CIFAR10(root="../data", train=False, transform=trans, download=False) return cifar_train, cifar_test class CIFAR10Net(torch.nn.Module): def __init__(self): super(CIFAR10Net, self).__init__() layers = nn.Sequential( # shape(3,32,32) -> shape(32,32,32) nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,32,32) -> shape(32,16,16) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,16,16) -> shape(32,16,16) nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,16,16) -> shape(32,8,8) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,8,8) -> shape(64,8,8) nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(64), nn.ReLU(), # shape(64, 8, 8) -> shape(64,4,4) nn.MaxPool2d(kernel_size=2, stride=2), # shape(64,4,4) -> shape(64 * 4 * 4,) nn.Flatten(), nn.Linear(64 * 4 * 4, 64), nn.ReLU(), nn.Dropout(0.5), nn.Linear(64, 10) ) self.layers = layers def forward(self, x): return self.layers(x) def init_normal(m): # 考虑使用kaiming if m is nn.Linear: nn.init.normal_(m.weight, mean=0, std=0.01) nn.init.zeros_(m.bias) if __name__ == '__main__': # 超参数 epochs = 6 batch_size = 256 learning_rate = 0.01 num_workers = 0 weight_decay = 0 # 数据记录 total_train_step = 0 total_test_step = 0 train_loss_list = list() test_loss_list = list() train_acc_list = list() test_acc_list = list() # 准备数据集 train_data, test_data = load_data_CIFAR10() train_loader = data.DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers) test_loader = data.DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=num_workers) train_data_size = len(train_data) test_data_size = len(test_data) print(f'训练测试集长度: {train_data_size}, 测试数据集长度: {test_data_size}, batch_size: {batch_size}\n') # device = torch.device("cpu") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(f'\ndevice: {device}') # 定义模型、数据初始化 net = CIFAR10Net().to(device) # net.apply(init_normal) # 损失函数 loss_fn = nn.CrossEntropyLoss().to(device) # 优化器 optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay) # now_time = datetime.now() # now_time = now_time.strftime("%Y%m%d-%H%M%S") # tensorboard writer = SummaryWriter('./train_logs') # 随便定义个输入, 好使用add_graph tmp = torch.rand((batch_size, 3, 32, 32)).to(device) writer.add_graph(net, tmp) start_time = time.time() for epoch in range(epochs): print('------------Epoch {}/{}'.format(epoch + 1, epochs)) # 训练 net.train() for images, labels in train_loader: images, labels = images.to(device), labels.to(device) outputs = net(images) loss = loss_fn(outputs, labels) # 优化器处理 optimizer.zero_grad() loss.backward() optimizer.step() total_train_step += 1 if total_train_step % 100 == 0: print(f'Epoch: {epoch + 1}, 累计训练次数: {total_train_step}, 本次loss: {loss.item():.4f}') writer.add_scalar('train_loss', loss.item(), total_train_step) current_time = time.time() writer.add_scalar('train_time', current_time-start_time, total_train_step) # 测试 net.eval() total_test_loss = 0 total_test_acc = 0 # 整个测试集正确个数 with torch.no_grad(): for images, labels in test_loader: images, labels = images.to(device), labels.to(device) outputs = net(images) loss = loss_fn(outputs, labels) total_test_loss += loss.item() accuracy = (outputs.argmax(1) == labels).sum() total_test_acc += accuracy print(f'整个测试集loss值和: {total_test_loss:.4f}, batch_size: {batch_size}') print(f'整个测试集正确率: {(total_test_acc / test_data_size) * 100:.4f}%') writer.add_scalar('test_loss', total_test_loss, epoch + 1) writer.add_scalar('test_acc', (total_test_acc / test_data_size) * 100, epoch + 1) torch.save(net.state_dict(), './save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size))) writer.close()
tensorboard训练可视化结果
train_loss
纵轴为每个batch_size损失值,横轴为训练次数,其中batch_size = 256。
测试准确率
纵轴为整个CIFAR10测试集的准确率(%),横轴为epoch,其中epochs=50。
测试集loss
纵轴为CIFAR10整个测试集的每个batch_size的loss之和,batch_size = 256。横轴为epoch,其中epochs=50。
模型应用
模型训练过程中,每个epoch保存一次模型。
torch.save(net.state_dict(), './save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size)))
这里实现一个,将保存的模型加载,并对自行搜集的图片进行预测。
项目结构:
-
./autodl_save/cuda_params_acc_75.pth
:训练时保存的模型参数文件; -
./test_images
:网上搜集的卡车、狗、飞机、船图片,大小不一,保存时未作处理,如下:
-
api.py
:实现图片的预处理(裁剪、ToTensor
、封装为数据集等)、模型加载、图片推理等;
模型独立应用代码api.py
import os import torch
import torchvision
from PIL import Image
from torch import nn class CIFAR10Net(torch.nn.Module): def __init__(self): super(CIFAR10Net, self).__init__() layers = nn.Sequential( # shape(3,32,32) -> shape(32,32,32) nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,32,32) -> shape(32,16,16) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,16,16) -> shape(32,16,16) nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), # shape(32,16,16) -> shape(32,8,8) nn.MaxPool2d(kernel_size=2, stride=2), # shape(32,8,8) -> shape(64,8,8) nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(64), nn.ReLU(), # shape(64, 8, 8) -> shape(64,4,4) nn.MaxPool2d(kernel_size=2, stride=2), # shape(64,4,4) -> shape(64 * 4 * 4,) nn.Flatten(), nn.Linear(64 * 4 * 4, 64), nn.ReLU(), nn.Dropout(0.5), nn.Linear(64, 10) ) self.layers = layers def forward(self, x): return self.layers(x) def build_data(images_dir): image_list = os.listdir(images_dir) image_paths = [] for image in image_list: image_paths.append(os.path.join(images_dir, image)) transform = torchvision.transforms.Compose([torchvision.transforms.Resize((32, 32)), torchvision.transforms.ToTensor()]) # 存储转换后的张量 images_tensor = [] for image_path in image_paths: try: # 加载图像并转换为 RGB(如果它已经是 RGB,这步是多余的) image_pil = Image.open(image_path).convert('RGB') # 应用转换并添加到列表中 images_tensor.append(transform(image_pil)) except IOError: print(f"Cannot open {image_path}. Skipping...") # 转换列表为单个张量,如果需要的话 # 注意:这里假设所有图像都被成功加载和转换 if images_tensor: # 使用 torch.stack 来合并张量列表 images_tensor = torch.stack(images_tensor) else: # 如果没有图像,返回一个空的张量或根据需要处理 images_tensor = torch.empty(0, 3, 32, 32) return images_tensor, image_list def predict(state_dict_path, image): net = CIFAR10Net() net.load_state_dict(torch.load(state_dict_path)) net.cuda() with torch.no_grad(): image = image.cuda() output = net(image) return output if __name__ == '__main__': images, labels = build_data("./test_images") outputs = predict("./autodl_save/cuda_params_acc_75.pth", images) # 选取结果(即得分最大的下标) res = outputs.argmax(dim=1) kinds = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck'] for i in range(len(res)): classes_idx = res[i] print(f'文件(正确标签): {labels[i]}, 预测结果: {classes_idx}, {kinds[classes_idx]}\n')
预测结果
7个识别出4个。
注意这个索引和标签的对应关系可以从数据集中查看。