**卷积神经网络典型CNN**

SeNet

SeNet(Squeeze-and-Excitation Networks)是ImageNet 2017年分类任务冠军,核心思想是:Squeeze(挤压、压缩)Excitation(激励)两个操作,其主要目的是通过显示的构建特征通道之间的相互依赖关系,采用特征重定向的策略,通过学习的方式自动的获取每个特征通道的重要程度,然后依据***这个重要程度去提升有用的特征,并抑制对于当前任务用处不大的特征。

在这里插入图片描述

Squeeze特征压缩操作,将每个feature map的特征数据使用全局平均池化操作转换为一个数值(实数),这个实数在一定程度上具有全局感受野的作用。Squeeze操作的输出表示这特征通道上响应的全局分布(所有的feature map可以认为是局部描述子的集合),而且使得靠近输入层的结构中也可以获得全局感受野的信息。

在这里插入图片描述

Excitation特征激励操作,主要目的是为了显示的构建特征通道之间的相关性。为了限制模型复杂度和辅助增加泛化能力,引入两个FC层,首先经过第一个FC层,将输入的11c的特征图降维成11c/r,然后经过一个ReLU后,经过第二个FC层进行升维层,转换为11c,最后做一个sigmoid转换得到权重值。论文中r为16.

在这里插入图片描述

这里使用FC全连接的主要目的是为了降低通道间的相关性,使用两个FullyConnected,这样做比直接用一个Fully Connected层的好处在于:

-1. 具有更多的非线性组合,可以更好地拟合通道间复杂的相关性;

-2. 极大地减少了参数量和计算量。

ReWeight操作是将Excitation输出的权重可以看成是经过特征选择后的每个特征通道的重要性,然后通过乘法逐通道的加权到之前的特征上,完成在通道维度上对于原始特征的重标定。

在这里插入图片描述

讲SE Block嵌入到任何网络结构中,形成特有的SE结构。

在这里插入图片描述

import osimport numpy as np
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from torchvision import models, datasets
import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.nn as nn
from torchvision.transforms import transformsclass GolbalAvgPool2d(nn.Module):def __init__(self):super(GolbalAvgPool2d, self).__init__()def forward(self, x):"""[N, C, H, W]-> [N, C, 1, 1]"""return torch.mean(x, dim=(2, 3), keepdim=True)class SeModule(nn.Module):def __init__(self, in_channel, r=16):super(SeModule, self).__init__()self.avg = GolbalAvgPool2d()self.fc1 = nn.Conv2d(in_channel, in_channel // r, kernel_size=(1, 1), stride=(1, 1))self.fc2 = nn.Conv2d(in_channel // r, in_channel, kernel_size=(1, 1), stride=(1, 1))def forward(self, x):"""se操作[N, C, H, W]-》 [N, C, H, W]"""# 求解每个通道的权重信息alpha = self.avg(x)  # [N, C, H, W] [N, C, 1, 1]]alpha = F.relu(self.fc1(alpha))     # [N, C, 1, 1] [N, C//r, 1, 1]]alpha = torch.sigmoid(self.fc2(alpha)) # [N, C//r, 1, 1] [N, C, 1, 1]]# 加权合并x = x * alphareturn xclass SeModuleV2(nn.Module):def __init__(self, in_channel, r=16):super(SeModuleV2, self).__init__()self.fc1 = nn.Conv2d(in_channel, in_channel // r, kernel_size=(1, 1), stride=(1, 1))self.fc2 = nn.Conv2d(in_channel // r, in_channel, kernel_size=(1, 1), stride=(1, 1))self.pool = nn.MaxPool2d(5, 1, padding=2)self.bias = nn.Parameter(torch.zeros([1, in_channel, 1, 1]))def forward(self, x):"""se操作[N, C, H, W]-》 [N, C, H, W]"""# 求解每个通道的权重信息alpha = F.relu(self.fc1(x))     # [N, C, H, W] [N, C//r, H, W]]alpha = torch.sigmoid(self.fc2(alpha))   # [N, C//r, H, W] [N, C, H, W]]alpha = self.pool(-1.0 * alpha) * -1.0   # [N, C, H, W] -> [N, C, H, W]alpha = F.relu(alpha + F.tanh(self.bias)) / 2.0  # [N, C, H, W] -> [N, C, H, W]# 加权合并x = x * alphareturn xclass BasicConv2d(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride, padding):super(BasicConv2d, self).__init__()self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)self.relu = nn.ReLU()def forward(self, x):return self.relu(self.conv(x))class Inception(nn.Module):def __init__(self, in_channels, out_channels, ):"""in_channels:输入通道数 eg:192out_channels:各个分支的输出通道数, eg:[[64], [96, 128], [16,32], [32]]"""super(Inception, self).__init__()self.branch1 = nn.Sequential(BasicConv2d(in_channels, out_channels[0][0], kernel_size=1, stride=1, padding=0))self.branch2 = nn.Sequential(BasicConv2d(in_channels, out_channels[1][0], kernel_size=1, stride=1, padding=0),BasicConv2d(out_channels[1][0], out_channels[1][1], kernel_size=3, stride=1, padding=1))self.branch3 = nn.Sequential(BasicConv2d(in_channels, out_channels[2][0], kernel_size=1, stride=1, padding=0),BasicConv2d(out_channels[2][0], out_channels[2][1], kernel_size=5, stride=1, padding=2))self.branch4 = nn.Sequential(nn.MaxPool2d(3, 1, padding=1),BasicConv2d(in_channels, out_channels[3][0], kernel_size=1, stride=1, padding=0))def forward(self, x):x1 = self.branch1(x)    # [N, C, H, W] -> [N, C1, H, W]x2 = self.branch2(x)    # [N, C, H, W] -> [N, C2, H, W]x3 = self.branch3(x)    # [N, C, H, W] -> [N, C3, H, W]x4 = self.branch4(x)    # [N, C, H, W] -> [N, C4, H, W]x = torch.concat([x1, x2, x3, x4], dim=1)   # [N, C1+C2+C3+C4, H, W]return xclass SeNetInception(nn.Module):def __init__(self, in_channels, out_channels, r=16):super(SeNetInception, self).__init__()self.block = Inception(in_channels, out_channels)se_in_channels = int(sum(r[-1] for r in out_channels))self.se_block = SeModule(se_in_channels, r)def forward(self, x):x = self.block(x)x = self.se_block(x)return xclass SeNetInceptionV2(nn.Module):def __init__(self, in_channels, out_channels, r=16):super(SeNetInceptionV2, self).__init__()self.block = Inception(in_channels, out_channels)se_in_channels = int(sum(r[-1] for r in out_channels))self.se_block = SeModuleV2(se_in_channels, r)def forward(self, x):x = self.block(x)x = self.se_block(x)return xclass GoogLeNet(nn.Module):def __init__(self, num_class, add_aux_stage=False):super(GoogLeNet, self).__init__()_inception = SeNetInceptionV2self.stage1 = nn.Sequential(BasicConv2d(3, 64, 7, 2, 3),nn.MaxPool2d(3, 2, padding=1),# nn.LocalResponseNorm(size=10),BasicConv2d(64, 64, 1, 1, 0),BasicConv2d(64, 192, 3, 1, 1),nn.MaxPool2d(3, 2, padding=1),_inception(192, [[64], [96, 128], [16, 32], [32]]), #inception3a_inception(256, [[128], [128, 192], [32, 96], [64]]), #inception3bnn.MaxPool2d(3, 2, padding=1),_inception(480, [[192], [96, 208], [16, 48], [64]]) #inception4a)self.stage2 = nn.Sequential(_inception(512, [[160], [112, 224], [24, 64], [64]]),  # inception4b_inception(512, [[128], [128, 256], [24, 64], [64]]),  # inception4c_inception(512, [[112], [144, 288], [32, 64], [64]]),  # inception4d)self.stage3 = nn.Sequential(_inception(528, [[256], [160, 320], [32, 128], [128]]),  # inception4enn.MaxPool2d(3, 2, padding=1),_inception(832, [[256], [160, 320], [32, 128], [128]]),  # inception5a_inception(832, [[384], [192, 384], [48, 128], [128]]),  # inception5bGolbalAvgPool2d())self.classify = nn.Conv2d(1024, num_class, kernel_size=(1, 1), stride=(1, 1), padding=0)if add_aux_stage:self.aux_stage1 = nn.Sequential(nn.MaxPool2d(5, 3, padding=0),nn.Conv2d(512, 1024, kernel_size=(1, 1), stride=(1, 1), padding=0),nn.ReLU(),nn.AdaptiveAvgPool2d(output_size=(2, 2)),nn.Flatten(1),nn.Linear(4096, 2048),nn.Dropout(p=0.4),nn.ReLU(),nn.Linear(2048, num_class))self.aux_stage2 = nn.Sequential(nn.MaxPool2d(5, 3, padding=0),nn.Conv2d(528, 1024, kernel_size=(1, 1), stride=(1, 1), padding=0),nn.ReLU(),nn.AdaptiveAvgPool2d(output_size=(2, 2)),nn.Flatten(1),nn.Linear(4096, 2048),nn.Dropout(p=0.4),nn.ReLU(),nn.Linear(2048, num_class))else:self.aux_stage1 = Noneself.aux_stage2 = Nonedef forward(self, x):"""[N, C, H, W]"""z1 = self.stage1(x)     # [N, C, H, W] -> [N, 512, H1, W1]z2 = self.stage2(z1)    # [N, 512, H1, W1] ->  [N, 528, H2, W2]z3 = self.stage3(z2)    # [N, 528, H2, W2] ->  [N, 1024, 1, 1]# 三个决策分支输出scores3 = torch.squeeze(self.classify(z3))  # [N, 1024, 1, 1] -> [N, num_class, 1, 1] ->[N, num_class]if self.aux_stage1 is not None:score1 = self.aux_stage1(z1)score2 = self.aux_stage2(z2)return score1, score2, scores3else:return scores3def t1():net = GoogLeNet(num_class=4, add_aux_stage=True)loss_fn = nn.CrossEntropyLoss()_x = torch.rand(2, 3, 224, 224)_y = torch.tensor([0, 3], dtype=torch.long)  # 模拟的真是类别标签id_r1, _r2, _r3 = net(_x)  # 获取三个分支的预测值,可以用来和实际标签一起构架损失函数_loss1 = loss_fn(_r1, _y)_loss2 = loss_fn(_r2, _y)_loss3 = loss_fn(_r3, _y)_loss = _loss1 + _loss2, _loss3print(_r1)print(_r2)print(_r3)print(_r3.shape)traceed_script_module = torch.jit.trace(net.eval(), _x)traceed_script_module.save('./output/modules/googlenet_inception.pt')# 模型持久化torch.save(net, './output/modules/googlenet_inception.pkl')def t2():net1 = torch.load('./output/modules/googlenet_inception.pkl')net2 = GoogLeNet(num_class=4, add_aux_stage=False)# net2 中有部分参数没有恢复# net2 中没有这部分参数,但是入参的字典中传入该参数missing_keeys, unexpected_keys = net2.load_state_dict(net1.state_dict(), strict=False)if len(missing_keeys) >0 :raise ValueError(f"网络有部分参数没有恢复:{missing_keeys}")_x = torch.rand(2, 3, 224, 224)traceed_script_module = torch.jit.trace(net2.eval(), _x)traceed_script_module.save('./output/modules/googlenet_inception.pt')# 转换为onnx结构torch.onnx.export(model=net2.eval().cpu(),  # 给定模型对象args=_x,  # 给定模型forward的输出参数f= './output/modules/googlenet_inception_dynamic.onnx',  # 输出文件名称# training=TrainingMode.EVAL,do_constant_folding=True,input_names=['images'],  # 输入的tensor名称列表output_names=['scores'],  # 输出的tensor名称列表opset_version=12,# dynamic_axes=None   # 是否是动态结构dynamic_axes={'images': {0: 'n',2: 'h',3: 'w'},'label': {0: 'n'}})if __name__ == '__main__':# inception = Inception(192, [[64], [96, 128], [16, 32], [32]])# print(inception)# _x = torch.rand(4, 192, 100, 100)# _r = inception(_x)# print(_r.shape)t1()t2()

Residual Attention Networks

Residual Attention Networks利用Residual和Attention机制进行网络结构的堆叠,从而得到一个更深入的特征信息,在每个attention module中会做一个适应性的变化,采用上采样和下采样的结构。主要创新点:

Stacked Network Structure:堆叠多个attention module来构建网络结构。

Attention Residual Learning:一种优化思想,类似ResNet中基于残差的更新方式,可以让模型具有更好的性能。

Bottom-up top-down feedforward attention:基于下采样-上采样的机制,将特征权重加入到特征图中。

在这里插入图片描述

CNN-MobileNet

MobileNet是专门为了移动端和嵌入式深度学习应用设计的网络结构,主要是为了得到一个在较低配置资源上也具有非常好的效果的这个特性。其主要特点为:轻量化和直接使用stride=2的卷积代替池化层;其主要创新点:引入了depthwise separable convolutions(纵向可分离卷积),主要分解为两个更小的卷积操作:depthwise convolutions(深度卷积)和pointwiseconvolutions(逐点卷积),其主要目的是降低参数量和计算量。

输入特征图(DF ,DF ,M),输出特征图为(DG,DG,N)。

标准卷积核为(DK,DK,M,N)

在这里插入图片描述

将标准卷积分解为深度卷积和逐点卷积;其中深度卷积负责滤波作用(基于输入的featuremap提取更高阶的特征),核大小为(DK,DK,1,M);逐点卷积负责转换通道(合并之前的高阶特征信息,形成最终的feature map),核大小为(1,1,M,N)

在这里插入图片描述
在这里插入图片描述

深度卷积过程

对每个输入的通道分别进行卷积,得到不同的输出卷积结果。

逐点卷积过程

其实就是一个普通的11的卷积操作,做逐点卷积的主要原因是因为深度卷积过程中,是对于每个feature map单独提取特征的,做一个11的卷积,相当于多个输入featuremap的特征融合。

在这里插入图片描述

在这里插入图片描述

Width Multiplier: Thinner Models

引入第一个控制模型大小的超参数:宽度因子α(Width Multiplier),用于控制输入和输出的通道数,即将输入通道从M变成αM,输出通道从N变成αN。

α的一般取值为:[1.0, 0.75, 0.5, 0.25]

NOTE: 计算量和参数量减低了约α^2倍。

在这里插入图片描述

Resolution Multiplier: Reduced Representation

引入第二个模型大小控制参数:分辨率因子ρ(Resolution Multiplier),用于控制输入和内部层的表示,即输出层的分辨率控制。

常见输入通道为224,192,160和128,ρ取值范围一般(0,1]

在这里插入图片描述

MobileNet V2:

引入shortcut结构(残差结构)

使用1x1的卷积在depthwise之前进行feature map扩增。

在pointwise后使用linear激活函数代替relu激活函数,防止对于特征的破坏。

在这里插入图片描述

from pathlib import Path
import torch.jit
from PIL import Image
from torch import nn
from torchvision import models
import torch
from torchvision.transforms import transforms
from thop import profiledef calc_flops(my_net, inputs):if isinstance(inputs, list):inputs = tuple(inputs)elif not isinstance(inputs, tuple):inputs = (inputs,)flops, params = profile(my_net, inputs=inputs)print(f"总的浮点计数量:{flops}")print(f"总的参数量:{params}")def t1():calc_flops(my_net=nn.Sequential(nn.Linear(3, 5)),inputs=torch.randn(2, 3))if __name__ == '__main__':# path_dir = Path("./output/modules")# path_dir.mkdir(parents=True, exist_ok=True)# net = models.mobilenet_v2(pretrained=True)# print(net)# _x = torch.randn(4, 3, 224, 224)# modules = torch.jit.trace(net.eval(), _x)# modules.save(str(path_dir / 'mobile_v2.pt'))## net.eval().cpu()# tfs = transforms.ToTensor()## image_path = {#     '小狗': r'../datas/小狗.png',#     '小狗2': r'../datas/小狗2.png',#     '小猫': r'../datas/小猫.jpg',#     '飞机': r'../datas/飞机.jpg',#     '飞机2': r'../datas/飞机2.jpg'# }## out_dir = Path('./output/mobiliev2/features/')# for name in image_path.keys():#     img = Image.open(image_path[name]).convert("RGB")#     img = tfs(img)  # [3, H, W]#     img = img[None]  # [3, H, W] -> [1, 3, H, W]##     score = net(img)  # [1, 1000]#     prob = torch.softmax(score, dim=1)#     top5 = torch.topk(prob, 5, dim=1)#     print("=" * 100)#     print(name)#     print(top5)t1()

CNN-ShuffleNet

ShuffleNet是一种满足在受限条件下的高效基础网络结构,基于组群卷积*(Group Convolution)和深度可分离卷积(Depthwise SeparableConvolution)

简单的组群卷积会导致每个卷积操作仅从某些部分的输入通道数据中导出,会降低通道之间的信息流通,降低信息的表达能力,故在做GroupConvolution之前先做一个channel的shuffle操作,以保障信息的表达能力。

对于channel的shuffle操作,

有g*n的输出通道;reshape(g,n),转置为(n,g),扁平化,再分组作为下一层的输入

在这里插入图片描述

在这里插入图片描述

RepVGG

深度解读:RepVGG - 知乎 (zhihu.com)

https://zhuanlan.zhihu.com/p/353697121

图解RepVGG - 知乎 (zhihu.com)

https://zhuanlan.zhihu.com/p/352239591

训练使用多分支机构,模型拟合能力强

推理是将多分支合并,减少内存开支,加快运行速度

MobileOne

全网唯一复现!手机端 1ms 级延迟的主干网模型 MobileOne - 知乎 (zhihu.com)

https://zhuanlan.zhihu.com/p/614576582

FasterNet

BatchNorm

from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass BN(nn.Module):def __init__(self, num_features):super(BN, self).__init__()self.momentum = 0.1self.eps = 1e-8# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))self.register_buffer('_var',  torch.zeros([1, num_features, 1, 1]))self.running_mean: Optional[Tensor]self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))def forward(self, x):if self.training:_mean = torch.mean(x, dim=(0, 2, 3), keepdim=True)_var = torch.var(x, dim=(0, 2, 3), keepdim=True)self._mean = self.momentum * self._mean + (1 - self.momentum) * _meanself._var = self.momentum * self._var + (1 - self.momentum) * _varelse:_mean = self._mean_var = self._varz = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)bn = BN(num_features=12)bn.train()xs = [torch.randn(8, 12, 32, 32) for _ in range(10)]for _x in xs:bn(_x)print(bn._mean.view(-1))print(bn._var.view(-1))bn.eval()_r = bn(xs[0])print(_r.shape)# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(bn, str(path_dir / "bn_model.pkl"))torch.save(bn.state_dict(), str(path_dir / "bn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(bn.eval(), xs[0])traced_script_model.save("./output/modules/bn_model.pt")# 模拟模型恢复bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')print(bn2)

LN

from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass LN(nn.Module):def __init__(self, num_features, eps=1e-8):super(LN, self).__init__()# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解# self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))# self.register_buffer('_var',  torch.zeros([1, num_features, 1, 1]))# self.running_mean: Optional[Tensor]# self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.eps = epsdef forward(self, x):_mean = torch.mean(x, dim=(1, 2, 3), keepdim=True)_var = torch.var(x, dim=(1, 2, 3), keepdim=True)z = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = LN(num_features=12)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "ln_model.pkl"))torch.save(net.state_dict(), str(path_dir / "ln_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/ln_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)

GN

from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass GN(nn.Module):def __init__(self, num_features, groups, eps=1e-8):super(GN, self).__init__()assert num_features % groups == 0, "要求特征数必须整除"# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解# self.register_buffer('_mean', torch.zeros([1, num_features, 1, 1]))# self.register_buffer('_var',  torch.zeros([1, num_features, 1, 1]))# self.running_mean: Optional[Tensor]# self.running_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.eps = epsself.groups = groupsdef forward(self, x):n, c, h, w = x.shapecg = c // self.groupsx = x.view(n, self.groups, cg, h, w)_mean = torch.mean(x, dim=(2, 3, 4), keepdim=True)_var = torch.var(x, dim=(2, 3, 4), keepdim=True)x = (x - _mean) / torch.sqrt(_var + self.eps)x = x.view(n, c, h, w)z = x * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = GN(num_features=12, groups=3)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "gn_model.pkl"))torch.save(net.state_dict(), str(path_dir / "gn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/gn_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)

SN

from pathlib import Path
from typing import Optionalimport torch
import torch.nn as nn
from torch import Tensor
from torch.nn import modulesclass SN(nn.Module):def __init__(self, num_features):super(SN, self).__init__()self.momentum = 0.1self.eps = 1e-8# register_buffer:将属性当Parameter处理,唯一区别就是不参与反向传播的梯度求解self.register_buffer('running_bn_mean', torch.zeros([1, num_features, 1, 1]))self.register_buffer('running_bn_var',  torch.zeros([1, num_features, 1, 1]))self.running_bn_mean: Optional[Tensor]self.running_bn_var: Optional[Tensor]self.gamma = nn.Parameter(torch.ones([1, num_features, 1, 1]))self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))self.w = nn.Parameter(torch.ones([3]))def get_bn(self, x):if self.training:_bn_mean = torch.mean(x, dim=(0, 2, 3), keepdim=True)_bn_var = torch.var(x, dim=(0, 2, 3), keepdim=True)self.running_bn_mean = self.momentum * self.running_bn_mean + (1 - self.momentum) * _bn_meanself.running_bn_var = self.momentum * self.running_bn_var + (1 - self.momentum) * _bn_varelse:_bn_mean = self.running_bn_mean_bn_var = self.running_bn_varreturn _bn_mean, _bn_vardef get_ln(self, x):_bn_mean = torch.mean(x, dim=(1, 2, 3), keepdim=True)_bn_var = torch.var(x, dim=(1, 2, 3), keepdim=True)return _bn_mean, _bn_vardef get_in(self, x):_bn_mean = torch.mean(x, dim=(2, 3), keepdim=True)_bn_var = torch.var(x, dim=(2, 3), keepdim=True)return _bn_mean, _bn_vardef forward(self, x):_bn_mean, _bn_var = self.get_bn(x)_ln_mean, _ln_var = self.get_ln(x)_in_mean, _in_var = self.get_in(x)w = torch.softmax(self.w, dim=0)bn_w, ln_w, in_w = w[0], w[1], w[2]_mean = _bn_mean * bn_w + _ln_mean * ln_w + _in_mean * in_w_var = _bn_var * bn_w + _ln_var * ln_w + _in_var * in_wz = (x - _mean) / torch.sqrt(_var + self.eps) * self.gamma + self.betareturn zif __name__ == '__main__':torch.manual_seed(28)path_dir = Path("./output/modules")path_dir.mkdir(parents=True, exist_ok=True)device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = SN(num_features=12)net = net.to(device)net.train()xs = [torch.randn(8, 12, 32, 32).to(device) for _ in range(10)]for _x in xs:net(_x)net.eval()_r = net(xs[0])print(_r.shape)net = net.cpu()# 模拟模型保存# state_dict:获取当前所有参数(Parameter + register_buffer)torch.save(net, str(path_dir / "sn_model.pkl"))torch.save(net.state_dict(), str(path_dir / "sn_params.pkl"))# pt结构保存traced_script_model = torch.jit.trace(net.eval(), xs[0].cpu())traced_script_model.save("./output/modules/sn_model.pt")# # 模拟模型恢复# bn1 = torch.load(str(path_dir / "bn_model.pkl"), map_location='cpu')# bn2 = torch.load(str(path_dir / "bn_params.pkl"), map_location='cpu')# print(bn2)

算子融合

属于模型量化中的一个小的分支,通过在推理之前对推理的链路婕儿沟就行算子的合并,从而降低运行的耗时。

基本上不影响模型的预测结果,但能够融合的算子只占一部分

常见的结构&模块:

  1. Conv + Bn

  2. Linear + Bn

  3. RepVGG

    conv+bn合并
    from pathlib import Pathimport torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torch._C._onnx import TrainingModeclass Cnov(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride, padding):super(Cnov, self).__init__()self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)self.bn = nn.BatchNorm2d(out_channels)self.act = nn.ReLU()def forward(self, x):return self.act(self.bn(self.conv(x)))def forward_fuse(self, x):return self.act(self.conv(x))class NetWork(nn.Module):def __init__(self, num_classes):super(NetWork, self).__init__()self.features = nn.Sequential(Cnov(3, 64, 3, 1, 1),Cnov(64, 128, 3, 2, 1),     # 下采样Cnov(128, 128, 3, 1, 1),Cnov(128, 256, 3, 2, 1),    # 下采样Cnov(256, 256, 3, 1, 1),nn.AdaptiveMaxPool2d((4, 4)))self.classify = nn.Sequential(nn.Linear(256*4*4, 256),nn.ReLU(),nn.Linear(256, num_classes))def forward(self, x):z = self.features(x)z = z.flatten(1)z = self.classify(z)return zdef t0():net = NetWork(10)print(net)loss_fn = nn.CrossEntropyLoss()train_opt = optim.SGD(net.parameters(), lr=0.0001)n = 20xs = [torch.rand(8, 3, 32, 32) for _ in range(n)]ys = [torch.randint(10, size=(8,)) for _ in range(n)]for epoch in range(5):for i in range(n):_x = xs[i]_y = ys[i]loss = loss_fn(net(_x), _y)train_opt.zero_grad()loss.backward()train_opt.step()print(f"epoch:{epoch}, batch:{i}, loss:{loss.item():.5f}")path_dir = Path("./output/modules/01")path_dir.mkdir(parents=True, exist_ok=True)torch.save(net.eval(), str(path_dir / "module.pkl"))def export(model_dir, model_path=None, name='module'):model_dir = Path(model_dir)if model_path is None:model_path = model_dir / 'module.pkl'net = torch.load(model_path or (model_dir / 'module.pkl'), map_location='cpu')net.eval().cpu()example = torch.rand(1, 3, 32, 32)traces_script_module = torch.jit.trace(net, example)traces_script_module.save(model_dir / f'{name}.pt')torch.onnx.export(model=net,args=example,f=model_dir / f'{name}.onnx',training=TrainingMode.EVAL,input_names=['images'],output_names=['scores'],opset_version=12,dynamic_axes={'images': {0: 'batch'},'scores': {0: "batch"}})def fuse_conv_bn(conv: nn.Conv2d, bn:nn.BatchNorm2d):fusedconv = nn.Conv2d(in_channels=conv.in_channels,out_channels=conv.out_channels,kernel_size=conv.kernel_size,stride=conv.stride,padding=conv.padding,groups=conv.groups,bias=True).requires_grad_(False).to(conv.weight.device)# 合并weightw_bn = bn.weight.div(torch.sqrt(bn.eps + bn.running_var)) # 构建一个主对角线有数值,其他为欸欸之没有的操作w_bn_conv = w_bn[:, None, None, None]   # [OC] -> [OC,1,1,1]fusedconv.weight.copy_(conv.weight.clone() * w_bn_conv)# 合并biasconv_bias = torch.zeros(conv.out_channels, device=conv.weight.device) if conv.bias is None else conv.bias.clone()fusedconv.bias.copy_((conv_bias - bn.running_mean) * w_bn + bn.bias)return fusedconvdef fuse_modules(model_dir, name="new_model"):model_dir = Path(model_dir)net = torch.load(model_dir / 'module.pkl', map_location='cpu')net.eval().cpu()# 模型模块合并,合并conv和bnfor m in net.modules():if type(m) is Cnov:# 合并m.conv = fuse_conv_bn(m.conv, m.bn)delattr(m, 'bn')    # 删除m中的bn属性m.forward = m.forward_fuse  # 方法的赋值torch.save(net.cpu(), str(model_dir / f"{name}.pkl"))example = torch.rand(1, 3, 28, 28)traces_script_module = torch.jit.trace(net, example)traces_script_module.save(model_dir / f'{name}.pt')export(model_dir=model_dir,model_path=str(model_dir / f"{name}.pkl"),name=name)print("nih")def tt_fuse(model_dir):model_dir = Path(model_dir)net1 = torch.jit.load(str(model_dir/'module.pt'),  map_location='cpu')net1.eval().cpu()net2 = torch.jit.load(str(model_dir/'new_model.pt'), map_location='cpu')net2.eval().cpu()x = torch.rand(1, 3, 32, 32)r1 = net1(x)r2 = net2(x)print(r1 - r2)def tt(model_dir):model_dir = Path(model_dir)net = torch.load(model_dir / 'module.pkl', map_location='cpu')net.eval().cpu()# 调用torch的量化接口fused_m = torch.quantization.fuse_modules(model=net,modules_to_fuse=[['features.0.conv', 'features.0.bn', 'features.0.act'],['features.1.conv', 'features.1.bn', 'features.1.act'],['features.2.conv', 'features.2.bn', 'features.2.act'],['features.3.conv', 'features.3.bn', 'features.3.act'],['features.4.conv', 'features.4.bn', 'features.4.act'],])print(fused_m)x = torch.rand(4, 3, 28, 28)r1 = net(x)r2  = fused_m(x)print(r1 - r2)torch.save(fused_m.cpu(), str(model_dir / f"fuse_model.pkl"))export(model_dir=model_dir,model_path=str(model_dir / f"fuse_model.pkl"),name="fuse_model")if __name__ == '__main__':# t0()# export(model_dir="./output/modules/01")# fuse_modules(model_dir="./output/modules/01")# tt_fuse(model_dir="./output/modules/01")tt("./output/modules/01")
    
    RepVGG合并
    import torch
    import torch.nn as nn
    import torch.nn.functional as  Fdef t0():_x = torch.rand(4, 9, 24, 24)conv1 = nn.Conv2d(9, 9, kernel_size=(1, 1), padding=0, stride=(1, 1))conv3 = nn.Conv2d(9, 9, kernel_size=(3, 3), padding=1, stride=(1, 1))r1 = conv1(_x) + conv3(_x)print(r1.shape)# 单链路conv = nn.Conv2d(9, 9, kernel_size=(3, 3), padding=1, stride=(1, 1)).requires_grad_(False)conv1_weight = F.pad(conv1.weight.clone(), [1, 1, 1, 1])conv1_bias = conv1.bias.clone()conv3_weight = conv3.weight.clone()conv3_bias = conv3.bias.clone()conv.weight.copy_(conv3_weight + conv1_weight)conv.bias.copy_(conv3_bias + conv1_bias)r2 = conv(_x)print(r2.shape)r = torch.abs(r1 - r2)print(torch.max(r))if __name__ == '__main__':t0()
    

鸢尾花数据集

import os
from pathlib import Pathimport torch
from  sklearn.datasets import load_iris
import torch.nn  as nn
from torch.utils.tensorboard import SummaryWriterfrom numpy_dataset import bulid_dataloader
impor t torch.optim as optim
import numpy as np
from metrics import Accuracyclass IrisNetWork(nn.Module):def __init__(self):super(IrisNetWork, self).__init__()self.classify = nn.Sequential(nn.Linear(4, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 32),nn.ReLU(),nn.Linear(32, 3))def forward(self, x):return self.classify(x)def save_model(path, net, epoch, train_batch, test_batch):if not os.path.exists(os.path.dirname(path)):os.mkdir(os.path.dirname(path))torch.save(net, path)torch.save({'net': net,'epoch': epoch,'train batch': train_batch,'test batch': test_batch}, path)def save(obj,path):torch.save(obj, path)def load(path, net):print(f"模型恢复:{path}")ss_mmodel = torch.load(path, map_location='cpu')net.load_state_dict(state_dict=ss_mmodel['net'].state_dict(), strict=True)start_epoch = ss_mmodel['epoch']best_acc = ss_mmodel['acc']train_batch = ss_mmodel['train_batch']test_batch = ss_mmodel['test_batch']return start_epoch, best_acc, train_batch, test_batchdef training(restore_path=None):root_dir = Path('./output/01')summary_dir = root_dir / 'summary'if not summary_dir.exists():summary_dir.mkdir(parents=True)checkout_dir = root_dir / 'model'if not checkout_dir.exists():checkout_dir.mkdir(parents=True)last_path = checkout_dir / 'last.pkl'best_path = checkout_dir / 'best.pkl'final_path = checkout_dir / 'finall.pkl'total_epoch = 100start_epoch = 0summary_initerval_batch = 2train_batch = 0test_batch = 0best_acc = -0.1save_interva_batch = 2# 1. 定义数据加载器X, Y = load_iris(return_X_y=True)X = X.astype('float32')Y = Y.astype('int64')train_dataloader, test_dataloader, test_x, test_y = bulid_dataloader(X, Y, 0.1, 8)# 2.定义模型net = IrisNetWork()loss_fn = nn.CrossEntropyLoss()opt = optim.SGD(params=net.parameters(), lr=0.01)acc_fn = Accuracy()# # 3.模型恢复if best_path.exists():start_epoch, best_acc, train_batch, test_batch = load(best_path, net)elif final_path.exists():start_epoch, best_acc, train_batch, test_batch = load(final_path, net)# 4.定义可视化输出writer =SummaryWriter(log_dir=summary_dir)writer.add_graph(net, torch.rand(3, 4))# 5. 遍历训练模型for epoch in range(start_epoch, total_epoch+start_epoch):# 5.1训练net.train()train_loss = []train_true, train_total = 0, 0for x, y in train_dataloader:# 前向过程scores = net(x)loss = loss_fn(scores, y)n, acc = acc_fn(scores, y)# 反向过程opt.zero_grad()loss.backward()opt.step()loss = loss.item()acc = acc.item()train_total += ntrain_true += n * accif train_batch % summary_initerval_batch == 0:print(f"epoch:{epoch}, train batch:{train_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('train_loss', loss, global_step=train_batch)writer.add_scalar('train_acc', acc, global_step=train_batch)train_batch += 1train_loss.append(loss)# 评估net.eval()test_loss = []test_true, test_total = 0, 0with torch.no_grad():for x, y in test_dataloader:# 前向过程scores = net(x)loss = loss_fn(scores, y)n, acc = acc_fn(scores, y)loss = loss.item()acc = acc.item()test_total += ntest_true = n * accprint(f"epoch:{epoch}, test batch:{test_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('test_loss', loss, global_step=test_batch)writer.add_scalar('test_acc', acc, global_step=test_batch)test_batch += 1test_loss.append(loss)# 5.3 epoch街二段的信息可视化train_acc = train_true / train_totaltest_acc= test_true / test_totalwriter.add_scalars('loss', {'train': np.mean(train_loss), 'test':np.mean(test_loss)}, global_step=epoch)writer.add_scalars('acc', {'train': train_acc, 'test': test_acc}, global_step=epoch)# TODO:自己加入提前结束训练的逻辑判断writer.close()# 5. 模型持久化if test_acc > best_acc:# 最优模型保存obj = {'net':net,'epoch':epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (checkout_dir / 'best.pkl').absolute())best_acc = test_accif epoch % save_interva_batch == 0:obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (last_path).absolute())# 6.最终模型持久化obj = {'net': net,'epoch': start_epoch + total_epoch - 1,'train_btch': train_batch,'test_batch': test_batch,'best_acc': test_acc}save(obj, (checkout_dir / 'finall.pkl').absolute())writer.close()def export(model_dir):"""NOTE:可以通过netron(https://netron.app)来看网络结构将训练好的模型转换成可以支持多平台部署的结构,常用的有:pt:Torch框架跨语言的结构onnx:一种比较通用的深度学习模型框架结构tensorRt:先转换成onnx,然后在进行转换使用TensorRT进行GPU加速openvino:先转换成onnx,然后在进行转换使用TensorRT进行CPU加速"""model_dir = Path(model_dir)# 模型恢复net = torch.load(model_dir / 'best.pkl', map_location='cpu')['net']net.eval().cpu()# 模型转换为pt结构example = torch.rand(1, 4)traced_script_module = torch.jit.trace(net, example)traced_script_module.save(model_dir / 'best.pt')# 转换为onnx结构torch.onnx.export(model=net, # 给定模型对象args=example,   # 给定模型forward的输出参数f=model_dir / 'best_dynamic.onnx',  # 输出文件名称# training=_C_onnx.TrainingMode.EVAL,input_names=['features'], # 输入的tensor名称列表output_names=['label'], # 输出的tensor名称列表opset_version=12,# dynamic_axes=None   # 是否是动态结构dynamic_axes={'features':{0:'batch'},'label':{0:'bath'}})pass@torch.no_grad()
def tt_load_model(module_dir):module_dir = Path(module_dir)# python的模型恢复net1 = torch.load(module_dir / 'best.pkl', map_location='cpu')['net']net1.eval().cpu()# pytorch script模型恢复net2 = torch.jit.load(module_dir / 'best.pt', map_location='cpu')net2.eval().cpu()# onnx模型恢复import onnxruntimenet3 = onnxruntime.InferenceSession(module_dir / 'best_dynamic.onnx')x = torch.rand(2, 4)print(net1(x))print(net2(x))print(net3.run(['label'], input_feed={'features': x.detach().numpy()}))if __name__ == '__main__':# training()# export(#     model_dir='output/01/model'# )tt_load_model(module_dir='output/01/model')
"""
Iris预测模型处理器代码
"""
import os.pathimport numpy as np
import onnxruntime
import torch.jitdef softmx(scores):"""求解softmax概率值scores:numpy对象 [n,m]return 求解属于m个类别的概率值"""a = np.exp(scores)b = np.sum(a, axis=1, keepdims=True)p = a / breturn pclass IrisProcessor(object):def __init__(self, model_path):"""模型初始化支持pt。onnx"""super(IrisProcessor, self).__init__()model_path = os.path.abspath(model_path)_, ext = os.path.splitext(model_path.lower())self.pt, self.onnx = False, Falseif ext == '.pt':model = torch.jit.load(model_path, map_location='cpu')model.eval().cpu()self.modedl = modelself.pt = Trueelif ext == '.onnx':session = onnxruntime.InferenceSession(model_path)self.session = sessionself.input_name = 'features'self.output_name = 'label'self.onnx = Trueelse:raise ValueError(f'当前仅支持pt和onnx格式,当前文件类型为:{model_path}')self.classes = ['类别1', '类别2', '类别3']print(f"模型恢复成功:pt -->{self.pt}; onnx --> {self.onnx}")def _process_after_model(self, x, scores):"""后处理逻辑x:原始属性x, numpy [n, 4]scores: 模型预测的置信度 numpy类型 [n, 3]return :每个样本返回对应的预测类别名称,id以及概率只,以dict返回"""pred_probas = softmx(scores) # [n, 3]pred_indexes = np.argmax(scores, axis=1)result = []for k, idx in enumerate(pred_indexes):r = {'id': idx,'label': self.classes[idx],'proba': pred_probas[k][idx]}result.append(r)return result@torch.no_grad()def _predict_with_pt(self, x):tensor_x = torch.from_numpy(x).to(torch.float)score = self.modedl(tensor_x) # [n, 4] -> [n, 3]return self._process_after_model(x, score.numpy())def _preedict_with_onnx(self, x):onnx_x = x.astype('float32')score = self.session.run([self.output_name], input_feed={self.input_name: onnx_x}) #[n, 4] -> [n, 3]score = score[0]    # 获取去第一个输出结果,output_name对应的结果return self._process_after_model(x, score)def predict(self, x):"""模型预测方法,输入鸢尾花原始特征属性,返回对应标签x:numpy对象,形状是[n, 4],n个样本,4个属性return: list对象,形状为[n]"""if self.pt:return self._predict_with_pt(x)elif self.onnx:return self._preedict_with_onnx(x)else:raise ValueError("当前环境初始化异常!")if __name__ == '__main__':# processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best.pt")# r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [0.2, 1.3, 0.5, 0.2]]))# print(r)processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [3.2, 1.3, 4.7, 2.2]]))print(r)
"""
Iris预测模型处理器代码
"""
import os.pathimport numpy as np
import onnxruntime
import torch.jitdef softmx(scores):"""求解softmax概率值scores:numpy对象 [n,m]return 求解属于m个类别的概率值"""a = np.exp(scores)b = np.sum(a, axis=1, keepdims=True)p = a / breturn pclass IrisProcessor(object):def __init__(self, model_path):"""模型初始化支持pt。onnx"""super(IrisProcessor, self).__init__()model_path = os.path.abspath(model_path)_, ext = os.path.splitext(model_path.lower())self.pt, self.onnx = False, Falseif ext == '.pt':model = torch.jit.load(model_path, map_location='cpu')model.eval().cpu()self.modedl = modelself.pt = Trueelif ext == '.onnx':session = onnxruntime.InferenceSession(model_path)self.session = sessionself.input_name = 'features'self.output_name = 'label'self.onnx = Trueelse:raise ValueError(f'当前仅支持pt和onnx格式,当前文件类型为:{model_path}')self.classes = ['类别1', '类别2', '类别3']print(f"模型恢复成功:pt -->{self.pt}; onnx --> {self.onnx}")def _process_after_model(self, x, scores):"""后处理逻辑x:原始属性x, numpy [n, 4]scores: 模型预测的置信度 numpy类型 [n, 3]return :每个样本返回对应的预测类别名称,id以及概率只,以dict返回"""pred_probas = softmx(scores) # [n, 3]pred_indexes = np.argmax(scores, axis=1)result = []for k, idx in enumerate(pred_indexes):r = {'id': int(idx),     # 将numpy的int类型转为python的int类型'label': self.classes[idx],'proba': float(pred_probas[k][idx])     # 将numpy的int类型转为python的float类型}result.append(r)return result@torch.no_grad()def _predict_with_pt(self, x):tensor_x = torch.from_numpy(x).to(torch.float)score = self.modedl(tensor_x) # [n, 4] -> [n, 3]return self._process_after_model(x, score.numpy())def _preedict_with_onnx(self, x):onnx_x = x.astype('float32')score = self.session.run([self.output_name], input_feed={self.input_name: onnx_x}) #[n, 4] -> [n, 3]score = score[0]    # 获取去第一个输出结果,output_name对应的结果return self._process_after_model(x, score)def predict(self, x):"""模型预测方法,输入鸢尾花原始特征属性,返回对应标签x:numpy对象,形状是[n, 4],n个样本,4个属性return: list对象,形状为[n]"""if self.pt:return self._predict_with_pt(x)elif self.onnx:return self._preedict_with_onnx(x)else:raise ValueError("当前环境初始化异常!")if __name__ == '__main__':# processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best.pt")# r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [0.2, 1.3, 0.5, 0.2]]))# print(r)processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")r = processor.predict(np.asarray([[5, 2.3, 1.5, 2.2], [3.2, 1.3, 4.7, 2.2]]))print(r)
import numpy as npfrom iris_proceeessor import IrisProcessorprocessor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")while True:x = input("请输入特征属性,用空格隔开:")if "q" == x:breakx = x.split(" ")if len(x) != 4:print(f"输入的特征属性异常,请输入4维特征属性:{x}")continuex = np.asarray([x])r = processor.predict(x)print(f"预测结果为:{r}")
from flask import Flask, request, jsonify
import numpy as npfrom study_code.iris_proceeessor import IrisProcessorapp = Flask(__name__)
processor = IrisProcessor(r"D:\my_program\study_code\output\01\model\best_dynamic.onnx")@app.route('/')
def index():return "Iris数据分类模型接口服务"@app.route("/predict")
def predict():try:# get方式请求,碧玺给定参数features,使用‘,’进行特征分割,使用‘;’进行样本分割features = request.args.get('features')if features is None:return jsonify({'code': 1, 'msg': '参数异常,必须给定有效的features参数'})x = [xx.split(",") for xx in features.split(";")]x = np.asarray(x, dtype='float32')if len(x) == 0:return jsonify({'code': 2, 'msg': f'参数异常,必须给定有效的features参数:{features}'})if len(x[0]) != 4:return jsonify({'code': 3, 'msg': f'参数维度异常,必须给定有效的features参数:{features}'})print(x)r = processor.predict(x)print(r)return jsonify({'code': 0, 'data': r, 'msg': "成功!"})except Exception as e:return jsonify({'code': 4, "msg": f"服务器异常“{e}"})
import os
import sys#将当前文件所在文件夹添加大环境变量
sys.path.append(os.path.dirname(__file__))if __name__ == '__main__':from app import appapp.run(host='0.0.0.0', port=9999)

手写体识别

import os
from datetime import datetime
from pathlib import Pathimport matplotlib.pyplot as plt
import torch
from sklearn.datasets import load_iris
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transformsfrom study_code.numpy_dataset import bulid_dataloader
import torch.optim as optim
import numpy as np
from study_code.metrics  import Accuracyclass NetWork(nn.Module):def __init__(self, in_features, num_classes, units=None):super(NetWork, self).__init__()if units is None:units = [1024, 2048, 512]self.in_features = in_featuresself.num_classes = num_classeslayers = []for unit in units:layers.append(nn.Linear(in_features=in_features, out_features=unit))layers.append(nn.ReLU())in_features = unitlayers.append(nn.Linear(in_features=in_features, out_features=self.num_classes))self.classify = nn.Sequential(*layers)def forward(self, x):x = x.reshape(-1, self.in_features) #[N, 1, 28, 28] -> [N, 1 * 28 * 28]return self.classify(x)def save_model(path, net, epoch, train_batch, test_batch):if not os.path.exists(os.path.dirname(path)):os.mkdir(os.path.dirname(path))torch.save(net, path)torch.save({'net': net,'epoch': epoch,'train batch': train_batch,'test batch': test_batch}, path)def save(obj, path):torch.save(obj, path)def load(path, net):print(f"模型恢复:{path}")ss_model = torch.load(path, map_location='cpu')net.load_state_dict(state_dict=ss_model['net'].state_dict(), strict=True)start_epoch = ss_model['epoch']best_acc = ss_model['acc']train_batch = ss_model['train_batch']test_batch = ss_model['test_batch']return start_epoch, best_acc, train_batch, test_batchdef training(restore_path=None):# now = datetime.now().strftime("%%%y%%m%d%H%M%S")root_dir = Path('./output/02')summary_dir = root_dir / 'summary'if not summary_dir.exists():summary_dir.mkdir(parents=True)checkout_dir = root_dir / 'model'if not checkout_dir.exists():checkout_dir.mkdir(parents=True)last_path = checkout_dir / 'last.pkl'best_path = checkout_dir / 'best.pkl'final_path = checkout_dir / 'final.pkl'total_epoch = 5start_epoch = 0summary_initerval_batch = 2train_batch = 0test_batch = 0best_acc = -0.1save_interva_batch = 2batch_size = 8# 1. 定义数据加载器train_dataset = datasets.MNIST(root='../../datas/MNIST',train=True,transform=transforms.ToTensor(),  # 定义数据集转换方式,默认是numpydownload=True)train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)test_dataset = datasets.MNIST(root='../../datas/MNIST',train=False,transform=transforms.ToTensor(),  # 定义数据集转换方式,默认是numpydownload=True)test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=batch_size * 2)# 2.定义模型net = NetWork(in_features=1 * 28 * 28, num_classes=10)loss_fn = nn.CrossEntropyLoss()opt = optim.SGD(params=net.parameters(), lr=0.01)acc_fn = Accuracy()# # 3.模型恢复if best_path.exists():start_epoch, best_acc, train_batch, test_batch = load(best_path, net)elif last_path.exists():start_epoch, best_acc, train_batch, test_batch = load(last_path, net)# 4.定义可视化输出writer = SummaryWriter(log_dir=summary_dir)writer.add_graph(net, torch.rand(3, 1, 28, 28))# 5. 遍历训练模型for epoch in range(start_epoch, total_epoch+start_epoch):# 5.1训练net.train()train_loss = []train_true, train_total = 0, 0for batch_img, bacth_label in train_dataloader:# 前向过程scores = net(batch_img)loss = loss_fn(scores, bacth_label)n, acc = acc_fn(scores, bacth_label)# 反向过程opt.zero_grad()loss.backward()opt.step()loss = loss.item()acc = acc.item()train_total += ntrain_true += n * accif train_batch % summary_initerval_batch == 0:print(f"epoch:{epoch}, train batch:{train_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('train_loss', loss, global_step=train_batch)writer.add_scalar('train_acc', acc, global_step=train_batch)train_batch += 1train_loss.append(loss)# 评估net.eval()test_loss = []test_true, test_total = 0, 0with torch.no_grad():for batch_img, bacth_label in test_dataloader:# 前向过程scores = net(batch_img)loss = loss_fn(scores, bacth_label)n, acc = acc_fn(scores, bacth_label)loss = loss.item()acc = acc.item()test_total += ntest_true = n * accprint(f"epoch:{epoch}, test batch:{test_batch}, loss:{loss:.3f}, acc:{acc:.3f}")writer.add_scalar('test_loss', loss, global_step=test_batch)writer.add_scalar('test_acc', acc, global_step=test_batch)test_batch += 1test_loss.append(loss)# 5.3 epoch街二段的信息可视化train_acc = train_true / train_totaltest_acc= test_true / test_totalwriter.add_scalars('loss', {'train': np.mean(train_loss), 'test':np.mean(test_loss)}, global_step=epoch)writer.add_scalars('acc', {'train': train_acc, 'test': test_acc}, global_step=epoch)# TODO:自己加入提前结束训练的逻辑判断writer.close()# 5. 模型持久化if test_acc > best_acc:# 最优模型保存obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, (checkout_dir / 'best.pkl').absolute())best_acc = test_accif epoch % save_interva_batch == 0:obj = {'net': net,'epoch': epoch,'train_batch': train_batch,'test_batch': test_batch,'acc': test_acc}save(obj, last_path.absolute())# 6.最终模型持久化obj = {'net': net,'epoch': start_epoch + total_epoch - 1,'train_batch': train_batch,'test_batch': test_batch,'best_acc': test_acc}save(obj, (checkout_dir / 'final.pkl').absolute())writer.close()def export(model_dir):"""NOTE:可以通过netron(https://netron.app)来看网络结构将训练好的模型转换成可以支持多平台部署的结构,常用的有:pt:Torch框架跨语言的结构onnx:一种比较通用的深度学习模型框架结构tensorRt:先转换成onnx,然后在进行转换使用TensorRT进行GPU加速openvino:先转换成onnx,然后在进行转换使用TensorRT进行CPU加速"""model_dir = Path(model_dir)# 模型恢复net = torch.load(model_dir / 'best.pkl', map_location='cpu')['net']net.eval().cpu()# 模型转换为pt结构example = torch.rand(1, 1, 28, 28)traced_script_module = torch.jit.trace(net, example)traced_script_module.save(model_dir / 'best.pt')# 转换为onnx结构torch.onnx.export(model=net,  # 给定模型对象args=example,   # 给定模型forward的输出参数f=model_dir / 'best_dynamic.onnx',  # 输出文件名称# training=_C_onnx.TrainingMode.EVAL,input_names=['images'], # 输入的tensor名称列表output_names=['scores'], # 输出的tensor名称列表opset_version=12,# dynamic_axes=None   # 是否是动态结构dynamic_axes={'images': {0: 'batch'},'scores': {0: 'batch'}})pass@torch.no_grad()
def tt_load_model(module_dir):module_dir = Path(module_dir)# python的模型恢复net1 = torch.load(module_dir / 'best.pkl', map_location='cpu')['net']net1.eval().cpu()# pytorch script模型恢复net2 = torch.jit.load(module_dir / 'best.pt', map_location='cpu')net2.eval().cpu()# onnx模型恢复import onnxruntimenet3 = onnxruntime.InferenceSession(module_dir / 'best_dynamic.onnx')x = torch.rand(2, 1, 28, 28)print(net1(x))print(net2(x))print(net3.run(['scores'], input_feed={'images': x.detach().numpy()}))img_path = r"D:\my_program\datas\MNIST\MNIST\images\4\53.png"img = plt.imread(img_path)[:, :, 0][None, None, :, :]   # [28, 28, 4] -> [1, 1, 28, 28]img = torch.from_numpy(img)print("=" * 100)print(net1(img))print(net2(img))print(net3(img))print(net3.run(['scores'], input_feed={'images': img.detach().numpy()}))if __name__ == '__main__':# training()export(model_dir='output/02/model')tt_load_model(module_dir='output/01/model')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/384828.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

光明乳业:以科技赋能品质,引领乳业绿色新未来

近日,光明乳业再次成为行业焦点,其在科技创新与绿色发展方面的卓越表现赢得了广泛赞誉。作为中国乳制品行业的领军企业,光明乳业始终坚守品质至上的原则,不断探索科技创新之路,致力于为消费者提供更高品质、更健康的乳…

webStorm 实时模板笔记

文章目录 1、单斜杠效果 2、双斜杠效果 3、控制台打印效果 1、单斜杠 /** $END$ */效果 2、双斜杠 /*** $END$* author Ikun* since $DATE$ $TIME$ */DATE date() ✔ TIME time() ✔效果 3、控制台打印 console.log("███████$EXPR_COPY$>>>>&a…

Internxt:适用于Linux开源安全云存储平台

有无数的云存储平台为您的文件提供安全可靠的存储空间。可在 Linux 上安装的热门云存储应用程序包括Dropbox、Nextcloud和Google Drive,遗憾的是,后者迄今为止不提供 Linux 客户端。 其他自托管选项包括OwnCloud、Pydio Cells、Seafile、Resilio和Synct…

暑期C++ printf和scanf的平替

有任何不懂的问题可以评论区留言&#xff0c;能力范围内都会一一回答 C中也有专门的输入和输出的方法 首先我们需要一个头文件&#xff0c;也就是#include<iostream> 然后根据我们命名空间的知识可知这个地方如果我们要使用必须先展开 可以全部展开比如using namespa…

Godot入门 04平台设计

新建创景&#xff0c;添加AnimatableBody2D节点。 添加Sprite2D节点 拖动图片 剪裁图片&#xff0c;吸附模式&#xff1a;像素吸附 添加CollisionShape2D&#xff0c;设置实际形状为矩形 重命名AnimatableBody2D节点为Platform&#xff0c;保存场景&#xff0c;拖动platform场景…

pikachu靶场之目录遍历、敏感信息泄露

一、目录遍历 漏洞概述 在web功能设计中,很多时候我们会要将需要访问的文件定义成变量&#xff0c;从而让前端的功能便的更加灵活。 当用户发起一个前端的请求时&#xff0c;便会将请求的这个文件的值(比如文件名称)传递到后台&#xff0c;后台再执行其对应的文件。 在这个过…

VSCode切换默认终端

我的VSCode默认终端为PowerShell&#xff0c;每次新建都会自动打开PowerShell。但是我想让每次都变为cmd&#xff0c;也就是Command Prompt 更改默认终端的操作方法如下&#xff1a; 键盘调出命令面板&#xff08;CtrlShiftP&#xff09;中,输入Terminal: Select Default Prof…

C++程序的UI界面闪烁问题的解决办法总结

Windows C++程序复杂的UI界面要使用多种绘图技术(使用GDI、GDI+、ddraw、D3D等绘图),并要贴图去美化,在窗口移动或者改变大小的时候可能会出现闪烁。下面罗列一下UI界面产生闪烁的几种可能的原因,并给出相应的解决办法。 1、原因一 如果熟悉显卡原理的话,调用GDI函数向屏…

重塑生态体系 深挖应用场景 萤石诠释AI时代智慧生活新图景

7月24日&#xff0c;“智动新生&#xff0c;尽在掌控”2024萤石夏季新品发布会在杭州举办。来自全国各地的萤石合作伙伴、行业从业者及相关媒体&#xff0c;共聚杭州&#xff0c;共同见证拥抱AI的萤石&#xff0c;将如何全新升级&#xff0c;AI加持下的智慧生活又有何不同。 发…

Git仓库拆分和Merge

1. 问题背景 我们原先有一个项目叫open-api&#xff0c;后来想要做租户独立发展&#xff0c;每个租户独立成一个项目&#xff0c;比如租户akc独立部署一个akc-open-api&#xff0c;租户yhd独立部署一个yhd-open-api&#xff0c;其中大部分代码是相同的&#xff0c;少量租户定制…

记录一次centos部署node高版本踩坑记录

前情回顾 大概在上周四的时候&#xff0c;我使用nuxt3把我上上周做的nuxt2项目重构了一遍&#xff0c;重构的时候感觉爽到飞起&#xff0c;vitevue3ts的感觉确实不错&#xff0c;再加上nuxt3的setup模板语法&#xff0c;写的得心应手 在项目写完之后我打算重新部署来替换之前…

学习笔记-系统框图传递函数公式推导

目录 *待了解 现代控制理论和自动控制理论区别 自动控制系统的组成 信号流图 1、系统框图 1.1、信号线、分支点、相加点 1.2、系统各环节间的连接 1.3、 相加点和分支点的等效移动&#xff08;比较点、引出点&#xff09; 2、反馈连接公式推导 2.1、前向通路传递函数…

android13 Settings动态显示隐藏某一项

总纲 android13 rom 开发总纲说明 目录 1.前言 2.确定目标设置项 3.修改参考 3.1 方法1 3.2 方法2 4.编译测试 5.彩蛋 1.前言 在Android 13系统中,动态显示或隐藏Settings应用中的某一项通常涉及到对Settings应用的内部逻辑进行修改。由于Settings应用是一个系统应用…

Vue3时间选择器datetimerange在数据库存开始时间和结束时间

♥️作者&#xff1a;小宋1021 &#x1f935;‍♂️个人主页&#xff1a;小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识&#xff0c;和大家一起努力呀&#xff01;&#xff01;&#xff01; &#x1f388;&#x1f388;加油&#xff01; 加油&#xff01…

【linux】Shell脚本三剑客之sed命令的详细用法攻略

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

Docker+consul容器服务的更新与发现

1、Consul概述 &#xff08;1&#xff09;什么是服务注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可用性&#xff0c;也不考虑服务的压力承载&#xff0c;服务之间调用单纯的通过接口访问。直到后来出现了多个节点…

mac清理软件哪个好用免费 MacBook电脑清理软件推荐 怎么清理mac

随着使用时间的增长&#xff0c;mac电脑会积累一些不必要的垃圾文件&#xff0c;这些文件会占用宝贵的存储空间&#xff0c;影响电脑的运行速度和稳定性。因此&#xff0c;定期清理mac电脑的垃圾文件是非常有必要的。市场上有许多优秀的Mac清理软件&#xff0c;包括一些出色的国…

打造一篇完美的【数学建模竞赛论文】:从准备到撰写的全面指南

目录 一、赛前准备 1.1 报名与纪律要求 1.2 MD5码上传 1.3 竞赛准备 1.4 时间分配 二、论文格式规范 2.1 摘要 2.2 参考文献 2.3 排版要求 三、建模过程与方法 3.1 问题分析与模型假设 3.2 模型构建与求解 3.3 结果分析与检验 四、论文撰写技巧 4.1 论文结构 4…

mybatis中的缓存(一级缓存、二级缓存)

文章目录 前言一、MyBatis 缓存概述二、一级缓存1_初识一级缓存2_一级缓存命中原则1_StatementId相同2_查询参数相同3_分页参数相同4_sql 语句5_环境 3_一级缓存的生命周期1_缓存的产生2_缓存的销毁3_网传的一些谣言 4_一级缓存核心源码5_总结 三、二级缓存1_开启二级缓存2_二级…

Java面试八股之BeanFactory和ApplicationContext有什么区别

BeanFactory和ApplicationContext有什么区别 在Spring框架中&#xff0c;BeanFactory和ApplicationContext都是用于管理和控制Bean的容器&#xff0c;但是它们之间存在一些关键的区别。下面是这些区别的总结&#xff1a; 功能和特性: BeanFactory是最基础的容器&#xff0c;…