TRPO
思考-TRPO-在线策略-给定信任区域防止更新不稳定
Actor-Critic网络随着网络深度的增加,步长太长,梯度更新会变差。改变方法-增加信任区域。(trust region policy optimization)-TRPO算法:
核心思想:
是在每次迭代中,通过限制策略更新的幅度,使得策略在一个 “信任区域” 内进行优化,从而保证算法的稳定性和收敛性。
实现方式:
求解一个约束优化问题来找到最优的策略更新方向。具体来说,TRPO使用了一种称为 “共轭梯度法” 的优化算法来高效地求解这个问题,同时利用了一些技巧来估计策略更新对价值函数的影响。
具体实现过程
策略目标(找寻新旧策略的关系)
策略是,参数是
。目标求解更优的参数
。使得
。公式:
第一行:初始策略下,不同初始状态出发,获得的状态价值函数的期望
第二行:t = 0时刻出发求解的状态价值函数 - t=1时刻出发求解的状态价值函数,这样就只保留了在 t=0时刻的初始状态的状态价值函数
这里的关键所有状态分布由新策略生成,而值函数V仍基于旧策略,V使用新策略生成的轨迹来重新表达
求新旧策略差异;
第二行第一个:状态价值函数的实际计算过程
后面的是时序差分误差:(时序差分误差:当前状态下实际获得的即时奖励,加上下一个状态的折扣估计值,与当前状态价值估计值之间的差值。也就是说:当前价值估计和基于下一状态估计进行递推得到的价值之间的误差)
所以得到:
先说简单的最后一步
这是因为状态访问分布的概念
倒数第二步骤:按照条件概率期望的展开形式,将对策略
的期望显示的写成对状态和动作基于相应分布的期望,
状态 s服从 策略诱导的状态分布 P,动作服从该策略给出的条件分布
(其中 是策略 πθ′在时间步 t 的状态分布)
最后这个新旧策略的查值求出来 >= 0 ,就能保证策略性能单调递增,到这里好像找到了信任区域,但是求解该式子十分困难,因为最后一步中有新策略,而新策略就是我们要求解的。把所有可能的新策略都用来收集数据,然后判断哪个策略满足上述条件显然是不现实的。
所以。近似操作,忽略两个策略间的状态访问分布变化,直接用旧策略的状态分布
新旧策略非常接近时,状态访问分布变化很小,近似合理。动作由新策略采样得到,状态分布使用旧策略
这里的分数是重要性权重,因为不想所有的新策略都用来计算一遍,所以需要对新策略进行估计,利用旧策略中已有的数据设置权重,来进行策略更新的计算。这样就可以用旧策略更新新策略了,但是怎么衡量策略之间的距离呢?
KL散度 :用来衡量两个概率分布的差异的一种度量
即:
这个式子有两行,第一行表示找到一组参数,使得函数最大
s.t.表示 subject to 受限于后i面的约束条件:在旧策略诱导的状态分布下,新旧策略在各个状态下动作概率分布差异的期望,不能超过给定的阈值,保证更新时不会变化过大。
也就是说上一步既然没有不稳定,下一步在这上一步的基础上使用重要性采样方法(这样就可以认为两轮的策略分布一样)在信任区域内更新也不会不稳定。
所以说没最终的结果就是这个:求解出来就行了
近似求解(在约束条件下的函数最大值)-推导过程不必理会
表示第k次迭代之后的策略。因为都是求期望就是求无穷级数,所以可以用泰勒级数求解
其中 ,表示目标函数的梯度,
表示策略之间平均KL距离的黑塞矩阵
所以优化目标继续变化:
再利用KKT(求解约束优化问题,处理不等式和不等式约束的非线性规划问题)条件导出问题的解:
就是最终要求解的东西
共轭梯度(计算参数更新方向)
一般来说,参数过多,计算H浪费时间,使用共轭梯度法。
核心思想:
, x参数更新方向,
β ,满足KL距离约束的参数更新时的最大步长
于是,有
求解:
所以参数更新方式变成:
所以 问题转化为求解:
伪代码:
线性搜索
作用机理:通过逐步调整参数更新的步长,在满足约束条件的同时优化目标函数。
- 步长调整:引入超参数α∈(0,1) 来决定线性搜索的长度,通过
(i为非负整数)对参数更新的基本步长进行缩放。αi 会随着 i 的增大而减小,相当于以不同的步长对参数进行更新尝试。
- 条件判断:从 i=0 开始,依次检查θk+1 是否满足两个条件,一是依然满足最初的 KL 散度限制,保证策略更新在稳定的范围内;二是能够提升目标函数Lθk ,即新策略比旧策略更优。
- 确定参数:当找到一个最小的非负整数 i ,使得θk+1 同时满足上述两个条件时,就用该θk+1 更新策略网络参数,完成这一轮的策略优化。
i是在变的
伪代码更新:
- 初始化策略网络参数
, 价值网络参数
- for 序列 e = 1 -> E do:
- 用当前策略采样轨迹
- 根据收集到的数据和价值网络估计每个状态动作对的优势A
- 计算策略目标网络的梯度
- 用共轭梯度法计算x ,计算更新方向
- 用线性搜索找到一个最小的非负整数 i ,并更新网络参数
- 更新价值网络参数(和Actor-Critic网络中的更新方法相同)
- end for
广义优势估计(计算A)
GAE : generalized advantage estiimation ,核心:将不同步数的优势估计进行指数加权平均
这是每一个时间段的东西
然后将这些指数加权平均
,
为0时,表示仅仅只看一步差分得到的优势估计
为1时,表示看每一步差分得到的优势的完全平均值
给出代码 ,
注意:时序差分误差和优势不一样,单步的优势值就是时序差分误差
def compute_advantage(gamma, lmbda, td_delta):td_delta = td_delta.detach().numpy()advantage_list = []advantage = 0.0for delta in td_delta[::-1]:advantage = gamma * lmbda * advantage + deltaadvantage_list.append(advantage)advantage_list.reverse()return torch.tensor(advantage_list, dtype=torch.float)
代码
动作空间:连续和离散
import torch
import numpy as np
import gym
import matplotlib.pyplot as plt
import torch.nn.functional as F
import rl_utils
import copy
定义策略网络和价值网络(与Actor-Critic一样)
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class TRPO:""" TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.n# 策略网络参数不需要优化器更新self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbda # GAE参数self.kl_constraint = kl_constraint # KL距离最大限制self.alpha = alpha # 线性搜索参数self.device = devicedef take_action(self, state):#通过策略网络得到动作概率分布,然后采样返回state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def hessian_matrix_vector_product(self, states, old_action_dists, vector):# 计算黑塞矩阵和一个向量的乘积new_action_dists = torch.distributions.Categorical(self.actor(states))kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) # 计算平均KL距离kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])# KL距离的梯度先和向量进行点积运算kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.view(-1) for grad in grad2])return grad2_vectordef conjugate_gradient(self, grad, states, old_action_dists): # 共轭梯度法求解方程x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10): # 共轭梯度主循环Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor): # 计算策略目标log_probs = torch.log(actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec): # 线性搜索old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15): # 线性搜索主循环coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())new_action_dists = torch.distributions.Categorical(new_actor(states))kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage): # 更新策略函数,依据策略目标函数计算梯度#利用共轭梯度法确定更新方向,通过线性搜索得到合适的参数更新量,最后更新策略网络surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()# 用共轭梯度法计算x = H^(-1)gdescent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef) # 线性搜索torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters()) # 用线性搜索后的参数更新策略def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()old_action_dists = torch.distributions.Categorical(self.actor(states).detach())critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step() # 更新价值函数# 更新策略函数self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)
在车杆环境中训练TRPO
num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
critic_lr = 1e-2
kl_constraint = 0.0005
alpha = 0.5
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
agent = TRPO(hidden_dim, env.observation_space, env.action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()
收敛速度很快
对于连续环境倒立摆:对于策略网络,环境是连续的,所以策略网络分别输出表示动作分布的高斯分布的均值和标准差值
class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, std # 高斯分布的均值和标准差class TRPOContinuous:""" 处理连续动作的TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.shape[0]self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.kl_constraint = kl_constraintself.alpha = alphaself.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, std = self.actor(state)action_dist = torch.distributions.Normal(mu, std)action = action_dist.sample()return [action.item()]def hessian_matrix_vector_product(self,states,old_action_dists,vector,damping=0.1):mu, std = self.actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.contiguous().view(-1) for grad in grad2])return grad2_vector + damping * vectordef conjugate_gradient(self, grad, states, old_action_dists):x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10):Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor):mu, std = actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec):old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15):coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())mu, std = new_actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage):surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()descent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters())def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)rewards = (rewards + 8.0) / 8.0 # 对奖励进行修改,方便训练td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)mu, std = self.actor(states)old_action_dists = torch.distributions.Normal(mu.detach(),std.detach())old_log_probs = old_action_dists.log_prob(actions)critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)
倒立摆环境
num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
critic_lr = 1e-2
kl_constraint = 0.00005
alpha = 0.5
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
agent = TRPOContinuous(hidden_dim, env.observation_space, env.action_space,lmbda, kl_constraint, alpha, critic_lr, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.show()
离散空间和连续空间代码的主要区别
策略网络定义:
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)
class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, std
动作采样方式
def take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()
高斯分布-处理连续数据
def take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, std = self.actor(state)action_dist = torch.distributions.Normal(mu, std)action = action_dist.sample()return [action.item()]
计算动作概率和分布操作
def compute_surrogate_obj(self, states, actions, advantage, old_log_probs, actor):log_probs = torch.log(actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)
def compute_surrogate_obj(self, states, actions, advantage, old_log_probs, actor):mu, std = actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)
PPO
PPO算法优化目标和TRPO算法优化目标相同
我们在上一讲中说过,TRPO使用多种方法求解才得到最后的结果,比较复杂,现在提供一个比较简单的算法PPO,分为两种,PPO-惩罚和 PPO-截断
PPO-惩罚
核心思想:通过KL散度约束策略更新幅度,将约束条件转化为目标函数的惩罚项,并动态调整惩罚系数。
方法:拉格朗日乘数法将KL散度的限制放到目标函数中
目标函数设计
- 第一项是重要性采样比例乘以优势函数,用于估计新策略的改进方向
- 第二项是KL散度惩罚项,限制新旧策略的差异
动态调整惩罚系数
根据KL散度 的值调整 β
- 如果
;(两个策略相似度过高),减小
(约束过强,放松惩罚)
- 如果
;(两个策略相似度过低),增大
(约束不足,加强惩罚)
缺点:需要手动调整阈值 ,KL散度的计算增加计算开销
PPO-截断
核心思想:直接在目标函数中限制新旧策略的比率范围,通过截断操作避免过大的策略更新,不用计算(显式)KL
目标函数设计
- clip操作:将重要性采样比例限制在
之间
- Min操作:取原始比例和截断比例中较小的值,确保策略更新幅度可控
分情况分析
- 当A > 0 ,(当前动作优于平均):比例增大,但不超过 1+
- 当A < 0 ,(当前动作劣于平均):比例减小,但不低于 1 -
缺点:需要经验设定,对极端值的处理可能不够灵活
举一个简单的例子:
假设在某个状态 ss 下:
-
旧策略选择动作 a的概率为0.5。
-
新策略计算出的概率为0.8。
-
优势函数 A=+1(动作 aa 优于平均)
PPO-惩罚:
-
计算 KL 散度
-
根据 KL 散度是否超出阈值 δ,调整 β。
-
目标函数中,若 KL 散度过大,β 增大,抑制新策略的偏离,惩罚偏离的越强
PPO--截断:简单高效
-
原始比率:0.8/0.5=1.6
-
Clip 操作:将比率限制在 1+ϵ=1.2
-
最终目标函数取 min(1.6×1,1.2×1)=1.2
➔ 实际更新幅度被限制为 1.2 倍,而非原始的 1.6 倍。
代码
由实验表明,PPO-截断比PPO-惩罚表现得更好
import gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import rl_utilsclass PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.epochs = epochs # 一条序列的数据用来训练轮数self.eps = eps # PPO中截断范围的参数self.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs):log_probs = torch.log(self.actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage # 截断actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()
车杆环境训练PPO
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda,epochs, eps, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()
到连续动作交互的环境-倒立摆,需要使用高斯分布
class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, stdclass PPOContinuous:''' 处理连续动作的PPO算法 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.epochs = epochsself.eps = epsself.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, sigma = self.actor(state)action_dist = torch.distributions.Normal(mu, sigma)action = action_dist.sample()return [action.item()]def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)rewards = (rewards + 8.0) / 8.0 # 和TRPO一样,对奖励进行修改,方便训练td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)mu, std = self.actor(states)action_dists = torch.distributions.Normal(mu.detach(), std.detach())# 动作是正态分布old_log_probs = action_dists.log_prob(actions)for _ in range(self.epochs):mu, std = self.actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantageactor_loss = torch.mean(-torch.min(surr1, surr2))critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()
actor_lr = 1e-4
critic_lr = 5e-3
num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0] # 连续动作空间
agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()mv_return = rl_utils.moving_average(return_list, 21)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()
总结
TRPO和PPO都是在线策略算法,需要注意的是即时优化目标中包含重要性采样的过程但是也只是用到了上一轮策略的数据,而不是所有策略的数据。