SeNet
SeNet(Squeeze-and-Excitation Networks)是ImageNet 2017年分类任务冠军,核心思想是:Squeeze(挤压、压缩)和Excitation(激励)两个操作,其主要目的是通过显示的构建特征通道之间的相互依赖关系,采用特征重定向的策略,通过学习的方式自动的获取每个特征通道的重要程度,然后依据***这个重要程度去提升有用的特征,并抑制对于当前任务用处不大的特征。
Squeeze特征压缩操作,将每个feature map的特征数据使用全局平均池化操作转换为一个数值(实数),这个实数在一定程度上具有全局感受野的作用。Squeeze操作的输出表示这特征通道上响应的全局分布(所有的feature map可以认为是局部描述子的集合),而且使得靠近输入层的结构中也可以获得全局感受野的信息。
Excitation特征激励操作,主要目的是为了显示的构建特征通道之间的相关性。为了限制模型复杂度和辅助增加泛化能力,引入两个FC层,首先经过第一个FC层,将输入的11c的特征图降维成11c/r,然后经过一个ReLU后,经过第二个FC层进行升维层,转换为11c,最后做一个sigmoid转换得到权重值。论文中r为16.
这里使用FC全连接的主要目的是为了降低通道间的相关性,使用两个FullyConnected,这样做比直接用一个Fully Connected层的好处在于:
-1. 具有更多的非线性组合,可以更好地拟合通道间复杂的相关性;
-2. 极大地减少了参数量和计算量。
ReWeight操作是将Excitation输出的权重可以看成是经过特征选择后的每个特征通道的重要性,然后通过乘法逐通道的加权到之前的特征上,完成在通道维度上对于原始特征的重标定。
讲SE Block嵌入到任何网络结构中,形成特有的SE结构。
import osimport numpy as np
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from torchvision import models, datasets
import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.nn as nn
from torchvision.transforms import transformsclass GolbalAvgPool2d(nn.Module):def __init__(self):super(GolbalAvgPool2d, self).__init__()def forward(self, x):"""[N, C, H, W]-> [N, C, 1, 1]"""return torch.mean(x, dim=(2, 3), keepdim=True)class SeModule(nn.Module):def __init__(self, in_channel, r=16):super(SeModule, self).__init__()self.avg = GolbalAvgPool2d()self.fc1 = nn.Conv2d(in_channel, in_channel // r, kernel_size=(1, 1), stride=(1, 1))self.fc2 = nn.Conv2d(in_channel // r, in_channel, kernel_size=(1, 1), stride=(1, 1))def forward(self, x):"""se操作[N, C, H, W]-》 [N, C, H, W]"""# 求解每个通道的权重信息alpha = self.avg(x) # [N, C, H, W] [N, C, 1, 1]]alpha = F.relu(self.fc1(alpha)) # [N, C, 1, 1] [N, C//r, 1, 1]]alpha = torch.sigmoid(self.fc2(alpha)) # [N, C//r, 1, 1] [N, C, 1, 1]]# 加权合并x = x * alphareturn xclass SeModuleV2(nn.Module):def __init__(self, in_channel, r=16):super(SeModuleV2, self).__init__()self.fc1 = nn.Conv2d(in_channel, in_channel // r, kernel_size=(1, 1), stride=(1, 1))self.fc2 = nn.Conv2d(in_channel // r, in_channel, kernel_size=(1, 1), stride=(1, 1))self.pool = nn.MaxPool2d(5, 1, padding=2)self.bias = nn.Parameter(torch.zeros([1, in_channel, 1, 1]))def forward(self, x):"""se操作[N, C, H, W]-》 [N, C, H, W]"""# 求解每个通道的权重信息alpha = F.relu(self.fc1(x)) # [N, C, H, W] [N, C//r, H, W]]alpha = torch.sigmoid(self.fc2(alpha)) # [N, C//r, H, W] [N, C, H, W]]alpha = self.pool(-1.0 * alpha) * -1.0 # [N, C, H, W] -> [N, C, H, W]alpha = F.relu(alpha + F.tanh(self.bias)) / 2.0 # [N, C, H, W] -> [N, C, H, W]# 加权合并x = x * alphareturn xclass BasicConv2d(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride, padding):super(BasicConv2d, self).__init__()self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)self.relu = nn.ReLU()def forward(self, x):return self.relu(self.conv(x))class Inception(nn.Module):def __init__(self, in_channels, out_channels, ):"""in_channels:输入通道数 eg:192out_channels:各个分支的输出通道数, eg:[[64], [96, 128], [16,32], [32]]"""super(Inception, self).__init__()self.branch1 = nn.Sequential(BasicConv2d(in_channels, out_channels[0][0], kernel_size=1, stride=1, padding=0))self.branch2 = nn.Sequential(BasicConv2d(in_channels, out_channels[1][0], kernel_size=1, stride=1, padding=0),BasicConv2d(out_channels[1][0], out_channels[1][1], kernel_size=3, stride=1, padding=1))self.branch3 = nn.Sequential(BasicConv2d(in_channels, out_channels[2][0], kernel_size=1, stride=1, padding=0),BasicConv2d(out_channels[2][0], out_channels[2][1], kernel_size=5, stride=1, padding=2))self.branch4 = nn.Sequential(nn.MaxPool2d(3, 1, padding=1),BasicConv2d(in_channels, out_channels[3][0], kernel_size=1, stride=1, padding=0))def forward(self, x):x1 = self.branch1(x) # [N, C, H, W] -> [N, C1, H, W]x2 = self.branch2(x) # [N, C, H, W] -> [N, C2, H, W]x3 = self.branch3(x) # [N, C, H, W] -> [N, C3, H, W]x4 = self.branch4(x) # [N, C, H, W] -> [N, C4, H, W]x = torch.concat([x1, x2, x3, x4], dim=1) # [N, C1+C2+C3+C4, H, W]return xclass SeNetInception(nn.Module):def __init__(self, in_channels, out_channels, r=16):super(SeNetInception, self).__init__()self.block = Inception(in_channels, out_channels)se_in_channels = int(sum(r[-1] for r in out_channels))self.se_block = SeModule(se_in_channels, r)def forward(self, x):x = self.block(x)x = self.se_block(x)return xclass SeNetInceptionV2(nn.Module):def __init__(self, in_channels, out_channels, r=16):super(SeNetInceptionV2, self).__init__()self.block = Inception(in_channels, out_channels)se_in_channels = int(sum(r[-1] for r in out_channels))self.se_block = SeModuleV2(se_in_channels, r)def forward(self, x):x = self.block(x)x = self.se_block(x)return xclass GoogLeNet(nn.Module):def __init__(self, num_class, add_aux_stage=False):super(GoogLeNet, self).__init__()_inception = SeNetInceptionV2self.stage1 = nn.Sequential(BasicConv2d(3, 64, 7, 2, 3),nn.MaxPool2d(3, 2, padding=1),# nn.LocalResponseNorm(size=10),BasicConv2d(64, 64, 1, 1, 0),BasicConv2d(64, 192, 3, 1, 1),nn.MaxPool2d(3, 2, padding=1),_inception(192, [[64], [96, 128], [16, 32], [32]]), #inception3a_inception(256, [[128], [128, 192], [32, 96], [64]]), #inception3bnn.MaxPool2d(3, 2, padding=1),_inception(480, [[192], [96, 208], [16, 48], [64]]) #inception4a)self.stage2 = nn.Sequential(_inception(512, [[160], [112, 224], [24, 64], [64]]), # inception4b_inception(512, [[128], [128, 256], [24, 64], [64]]), # inception4c_inception(512, [[112], [144, 288], [32, 64], [64]]), # inception4d)self.stage3 = nn.Sequential(_inception(528, [[256], [160, 320], [32, 128], [128]]), # inception4enn.MaxPool2d(3, 2, padding=1),_inception(832, [[256], [160, 320], [32, 128], [128]]), # inception5a_inception(832, [[384], [192, 384], [48, 128], [128]]), # inception5bGolbalAvgPool2d())self.classify = nn.Conv2d(1024, num_class, kernel_size=(1, 1), stride=(1, 1), padding=0)if add_aux_stage:self.aux_stage1 = nn.Sequential(nn.MaxPool2d(5, 3, padding=0),nn.Conv2d(512, 1024, kernel_size=(1, 1), stride=(1, 1), padding=0),nn.ReLU(),nn.AdaptiveAvgPool2d(output_size=(2, 2)),nn.Flatten(1),nn.Linear(4096, 2048),nn.Dropout(p=0.4),nn.ReLU(),nn.Linear(2048, num_class))self.aux_stage2 = nn.Sequential(nn.MaxPool2d(5, 3, padding=0),nn.Conv2d(528, 1024, kernel_size=(1, 1), stride=(1, 1), padding=0),nn.ReLU(),nn.AdaptiveAvgPool2d(output_size=(2, 2)),nn.Flatten(1),nn.Linear(4096, 2048),nn.Dropout(p=0.4),nn.ReLU(),nn.Linear(2048, num_class))else:self.aux_stage1 = Noneself.aux_stage2 = Nonedef forward(self, x):"""[N, C, H, W]"""z1 = self.stage1(x) # [N, C, H, W] -> [N, 512, H1, W1]z2 = self.stage2(z1) # [N, 512, H1, W1] -> [N, 528, H2, W2]z3 = self.stage3(z2) # [N, 528, H2, W2] -> [N, 1024, 1, 1]# 三个决策分支输出scores3 = torch.squeeze(self.classify(z3)) # [N, 1024, 1, 1] -> [N, num_class, 1, 1] ->[N, num_class]if self.aux_stage1 is not None:score1 = self.aux_stage1(z1)score2 = self.aux_stage2(z2)return score1, score2, scores3else:return scores3def t1():net = GoogLeNet(num_class=4, add_aux_stage=True)loss_fn = nn.CrossEntropyLoss()_x = torch.rand(2, 3, 224, 224)_y = torch.tensor([0, 3], dtype=torch.long) # 模拟的真是类别标签id_r1, _r2, _r3 = net(_x) # 获取三个分支的预测值,可以用来和实际标签一起构架损失函数_loss1 = loss_fn(_r1, _y)_loss2 = loss_fn(_r2, _y)_loss3 = loss_fn(_r3, _y)_loss = _loss1 + _loss2, _loss3print(_r1)print(_r2)print(_r3)print(_r3.shape)traceed_script_module = torch.jit.trace(net.eval(), _x)traceed_script_module.save('./output/modules/googlenet_inception.pt')# 模型持久化torch.save(net, './output/modules/googlenet_inception.pkl')def t2():net1 = torch.load('./output/modules/googlenet_inception.pkl')net2 = GoogLeNet(num_class=4, add_aux_stage=False)# net2 中有部分参数没有恢复# net2 中没有这部分参数,但是入参的字典中传入该参数missing_keeys, unexpected_keys = net2.load_state_dict(net1.state_dict(), strict=False)if len(missing_keeys) >0 :raise ValueError(f"网络有部分参数没有恢复:{missing_keeys}")_x = torch.rand(2, 3, 224, 224)traceed_script_module = torch.jit.trace(net2.eval(), _x)traceed_script_module.save('./output/modules/googlenet_inception.pt')# 转换为onnx结构torch.onnx.export(model=net2.eval().cpu(), # 给定模型对象args=_x, # 给定模型forward的输出参数f= './output/modules/googlenet_inception_dynamic.onnx', # 输出文件名称# training=TrainingMode.EVAL,do_constant_folding=True,input_names=['images'], # 输入的tensor名称列表output_names=['scores'], # 输出的tensor名称列表opset_version=12,# dynamic_axes=None # 是否是动态结构dynamic_axes={'images': {0: 'n',2: 'h',3: 'w'},'label': {0: 'n'}})if __name__ == '__main__':# inception = Inception(192, [[64], [96, 128], [16, 32], [32]])# print(inception)# _x = torch.rand(4, 192, 100, 100)# _r = inception(_x)# print(_r.shape)t1()t2()
Residual Attention Networks
Residual Attention Networks利用Residual和Attention机制进行网络结构的堆叠,从而得到一个更深入的特征信息,在每个attention module中会做一个适应性的变化,采用上采样和下采样的结构。主要创新点:
Stacked Network Structure:堆叠多个attention module来构建网络结构。
Attention Residual Learning:一种优化思想,类似ResNet中基于残差的更新方式,可以让模型具有更好的性能。
Bottom-up top-down feedforward attention:基于下采样-上采样的机制,将特征权重加入到特征图中。
CNN-MobileNet
MobileNet是专门为了移动端和嵌入式深度学习应用设计的网络结构,主要是为了得到一个在较低配置资源上也具有非常好的效果的这个特性。其主要特点为:轻量化和直接使用stride=2的卷积代替池化层;其主要创新点:引入了depthwise separable convolutions(纵向可分离卷积),主要分解为两个更小的卷积操作:depthwise convolutions(深度卷积)和pointwiseconvolutions(逐点卷积),其主要目的是降低参数量和计算量。
输入特征图(DF ,DF ,M),输出特征图为(DG,DG,N)。
标准卷积核为(DK,DK,M,N)
将标准卷积分解为深度卷积和逐点卷积;其中深度卷积负责滤波作用(基于输入的featuremap提取更高阶的特征),核大小为(DK,DK,1,M);逐点卷积负责转换通道(合并之前的高阶特征信息,形成最终的feature map),核大小为(1,1,M,N)
深度卷积过程
对每个输入的通道分别进行卷积,得到不同的输出卷积结果。
逐点卷积过程
其实就是一个普通的11的卷积操作,做逐点卷积的主要原因是因为深度卷积过程中,是对于每个feature map单独提取特征的,做一个11的卷积,相当于多个输入featuremap的特征融合。
Width Multiplier: Thinner Models
引入第一个控制模型大小的超参数:宽度因子α(Width Multiplier),用于控制输入和输出的通道数,即将输入通道从M变成αM,输出通道从N变成αN。
α的一般取值为:[1.0, 0.75, 0.5, 0.25]
NOTE: 计算量和参数量减低了约α^2倍。
Resolution Multiplier: Reduced Representation
引入第二个模型大小控制参数:分辨率因子ρ(Resolution Multiplier),用于控制输入和内部层的表示,即输出层的分辨率控制。
常见输入通道为224,192,160和128,ρ取值范围一般(0,1]
MobileNet V2:
引入shortcut结构(残差结构)
使用1x1的卷积在depthwise之前进行feature map扩增。
在pointwise后使用linear激活函数代替relu激活函数,防止对于特征的破坏。
from pathlib import Path
import torch.jit
from PIL import Image
from torch import nn
from torchvision import models
import torch
from torchvision.transforms import transforms
from thop import profiledef calc_flops(my_net, inputs):if isinstance(inputs, list):inputs = tuple(inputs)elif not isinstance(inputs, tuple):inputs = (inputs,)flops, params = profile(my_net, inputs=inputs)print(f"总的浮点计数量:{flops}")print(f"总的参数量:{params}")def t1():calc_flops(my_net=nn.Sequential(nn.Linear(3, 5)),inputs=torch.randn(2, 3))if __name__ == '__main__':# path_dir = Path("./output/modules")# path_dir.mkdir(parents=True, exist_ok=True)# net = models.mobilenet_v2(pretrained=True)# print(net)# _x = torch.randn(4, 3, 224, 224)# modules = torch.jit.trace(net.eval(), _x)# modules.save(str(path_dir / 'mobile_v2.pt'))## net.eval().cpu()# tfs = transforms.ToTensor()## image_path = {# '小狗': r'../datas/小狗.png',# '小狗2': r'../datas/小狗2.png',# '小猫': r'../datas/小猫.jpg',# '飞机': r'../datas/飞机.jpg',# '飞机2': r'../datas/飞机2.jpg'# }## out_dir = Path('./output/mobiliev2/features/')# for name in image_path.keys():# img = Image.open(image_path[name]).convert("RGB")# img = tfs(img) # [3, H, W]# img = img[None] # [3, H, W] -> [1, 3, H, W]## score = net(img) # [1, 1000]# prob = torch.softmax(score, dim=1)# top5 = torch.topk(prob, 5, dim=1)# print("=" * 100)# print(name)# print(top5)t1()
CNN-ShuffleNet
ShuffleNet是一种满足在受限条件下的高效基础网络结构,基于组群卷积*(Group Convolution)和深度可分离卷积(Depthwise SeparableConvolution)。
简单的组群卷积会导致每个卷积操作仅从某些部分的输入通道数据中导出,会降低通道之间的信息流通,降低信息的表达能力,故在做GroupConvolution之前先做一个channel的shuffle操作,以保障信息的表达能力。
对于channel的shuffle操作,
有g*n的输出通道;reshape(g,n),转置为(n,g),扁平化,再分组作为下一层的输入
RepVGG
深度解读:RepVGG - 知乎 (zhihu.com)
https://zhuanlan.zhihu.com/p/353697121
图解RepVGG - 知乎 (zhihu.com)
https://zhuanlan.zhihu.com/p/352239591
训练使用多分支机构,模型拟合能力强
推理是将多分支合并,减少内存开支,加快运行速度
MobileOne
全网唯一复现!手机端 1ms 级延迟的主干网模型 MobileOne - 知乎 (zhihu.com)
https://zhuanlan.zhihu.com/p/614576582
FasterNet
BatchNorm
from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass BN(nn.Module):def __init__(self, num_features):super(BN, self).__init__()self.momentum = 0.1self.eps = 1e-8# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))self.register_buffer('_var', torch.zeros([1, num_features, 1, 1]))self.running_mean: Optional[Tensor]self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))def forward(self, x):if self.training:_mean = torch.mean(x, dim=(0, 2, 3), keepdim=True)_var = torch.var(x, dim=(0, 2, 3), keepdim=True)self._mean = self.momentum * self._mean + (1 - self.momentum) * _meanself._var = self.momentum * self._var + (1 - self.momentum) * _varelse:_mean = self._mean_var = self._varz = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)bn = BN(num_features=12)bn.train()xs = [torch.randn(8, 12, 32, 32) for _ in range(10)]for _x in xs:bn(_x)print(bn._mean.view(-1))print(bn._var.view(-1))bn.eval()_r = bn(xs[0])print(_r.shape)# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(bn, str(path_dir / "bn_model.pkl"))torch.save(bn.state_dict(), str(path_dir / "bn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(bn.eval(), xs[0])traced_script_model.save("./output/modules/bn_model.pt")# 模拟模型恢复bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')print(bn2)
LN
from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass LN(nn.Module):def __init__(self, num_features, eps=1e-8):super(LN, self).__init__()# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解# self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))# self.register_buffer('_var', torch.zeros([1, num_features, 1, 1]))# self.running_mean: Optional[Tensor]# self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.eps = epsdef forward(self, x):_mean = torch.mean(x, dim=(1, 2, 3), keepdim=True)_var = torch.var(x, dim=(1, 2, 3), keepdim=True)z = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = LN(num_features=12)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "ln_model.pkl"))torch.save(net.state_dict(), str(path_dir / "ln_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/ln_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)
GN
from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass GN(nn.Module):def __init__(self, num_features, groups, eps=1e-8):super(GN, self).__init__()assert num_features % groups == 0, "要求特征数必须整除"# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解# self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))# self.register_buffer('_var', torch.zeros([1, num_features, 1, 1]))# self.running_mean: Optional[Tensor]# self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.eps = epsself.groups = groupsdef forward(self, x):n, c, h, w = x.shapecg = c // self.groupsx = x.view(n, self.groups, cg, h, w)_mean = torch.mean(x, dim=(2, 3, 4), keepdim=True)_var = torch.var(x, dim=(2, 3, 4), keepdim=True)x = (x - _mean) / torch.sqrt(_var + self.eps)x = x.view(n, c, h, w)z = x * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = GN(num_features=12, groups=3)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "gn_model.pkl"))torch.save(net.state_dict(), str(path_dir / "gn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/gn_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)
SN
from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass SN(nn.Module):def __init__(self, num_features):super(SN, self).__init__()self.momentum = 0.1self.eps = 1e-8# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解self.register_buffer('running_bn_mean', torch.zeros([1, num_features, 1, 1]))self.register_buffer('running_bn_var', torch.zeros([1, num_features, 1, 1]))self.running_bn_mean: Optional[Tensor]self.running_bn_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.w = nn.Parameter(torch.ones([3]))def get_bn(self, x):if self.training:_bn_mean = torch.mean(x, dim=(0, 2, 3), keepdim=True)_bn_var = torch.var(x, dim=(0, 2, 3), keepdim=True)self.running_bn_mean = self.momentum * self.running_bn_mean + (1 - self.momentum) * _bn_meanself.running_bn_var = self.momentum * self.running_bn_var + (1 - self.momentum) * _bn_varelse:_bn_mean = self.running_bn_mean_bn_var = self.running_bn_varreturn _bn_mean, _bn_vardef get_ln(self, x):_bn_mean = torch.mean(x, dim=(1, 2, 3), keepdim=True)_bn_var = torch.var(x, dim=(1, 2, 3), keepdim=True)return _bn_mean, _bn_vardef get_in(self, x):_bn_mean = torch.mean(x, dim=(2, 3), keepdim=True)_bn_var = torch.var(x, dim=(2, 3), keepdim=True)return _bn_mean, _bn_vardef forward(self, x):_bn_mean, _bn_var = self.get_bn(x)_ln_mean, _ln_var = self.get_ln(x)_in_mean, _in_var = self.get_in(x)w = torch.softmax(self.w, dim=0)bn_w, ln_w, in_w = w[0], w[1], w[2]_mean = _bn_mean * bn_w + _ln_mean * ln_w + _in_mean * in_w_var = _bn_var * bn_w + _ln_var * ln_w + _in_var * in_wz = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = SN(num_features=12)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "sn_model.pkl"))torch.save(net.state_dict(), str(path_dir / "sn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/sn_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)
算子融合
属于模型量化中的一个小的分支,通过在推理之前对推理的链路婕儿沟就行算子的合并,从而降低运行的耗时。
基本上不影响模型的预测结果,但能够融合的算子只占一部分
常见的结构&模块:
-
Conv + Bn
-
Linear + Bn
-
RepVGG
conv+bn合并
from pathlib import Pathimport torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch._C._onnx import TrainingModeclass Cnov(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride, padding):super(Cnov, self).__init__()self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)self.bn = nn.BatchNorm2d(out_channels)self.act = nn.ReLU()def forward(self, x):return self.act(self.bn(self.conv(x)))def forward_fuse(self, x):return self.act(self.conv(x))class NetWork(nn.Module):def __init__(self, num_classes):super(NetWork, self).__init__()self.features = nn.Sequential(Cnov(3, 64, 3, 1, 1),Cnov(64, 128, 3, 2, 1), # 下采样Cnov(128, 128, 3, 1, 1),Cnov(128, 256, 3, 2, 1), # 下采样Cnov(256, 256, 3, 1, 1),nn.AdaptiveMaxPool2d((4, 4)))self.classify = nn.Sequential(nn.Linear(256*4*4, 256),nn.ReLU(),nn.Linear(256, num_classes))def forward(self, x):z = self.features(x)z = z.flatten(1)z = self.classify(z)return zdef t0():net = NetWork(10)print(net)loss_fn = nn.CrossEntropyLoss()train_opt = optim.SGD(net.parameters(), lr=0.0001)n = 20xs = [torch.rand(8, 3, 32, 32) for _ in range(n)]ys = [torch.randint(10, size=(8,)) for _ in range(n)]for epoch in range(5):for i in range(n):_x = xs[i]_y = ys[i]loss = loss_fn(net(_x), _y)train_opt.zero_grad()loss.backward()train_opt.step()print(f"epoch:{epoch}, batch:{i}, loss:{loss.item():.5f}")path_dir = Path("./output/modules/01")path_dir.mkdir(parents=True, exist_ok=True)torch.save(net.eval(), str(path_dir / "module.pkl"))def export(model_dir, model_path=None, name='module'):model_dir = Path(model_dir)if model_path is None:model_path = model_dir / 'module.pkl'net = torch.load(model_path or (model_dir / 'module.pkl'), map_location='cpu')net.eval().cpu()example = torch.rand(1, 3, 32, 32)traces_script_module = torch.jit.trace(net, example)traces_script_module.save(model_dir / f'{name}.pt')torch.onnx.export(model=net,args=example,f=model_dir / f'{name}.onnx',training=TrainingMode.EVAL,input_names=['images'],output_names=['scores'],opset_version=12,dynamic_axes={'images': {0: 'batch'},'scores': {0: "batch"}})def fuse_conv_bn(conv: nn.Conv2d, bn:nn.BatchNorm2d):fusedconv = nn.Conv2d(in_channels=conv.in_channels,out_channels=conv.out_channels,kernel_size=conv.kernel_size,stride=conv.stride,padding=conv.padding,groups=conv.groups,bias=True).requires_grad_(False).to(conv.weight.device)# 合并weightw_bn = bn.weight.div(torch.sqrt(bn.eps + bn.running_var)) # 构建一个主对角线有数值,其他为欸欸之没有的操作w_bn_conv = w_bn[:, None, None, None] # [OC] -> [OC,1,1,1]fusedconv.weight.copy_(conv.weight.clone() * w_bn_conv)# 合并biasconv_bias = torch.zeros(conv.out_channels, device=conv.weight.device) if conv.bias is None else conv.bias.clone()fusedconv.bias.copy_((conv_bias - bn.running_mean) * w_bn + bn.bias)return fusedconvdef fuse_modules(model_dir, name="new_model"):model_dir = Path(model_dir)net = torch.load(model_dir / 'module.pkl', map_location='cpu')net.eval().cpu()# 模型模块合并,合并conv和bnfor m in net.modules():if type(m) is Cnov:# 合并m.conv = fuse_conv_bn(m.conv, m.bn)delattr(m, 'bn') # 删除m中的bn属性m.forward = m.forward_fuse # 方法的赋值torch.save(net.cpu(), str(model_dir / f"{name}.pkl"))example = torch.rand(1, 3, 28, 28)traces_script_module = torch.jit.trace(net, example)traces_script_module.save(model_dir / f'{name}.pt')export(model_dir=model_dir,model_path=str(model_dir / f"{name}.pkl"),name=name)print("nih")def tt_fuse(model_dir):model_dir = Path(model_dir)net1 = torch.jit.load(str(model_dir/'module.pt'), map_location='cpu')net1.eval().cpu()net2 = torch.jit.load(str(model_dir/'new_model.pt'), map_location='cpu')net2.eval().cpu()x = torch.rand(1, 3, 32, 32)r1 = net1(x)r2 = net2(x)print(r1 - r2)def tt(model_dir):model_dir = Path(model_dir)net = torch.load(model_dir / 'module.pkl', map_location='cpu')net.eval().cpu()# 调用torch的量化接口fused_m = torch.quantization.fuse_modules(model=net,modules_to_fuse=[['features.0.conv', 'features.0.bn', 'features.0.act'],['features.1.conv', 'features.1.bn', 'features.1.act'],['features.2.conv', 'features.2.bn', 'features.2.act'],['features.3.conv', 'features.3.bn', 'features.3.act'],['features.4.conv', 'features.4.bn', 'features.4.act'],])print(fused_m)x = torch.rand(4, 3, 28, 28)r1 = net(x)r2 = fused_m(x)print(r1 - r2)torch.save(fused_m.cpu(), str(model_dir / f"fuse_model.pkl"))export(model_dir=model_dir,model_path=str(model_dir / f"fuse_model.pkl"),name="fuse_model")if __name__ == '__main__':# t0()# export(model_dir="./output/modules/01")# fuse_modules(model_dir="./output/modules/01")# tt_fuse(model_dir="./output/modules/01")tt("./output/modules/01")
RepVGG合并
import torch import torch.nn as nn import torch.nn.functional as Fdef t0():_x = torch.rand(4, 9, 24, 24)conv1 = nn.Conv2d(9, 9, kernel_size=(1, 1), padding=0, stride=(1, 1))conv3 = nn.Conv2d(9, 9, kernel_size=(3, 3), padding=1, stride=(1, 1))r1 = conv1(_x) + conv3(_x)print(r1.shape)# 单链路conv = nn.Conv2d(9, 9, kernel_size=(3, 3), padding=1, stride=(1, 1)).requires_grad_(False)conv1_weight = F.pad(conv1.weight.clone(), [1, 1, 1, 1])conv1_bias = conv1.bias.clone()conv3_weight = conv3.weight.clone()conv3_bias = conv3.bias.clone()conv.weight.copy_(conv3_weight + conv1_weight)conv.bias.copy_(conv3_bias + conv1_bias)r2 = conv(_x)print(r2.shape)r = torch.abs(r1 - r2)print(torch.max(r))if __name__ == '__main__':t0()
鸢尾花数据集
import os
from pathlib import Pathimport torch
from sklearn.datasets import load_iris
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriterfrom numpy_dataset import bulid_dataloader
impor t torch.optim as optim
import numpy as np
from metrics import Accuracyclass IrisNetWork(nn.Module):def __init__(self):super(IrisNetWork, self).__init__()self.classify = nn.Sequential(nn.Linear(4, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 32),nn.ReLU(),nn.Linear(32, 3))def forward(self, x):return self.classify(x)def save_model(path, net, epoch, train_batch, test_batch):if not os.path.exists(os.path.dirname(path)):os.mkdir(os.path.dirname(path))torch.save(net, path)torch.save({'net': net,'epoch': epoch,'train batch': train_batch,'test batch': test_batch}, path)def save(obj,path):torch.save(obj, path)def load(path, net):print(f"模型恢复:{path}")ss_mmodel = torch.load(path, map_location='cpu')net.load_state_dict(state_dict=ss_mmodel['net'].state_dict(), strict=True)start_epoch = ss_mmodel['epoch']best_acc = ss_mmodel['acc']train_batch = ss_mmodel['train_batch']test_batch = ss_mmodel['test_batch']return start_epoch, best_acc, train_batch, test_batchdef training(restore_path=None):root_dir = Path('./output/01')summary_dir = root_dir / 'summary'if not summary_dir.exists():summary_dir.mkdir(parents=True)checkout_dir = root_dir / 'model'if not checkout_dir.exists():checkout_dir.mkdir(parents=True)last_path = checkout_dir / 'last.pkl'best_path = checkout_dir / 'best.pkl'final_path = checkout_dir / 'finall.pkl'total_epoch = 100start_epoch = 0summary_initerval_batch = 2train_batch = 0test_batch = 0best_acc = -0.1save_interva_batch = 2# 1. 定义数据加载器X, Y = load_iris(return_X_y=True)X = X.astype('float32')Y = Y.astype('int64')train_dataloader, test_dataloader, test_x, test_y = bulid_dataloader(X, Y, 0.1, 8)# 2.定义模型net = IrisNetWork()loss_fn = nn.CrossEntropyLoss()opt = optim.SGD(params=net.parameters(), lr=0.01)acc_fn = Accuracy()# # 3.模型恢复if best_path.exists():start_epoch, best_acc, train_batch, test_batch = load(best_path, net)elif final_path.exists():start_epoch, best_acc, train_batch, test_batch = load(final_path, net)# 4.定义可视化输出writer =SummaryWriter(log_dir=summary_dir)writer.add_graph(net, torch.rand(3, 4))# 5. 遍历训练模型for epoch in range(start_epoch, total_epoch+start_epoch):# 5.1训练net.train()train_loss = []train_true, train_total = 0, 0for x, y in train_dataloader:# 前向过程scores = net(x)loss = loss_fn(scores, y)n, acc = acc_fn(scores, y)# 反向过程opt.zero_grad()loss.backward()opt.step()loss = loss.item()acc = acc.item()train_total += ntrain_true += n * accif train_batch % summary_initerval_batch == 0:print(f"epoch:{epoch}, train batch:{train_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('train_loss', loss, global_step=train_batch)writer.add_scalar('train_acc', acc, global_step=train_batch)train_batch += 1train_loss.append(loss)# 评估net.eval()test_loss = []test_true, test_total = 0, 0with torch.no_grad():for x, y in test_dataloader:# 前向过程scores = net(x)loss = loss_fn(scores, y)n, acc = acc_fn(scores, y)loss = loss.item()acc = acc.item()test_total += ntest_true = n * accprint(f"epoch:{epoch}, test batch:{test_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('test_loss', loss, global_step=test_batch)writer.add_scalar('test_acc', acc, global_step=test_batch)test_batch += 1test_loss.append(loss)# 5.3 epoch街二段的信息可视化train_acc = train_true / train_totaltest_acc= test_true / test_totalwriter.add_scalars('loss', {'train': np.mean(train_loss), 'test':np.mean(test_loss)}, global_step=epoch)writer.add_scalars('acc', {'train': train_acc, 'test': test_acc}, global_step=epoch)# TODO:自己加入提前结束训练的逻辑判断writer.close()# 5. 模型持久化if test_acc > best_acc:# 最优模型保存obj = {'net':net,'epoch':epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (checkout_dir / 'best.pkl').absolute())best_acc = test_accif epoch % save_interva_batch == 0:obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (last_path).absolute())# 6.最终模型持久化obj = {'net': net,'epoch': start_epoch + total_epoch - 1,'train_btch': train_batch,'test_batch': test_batch,'best_acc': test_acc}save(obj, (checkout_dir / 'finall.pkl').absolute())writer.close()def export(model_dir):"""NOTE:可以通过netron(https://netron.app)来看网络结构将训练好的模型转换成可以支持多平台部署的结构,常用的有:pt:Torch框架跨语言的结构onnx:一种比较通用的深度学习模型框架结构tensorRt:先转换成onnx,然后在进行转换使用TensorRT进行GPU加速openvino:先转换成onnx,然后在进行转换使用TensorRT进行CPU加速"""model_dir = Path(model_dir)# 模型恢复net = torch.load(model_dir / 'best.pkl', map_location='cpu')['net']net.eval().cpu()# 模型转换为pt结构example = torch.rand(1, 4)traced_script_module = torch.jit.trace(net, example)traced_script_module.save(model_dir / 'best.pt')# 转换为onnx结构torch.onnx.export(model=net, # 给定模型对象args=example, # 给定模型forward的输出参数f=model_dir / 'best_dynamic.onnx', # 输出文件名称# training=_C_onnx.TrainingMode.EVAL,input_names=['features'], # 输入的tensor名称列表output_names=['label'], # 输出的tensor名称列表opset_version=12,# dynamic_axes=None # 是否是动态结构dynamic_axes={'features':{0:'batch'},'label':{0:'bath'}})pass@torch.no_grad()
def tt_load_model(module_dir):module_dir = Path(module_dir)# python的模型恢复net1 = torch.load(module_dir / 'best.pkl', map_location='cpu')['net']net1.eval().cpu()# pytorch script模型恢复net2 = torch.jit.load(module_dir / 'best.pt', map_location='cpu')net2.eval().cpu()# onnx模型恢复import onnxruntimenet3 = onnxruntime.InferenceSession(module_dir / 'best_dynamic.onnx')x = torch.rand(2, 4)print(net1(x))print(net2(x))print(net3.run(['label'], input_feed={'features': x.detach().numpy()}))if __name__ == '__main__':# training()# export(# model_dir='output/01/model'# )tt_load_model(module_dir='output/01/model')
"""
Iris预测模型处理器代码
"""
import os.pathimport numpy as np
import onnxruntime
import torch.jitdef softmx(scores):"""求解softmax概率值scores:numpy对象 [n,m]return 求解属于m个类别的概率值"""a = np.exp(scores)b = np.sum(a, axis=1, keepdims=True)p = a / breturn pclass IrisProcessor(object):def __init__(self, model_path):"""模型初始化支持pt。onnx"""super(IrisProcessor, self).__init__()model_path = os.path.abspath(model_path)_, ext = os.path.splitext(model_path.lower())self.pt, self.onnx = False, Falseif ext == '.pt':model = torch.jit.load(model_path, map_location='cpu')model.eval().cpu()self.modedl = modelself.pt = Trueelif ext == '.onnx':session = onnxruntime.InferenceSession(model_path)self.session = sessionself.input_name = 'features'self.output_name = 'label'self.onnx = Trueelse:raise ValueError(f'当前仅支持pt和onnx格式,当前文件类型为:{model_path}')self.classes = ['类别1', '类别2', '类别3']print(f"模型恢复成功:pt -->{self.pt}; onnx --> {self.onnx}")def _process_after_model(self, x, scores):"""后处理逻辑x:原始属性x, numpy [n, 4]scores: 模型预测的置信度 numpy类型 [n, 3]return :每个样本返回对应的预测类别名称,id以及概率只,以dict返回"""pred_probas = softmx(scores) # [n, 3]pred_indexes = np.argmax(scores, axis=1)result = []for k, idx in enumerate(pred_indexes):r = {'id': idx,'label': self.classes[idx],'proba': pred_probas[k][idx]}result.append(r)return result@torch.no_grad()def _predict_with_pt(self, x):tensor_x = torch.from_numpy(x).to(torch.float)score = self.modedl(tensor_x) # [n, 4] -> [n, 3]return self._process_after_model(x, score.numpy())def _preedict_with_onnx(self, x):onnx_x = x.astype('float32')score = self.session.run([self.output_name], input_feed={self.input_name: onnx_x}) #[n, 4] -> [n, 3]score = score[0] # 获取去第一个输出结果,output_name对应的结果return self._process_after_model(x, score)def predict(self, x):"""模型预测方法,输入鸢尾花原始特征属性,返回对应标签x:numpy对象,形状是[n, 4],n个样本,4个属性return: list对象,形状为[n]"""if self.pt:return self._predict_with_pt(x)elif self.onnx:return self._preedict_with_onnx(x)else:raise ValueError("当前环境初始化异常!")if __name__ == '__main__':# processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best.pt")# r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [0.2, 1.3, 0.5, 0.2]]))# print(r)processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [3.2, 1.3, 4.7, 2.2]]))print(r)
"""
Iris预测模型处理器代码
"""
import os.pathimport numpy as np
import onnxruntime
import torch.jitdef softmx(scores):"""求解softmax概率值scores:numpy对象 [n,m]return 求解属于m个类别的概率值"""a = np.exp(scores)b = np.sum(a, axis=1, keepdims=True)p = a / breturn pclass IrisProcessor(object):def __init__(self, model_path):"""模型初始化支持pt。onnx"""super(IrisProcessor, self).__init__()model_path = os.path.abspath(model_path)_, ext = os.path.splitext(model_path.lower())self.pt, self.onnx = False, Falseif ext == '.pt':model = torch.jit.load(model_path, map_location='cpu')model.eval().cpu()self.modedl = modelself.pt = Trueelif ext == '.onnx':session = onnxruntime.InferenceSession(model_path)self.session = sessionself.input_name = 'features'self.output_name = 'label'self.onnx = Trueelse:raise ValueError(f'当前仅支持pt和onnx格式,当前文件类型为:{model_path}')self.classes = ['类别1', '类别2', '类别3']print(f"模型恢复成功:pt -->{self.pt}; onnx --> {self.onnx}")def _process_after_model(self, x, scores):"""后处理逻辑x:原始属性x, numpy [n, 4]scores: 模型预测的置信度 numpy类型 [n, 3]return :每个样本返回对应的预测类别名称,id以及概率只,以dict返回"""pred_probas = softmx(scores) # [n, 3]pred_indexes = np.argmax(scores, axis=1)result = []for k, idx in enumerate(pred_indexes):r = {'id': int(idx), # 将numpy的int类型转为python的int类型'label': self.classes[idx],'proba': float(pred_probas[k][idx]) # 将numpy的int类型转为python的float类型}result.append(r)return result@torch.no_grad()def _predict_with_pt(self, x):tensor_x = torch.from_numpy(x).to(torch.float)score = self.modedl(tensor_x) # [n, 4] -> [n, 3]return self._process_after_model(x, score.numpy())def _preedict_with_onnx(self, x):onnx_x = x.astype('float32')score = self.session.run([self.output_name], input_feed={self.input_name: onnx_x}) #[n, 4] -> [n, 3]score = score[0] # 获取去第一个输出结果,output_name对应的结果return self._process_after_model(x, score)def predict(self, x):"""模型预测方法,输入鸢尾花原始特征属性,返回对应标签x:numpy对象,形状是[n, 4],n个样本,4个属性return: list对象,形状为[n]"""if self.pt:return self._predict_with_pt(x)elif self.onnx:return self._preedict_with_onnx(x)else:raise ValueError("当前环境初始化异常!")if __name__ == '__main__':# processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best.pt")# r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [0.2, 1.3, 0.5, 0.2]]))# print(r)processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [3.2, 1.3, 4.7, 2.2]]))print(r)
import numpy as npfrom iris_proceeessor import IrisProcessorprocessor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")while True:x = input("请输入特征属性,用空格隔开:")if "q" == x:breakx = x.split(" ")if len(x) != 4:print(f"输入的特征属性异常,请输入4维特征属性:{x}")continuex = np.asarray([x])r = processor.predict(x)print(f"预测结果为:{r}")
from flask import Flask, request, jsonify
import numpy as npfrom study_code.iris_proceeessor import IrisProcessorapp = Flask(__name__)
processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")@app.route('/')
def index():return "Iris数据分类模型接口服务"@app.route("/predict")
def predict():try:# get方式请求,碧玺给定参数features,使用‘,’进行特征分割,使用‘;’进行样本分割features = request.args.get('features')if features is None:return jsonify({'code': 1, 'msg': '参数异常,必须给定有效的features参数'})x = [xx.split(",") for xx in features.split(";")]x = np.asarray(x, dtype='float32')if len(x) == 0:return jsonify({'code': 2, 'msg': f'参数异常,必须给定有效的features参数:{features}'})if len(x[0]) != 4:return jsonify({'code': 3, 'msg': f'参数维度异常,必须给定有效的features参数:{features}'})print(x)r = processor.predict(x)print(r)return jsonify({'code': 0, 'data': r, 'msg': "成功!"})except Exception as e:return jsonify({'code': 4, "msg": f"服务器异常“{e}"})
import os
import sys#将当前文件所在文件夹添加大环境变量
sys.path.append(os.path.dirname(__file__))if __name__ == '__main__':from app import appapp.run(host='0.0.0.0', port=9999)
手写体识别
import os
from datetime import datetime
from pathlib import Pathimport matplotlib.pyplot as plt
import torch
from sklearn.datasets import load_iris
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transformsfrom study_code.numpy_dataset import bulid_dataloader
import torch.optim as optim
import numpy as np
from study_code.metrics import Accuracyclass NetWork(nn.Module):def __init__(self, in_features, num_classes, units=None):super(NetWork, self).__init__()if units is None:units = [1024, 2048, 512]self.in_features = in_featuresself.num_classes = num_classeslayers = []for unit in units:layers.append(nn.Linear(in_features=in_features, out_features=unit))layers.append(nn.ReLU())in_features = unitlayers.append(nn.Linear(in_features=in_features, out_features=self.num_classes))self.classify = nn.Sequential(*layers)def forward(self, x):x = x.reshape(-1, self.in_features) #[N, 1, 28, 28] -> [N, 1 * 28 * 28]return self.classify(x)def save_model(path, net, epoch, train_batch, test_batch):if not os.path.exists(os.path.dirname(path)):os.mkdir(os.path.dirname(path))torch.save(net, path)torch.save({'net': net,'epoch': epoch,'train batch': train_batch,'test batch': test_batch}, path)def save(obj, path):torch.save(obj, path)def load(path, net):print(f"模型恢复:{path}")ss_model = torch.load(path, map_location='cpu')net.load_state_dict(state_dict=ss_model['net'].state_dict(), strict=True)start_epoch = ss_model['epoch']best_acc = ss_model['acc']train_batch = ss_model['train_batch']test_batch = ss_model['test_batch']return start_epoch, best_acc, train_batch, test_batchdef training(restore_path=None):# now = datetime.now().strftime("%%%y%%m%d%H%M%S")root_dir = Path('./output/02')summary_dir = root_dir / 'summary'if not summary_dir.exists():summary_dir.mkdir(parents=True)checkout_dir = root_dir / 'model'if not checkout_dir.exists():checkout_dir.mkdir(parents=True)last_path = checkout_dir / 'last.pkl'best_path = checkout_dir / 'best.pkl'final_path = checkout_dir / 'final.pkl'total_epoch = 5start_epoch = 0summary_initerval_batch = 2train_batch = 0test_batch = 0best_acc = -0.1save_interva_batch = 2batch_size = 8# 1. 定义数据加载器train_dataset = datasets.MNIST(root='../../datas/MNIST',train=True,transform=transforms.ToTensor(), # 定义数据集转换方式,默认是numpydownload=True)train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)test_dataset = datasets.MNIST(root='../../datas/MNIST',train=False,transform=transforms.ToTensor(), # 定义数据集转换方式,默认是numpydownload=True)test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=batch_size * 2)# 2.定义模型net = NetWork(in_features=1 * 28 * 28, num_classes=10)loss_fn = nn.CrossEntropyLoss()opt = optim.SGD(params=net.parameters(), lr=0.01)acc_fn = Accuracy()# # 3.模型恢复if best_path.exists():start_epoch, best_acc, train_batch, test_batch = load(best_path, net)elif last_path.exists():start_epoch, best_acc, train_batch, test_batch = load(last_path, net)# 4.定义可视化输出writer = SummaryWriter(log_dir=summary_dir)writer.add_graph(net, torch.rand(3, 1, 28, 28))# 5. 遍历训练模型for epoch in range(start_epoch, total_epoch+start_epoch):# 5.1训练net.train()train_loss = []train_true, train_total = 0, 0for batch_img, bacth_label in train_dataloader:# 前向过程scores = net(batch_img)loss = loss_fn(scores, bacth_label)n, acc = acc_fn(scores, bacth_label)# 反向过程opt.zero_grad()loss.backward()opt.step()loss = loss.item()acc = acc.item()train_total += ntrain_true += n * accif train_batch % summary_initerval_batch == 0:print(f"epoch:{epoch}, train batch:{train_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('train_loss', loss, global_step=train_batch)writer.add_scalar('train_acc', acc, global_step=train_batch)train_batch += 1train_loss.append(loss)# 评估net.eval()test_loss = []test_true, test_total = 0, 0with torch.no_grad():for batch_img, bacth_label in test_dataloader:# 前向过程scores = net(batch_img)loss = loss_fn(scores, bacth_label)n, acc = acc_fn(scores, bacth_label)loss = loss.item()acc = acc.item()test_total += ntest_true = n * accprint(f"epoch:{epoch}, test batch:{test_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('test_loss', loss, global_step=test_batch)writer.add_scalar('test_acc', acc, global_step=test_batch)test_batch += 1test_loss.append(loss)# 5.3 epoch街二段的信息可视化train_acc = train_true / train_totaltest_acc= test_true / test_totalwriter.add_scalars('loss', {'train': np.mean(train_loss), 'test':np.mean(test_loss)}, global_step=epoch)writer.add_scalars('acc', {'train': train_acc, 'test': test_acc}, global_step=epoch)# TODO:自己加入提前结束训练的逻辑判断writer.close()# 5. 模型持久化if test_acc > best_acc:# 最优模型保存obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (checkout_dir / 'best.pkl').absolute())best_acc = test_accif epoch % save_interva_batch == 0:obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, last_path.absolute())# 6.最终模型持久化obj = {'net': net,'epoch': start_epoch + total_epoch - 1,'train_batch': train_batch,'test_batch': test_batch,'best_acc': test_acc}save(obj, (checkout_dir / 'final.pkl').absolute())writer.close()def export(model_dir):"""NOTE:可以通过netron(https://netron.app)来看网络结构将训练好的模型转换成可以支持多平台部署的结构,常用的有:pt:Torch框架跨语言的结构onnx:一种比较通用的深度学习模型框架结构tensorRt:先转换成onnx,然后在进行转换使用TensorRT进行GPU加速openvino:先转换成onnx,然后在进行转换使用TensorRT进行CPU加速"""model_dir = Path(model_dir)# 模型恢复net = torch.load(model_dir / 'best.pkl', map_location='cpu')['net']net.eval().cpu()# 模型转换为pt结构example = torch.rand(1, 1, 28, 28)traced_script_module = torch.jit.trace(net, example)traced_script_module.save(model_dir / 'best.pt')# 转换为onnx结构torch.onnx.export(model=net, # 给定模型对象args=example, # 给定模型forward的输出参数f=model_dir / 'best_dynamic.onnx', # 输出文件名称# training=_C_onnx.TrainingMode.EVAL,input_names=['images'], # 输入的tensor名称列表output_names=['scores'], # 输出的tensor名称列表opset_version=12,# dynamic_axes=None # 是否是动态结构dynamic_axes={'images': {0: 'batch'},'scores': {0: 'batch'}})pass@torch.no_grad()
def tt_load_model(module_dir):module_dir = Path(module_dir)# python的模型恢复net1 = torch.load(module_dir / 'best.pkl', map_location='cpu')['net']net1.eval().cpu()# pytorch script模型恢复net2 = torch.jit.load(module_dir / 'best.pt', map_location='cpu')net2.eval().cpu()# onnx模型恢复import onnxruntimenet3 = onnxruntime.InferenceSession(module_dir / 'best_dynamic.onnx')x = torch.rand(2, 1, 28, 28)print(net1(x))print(net2(x))print(net3.run(['scores'], input_feed={'images': x.detach().numpy()}))img_path = r"D:\my_program\datas\MNIST\MNIST\images\4\53.png"img = plt.imread(img_path)[:, :, 0][None, None, :, :] # [28, 28, 4] -> [1, 1, 28, 28]img = torch.from_numpy(img)print("=" * 100)print(net1(img))print(net2(img))print(net3(img))print(net3.run(['scores'], input_feed={'images': img.detach().numpy()}))if __name__ == '__main__':# training()export(model_dir='output/02/model')tt_load_model(module_dir='output/01/model')