10天速通强化学习-008

TRPO

思考-TRPO-在线策略-给定信任区域防止更新不稳定

Actor-Critic网络随着网络深度的增加,步长太长,梯度更新会变差。改变方法-增加信任区域。(trust region policy optimization)-TRPO算法:

核心思想:

是在每次迭代中,通过限制策略更新的幅度,使得策略在一个 “信任区域” 内进行优化,从而保证算法的稳定性和收敛性。

实现方式:

求解一个约束优化问题来找到最优的策略更新方向。具体来说,TRPO使用了一种称为 “共轭梯度法” 的优化算法来高效地求解这个问题,同时利用了一些技巧来估计策略更新对价值函数的影响

 具体实现过程

策略目标(找寻新旧策略的关系)

策略是\pi_{\theta },参数是\theta。目标求解更优的参数\theta '。使得 J(\theta ')\geq J(\theta )。公式:

\begin{aligned}J(\theta) & =\mathbb{E}_{s_{0}}\left[V^{\pi_{\theta}}\left(s_{0}\right)\right] \\& =\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^{\infty} \gamma^{t} V^{\pi_{\theta}}\left(s_{t}\right)-\sum_{t=1}^{\infty} \gamma^{t} V^{\pi_{\theta}}\left(s_{t}\right)\right] \\& =-\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^{\infty} \gamma^{t}\left(\gamma V^{\pi_{\theta}}\left(s_{t+1}\right)-V^{\pi_{\theta}}\left(s_{t}\right)\right)\right]\end{aligned}

第一行:初始策略下,不同初始状态出发,获得的状态价值函数的期望

第二行:t = 0时刻出发求解的状态价值函数 -  t=1时刻出发求解的状态价值函数,这样就只保留了在 t=0时刻的初始状态的状态价值函数

这里的关键所有状态分布由新策略生成,而值函数V仍基于旧策略,V使用新策略生成的轨迹来重新表达

求新旧策略差异;

\begin{aligned}J\left(\theta^{\prime}\right)-J(\theta) & =\mathbb{E}_{s_{0}}\left[V^{\pi_{\theta^{\prime}}}\left(s_{0}\right)\right]-\mathbb{E}_{s_{0}}\left[V^{\pi_{\theta}}\left(s_{0}\right)\right] \\& =\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^{\infty} \gamma^{t} r\left(s_{t}, a_{t}\right)\right]+\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^{\infty} \gamma^{t}\left(\gamma V^{\pi_{\theta}}\left(s_{t+1}\right)-V^{\pi_{\theta}}\left(s_{t}\right)\right)\right] \\& =\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^{\infty} \gamma^{t}\left[r\left(s_{t}, a_{t}\right)+\gamma V^{\pi_{\theta}}\left(s_{t+1}\right)-V^{\pi_{\theta}}\left(s_{t}\right)\right]\right]\end{aligned}

第二行第一个:状态价值函数的实际计算过程

后面的是时序差分误差:(时序差分误差:当前状态下实际获得的即时奖励,加上下一个状态的折扣估计值,与当前状态价值估计值之间的差值。也就是说:当前价值估计和基于下一状态估计进行递推得到的价值之间的误差)

所以得到:

$\begin{aligned} & =\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^\infty\gamma^tA^{\pi_\theta}(s_t,a_t)\right] \\ & =\sum_{t=0}^{\infty}\gamma^{t}\mathbb{E}_{s_{t}\sim P_{t}^{\pi_{\theta^{\prime}}}}\mathbb{E}_{a_{t}\sim\pi_{\theta^{\prime}}(\cdot|s_{t})}\left[A^{\pi_{\theta}}(s_{t},a_{t})\right] \\ & =\frac{1}{1-\gamma}\mathbb{E}_{s\sim\nu^{\pi_{\theta^{\prime}}}}\mathbb{E}_{a\sim\pi_{\theta^{\prime}}(\cdot|s)}\left[A^{\pi_{\theta}}(s,a)\right] \end{aligned}$

先说简单的最后一步 

这是因为状态访问分布的概念

倒数第二步骤:\mathbb{E}_{\pi_{\theta^{\prime}}}\left[\sum_{t=0}^\infty\gamma^tA^{\pi_\theta}(s_t,a_t)\right] \\按照条件概率期望的展开形式,将对策略\pi_{\theta '}的期望显示的写成对状态和动作基于相应分布的期望,

\sum_{t=0}^{\infty}\gamma^{t}\mathbb{E}_{s_{t}\sim P_{t}^{\pi_{\theta^{\prime}}}}\mathbb{E}_{a_{t}\sim\pi_{\theta^{\prime}}(\cdot|s_{t})}\left[A^{\pi_{\theta}}(s_{t},a_{t})\right] \\

状态 s服从 策略诱导的状态分布 P,动作服从该策略给出的条件分布 \pi

(其中 P_{t}^{\pi_{\theta '}}​​ 是策略 πθ′在时间步 t 的状态分布)

最后这个新旧策略的查值求出来 >= 0 ,就能保证策略性能单调递增,到这里好像找到了信任区域,但是求解该式子十分困难,因为最后一步中有新策略,而新策略就是我们要求解的。把所有可能的新策略都用来收集数据,然后判断哪个策略满足上述条件显然是不现实的。

所以。近似操作,忽略两个策略间的状态访问分布变化,直接用旧策略的状态分布

新旧策略非常接近时,状态访问分布变化很小,近似合理。动作由新策略采样得到,状态分布使用旧策略

这里的分数是重要性权重,因为不想所有的新策略都用来计算一遍,所以需要对新策略进行估计,利用旧策略中已有的数据设置权重,来进行策略更新的计算。这样就可以用旧策略更新新策略了,但是怎么衡量策略之间的距离呢?

KL散度 :用来衡量两个概率分布的差异的一种度量

即:

这个式子有两行,第一行表示找到一组参数\theta ',使得函数最大

s.t.表示 subject to 受限于后i面的约束条件:在旧策略诱导的状态分布下,新旧策略在各个状态下动作概率分布差异的期望,不能超过给定的阈值,保证更新时不会变化过大。

也就是说上一步既然没有不稳定,下一步在这上一步的基础上使用重要性采样方法(这样就可以认为两轮的策略分布一样)在信任区域内更新也不会不稳定。

所以说没最终的结果就是这个:求解出来就行了

 近似求解(在约束条件下的函数最大值)-推导过程不必理会

\theta _{k}表示第k次迭代之后的策略。因为都是求期望就是求无穷级数,所以可以用泰勒级数求解

其中 ,表示目标函数的梯度,

表示策略之间平均KL距离的黑塞矩阵

所以优化目标继续变化:

 再利用KKT(求解约束优化问题,处理不等式和不等式约束的非线性规划问题)条件导出问题的解:

就是最终要求解的东西

共轭梯度(计算参数更新方向)

一般来说,参数过多,计算H浪费时间,使用共轭梯度法。

核心思想:

x = H^{-1}g , x参数更新方向,

β ,满足KL距离约束的参数更新时的最大步长

于是,有

 求解: 

 所以参数更新方式变成:

所以 问题转化为求解:

伪代码:

 线性搜索

作用机理:通过逐步调整参数更新的步长,在满足约束条件的同时优化目标函数。

  • 步长调整:引入超参数α∈(0,1) 来决定线性搜索的长度,通过\alpha ^{i}(i为非负整数)对参数更新的基本步长进行缩放。αi 会随着 i 的增大而减小,相当于以不同的步长对参数进行更新尝试。
  • 条件判断:从 i=0 开始,依次检查θk+1​ 是否满足两个条件,一是依然满足最初的 KL 散度限制,保证策略更新在稳定的范围内;二是能够提升目标函数Lθk​​ ,即新策略比旧策略更优。
  • 确定参数:当找到一个最小的非负整数 i ,使得θk+1​ 同时满足上述两个条件时,就用该θk+1​ 更新策略网络参数,完成这一轮的策略优化。

i是在变的

 伪代码更新:

  • 初始化策略网络参数 \theta , 价值网络参数 w
  • for 序列 e = 1 -> E do:
    • 用当前策略采样轨迹
    • 根据收集到的数据和价值网络估计每个状态动作对的优势A
    • 计算策略目标网络的梯度
    • 用共轭梯度法计算x ,计算更新方向
    • 用线性搜索找到一个最小的非负整数 i ,并更新网络参数
    • 更新价值网络参数(和Actor-Critic网络中的更新方法相同)
  • end for 

广义优势估计(计算A)

GAE : generalized advantage estiimation ,核心:将不同步数的优势估计进行指数加权平均

 这是每一个时间段的东西

然后将这些指数加权平均

\lambda -> (0,1)   ,

为0时,表示仅仅只看一步差分得到的优势估计

为1时,表示看每一步差分得到的优势的完全平均值

给出代码 ,

注意:时序差分误差和优势不一样,单步的优势值就是时序差分误差

def compute_advantage(gamma, lmbda, td_delta):td_delta = td_delta.detach().numpy()advantage_list = []advantage = 0.0for delta in td_delta[::-1]:advantage = gamma * lmbda * advantage + deltaadvantage_list.append(advantage)advantage_list.reverse()return torch.tensor(advantage_list, dtype=torch.float)

代码

动作空间:连续和离散

import torch
import numpy as np
import gym
import matplotlib.pyplot as plt
import torch.nn.functional as F
import rl_utils
import copy

定义策略网络和价值网络(与Actor-Critic一样)

class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class TRPO:""" TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.n# 策略网络参数不需要优化器更新self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbda  # GAE参数self.kl_constraint = kl_constraint  # KL距离最大限制self.alpha = alpha  # 线性搜索参数self.device = devicedef take_action(self, state):#通过策略网络得到动作概率分布,然后采样返回state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def hessian_matrix_vector_product(self, states, old_action_dists, vector):# 计算黑塞矩阵和一个向量的乘积new_action_dists = torch.distributions.Categorical(self.actor(states))kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))  # 计算平均KL距离kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])# KL距离的梯度先和向量进行点积运算kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.view(-1) for grad in grad2])return grad2_vectordef conjugate_gradient(self, grad, states, old_action_dists):  # 共轭梯度法求解方程x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10):  # 共轭梯度主循环Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor):  # 计算策略目标log_probs = torch.log(actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec):  # 线性搜索old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15):  # 线性搜索主循环coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())new_action_dists = torch.distributions.Categorical(new_actor(states))kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage):  # 更新策略函数,依据策略目标函数计算梯度#利用共轭梯度法确定更新方向,通过线性搜索得到合适的参数更新量,最后更新策略网络surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()# 用共轭梯度法计算x = H^(-1)gdescent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef)  # 线性搜索torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters())  # 用线性搜索后的参数更新策略def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()old_action_dists = torch.distributions.Categorical(self.actor(states).detach())critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()  # 更新价值函数# 更新策略函数self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)

在车杆环境中训练TRPO

num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
critic_lr = 1e-2
kl_constraint = 0.0005
alpha = 0.5
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
agent = TRPO(hidden_dim, env.observation_space, env.action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()

收敛速度很快

对于连续环境倒立摆:对于策略网络,环境是连续的,所以策略网络分别输出表示动作分布的高斯分布的均值和标准差值

class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, std  # 高斯分布的均值和标准差class TRPOContinuous:""" 处理连续动作的TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.shape[0]self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.kl_constraint = kl_constraintself.alpha = alphaself.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, std = self.actor(state)action_dist = torch.distributions.Normal(mu, std)action = action_dist.sample()return [action.item()]def hessian_matrix_vector_product(self,states,old_action_dists,vector,damping=0.1):mu, std = self.actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.contiguous().view(-1) for grad in grad2])return grad2_vector + damping * vectordef conjugate_gradient(self, grad, states, old_action_dists):x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10):Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor):mu, std = actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec):old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15):coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())mu, std = new_actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage):surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()descent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters())def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)rewards = (rewards + 8.0) / 8.0  # 对奖励进行修改,方便训练td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)mu, std = self.actor(states)old_action_dists = torch.distributions.Normal(mu.detach(),std.detach())old_log_probs = old_action_dists.log_prob(actions)critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)

 倒立摆环境

num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
critic_lr = 1e-2
kl_constraint = 0.00005
alpha = 0.5
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
agent = TRPOContinuous(hidden_dim, env.observation_space, env.action_space,lmbda, kl_constraint, alpha, critic_lr, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()

离散空间和连续空间代码的主要区别

 策略网络定义:

class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)
class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, std

动作采样方式

def take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()

高斯分布-处理连续数据 

def take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, std = self.actor(state)action_dist = torch.distributions.Normal(mu, std)action = action_dist.sample()return [action.item()]

计算动作概率和分布操作

def compute_surrogate_obj(self, states, actions, advantage, old_log_probs, actor):log_probs = torch.log(actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)
def compute_surrogate_obj(self, states, actions, advantage, old_log_probs, actor):mu, std = actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)

 PPO 

PPO算法优化目标和TRPO算法优化目标相同

我们在上一讲中说过,TRPO使用多种方法求解才得到最后的结果,比较复杂,现在提供一个比较简单的算法PPO,分为两种,PPO-惩罚和 PPO-截断

PPO-惩罚

核心思想:通过KL散度约束策略更新幅度,将约束条件转化为目标函数的惩罚项,并动态调整惩罚系数。

方法:拉格朗日乘数法将KL散度的限制放到目标函数中

目标函数设计

  • 第一项是重要性采样比例乘以优势函数,用于估计新策略的改进方向
  • 第二项是KL散度惩罚项,限制新旧策略的差异 

 动态调整惩罚系数

根据KL散度  d_{k} = D_{KL}(\pi_{\theta _{k}},\pi_{\theta })的值调整 β

  1. 如果  d_{k} < \delta /1.5;(两个策略相似度过高),减小\beta _{k+1} = \beta _{k} /2 (约束过强,放松惩罚)
  2. 如果  d_{k} > \delta \times 1.5;(两个策略相似度过低),增大\beta _{k+1} = \beta _{k} \times 2 (约束不足,加强惩罚)

缺点:需要手动调整阈值 \delta ,KL散度的计算增加计算开销

PPO-截断

核心思想:直接在目标函数中限制新旧策略的比率范围,通过截断操作避免过大的策略更新,不用计算(显式)KL

目标函数设计

  •  clip操作:将重要性采样比例限制在[1-\epsilon ,1+\epsilon ]之间
  • Min操作:取原始比例和截断比例中较小的值,确保策略更新幅度可控

分情况分析

  1. 当A > 0 ,(当前动作优于平均):比例增大,但不超过 1+\epsilon
  2. 当A < 0 ,(当前动作劣于平均):比例减小,但不低于 1 -\epsilon

缺点:\epsilon需要经验设定,对极端值的处理可能不够灵活

举一个简单的例子:

假设在某个状态 ss 下:

  • 旧策略选择动作 a的概率为0.5。

  • 新策略计算出的概率为0.8。

  • 优势函数 A=+1(动作 aa 优于平均)

PPO-惩罚:

  1. 计算 KL 散度 

  2. 根据 KL 散度是否超出阈值 δ,调整 β。

  3. 目标函数中,若 KL 散度过大,β 增大,抑制新策略的偏离,惩罚偏离的越强

PPO--截断:简单高效

  1. 原始比率:0.8/0.5=1.6

  2. Clip 操作:将比率限制在 1+ϵ=1.2

  3. 最终目标函数取 min⁡(1.6×1,1.2×1)=1.2
    ➔ 实际更新幅度被限制为 1.2 倍,而非原始的 1.6 倍。

代码

由实验表明,PPO-截断比PPO-惩罚表现得更好

import gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import rl_utilsclass PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.epochs = epochs  # 一条序列的数据用来训练轮数self.eps = eps  # PPO中截断范围的参数self.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs):log_probs = torch.log(self.actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage  # 截断actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()

车杆环境训练PPO

actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda,epochs, eps, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

 到连续动作交互的环境-倒立摆,需要使用高斯分布

class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, stdclass PPOContinuous:''' 处理连续动作的PPO算法 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.epochs = epochsself.eps = epsself.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, sigma = self.actor(state)action_dist = torch.distributions.Normal(mu, sigma)action = action_dist.sample()return [action.item()]def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)rewards = (rewards + 8.0) / 8.0  # 和TRPO一样,对奖励进行修改,方便训练td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)mu, std = self.actor(states)action_dists = torch.distributions.Normal(mu.detach(), std.detach())# 动作是正态分布old_log_probs = action_dists.log_prob(actions)for _ in range(self.epochs):mu, std = self.actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantageactor_loss = torch.mean(-torch.min(surr1, surr2))critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()
actor_lr = 1e-4
critic_lr = 5e-3
num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]  # 连续动作空间
agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 21)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

总结

 TRPO和PPO都是在线策略算法,需要注意的是即时优化目标中包含重要性采样的过程但是也只是用到了上一轮策略的数据,而不是所有策略的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/35630.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

整合百款经典街机游戏的模拟器介绍

对于80、90后而言&#xff0c;街机游戏承载着童年的欢乐记忆。今天要给大家介绍一款超棒的软件——「MXui街机厅经典游戏101款」&#xff0c;它能带你重回那段热血沸腾的街机时光。 「MXui街机厅经典游戏101款」是一款绿色免安装的街机模拟器&#xff0c;体积约1.39G。无需繁琐…

springboot第三站(1) web开发引入

目录 1.简介 2.SpringBoot对静态资源的映射规则 3.模版引擎 1.简介 使用SpringBoot&#xff1b; 1&#xff09;、创建SpringBoot应用&#xff0c;选中我们需要的模块&#xff1b; 2&#xff09;、SpringBoot已经默认将这些场景配置好了&#xff0c;只需要在配置文件中指定…

12-二叉树-二叉树高度(给定前序和中序确定二叉树)

题目 来源 23. 二叉树的高度 思路 其实跟09那篇很像&#xff0c;反正核心就是要通过前序和中序来建树&#xff0c;只不过现在多了一个返回值&#xff1b;因为建树的时候&#xff0c;其实左子树和右子树的深度就可以知道。其余详见代码。 代码 /* 前序遍历根左右,中序&…

PSI5接口

文章目录 前言PSI5接口简介操作模式命名规则异步操作模式&#xff08;PSI5-A&#xff09;同步操作模式&#xff08;PSI5-P&#xff09; 传感器->ECU物理层&#xff08;位编码&#xff09;数据链路层数据帧帧格式串行消息帧10bits 传感器帧定义超10bits传感器帧定义 ECU->…

垃圾处理全流程监管平台

在当前城市化进程中&#xff0c;垃圾处理已成为城市管理的重要课题。随着技术的发展&#xff0c;垃圾处理全流程监管平台的建设显得尤为重要。该平台能够实现垃圾从产生、收集、运输到最终处理的全流程监管&#xff0c;提高垃圾处理效率&#xff0c;促进资源回收利用&#xff0…

【Linux编程】IPC之消息队列从踩坑到实战:核心原理、实战案例与C++封装详解(含完整代码)

一、消息队列基础概念 消息队列是Linux系统提供的一种进程间通信&#xff08;IPC&#xff09;机制&#xff0c;具有以下特点&#xff1a; 消息以链表形式存放在内核中每个消息包含类型标识&#xff08;mtype&#xff09;支持多生产者/多消费者模式消息总长度受限于系统配置&a…

Unity 项目工程结构目录

1. Unity.VisualScripting.Core 作用: Visual Scripting 的核心模块&#xff0c;提供了可视化编程的基础功能&#xff08;前身为 Bolt&#xff09;。它允许开发者通过节点图创建游戏逻辑&#xff0c;而无需编写代码。 典型用途: 非程序员快速构建原型&#xff0c;或简化…

从pdf提取文本数据的c/cpp库(非OCR)

Aspose.PDF for C 商业付费版&#xff0c;无源码。 功能强大&#xff0c;支持多种PDF操作。 对应的官方示例代码&#xff1a;Aspose.PDF-for-C Spire.PDF for C 商业付费版 对应的官方示例代码&#xff1a;Spire.PDF-for-C- PDFTron SDK 商业付费版 PoDoFo 开源 当前版本…

【Linux操作系统——学习笔记二】Linux简单导航命令操作

一、前言 学习Linux&#xff0c;本质上是学习在命令行下熟练使用Linux的各类命令。 命令行&#xff1a;是一种通过输入命令和参数与计算机系统进行交互的方式&#xff0c;可以使用各种字符化命令对系统发出操作指令&#xff0c;打开Linux终端&#xff0c;进入命令行界面。 …

赛逸展2025创新模式,以科技创新奖赋能展位战略价值

CES Asia2025第七届亚洲消费电子技术贸易展&#xff08;赛逸展&#xff09;主办方负责人提出的创新理念&#xff0c;为展会的战略价值注入了新活力&#xff1a;“我们不是在卖展位&#xff0c;而是在分发政策红利入场券——企业每平方米的展位投入&#xff0c;都可能通过科技创…

深度革命:ResNet 如何用 “残差连接“ 颠覆深度学习

一文快速了解 ResNet创新点 在深度学习的历史长河中&#xff0c;2015年或许是最具突破性的一年。这一年&#xff0c;微软亚洲研究院的何恺明团队带着名为ResNet&#xff08;残差网络&#xff09;的模型横空出世&#xff0c;在ImageNet图像分类竞赛中以3.57%的错误率夺冠&#…

将Django连接到mysql

将Django连接到mysql 文章目录 将Django连接到mysql一.按照我的文章 在Django模型中的Mysql安装 此篇 的步骤完成mysql的基础配置二.Django配置 一.按照我的文章 ‘在Django模型中的Mysql安装’ 此篇 的步骤完成mysql的基础配置 基础配置具体内容 1.打开PowerShell 安装mysql的…

Pycatia自动化开发:智能焊点生成与数据管理一体化解决方案

引言&#xff1a;机械设计自动化的新范式 在汽车白车身、航空结构件等复杂装配体设计中&#xff0c;焊点定位精度直接影响产品性能和制造可行性。传统CAD软件操作模式存在两大痛点&#xff1a;1&#xff09;重复性点创建操作效率低下&#xff1b;2&#xff09;坐标数据缺乏结构…

《Python实战进阶》No26: CI/CD 流水线:GitHub Actions 与 Jenkins 集成

No26: CI/CD 流水线&#xff1a;GitHub Actions 与 Jenkins 集成 摘要 持续集成&#xff08;CI&#xff09;和持续部署&#xff08;CD&#xff09;是现代软件开发中不可或缺的实践&#xff0c;能够显著提升开发效率、减少错误并加速交付流程。本文将探讨如何利用 GitHub Actio…

【css酷炫效果】纯CSS实现3D翻转卡片动画

【css酷炫效果】纯CSS实现3D翻转卡片动画 缘创作背景html结构css样式完整代码效果图 想直接拿走的老板&#xff0c;链接放在这里&#xff1a;https://download.csdn.net/download/u011561335/90490472 缘 创作随缘&#xff0c;不定时更新。 创作背景 刚看到csdn出活动了&am…

大数据学习(72)-zookeeper选举机制

&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4dd;支持一…

Maven | 站在初学者的角度配置

目录 Maven 是什么 概述 常见错误 创建错误代码示例 正确代码示例 Maven 的下载 Maven 依赖源 Maven 环境 环境变量 CMD测试 Maven 文件配置 本地仓库 远程仓库 Maven 工程创建 IDEA配置Maven IDEA Maven插件 Maven 是什么 概述 Maven是一个项目管理和构建自…

C/C++模版初阶

文章目录 C/C模版初阶泛型编程函数模版函数模版概念函数模版格式函数模版的原理函数模版的实例化模版参数的匹配原则 类模版类模版的定义格式类模版的实例化 结语 我们今天又见面了&#xff0c;给生活加点<font colorred>impetus&#xff01;&#xff01;开启今天的编程之…

c++初阶易错题(选择)

本节有32道题&#xff0c;讲的是c初阶里边我认为重要的题目&#xff0c;有兴趣可以看看十分详细&#xff0c;欢迎互相交流学习 1~10 1 A.引用必须定义时初始化&#xff0c;指针不初始化其值为随机指向 B.指针可以改变指向&#xff0c;引用不能&#xff0c;故错误 C.空指针没有…

Java 设计模式之享元模式(Flyweight Pattern)

享元模式&#xff08;Flyweight Pattern&#xff09; 是一种 结构型设计模式&#xff0c;旨在通过共享对象来有效支持大量细粒度对象的复用&#xff0c;从而减少内存占用和提高性能。其核心是 分离内部状态&#xff08;可共享&#xff09;与外部状态&#xff08;不可共享&#…