- 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
- 🍖 原作者:K同学啊
一、实验目的:
- 阅读ResNeXt论文,了解作者的构建思路
- 对比之前介绍的ResNet50V2、DenseNet算法
- 使用ResNeXt-50算法完成猴痘病识别
二、实验环境:
- 语言环境:python 3.8
- 编译器:Jupyter notebook
- 深度学习环境:Pytorch
- torch==2.4.0+cu124
- torchvision==0.19.0+cu124
三、模型介绍
ResNeXt是由何凯明团队在2017年CVPR会议上提出来的新型图像分类网络。ResNeXt是ResNet的升级版,在ResNet的基础上,引入了cardinality的概念,类似于ResNet,ResNeXt也有ResNeXt-50,ResNeXt-101的版本。
ResNeXt论文原文:Aggregated Residual Transformations for Deep Neural Networks。
这篇文章介绍了一种用于图像分类的简单而有效的网络架构,该网络采用了VGG/ResNets的策略,通过重复层来增加深度和宽度,并利用分裂-变换-合并策略以易于扩展的方式进行转换。文章还提出了一个新的维度——“基数”,它是指转换集合的大小,可以在保持复杂性不变的情况下提高分类准确性。作者在ImageNet-1K数据集上进行了实证研究,证明了这种方法的有效性。
下图是ResNet(左)与ResNeXt(右)block的差异。在ResNet中,输入的具有256个通道的特征经过1×1卷积压缩4倍到64个通道,之后3×3的卷积核用于处理特征,经1×1卷积扩大通道数与原特征残差连接后输出。
ResNeXt也是相同的处理策略,但在ResNeXt中,输入的具有256个通道的特征被分为32个组,每组被压缩64倍到4个通道后进行处理。32个组相加后与原特征残差连接后输出。这里cardinatity指的是一个block中所具有的相同分支的数目。
分组卷积
ResNeXt中采用的分组卷机简单来说就是将特征图分为不同的组,再对每组特征图分别进行卷积,这个操作可以有效的降低计算量。
在分组卷积中,每个卷积核只处理部分通道,比如下图中,红色卷积核只处理红色的通道,绿色卷积核只处理绿色通道,黄色卷积核只处理黄色通道。此时每个卷积核有2个通道,每个卷积核生成一张特征图。
总结:ResNeXt-50网络简单讲就是在ResNet结构的基础上采用了聚合残差结构和局部连接结构,同时引入了Random Erasing和Mixup等数据增强和正则化方法。
- Random Erasing是一种数据增强技术,随机删除图像中的一些像素,并用随机值填充,从而增强模型的泛化性能。该技术可以防止模型过分关注图像中的一些细节和特定的区域,从而更好地适应新的数据。此外,Random Erasing还可以增加数据集的多样性,从而降低过拟合的风险。
- Mixup则是一种数据增强和正则化技术,将两张图像的像素按比例混合,生成一张新的图像作为输入,从而提高模型的鲁棒性和泛化性能。Mixup的基本思想是在训练过程中使用凸组合的方法,将输入的不同样本进行线性组合,从而生成一些新的数据样本。这种方法可以有效地增加数据集的多样性,从而提高模型的泛化性能。此外,Mixup还可以作为一种正则化技术,可以降低模型的过拟合风险。
四、使用Pytorch实现ResNeXt-50
设置GPU、导入数据、划分数据集等步骤同前。
1. 构建模型
class Bottleneck(nn.Module):expansion = 4def __init__(self, in_channel, out_channel, stride=1, downsample=None,groups=1, width_per_group=64):super(Bottleneck, self).__init__()width = int(out_channel * (width_per_group / 64.)) * groupsself.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,kernel_size=1, stride=1, bias=False) # squeeze channelsself.bn1 = nn.BatchNorm2d(width)# -----------------------------------------self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,kernel_size=3, stride=stride, bias=False, padding=1)self.bn2 = nn.BatchNorm2d(width)# -----------------------------------------self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel*self.expansion,kernel_size=1, stride=1, bias=False) # unsqueeze channelsself.bn3 = nn.BatchNorm2d(out_channel*self.expansion)self.relu = nn.ReLU(inplace=True)self.downsample = downsampledef forward(self, x):identity = xif self.downsample is not None:identity = self.downsample(x)out = self.conv1(x)out = self.bn1(out)out = self.relu(out)out = self.conv2(out)out = self.bn2(out)out = self.relu(out)out = self.conv3(out)out = self.bn3(out)out += identityout = self.relu(out)return out
class ResNet(nn.Module):def __init__(self,block,blocks_num,num_classes=1000,include_top=True,groups=1,width_per_group=64):super(ResNet, self).__init__()self.include_top = include_topself.in_channel = 64self.groups = groupsself.width_per_group = width_per_groupself.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,padding=3, bias=False)self.bn1 = nn.BatchNorm2d(self.in_channel)self.relu = nn.ReLU(inplace=True)self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)self.layer1 = self._make_layer(block, 64, blocks_num[0])self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)if self.include_top:self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1)self.fc = nn.Linear(512 * block.expansion, num_classes)for m in self.modules():if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')def _make_layer(self, block, channel, block_num, stride=1):downsample = Noneif stride != 1 or self.in_channel != channel * block.expansion:downsample = nn.Sequential(nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),nn.BatchNorm2d(channel * block.expansion))layers = []layers.append(block(self.in_channel,channel,downsample=downsample,stride=stride,groups=self.groups,width_per_group=self.width_per_group))self.in_channel = channel * block.expansionfor _ in range(1, block_num):layers.append(block(self.in_channel,channel,groups=self.groups,width_per_group=self.width_per_group))return nn.Sequential(*layers)def forward(self, x):x = self.conv1(x)x = self.bn1(x)x = self.relu(x)x = self.maxpool(x)x = self.layer1(x)x = self.layer2(x)x = self.layer3(x)x = self.layer4(x)if self.include_top:x = self.avgpool(x)x = torch.flatten(x, 1)x = self.fc(x)return x
def resnext50_32x4d(num_classes=1000, include_top=True):# 预训练权重:https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pthgroups = 32width_per_group = 4return ResNet(Bottleneck, [3, 4, 6, 3],num_classes=num_classes,include_top=include_top,groups=groups,width_per_group=width_per_group)model = resnext50_32x4d(num_classes=4, include_top=True)
model.to(device)# 统计模型参数量以及其他指标
import torchsummary as summary
summary.summary(model,(3,224,224))
代码输出部分截图:
2. 编写训练与测试函数
# 编写训练函数
def train(dataloader, model, loss_fn, optimizer):size = len(dataloader.dataset)num_batches = len(dataloader)train_acc, train_loss = 0, 0for X, y in dataloader:X, y = X.to(device), y.to(device)pred = model(X)loss = loss_fn(pred, y)optimizer.zero_grad()loss.backward()optimizer.step()train_loss += loss.item()train_acc += (pred.argmax(1) == y).type(torch.float).sum().item()train_loss /= num_batchestrain_acc /= sizereturn train_acc, train_loss
# 编写测试函数
def test(dataloader, model, loss_fn):size = len(dataloader.dataset) # 测试集的大小num_batches = len(dataloader) # 批次数目, (size/batch_size,向上取整)test_loss, test_acc = 0, 0# 当不进行训练时,停止梯度更新,节省计算内存消耗with torch.no_grad():for imgs, target in dataloader:imgs, target = imgs.to(device), target.to(device)# 计算losstarget_pred = model(imgs)loss = loss_fn(target_pred, target)test_loss += loss.item()test_acc += (target_pred.argmax(1) == target).type(torch.float).sum().item()test_acc /= sizetest_loss /= num_batchesreturn test_acc, test_loss
3. 设置损失函数和学习率
import copyloss_fn = nn.CrossEntropyLoss()
learn_rate = 1e-4
opt = torch.optim.Adam(model.parameters(), lr=learn_rate)scheduler = torch.optim.lr_scheduler.StepLR(opt, step_size=1, gamma=0.9) # 定义学习率高度器epochs = 100 # 设置训练模型的最大轮数为100,但可能到不了100
patience = 10 # 早停的耐心值,即如果模型连续10个周期没有准确率提升,则跳出训练train_loss = []
train_acc = []
test_loss = []
test_acc = []
best_acc = 0 # 设置一个最佳的准确率,作为最佳模型的判别指标
no_improve_epoch = 0 # 用于跟踪准确率是否提升的计数器
epoch = 0 # 用于统计最终的训练模型的轮数,这里设置初始值为0;为绘图作准备,这里的绘图范围不是epochs = 100
4. 正式训练
# 开始训练
for epoch in range(epochs):model.train()epoch_train_acc, epoch_train_loss = train(train_dl, model, loss_fn, opt)model.eval()epoch_test_acc, epoch_test_loss = test(test_dl, model, loss_fn)if epoch_test_acc > best_acc:best_acc = epoch_test_accbest_model = copy.deepcopy(model)no_improve_epoch = 0 # 重置计数器# 保存最佳模型的检查点PATH = 'J6_best_model.pth'torch.save({'epoch': epoch,'model_state_dict': best_model.state_dict(),'optimizer_state_dict': opt.state_dict(),'loss': epoch_test_loss,}, PATH)else:no_improve_epoch += 1if no_improve_epoch >= patience:print(f"Early stop triggered at epoch {epoch + 1}")break # 早停train_acc.append(epoch_train_acc)train_loss.append(epoch_train_loss)test_acc.append(epoch_test_acc)test_loss.append(epoch_test_loss)scheduler.step() # 更新学习率lr = opt.state_dict()['param_groups'][0]['lr']template = ('Epoch:{:2d}, Train_acc:{:.1f}%, Train_loss:{:.3f}, Test_acc:{:.1f}%, Test_loss:{:.3f}, Lr:{:.2E}')print(template.format(epoch + 1, epoch_train_acc * 100, epoch_train_loss, epoch_test_acc * 100, epoch_test_loss, lr))
代码输出部分截图:
5. 结果可视化
# 结果可视化
# Loss与Accuracy图import matplotlib.pyplot as plt
# 隐藏警告
import warnings
warnings.filterwarnings("ignore") # 忽略警告信息
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
plt.rcParams['figure.dpi'] = 100 # 分辨率epochs_range = range(epoch)plt.figure(figsize=(12, 3))
plt.subplot(1, 2, 1)plt.plot(epochs_range, train_acc, label='Training Accuracy')
plt.plot(epochs_range, test_acc, label='Test Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')plt.subplot(1, 2, 2)
plt.plot(epochs_range, train_loss, label='Training Loss')
plt.plot(epochs_range, test_loss, label='Test Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()
6. 预测
from PIL import Imageclasses = list(total_data.class_to_idx)def predict_one_image(image_path, model, transform, classes):test_img = Image.open(image_path).convert('RGB')plt.imshow(test_img) # 展示预测的图片test_img = transform(test_img)img = test_img.to(device).unsqueeze(0)model.eval()output = model(img)_, pred = torch.max(output, 1)pred_class = classes[pred]print(f'预测结果是:{pred_class}')import os
from pathlib import Path
import random#从所有的图片的随机选择一张图片image=[]
def image_path(data_dir):file_list=os.listdir(data_dir) #列出四个分类标签data_file_dir=file_list #从四个分类标签中随机选择一个data_dir=Path(data_dir)for i in data_file_dir:i=Path(i)image_file_path=data_dir.joinpath(i) #拼接路径data_file_paths=image_file_path.iterdir() #罗列文件夹的内容data_file_paths=list(data_file_paths) #要转换为列表image.append(data_file_paths)file=random.choice(image) #从所有的图像中随机选择一类file=random.choice(file) #从选择的类中随机选择一张图片return filedata_dir='./monkeypox_photos'
image_path=image_path(data_dir)# 预测训练集中的某张照片
predict_one_image(image_path=image_path,model=model,transform=train_transforms,classes=classes)
# 模型评估
# 将参数加载到model当中
best_model.load_state_dict(torch.load(PATH,map_location=device))
epoch_test_acc,epoch_test_loss=test(test_dl,best_model,loss_fn)
epoch_test_acc,epoch_test_loss
(0.8508158508158508, 0.39013327977487017)
总结
ResNeXt是在ResNet的网络架构上,使用类似于Inception的分治思想,即split-tranform-merge策略,将模块中的网络拆开分组,与Inception不同,每组的卷积核大小一致,这样其感受野一致,但由于每组的卷积核参数不同,提取的特征自然不同。然后将每组得到的特征进行concat操作后,再与原输入特征x或者经过卷积等处理(即进行非线性变换)的特征进行Add操作。这样做的好处是,在不增加参数复杂度的前提下提高准确率,同时还能提高超参数的数量。
另外,cardinality是基的意思,将数个通道特征进行分组,不同的特征组之间可以看作是由不同基组成的子空间,每个组的核虽然一样,但参数不同,在各自的子空间中学到的特征就多种多样,这点跟transformer中的Multi-head attention不谋而合(Multi-head attention allows the model to jointly attend to information from different representation subspaces.)而且分组进行特征提取,使得学到的特征冗余度降低,获取能起到正则化的作用。
ResNeXt-50与ResNet50V2、DenseNet的对比:
- 网络结构:
- ResNeXt-50:基于ResNet结构改进而来,采用聚合残差结构和局部连接结构。它通过重复构建块来构建,每个构建块聚合了一组具有相同拓扑结构的转换。引入了分组卷积的方法,可以将不同的通道分组处理,还使用了深度可分离卷积的方法进一步减少计算量。
- ResNet50V2:是ResNet系列中的经典模型,由50层卷积层、批量归一化、激活函数和池化层构成。引入了一种全新的残差块结构,即bottleneck结构,使得网络参数量大幅度降低,同时精度也有所提升。
- DenseNet:其特点是不同于传统的网络结构,每一层的输出不仅和前一层的输出有关,还和之前所有层的输出有关,这种密集连接的结构可以有效地缓解梯度消失和参数稀疏问题,提高了模型的泛化能力和精度。它由多个denseblock和transition层组成,denseblock内部采用密集连接,相邻denseblock之间通过transition层连接并降低特征图大小。
- 精度和计算量:
- ResNeXt-50:在相同的深度下具有更高的精度,并且在参数量和计算量上都显著降低。在较深的网络结构下,优势更加明显,可以达到更高的精度。
- ResNet50V2:能在保持较低参数量的同时,实现较高的精度。
- DenseNet:在参数和计算成本更少的情形下实现比ResNet更优的性能,通过特征在channel上的连接来实现特征重用,减少了网络的参数总量,但由于密集连接方式,计算量相对较大。
- 适用范围:
- ResNeXt-50:适用于各种图像分类任务,在对精度要求较高且计算资源相对充足的场景下表现良好。
- ResNet50V2:广泛适用于各种图像分类任务,尤其在对模型复杂度和精度有一定平衡要求的场景中应用较多。
- DenseNet:适用于对特征重用和模型紧凑性要求较高的任务,例如图像分类、目标检测等,但在计算资源有限的情况下,可能需要对其进行适当的调整或优化。
它们各自的优点和创新之处如下:
- ResNeXt-50:
- 优点:在不明显增加参数量的情况下提升了准确率,具有很好的可扩展性和可适应性,超参数数量相对较少,便于模型移植。
- 创新:提出aggregated residual transformations结构,利用分组卷积构建平行堆叠相同拓扑结构的blocks,代替原来ResNet的三层卷积的block;同时引入了cardinality的概念,即通过增加分组的数量(基数),可以在不增加模型复杂度的前提下提高性能,实验表明增加基数比增加深度或宽度更有效。
- ResNet50V2:
- 优点:通过改进残差结构,先进行批量归一化和激活函数计算后再卷积,并将addition后的ReLU计算放到残差结构内部,提高了模型的精度,同时降低了参数量。
- 创新:全新的残差块结构(bottleneck结构),减少了网络参数量,使得在保持较高精度的同时,模型更容易训练和优化,这种结构上的创新为后续许多网络的设计提供了借鉴思路。
- DenseNet:
- 优点:缓解了梯度消失问题,加强了特征传播,鼓励了特征复用,极大地减少了网络的参数总量,在参数较少的情况下能取得较好的性能,而且通过密集连接方式,提升了梯度的反向传播,使得网络更容易训练,对过拟合有一定的抑制作用。
- 创新:建立了前面所有层与后面层的密集连接机制,实现了特征重用,每个层都会与前面所有层在channel维度上连接并作为下一层的输入,这一创新的连接方式充分利用了特征信息,与传统的网络结构相比,在相同性能下可以减少参数数量,提高了模型的效率和泛化能力。此外,在denseblock中使用bottleneck层来减少计算量,以及在transition层采用特定的结构来处理特征图的尺寸匹配问题,也是其重要的创新点。