import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torch.nn.functional as F
from torchvision.utils import save_image# 变分自编码器
class CVAE(nn.Module):def __init__(self):super(CVAE, self).__init__()self.labels = 10 # 标签数量# 编码器层self.fc1 = nn.Linear(input_size + self.labels, 512) # 编码器输入层self.fc2 = nn.Linear(512, latent_size)self.fc3 = nn.Linear(512, latent_size)# 解码器层self.fc4 = nn.Linear(latent_size + self.labels, 512) # 解码器输入层self.fc5 = nn.Linear(512, input_size) # 解码器输出层# 编码器部分def encode(self, x):x = F.relu(self.fc1(x)) # 编码器的隐藏表示mu = self.fc2(x) # 潜在空间均值log_var = self.fc3(x) # 潜在空间对数方差return mu, log_var# 重参数化技巧def reparameterize(self, mu, log_var): # 从编码器输出的均值和对数方差中采样得到潜在变量zstd = torch.exp(0.5 * log_var) # 计算标准差eps = torch.randn_like(std) # 从标准正态分布中采样得到随机噪声return mu + eps * std # 根据重参数化公式计算潜在变量z# 解码器部分def decode(self, z):z = F.relu(self.fc4(z)) # 将潜在变量 z 解码为重构图像return torch.sigmoid(self.fc5(z)) # 将隐藏表示映射回输入图像大小,并应用 sigmoid 激活函数,以产生重构图像# 前向传播def forward(self, x, y): # 输入图像 x,标签 y 通过编码器和解码器,得到重构图像和潜在变量的均值和对数方差x = torch.cat([x, y], dim=1)mu, log_var = self.encode(x)z = self.reparameterize(mu, log_var)z = torch.cat([z, y], dim=1)return self.decode(z), mu, log_var# 使用重构损失和 KL 散度作为损失函数
def loss_function(recon_x, x, mu, log_var): # 参数:重构的图像、原始图像、潜在变量的均值、潜在变量的对数方差MSE = F.mse_loss(recon_x, x.view(-1, input_size), reduction='sum') # 计算重构图像 recon_x 和原始图像 x 之间的均方误差KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp()) # 计算潜在变量的KL散度return MSE + KLD # 返回二进制交叉熵损失和 KLD 损失的总和作为最终的损失值def sample_images(epoch):with torch.no_grad(): # 上下文管理器,确保在该上下文中不会进行梯度计算。因为在这里只是生成样本而不需要梯度number = 10# 生成标签sample_labels = torch.arange(10).long().to(device) # 0-9的标签sample_labels_onehot = F.one_hot(sample_labels, num_classes=10).float()# 生成随机噪声sample = torch.randn(number, latent_size).to(device) # 生成一个形状为 (64, latent_size) 的张量,其中包含从标准正态分布中采样的随机数sample = torch.cat([sample, sample_labels_onehot], dim=1) # 连接图片和标签sample = model.decode(sample).cpu() # 将随机样本输入到解码器中,解码器将其映射为图像save_image(sample.view(number, 1, 28, 28), f'sample{epoch}.png', nrow=int(number / 2)) # 将生成的图像保存为文件if __name__ == '__main__':batch_size = 512 # 批次大小epochs = 30 # 学习周期sample_interval = 10 # 保存结果的周期learning_rate = 0.001 # 学习率input_size = 784 # 输入大小latent_size = 64 # 潜在变量大小# 载入 MNIST 数据集中的图片进行训练transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()]) # 将图像转换为张量train_dataset = torchvision.datasets.MNIST(root="~/torch_datasets", train=True, transform=transform, download=True) # 加载 MNIST 数据集的训练集,设置路径、转换和下载为 Truetrain_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # 创建一个数据加载器,用于加载训练数据,设置批处理大小和是否随机打乱数据# 在使用定义的 AE 类之前,有以下事情要做:# 配置要在哪个设备上运行device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 建立 CVAE 模型并载入到 CPU 设备model = CVAE().to(device)# Adam 优化器,学习率optimizer = optim.Adam(model.parameters(), lr=learning_rate)# 训练for epoch in range(epochs):train_loss = 0for batch_idx, (data, labels) in enumerate(train_loader):data = data.to(device) # 将输入数据移动到设备(GPU 或 CPU)上data = data.view(-1, input_size) # 重塑维度labels = F.one_hot(labels, num_classes=10).float().to(device) # 转换为独热编码# print(labels[1])optimizer.zero_grad() # 进行反向传播之前,需要将优化器中的梯度清零,以避免梯度的累积# 重构图像 recon_batch、潜在变量的均值 mu 和对数方差 log_varrecon_batch, mu, log_var = model(data, labels)loss = loss_function(recon_batch, data, mu, log_var) # 计算损失loss.backward() # 计算损失相对于模型参数的梯度train_loss += loss.item()optimizer.step() # 更新模型参数train_loss = train_loss / len(train_loader) # # 计算每个周期的训练损失print('Epoch [{}/{}], Loss: {:.3f}'.format(epoch + 1, epochs, train_loss))# 每10次保存图像if (epoch + 1) % sample_interval == 0:sample_images(epoch + 1)# 每训练10次保存模型if (epoch + 1) % sample_interval == 0:torch.save(model.state_dict(), f'vae{epoch + 1}.pth')