简单的基于Transformer的滚动轴承故障诊断(Pytorch)

递归神经网络在很长一段时间内是序列转换任务的主导模型,其固有的序列本质阻碍了并行计算。因此,在2017年,谷歌的研究人员提出了一种新的用于序列转换任务的模型架构Transformer,它完全基于注意力机制建立输入与输出之间的全局依赖关系。在训练阶段,Transformer可以并行计算,大大减小了模型训练难度,提高了模型训练效率。Transformer由编码器和解码器两部分构成。其编解码器的子模块为多头注意力MHA和前馈神经网络FFN。此外,Transformer还利用了位置编码、层归一化、残差连接、dropout等技巧来增强模型性能。

图片

鉴于此,简单地采用Transformer对滚动轴承进行故障诊断,没有经过什么修改,效果不是很好,准确率较低,数据集采用江南大学轴承数据集。

import numpy as npimport torchimport torch.nn as nnimport torch.nn.functional as Fimport torch.optim as optimimport scipy.io as ioimport pandas as pdimport matplotlib.pyplot as pltfrom torch.utils.data import Dataset, DataLoaderd_k = 64d_v = 64class ScaledDotProductAttention(nn.Module):    def __init__(self):        super(ScaledDotProductAttention, self).__init__()    def forward(self, Q, K, V):        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)        weights = nn.Softmax(dim=-1)(scores)        context = torch.matmul(weights, V)        return context, weights
# Q = torch.randint(0, 9, (4, 8, 512)).to(torch.float32)# K = torch.randint(0, 4, (4, 8, 512)).to(torch.float32)# V = torch.randint(0, 2, (4, 8, 512)).to(torch.float32)# SDPA = ScaledDotProductAttention()# context, weights = SDPA(Q, K, V)
# print(weights.shape)# print(V.shape)# print(context.shape)d_embedding = 512n_heads = 2batch_size = 32seq_len = 4class MultiHeadAttention(nn.Module):    def __init__(self):        super(MultiHeadAttention, self).__init__()        self.W_Q = nn.Linear(d_embedding, d_k*n_heads)        self.W_K = nn.Linear(d_embedding, d_k*n_heads)        self.W_V = nn.Linear(d_embedding, d_v*n_heads)        self.Linear = nn.Linear(n_heads*d_v, d_embedding)        self.layer_norm = nn.LayerNorm(d_embedding)    def forward(self, Q, K, V):        residual, batch_size = Q, Q.size(0)        # input[batch_size, len, d_embedding]->output[batch_size, len, d_k*n_heads]->        # {view}->output[batch_size, len, n_heads, d_k]->{transpose}->output[batch_size, n_heads, len, d_k]        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)         k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)        context, weights = ScaledDotProductAttention()(q_s, k_s, v_s)        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads*d_v)        output = self.Linear(context)        output = self.layer_norm(output + residual)        return output, weights# data = torch.randn([4, 4, 512])# # [4, 4, 512]->[4, 4, 2, 64]->[4, 2, 4, 64]->{SDPA}->[4, 2, 4, 64]->{transpose}->[4, 4, 2, 64]->{view}->[4, 4, 128]->{Linear}->[4, 4, 512]# # q_s.shape[4, 2, 4, 64]=[batch_size, n_heads, len, d_v]->weight = q_s .* k_s = [4, 2, 4, 4]->weight .* q_v = [4, 2, 4, 64]# MHA = MultiHeadAttention()# output, weights = MHA(data, data, data)# print(output.shape, weights.shape)class PoswiseFeedForward(nn.Module):    def __init__(self, d_ff=1024):        super(PoswiseFeedForward, self).__init__()        self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)        self.layernorm = nn.LayerNorm(d_embedding)    def forward(self, inputs):        residual = inputs        output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))        output = self.conv2(output).transpose(1, 2)        output = self.layernorm(output + residual)        return output# data = torch.randn([4, 4, 512])# PFF = PoswiseFeedForward()# output = PFF(data)# print(output.shape)class EncoderLayer(nn.Module):    def __init__(self):        super(EncoderLayer, self).__init__()        self.enc_self_attn = MultiHeadAttention()        self.pos_ffn = PoswiseFeedForward()    def forward(self, enc_inputs):        enc_outputs, attn_weights = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs)        enc_outputs = self.pos_ffn(enc_outputs)        return enc_outputs, attn_weightsn_layers = 2num_classes = 4
class Encoder(nn.Module):    def __init__(self):        super(Encoder, self).__init__()        self.layers = nn.ModuleList(EncoderLayer() for _ in range(n_layers))        self.linear = nn.Linear(seq_len * d_embedding, num_classes)    def forward(self, enc_inputs):        for layer in self.layers:            enc_outputs, _ = layer(enc_inputs)        enc_outputs = enc_outputs.view(-1, seq_len * d_embedding)        enc_outputs = self.linear(enc_outputs)        return enc_outputsfrom sklearn.model_selection import train_test_splitfrom torch.utils.data import TensorDataset, DataLoaderimport os
gpu = "0"os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'os.environ['CUDA_VISIBLE_DEVICES'] = gpudevice = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
epochs = 200lr = 0.0001normal_data = torch.cat([torch.Tensor(pd.read_csv('./dataset/n600_3_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                           torch.Tensor(pd.read_csv('./dataset/n800_3_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                           torch.Tensor(pd.read_csv('./dataset/n1000_3_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32)], dim=0)           
inner_data = torch.cat([torch.Tensor(pd.read_csv('./dataset/ib600_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                          torch.Tensor(pd.read_csv('./dataset/ib800_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                          torch.Tensor(pd.read_csv('./dataset/ib1000_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32)])
outer_data = torch.cat([torch.Tensor(pd.read_csv('./dataset/ob600_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                          torch.Tensor(pd.read_csv('./dataset/ob800_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                          torch.Tensor(pd.read_csv('./dataset/ob1000_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32)])
roller_data = torch.cat([torch.Tensor(pd.read_csv('./dataset/tb600_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                           torch.Tensor(pd.read_csv('./dataset/tb800_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32),\                           torch.Tensor(pd.read_csv('./dataset/tb1000_2.csv', header=None).values[:409600].reshape(-1, seq_len, d_embedding)).to(torch.float32)])
data = torch.cat((normal_data, inner_data, outer_data, roller_data), dim=0)labels = torch.cat([torch.Tensor(np.repeat(i, normal_data.shape[0])).to(torch.long) for i in range(num_classes)])x_train, x_test, y_train, y_test = train_test_split(data, labels, test_size=0.3)
train_dataset = TensorDataset(x_train, y_train)test_dataset = TensorDataset(x_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)model = Encoder()model = model.to(device)
criterion = nn.CrossEntropyLoss()optimizer = optim.Adam(model.parameters(), lr=lr)
epoch_counter = []# train_counter = [i for i in range(1, EPOCH + 1)]# test_counter = [i for i in range(1, EPOCH + 1)]train_error = []test_error = []
train_acc = []train_loss = []test_acc = []test_loss = []
for epoch in range(epochs):    print('------------------------\nEpoch: %d------------------------' % (epoch + 1))    model.train()    sum_loss = 0.0    correct = 0.0    total = 0.0    for i, data in enumerate(train_loader, 0):        length = len(train_loader)        inputs, labels = data        inputs, labels = inputs.to(device), labels.to(device)        optimizer.zero_grad()        outputs = model(inputs)        loss = criterion(outputs, labels)        loss.backward()        optimizer.step()        sum_loss += loss.item()        _, predicted = torch.max(outputs.data, 1) # predicted denotes indice of the max number in output's second dimension        total += labels.size(0)        correct += predicted.eq(labels.data).cpu().sum()        print('[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% ' % (epoch + 1, (i + 1 + epoch * length), sum_loss / (i + 1), 100. * correct / total))    epoch_counter.append(epoch+1)    train_loss.append(sum_loss / len(train_loader))    train_acc.append(correct / total)    train_error.append(100. - 100. * correct / total)
    print("-------------------TEST------------------")    with torch.no_grad():        real_label = []        pred_label = []        sum_test_loss = 0.0        correct_test = 0        total_test = 0 # num of total test samples        for data in test_loader:            model.eval()            input_test, label_test = data            input_test, label_test = input_test.to(device), label_test.to(device)            output_test = model(input_test)            test_losses = criterion(output_test, label_test)            sum_test_loss += test_losses.item()            _, predicted_test = torch.max(output_test.data, 1)            total_test += label_test.size(0)            correct_test += (predicted_test == label_test.data).cpu().sum()            real_label.append(label_test.cpu())            pred_label.append(predicted_test.cpu())        test_loss.append(sum_test_loss / len(test_loader))        test_acc.append(correct_test / total_test)        print('=================测试分类准确率为:%.2f%%===================' % (100. * correct_test / total_test))        acc = 100. * correct / totalreal_label = np.array(torch.cat(real_label, dim=0))pred_label = np.array(torch.cat(pred_label, dim=0))def save_np_files(filename, data):    np.save('./acc&loss/' + filename + '.npy', np.array(data))save_np_files("real_label", real_label)save_np_files("pred_label", pred_label)save_np_files("epoch", epoch_counter)save_np_files("train_acc", train_acc)save_np_files("train_loss", train_loss)save_np_files("test_acc", test_acc)save_np_files("test_loss", test_loss)train_acc = np.load('./acc&loss/train_acc.npy')train_loss = np.load('./acc&loss/train_loss.npy')test_acc = np.load('./acc&loss/test_acc.npy')test_loss = np.load('./acc&loss/test_loss.npy')epoch = np.load('./acc&loss/epoch.npy')plt.figure(figsize=(8, 4))
plt.subplot(121)plt.plot(epoch, train_acc, label="train_acc")plt.plot(epoch, test_acc, label="test_acc")plt.legend()plt.subplot(122)plt.plot(epoch, train_loss, label="train_loss")plt.plot(epoch, test_loss, label="test_loss")plt.legend()plt.tight_layout()plt.show()

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/351173.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker-Portainer可视化管理工具

Docker-Portainer可视化管理工具 文章目录 Docker-Portainer可视化管理工具介绍资源列表基础环境一、安装Docker二、配置Docker加速器三、拉取Portainer汉化版本镜像四、运行容器五、访问可视化界面 介绍 Portainer是一款开源的容器管理平台,它提供了一个直观易用的…

Jmeter多个请求按照比例并发压测的几种方式

🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 一、需求 在压测的过程中,为了能够压测整个链路,通常需要多个接口进行并…

基于Matlab停车场车牌识别计时计费管理系统 【W2】

简介 停车场车牌识别计时计费管理系统在现代城市管理中具有重要意义。随着城市化进程的加快和车辆数量的增加,传统的人工管理停车场的方式已经难以满足效率和精确度的要求。因此引入车牌识别技术的自动化管理系统成为一种趋势和解决方案。 背景意义 提升管理效率&a…

linux 部署瑞数6实战(维普,药监局)sign第二部分

声明 本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!wx …

诊断解决方案——CANdesc和MICROSAR

文章目录 一、CANdesc二、MICROSAR一、CANdesc canbeded是Vector汽车电子开发软件Nun Autosar标准的工具链之一。 canbeded是以源代码的形式提供的可重用的组件,包括CAN Driver,交互层(IL),网络管理(NM),传输层(TP),诊断层(CANdesc) , 通信测量和标定协议(CCP,XCP) 和 通信控…

【C++】C++入门的杂碎知识点

思维导图大纲: namespac命名空间 什么是namespace命名空间namespace命名空间有什么用 什么是命名空间 namespace命名空间是一种域,它可以将内部的成员隔绝起来。举个例子,我们都知道有全局变量和局部变量,全局变量存在于全局域…

联想电脑电池只能充到80%,就不在充电了,猛一看以为坏了,只是设置了养护模式。

现在电池管理模式有三种: 1)常规 2)养护 3)快充 好久没有用联想的电脑了,猛一看,咱充到了80%不充了,难道电池是坏的?我们要如何设置才可以让其充电到100%呢? 右下角…

贪心算法学习五

例题一 解法(贪⼼): 贪⼼策略: 我们的任何选择,应该让这个数尽可能快的变成 1 。 对于偶数:只能执⾏除 2 操作,没有什么分析的; 对于奇数: i. 当 n 1 的时候…

如何使用ios自带语音转文字工具?

ios自带语音转文字是iOS系统中自带的语音转文字功能主要应用于以下几个方面: 1. 语音输入:在iOS的任何文本输入框中,通常都有一个麦克风图标,点击后可以进行语音输入,系统会将你的语音实时转换成文字。 2. Siri&…

1. NAS和SAN存储

NAS和SAN存储 一、存储设备1、根据工作方式2、DAS 直接附加存储3、NAS存储4、SAN存储 二、模拟配置SAN存储1、创建虚拟机、安装openfiler2、访问openfiler webUI3、创建RAID设备4、开启iSCSI服务5、配置SAN存储设备共享空间5.1 设置IQN 6、业务服务器连接使用存储6.1 安装客户端…

JDK17 你的下一个白月光

JDK版本升级的非常快,现在已经到JDK20了。JDK版本虽多,但应用最广泛的还得是JDK8,正所谓“他发任他发,我用Java8”。 但实际情况却不是这样,越来越多的java工程师拥抱 JDK17,于是了解了一下 JDK17新语法&a…

C#开发-集合使用和技巧(二)Lambda 表达式介绍和应用

C#开发-集合使用和技巧 Lambda 表达式介绍和应用 C#开发-集合使用和技巧介绍简单的示例:集合查询示例: 1. 基本语法从主体语句上区分:1. 主体为单一表达式2. 主体是代码块(多个表达式语句) 从参数上区分1. 带输入参数的…

69. UE5 RPG 使用Gameplay Cue 实现技能表现效果

在上一章中,我们实现了敌人的攻击技能的特效和音效。如果我们在多人模式下打开,发现,其它客户端看不到对应的效果。 造成这种问题的原因是因为敌人的技能是运行在服务器端的,它只复制到拥有它的客户端,而敌人的效果对于…

仿FC数学金刚游戏介绍

简介 Math Monkey是Simple2l工作室开发的第二款小游戏,灵感来源于FC游戏平台的数学金刚游戏。小学时玩FC游戏是业余时间最期待的事情,还记得有一次和玩伴玩游戏时已经晚上了,于是约定再玩一把就各回各家,没想到又连玩了N把每一把…

Postman下发流表至Opendaylight

目录 任务目的 任务内容 实验原理 实验环境 实验过程 1、打开ODL控制器 2、网页端打开ODL控制页面 3、创建拓扑 4、Postman中查看交换机的信息 5、L2层流表下发 6、L3层流表下发 7、L4层流表下发 任务目的 1、掌握OpenFlow流表相关知识,理解SDN网络中L…

Jira,一个强大灵活的项目和任务管理工具 Python 库

目录 01初识 Jira 为什么选择 Jira? 02安装与配置 安装 jira 库 配置 Jira 访问 获取 API token: 配置 Python 环境: 03基本操作 创建项目 创建任务 查询任务 更新任务 删除任务 04高级操作 处理子任务 搜索任务 添加附件 评论任务 05实战案例 自动化创建…

001 Spring介绍

文章目录 特点1.方便解耦,简化开发2.AOP编程的支持3.声明式事务的支持4.方便程序的测试5.方便集成各种优秀框架6.降低Java EE API的使用难度7.Java源码是经典学习范例 好处什么是耦合和内聚耦合性,也叫耦合度,是对模块间关联程度的度量内聚标…

react-day1

1.react是什么呢? react是由Meta公司开发,是一个用于构建web和原生交互界面的库 2.react 项目修改文件保存后 ,不能实时更新,需要: 在和package.json文件同目录的地方,新建.env文件:里面加入…

MySQL数据操作与查询- 连接查询

一、引入 1、为什么需要使用连接查询? 查询信息的来源如果来自多张表,则必须对这些表进行连接查询。 2、连接查询的分类 内连接和外连接。 二、内连接 1、概述 将两张表的记录组合在一起,产生一个新的结果。 (1&#xff09…

C++初学者指南第一步---2. Hello world

C初学者指南第一步—2. Hello world 目录 C初学者指南第一步---2. Hello world1.源文件 “Hello.cpp”2.编译hello.cpp3.术语4.编译器标志5.不要使用 “using namespace std;” &#xff01; 1.源文件 “Hello.cpp” #include <iostream> // our first program int main…