【pytorch】常用强化学习算法实现(持续更新)

持续更新常用的强化学习算法,采用单python文件实现,简单易读

  • 2024.11.09 更新:PPO(GAE); SAC
  • 2024.11.12 更新:OptionCritic(PPOC)
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as Fimport gymnasium as gym
import matplotlib.pyplot as pltfrom tqdm import trange
from torch.distributions import Normalclass Actor(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = F.tanh(self.mu(x))sigma = F.softplus(self.sigma(x))return mu, sigmaclass Critic(nn.Module):def __init__(self, state_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):states, actions, log_probs, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).to(device),zip(*trajectory))rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).int()with torch.no_grad():next_values = critic(next_states.float())td_target = rewards + gamma * next_values * (1 - dones)td_value = critic(states.float())td_delta = td_target - td_valuetd_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float().to(device)advantages = (advantages - advantages.mean()) / advantages.std()for k in range(k_epochs):mu, sigma = actor(states.float())dist = Normal(mu, sigma)new_log_probs = dist.log_prob(actions)entropy = dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagesactor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())actor_optimizer.zero_grad()critic_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()critic_loss.backward()critic_optimizer.step()if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 40gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)critic = Critic(env.observation_space.shape[0]).to(device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes+1)score_list = []for e in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1s = torch.from_numpy(state).float().to(device)mu, sigma = actor(s)dist = Normal(mu, sigma)a = dist.sample()log_prob = dist.log_prob(a).detach().cpu().numpy()action = a.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, reward, next_state, done])if timestep % train_timesteps == 0:ppo_training(trajectory,actor,critic,actor_optimizer,critic_optimizer,clip,k_epochs,gamma,lam,device,T)trajectory = []state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))
"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trangeimport torch
import torch.nn as nn
import torch.nn.functional as Fimport copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = self.mu(x)sigma = F.softplus(self.sigma(x))return mu, sigmaclass QNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size + action_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, s, a):x = torch.cat((s, a), dim=-1)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)class ReplayBuffer:def __init__(self, capacity):self.memory = deque(maxlen=capacity)def __len__(self):return len(self.memory)def save_memory(self, state, action, reward, next_state, done):self.memory.append([state, action, reward, next_state, done])def sample(self, batch_size):sample_size = min(len(self), batch_size)experiences = random.sample(self.memory, sample_size)return experiencesdef soft_update(target, source, tau=0.05):for param, target_param in zip(source.parameters(), target.parameters()):target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)def choice_action(actor, state):mu, sigma = actor(state)normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))epsilon = normal_dist.sample()action = torch.tanh(mu + sigma * epsilon)log_prob = normal_dist.log_prob(epsilon)log_prob -= torch.log(1 - action.pow(2) + 1e-6)log_prob = log_prob.sum(-1, keepdim=True)return action, log_probdef training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):(actor,q1_net,target_q1_net,q2_net,target_q2_net) = models(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer) = optimizersbatch_data = replay_buffer.sample(batch_size)states, actions, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).float().to(device),zip(*batch_data))with torch.no_grad():alpha = torch.exp(log_alpha)with torch.no_grad():next_state_actions, next_state_log_probs = choice_action(actor, next_states)target_q1_next = target_q1_net(next_states, next_state_actions)target_q2_next = target_q2_net(next_states, next_state_actions)min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probstd_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_targetq1 = q1_net(states, actions)q2 = q2_net(states, actions)q1_loss = F.mse_loss(q1, td_target_value)q2_loss = F.mse_loss(q2, td_target_value)q1_optimizer.zero_grad()q2_optimizer.zero_grad()q1_loss.backward()q2_loss.backward()q1_optimizer.step()q2_optimizer.step()state_actions, state_log_probs = choice_action(actor, states)q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))actor_loss = torch.mean((alpha * state_log_probs) - q)actor_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()with torch.no_grad():_, log_prob = choice_action(actor, states)alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))alpha_optimizer.zero_grad()alpha_loss.backward()alpha_optimizer.step()soft_update(target_q1_net, q1_net, tau)soft_update(target_q2_net, q2_net, tau)if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 4policy_lr = 1e-4q_lr = 1e-4alpha_lr = 1e-2tau = 0.05buffer_capacity = int(1e6)batch_size = 64gamma = 0.9state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))actor = ActorNetwork(state_size, action_size).to(device)q1_net = QNetwork(state_size, action_size).to(device)target_q1_net = QNetwork(state_size, action_size).to(device)q2_net = QNetwork(state_size, action_size).to(device)target_q2_net = QNetwork(state_size, action_size).to(device)target_q1_net.load_state_dict(q1_net.state_dict())target_q2_net.load_state_dict(q2_net.state_dict())log_alpha = torch.tensor(0.0, requires_grad=True, device=device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)replay_buffer = ReplayBuffer(buffer_capacity)pbar = trange(1, episodes+1)timestep = 0score_list = []for episode in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1if timestep % train_timesteps == 0:training(gamma,replay_buffer,(actor,q1_net,target_q1_net,q2_net,target_q2_net),log_alpha,target_entropy,(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer),batch_size,tau)action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))action = action.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardreplay_buffer.save_memory(state, action, reward, next_state, done)state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(episode, episodes, scores, timestep, log_alpha.item()))
"OptionCritic(PPOC)"import torch
import torch.nn as nn
import torch.nn.functional as Ffrom torch.distributions import Bernoulli, Normal
from torch import optimfrom tqdm import trangeimport matplotlib.pyplot as plt
import gymnasium as gym
import numpy as np
import random
import copyclass QNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options))def forward(self, x):return self.nn(x)class ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),)self.mu = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Tanh())self.sigma = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Softplus())def forward(self, x):x = self.fc(x)return self.mu(x), self.sigma(x)class TerminationNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options),nn.Sigmoid())def forward(self, x):return self.nn(x)class OptionCritic(nn.Module):def __init__(self, state_size, action_size, num_options):super().__init__()self.upper_policy_q_net = QNetwork(state_size, num_options)self.termination_network = TerminationNetwork(state_size, num_options)self.options = nn.ModuleList([ActorNetwork(state_size, action_size)for _ in range(num_options)])self.num_options = num_optionsdef get_option_id(self, state, epsilon):if np.random.rand() > epsilon:return torch.argmax(self.upper_policy_q_net(state),dim=-1).detach().cpu().numpy().item()else:return random.sample(range(self.num_options), 1)[0]def is_option_terminated(self, state, option_id):option_termination_prob = self.termination_network(state)[option_id]option_termination = Bernoulli(option_termination_prob).sample()return bool(option_termination.item())def select_action(self, state, epsilon, option_id):if self.is_option_terminated(state, option_id):option_id = self.get_option_id(state, epsilon)else: option_id = option_idmu, sigma = self.options[option_id](state)normal_dist = Normal(mu, sigma)action = normal_dist.sample()log_prob = normal_dist.log_prob(action)action = action.detach().cpu().numpy()log_prob = log_prob.detach().cpu().numpy()return action, log_prob, option_iddef training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T):states, actions, log_probs, option_id, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)), zip(*trajectory))option_id = option_id.view(-1, 1)rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).float()with torch.no_grad():option_terminated_prob = agent.termination_network(next_states.float()).gather(-1, option_id)next_q = agent.upper_policy_q_net(next_states.float())q_target = rewards + gamma * (1 - dones) * ((1 - option_terminated_prob) * next_q.gather(-1, option_id)+ option_terminated_prob * next_q.max(dim=-1, keepdim=True)[0])td_delta = q_target - agent.upper_policy_q_net(states.float()).gather(-1, option_id)td_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float()advantages = ((advantages - advantages.mean())/ (1e-6 + advantages.std()))for k in range(k_epochs):mus, sigmas = [], []for i in range(states.shape[0]):mu, sigma = agent.options[option_id[i]](states[i].float())mus.append(mu), sigmas.append(sigma)mu = torch.stack(mus, 0)sigma = torch.stack(sigmas, 0)normal_dist = Normal(mu, sigma)new_log_probs = normal_dist.log_prob(actions)entropy = normal_dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagespolicy_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(agent.upper_policy_q_net(states.float()).gather(-1, option_id),q_target.float())termination_loss = agent.termination_network(states.float()).gather(-1, option_id) * (agent.upper_policy_q_net(states.float()).gather(-1, option_id)- agent.upper_policy_q_net(states.float()).max(dim=-1, keepdim=True)[0]).detach()losses = policy_loss + critic_loss + termination_loss.mean()optimizer.zero_grad()losses.backward()optimizer.step()if __name__ == '__main__':env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 10gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4epsilon = 1.0epsilon_decay = 0.995mini_epsilon = 0.1state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]num_options = 4agent = OptionCritic(state_size, action_size, num_options)optimizer = optim.Adam(agent.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes + 1)scores_list = []for e in pbar:state, _ = env.reset()scores = 0.0option_id = agent.get_option_id(torch.from_numpy(state).float(), epsilon)options = [option_id]while True:timestep += 1if timestep % train_timesteps == 0:training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T)trajectory = []action, log_prob, option_id = agent.select_action(torch.from_numpy(state).float(), epsilon, option_id)options.append(option_id)next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, option_id, reward, next_state, done])state = copy.deepcopy(next_state)if done: breakscores_list.append(scores)epsilon = max(mini_epsilon, epsilon * epsilon_decay)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Epsilon: {:.2f}".format(e, episodes, scores, timestep, epsilon))plt.plot(scores_list)plt.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/470601.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】C++11特性(上)

✨✨欢迎大家来到Celia的博客✨✨ 🎉🎉创作不易,请点赞关注,多多支持哦🎉🎉 所属专栏:C 个人主页:Celias blog~ 目录 一、列表初始化 二、std::initializer_list 三、右值引用和移…

jmeter常用配置元件介绍总结之定时器

系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…

【含开题报告+文档+PPT+源码】基于Spring Boot智能综合交通出行管理平台的设计与实现

开题报告 随着城市规模的不断扩大和交通拥堵问题的日益严重,综合交通出行管理平台的研究与实现显得尤为重要。现代城市居民对于出行的需求越来越多样化,对于交通信息的获取和处理能力也提出了更高的要求。传统的交通管理方式已经难以满足这些需求&#…

并发基础:(淘宝笔试题)三个线程分别打印 A,B,C,要求这三个线程一起运行,打印 n 次,输出形如“ABCABCABC....”的字符串【举一反三】

🚀 博主介绍:大家好,我是无休居士!一枚任职于一线Top3互联网大厂的Java开发工程师! 🚀 🌟 在这里,你将找到通往Java技术大门的钥匙。作为一个爱敲代码技术人,我不仅热衷于探索一些框架源码和算法技巧奥秘,还乐于分享这些宝贵的知识和经验。 💡 无论你是刚刚踏…

万字长文解读深度学习——ViT、ViLT、DiT

文章目录 🌺深度学习面试八股汇总🌺ViT1. ViT的基本概念2. ViT的结构与工作流程1. 图像分块(Image Patch Tokenization)2. 位置编码(Positional Encoding)3. Transformer 编码器(Transformer En…

嵌入式硬件杂谈(一)-推挽 开漏 高阻态 上拉电阻

引言:对于嵌入式硬件这个庞大的知识体系而言,太多离散的知识点很容易疏漏,因此对于这些容易忘记甚至不明白的知识点做成一个梳理,供大家参考以及学习,本文主要针对推挽、开漏、高阻态、上拉电阻这些知识点的学习。 目…

使用jmeter查询项目数据库信息,保存至本地txt或excel文件1108

知识点1:使用jmeter把项目数据库的数据导出,并使用jmeter导出数据库的数据 步骤1:使用jmeter把项目数据库的数据导出 (1)测试计划-添加- 线程组setUp线程组 setUp线程组:添加-配置元件-JDBC Connection …

Flink_DataStreamAPI_输出算子Sink

Flink_DataStreamAPI_输出算子Sink 1连接到外部系统2输出到文件3输出到Kafka4输出到MySQL(JDBC)5自定义Sink输出 Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。 1连接到外部系统 Flink的D…

01-Ajax入门与axios使用、URL知识

欢迎来到“雪碧聊技术”CSDN博客! 在这里,您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者,还是具有一定经验的开发者,相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导,我将…

HarmonyOS Next星河版笔记--界面开发(4)

布局 1.1.线性布局 线性布局通过线性容器column和row创建 column容器:子元素垂直方向排列row容器:子元素水平方向排列 1.1.1.排布主方向上的对齐方式(主轴) 属性:.justifyContent(枚举FlexAlign&#…

webpack loader全解析,从入门到精通(10)

webpack 的核心功能是分析出各种模块的依赖关系,然后形成资源列表,最终打包生成到指定的文件中。更多复杂的功能需要借助 webpack loaders 和 plugins 来完成。 1. 什么是 Loader Loader 本质上是一个函数,它的作用是将某个源码字符串转换成…

如何用WordPress和Shopify提升SEO表现?

选择合适的建站程序对于SEO优化非常重要。目前,WordPress和Shopify是两种备受推崇的建站平台,各有优势。 WordPress最大的优点是灵活性。它支持大量SEO插件,帮助你调整元标签、生成站点地图、优化内容结构等。这些功能让你能够轻松地提升网站…

ArcGIS Pro属性表乱码与字段名3个汉字解决方案大总结

01 背景 我们之前在使用ArcGIS出现导出Excel中文乱码及shp添加字段3个字被截断的情况,我们有以下应对策略: 推荐阅读:ArcGIS导出Excel中文乱码及shp添加字段3个字被截断? 那如果我们使用ArGIS Pro出现上述问题,该如何…

24/11/13 算法笔记<强化学习> DQN算法

DQN算法的主要特点包括: 神经网络代替Q表:在传统的Q学习中,需要维护一个Q表来存储每个状态-动作对的Q值。而在DQN中,使用神经网络来近似这些Q值,这使得算法能够处理具有大量状态和动作的问题。 经验回放(E…

Blender进阶:图像纹理节点和映射节点

13 图像纹理节点 13.1 图像纹理节点 图像纹理节点,用于加载一张贴图 加载图片后,可以从图片上取得一个像素点。 输入:一个坐标矢量 输出:该坐标的像素颜色 演示:使用合并xyz节点来指定坐标。。 13.2 多种贴图 一…

MYSQL 库,表 基本操作

相关的两个编码集(简单了解即可) 1.数据库编码集 :对将要存储的数据进行编码 2.数据库校验集:对将要执行的操作(增删查改)数据是对数据编码的校验,本质也是一种读取数据库中数据库采用的一种编码格式。 总结:数据库无论对数据做…

万字长文分析函数式编程

目录 一.认识函数式编程 一、函数式编程的定义 二、函数式编程的思想 三、函数式编程的特点 四、函数式编程的应用 二.Lambda表达式 三.Stream流 3.1 创建流对象 3.2 注意事项 3.3 Stream流的中间操作 filter map distinct sorted limit skip flatMap 3.4 St…

DOM 规范 — MutationObserver 接口

前言 最近在重学 JavaScript 中,再一次接触到了 MutationObserver 内容,接着联想到了 Vue 源码中有使用过这个接口,因此觉得有必要对 MutationObserver 接口进行相关了解和学习。 下面是 vue 源码中关于 MutationObserver 接口使用的代码&a…

灰狼优化算法

一、简介 1.1 灰狼优化算法-Grey Wolf Optimizer 通过模拟灰狼群体捕食行为,基于狼群群体协 作的机制来达到优化的目的。GWO算法具有结构简单、需 要调节的参数少、容易实现等特点,其中存在能够自适应调整 的收敛因子…

AI 写作(五)核心技术之文本摘要:分类与应用(5/10)

一、文本摘要:AI 写作的关键技术 文本摘要在 AI 写作中扮演着至关重要的角色。在当今信息爆炸的时代,人们每天都被大量的文本信息所包围,如何快速有效地获取关键信息成为了一个迫切的需求。文本摘要技术正是为了解决这个问题而诞生的&#x…