# [0628] Task04 DQN 算法及进阶

  • easy-rl PDF版本 笔记整理 P6 - P8
  • joyrl 比对 补充 P7 - P8
  • 相关 代码 整理 待整理 !!

在这里插入图片描述

最新版PDF下载
地址:https://github.com/datawhalechina/easy-rl/releases
国内地址(推荐国内读者使用)
链接: https://pan.baidu.com/s/1isqQnpVRWbb3yh83Vs0kbw 提取码: us6a

easy-rl 在线版本链接 (用于 copy 代码)
参考链接 2:https://datawhalechina.github.io/joyrl-book/

其它:
【勘误记录 链接】
——————
5、深度强化学习基础 ⭐️
开源内容:https://linklearner.com/learn/summary/11
——————————
蒙特卡洛: 要等到游戏结束。方差很大
时序差分: 不用等到游戏结束。更常用

不同的方法考虑了不同的假设,所以运算结果不同。

在这里插入图片描述
在这里插入图片描述

π ( a ∣ s ) = e Q ( s , a ) T ∑ a ′ ∈ A e Q ( s , a ′ ) T \pi(a|s)=\frac{e^{\frac{Q(s, a)}{T}}}{\sum\limits_{a^\prime\in \cal A}e^{\frac{Q(s, a^\prime)}{T}}} π(as)=aAeTQ(s,a)eTQ(s,a)

T T T 大, 探索
T T T 小, 利用

经验回放 异策略

最耗时的环节: 与环境互动。

在这里插入图片描述

DQN 和 其它 Q-learning 学习方法的区别:

DQN其它 Q-learning
神经网络 拟合 动作值 函数表格
经验回放下一状态的数据

————————

7 个技巧: 深度 Q 网络、双深度 Q 网络、优先级经验回放的双深度 Q 网络、竞争深度 Q 网络、异步优势演员-评论员算法(A3C)、分布式 Q 函数、噪声网络。

序号问题解决方案其它
1数据存储效率;泛化深度 Q 网络 (DQN)经验回放 + 目标网络
Q ( s t , a t ) ↔ r t + max ⁡ a Q ( s t + 1 , a ) Q(s_t,a_t)\leftrightarrow r_t+\max\limits_aQ(s_{t+1},a) Q(st,at)rt+amaxQ(st+1,a)
2Q 值往往被高估双深度 Q 网络 (double DQN,DDQN) Q ( s t , a t ) ↔ r t + Q ′ ( s t + 1 , arg ⁡ max ⁡ a Q ( s t + 1 , a ) ) Q(s_t,a_t)\leftrightarrow r_t+\textcolor{blue}{Q^\prime\Big(s_{t+1},\arg}\max\limits_aQ(s_{t+1},a)\textcolor{blue}{\Big)} Q(st,at)rt+Q(st+1,argamaxQ(st+1,a))
3竞争深度 Q 网络(dueling DQN)第一步计算一个与输入有关的标量 V(s);
第二步计算一个向量 A(s, a) 对应每一个动作。
最后的网络将两步的结果相加,得到最终需要的 Q 值。 Q(s, a) = V(s) + A(s, a)
4使用经验回放时,考虑数据间的权重大小优先级经验回放(prioritized experience replay,PER)
5蒙特卡洛:方差大多步方法在蒙特卡洛方法和时序差分方法中取得平衡
A3C
6网络的权重等参数加上一个高斯噪声噪声网络(noisy net)类似于 ε \varepsilon ε 贪心策略,但在同一个回合里面参数是固定的 以保证 看到同一状态采取同一动作
7分布式 Q 函数(distributional Q-function)将最终网络的输出的每一 类别的动作再进行分布操作。低估 奖励
彩虹(rainbow)

主网络: 更新参数, 选动作
目标网络: 某段时间固定, 计算值

在这里插入图片描述

在这里插入图片描述

7.6 分布式 Q 函数

不易实现

Q 函数是累积奖励的期望值,所以我们算出来的 Q 值其实是一个期望值

因为环境是有随机性的,所以在某一个状态采取某一个动作后,计算后续不同轨迹得到的奖励,得到的是一个分布。

在这里插入图片描述
对这个 分布 求均值 得到 Q 值。

这个分布 可能对于选取 最优策略有好处, 当直接用 均值 建模时,可能会丢失重要的信息。

在原来的 Q 函数里面,假设我们只能采取 a 1 a_1 a1 a 2 a_2 a2 a 3 a_3 a3 这 3 个动作,我们输入一个状态,输出 3 个值。这 3 个值分别代表 3 个动作的 Q 值,但是这些 Q 值是一个分布的期望值。所以分布式 Q 函数就是直接输出分布

假设分布的值就分布在某一个范围里面,比如 −10 ~ 10,把 −10 ~ 10 拆成一个一个的长条。例如,每一个动作的奖励空间拆成 5 个长条。假设奖励空间可以拆成 5 个长条,Q 函数的输出就是要预测我们在某一个状态采取某一个动作得到的奖励,其落在某一个长条里面的概率。所以绿色长条概率的和应该是 1,其高度代表在某一个状态采取某一个动作的时候,它落在某一个长条内的概率。绿色的代表动作 a 1 a_1 a1,红色的代表动作 a 2 a_2 a2,蓝色的代表动作 a 3 a_3 a3

实际上在做测试的时候,我们选平均值最大的动作执行

在这里插入图片描述

类似地, 也可对 动作值 进行分布建模。
如果 动作值 的分布方差很大,这代表采取这个动作虽然平均而言很不错,但也许风险很高,我们可以训练一个网络来规避风险。在两个动作平均值 (动作值)都差不多的情况下,也许可以选一个风险比较小 【方差小】的动作来执行,这就是分布式 Q 函数的好处。

7.7 rainbow

分数 取 中位数

在这里插入图片描述

异步优势演员-评论员(asynchronous advantage actor-critic,A3C)是演员-评论员的方法,A3C 算法又被译作 异步优势动作评价算法

在这里插入图片描述

为什么分布式深度 Q 网络会低估奖励呢?因为分布式深度 Q 网络输出的是一个分布的范围,输出的范围不可能是无限的,我们一定会设一个限制,比如最大输出范围就是从 −10 ~10。假设得到的奖励超过 10,比如 100 怎么办?我们就当作没看到这件事。

超出设定范围的奖励值被丢弃——> 分布式深度 Q 网络低估奖励

————————

DQN 比 策略梯度 稳定

DQN 比较容易训练的原因: 只要能够估计出 Q 函数,就保证一定可以找到一个 比较好的策略。

回归问题

难以处理连续动作

方案不足
采样动作采样次数有限,动作不一定精确
梯度上升运算量大;局部最优
设计网络架构函数不能随便设置。
AC演员-评论员方法 = PPO 【基于策略】+ DQN【基于价值】

在这里插入图片描述



JoyRL

梯度下降法 基于一个假设:训练集中的样本是独立同分布的。

避免训练的不稳定性

经验回放的容量不能太小,太小了会导致收集到的样本具有一定的局限性,也不能太大,太大了会失去经验本身的意义。

Double DQN 并不是每隔 C 步复制参数到目标网络,而是每次随机选择其中一个网络选择动作进行更新。

竞争Dueling DQN 算法: 优化神经网络的结构
在这里插入图片描述

在网络模型中增加噪声层

根据经验回放中的每个样本计算出的 TD 误差赋予对应的优先级,然后在采样的时候取出优先级较大的样本。

单纯根据 TD 误差进行优先采样有可能会错过对当前网络“信息量”更大的样本
过拟合

分布式 DQN 算法 C51 算法

【Code】OpenAI Gym 中的 CartPole-v0 车杆平衡 【连续状态空间】

参考链接 1: https://github.com/datawhalechina/joyrl-book/tree/main/notebooks
参考链接 2: https://github.com/datawhalechina/easy-rl/tree/master/notebooks

通过向左 (action=0) 或向右 (action=1) 推车让杆保持直立。每进行一个 step 就会给一个 +1 的 reward,如果无法保持平衡那么 done 等于 true,本次 episode 失败。

—— 基本信息查看

import gym env = gym.make('CartPole-v0')  # 创建环境
n_states = env.observation_space.shape[0]  # 状态数
n_actions = env.action_space.n  # 动作数
print(f"状态数:{n_states}, 动作数: {n_actions}")

在这里插入图片描述
状态数是 4 个,分别为车的位置、车的速度、杆的角度以及杆顶部的速度
动作数为 2 个,并且是离散的向左或者向右。

state = env.reset()  # 初始化环境
print(f"初始状态:{state}")

在这里插入图片描述

训练思路: 执行动作,环境反馈,智能体更新

交互采样 + 模型更新

本地环境:

pip install gymnasium==0.28.1
pip install gym==0.25.2

PyTorch 环境配置链接

在这里插入图片描述

DQN_2015

论文 链接: https://sci-hub.se/10.1038/nature14236

在这里插入图片描述

算法: 带 经验回放 的 deep Q-learning
初始化容量为 N N N 的回放存储 D D D
用随机权重 θ \theta θ 初始化 动作-值 函数 Q Q Q
用权重 θ − = θ \theta^-=\theta θ=θ 初始化 目标动作-值函数 Q ^ \hat Q Q^
遍历 episode = 1,M:
~~~~~~        初始化 序列 s 1 = { x 1 } s_1=\{x_1\} s1={x1},预处理序列 ϕ 1 = ϕ ( s 1 ) \phi_1=\phi(s_1)~~~~ ϕ1=ϕ(s1)     【预处理的目的:降低输入维数并处理 Atari 2600 模拟器的一些伪影。】
~~~~~~        对于 t = 1 , T t=1, ~T t=1, T:
~~~~~~~~~~~~              以概率 ε \varepsilon ε 选择随机动作 a t a_t at
~~~~~~~~~~~~              否则 a t = arg ⁡ max ⁡ a Q ( ϕ ( s t ) , a ; θ ) a_t=\arg\max\limits_aQ(\phi(s_t),a;\theta) at=argamaxQ(ϕ(st),a;θ)
~~~~~~~~~~~~              在仿真器 执行动作 a t a_t at,观察 奖励 r t r_t rt 和 图像 x t + 1 x_{t+1} xt+1
~~~~~~~~~~~~              s t + 1 = { s t , a t , x t + 1 } s_{t+1}=\{s_t,a_t,x_{t+1}\} st+1={st,at,xt+1},并预处理 ϕ t + 1 = ϕ ( s t + 1 ) \phi_{t+1}=\phi(s_{t+1}) ϕt+1=ϕ(st+1)
~~~~~~~~~~~~              将 transition ( ϕ t , a t , r t , ϕ t + 1 ) (\phi_t,a_t,r_t,\phi_{t+1}) (ϕt,at,rt,ϕt+1) 存到 D D D
~~~~~~~~~~~~              D D D 中随机抽样 小批次 transitions ( ϕ j , a j , r j , ϕ j + 1 ) (\phi_j,a_j,r_j,\phi_{j+1}) (ϕj,aj,rj,ϕj+1)
~~~~~~~~~~~~              y j = { r j 在第  j + 1 步回合终止 r j + γ max ⁡ a ′ Q ^ ( ϕ j + 1 , a ′ ; θ − ) 其它 ~~y_j=\left\{\begin{aligned}&r_j~~~~~~~~~~~~~在 第 ~j+1~步回合终止\\ &r_j+\gamma\max\limits_{a^\prime}\hat Q(\phi_{j+1}, a^\prime;\theta^-)&其它\end{aligned}\right.   yj= rj             在第 j+1 步回合终止rj+γamaxQ^(ϕj+1,a;θ)其它
~~~~~~~~~~~~              梯度下降 ( y j − Q ( ϕ j , a j ; θ ) ) 2 \Big(y_j-Q(\phi_j,a_j;\theta)\Big)^2 (yjQ(ϕj,aj;θ))2,针对网络参数 θ \theta θ
~~~~~~~~~~~~              C C C 步 重置 Q ^ = Q \hat Q=Q Q^=Q

test.py

————————————
✔ 版本 1

# pip install seaborn
# pip install gym==0.25.2import torch.backends
import torch.nn as nn 
import torch.nn.functional as F from collections import deque 
import random import torch 
import torch.optim as optim 
import math 
import numpy as np import gym 
import os import argparse
import matplotlib.pyplot as plt 
import seaborn as sns # 
###### 1、算法模块
# 定义 用于 拟合 Q 函数 的 神经网络  Q(s,a; θ) 输入状态, 输出动作
class MLP(nn.Module):def __init__(self, n_states, n_actions, hidden_dim=128): # hidden_dim=128 隐藏层维度super(MLP, self).__init__()self.fc1 = nn.Linear(n_states, hidden_dim)  # 输入层self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层self.fc3 = nn.Linear(hidden_dim, n_actions)  # 输出层 def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)# 定义 经验回放
class ReplayBuffer(object):def __init__(self, capacity):self.capacity = capacity self.buffer = deque(maxlen=self.capacity)def push(self, transitions):  # 存储  (s, a, r, s') 到 buffer 中self.buffer.append(transitions)def sample(self, batch_size, sequential=False):  # 抽取 小批次 buffer 数据用于 更新 主网络if batch_size > len(self.buffer):batch_size = len(self.buffer)if sequential:  # 顺序采样rand = random.randint(0, len(self.buffer) - batch_size)batch = [self.buffer[i] for i in range(rand, rand + batch_size)]return zip(*batch)else: # 随机采样batch = random.sample(self.buffer, batch_size)return zip(*batch)def __len__(self):   # 返回 buffer中 存储的经验 数量return len(self.buffer)class DQN:def __init__(self, model, memory, cfg): # cfg 定义了一些 超参数self.n_actions = cfg['n_actions']self.device = torch.device(cfg['device'])self.gamma = cfg['gamma']  # 奖励 的 折扣因子# ε 贪心策略 参数self.sample_count = 0  # 用于 ε 衰减计数self.epsilon = cfg['epsilon_start']self.epsilon_start = cfg['epsilon_start']self.epsilon_end = cfg['epsilon_end']self.epsilon_decay = cfg['epsilon_decay']self.batch_size = cfg['batch_size']self.policy_net = model.to(self.device)self.target_net = model.to(self.device)  # 复制参数到 目标网络for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(param.data)self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg['lr'])  # 优化器self.memory = memory  # 经验回放def sample_action(self, state):  # 状态 s 采取 动作 a 是否是 好的策略self.sample_count += 1# 让 ε 随着步数指数衰减。 权衡 探索和利用 self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \math.exp(-1. * self.sample_count / self.epsilon_decay)if random.random() > self.epsilon:  # ε 已经很小with torch.no_grad():state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)q_values = self.policy_net(state)action = q_values.max(1)[1].item()  # 选择 最大动作值 对应的动作else:action = random.randrange(self.n_actions)return action ## 用于 测试@torch.no_grad()def predict_action(self, state):  # 策略已学好。 遇到状态 s 应输出 astate = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)q_values = self.policy_net(state)  # 学到的 Q 函数网络 输出 相应的 q 值action = q_values.max(1)[1].item() return action def update(self):  # 基于数据  不断更新  Q函数 的拟合模型if len(self.memory) < self.batch_size:    # 缓存器 中样本数不足 一批,不更新策 return # 在 缓存器 中抽样 batch (s, a, r, s')state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)# 将数据 转为 tensor state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)done_batch = torch.tensor(np.float32(done_batch), device=self.device)q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch)next_q_values = self.target_net(next_state_batch).max(1)[0].detach() expected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1))  # 计算均方根损失# 优化self.optimizer.zero_grad()loss.backward()# Clip 防止 梯度爆炸for param in self.policy_net.parameters():param.grad.data.clamp_(-1,1)self.optimizer.step() ###### 2、训练模块 和 测试模块  定义
def train(cfg, env, agent):  # 训练模块print("------开始训练------")rewards = []for i_ep in range(cfg['train_eps']):ep_reward = 0   # 一个 回合 内的奖励state = env.reset(seed=cfg['seed'])   # 重置环境for _ in range(cfg['ep_max_steps']):action = agent.sample_action(state)  next_state, reward, done, _ = env.step(action)agent.memory.push((state, action, reward, next_state, done))state = next_state  # 更新下一个状态agent.update()  # 更新智能体ep_reward += reward  # 累加奖励if done:break if (i_ep + 1) % cfg['C'] == 0:  # 智能体 目标网络更新agent.target_net.load_state_dict(agent.policy_net.state_dict())rewards.append(ep_reward)if (i_ep + 1) % 10 == 0:  # 每 10 个 回合 打印print(f"回合: {i_ep + 1} % {cfg['train_eps']}, 奖励:  {ep_reward:.2f}, Epsolon: {agent.epsilon:.3f}")print("------  完成训练 ! ------")env.close() return {'rewards': rewards}def test(cfg, env, agent):  # 训练模块print("------ 开始测试: ------")rewards = []for i_ep in range(cfg['test_eps']):ep_reward = 0state = env.reset(seed=cfg['seed'])  # 重置环境     for _ in range(cfg['ep_max_steps']):env.render()  # pygame 将直接可视化action = agent.predict_action(state) next_state, reward, done, _ = env.step(action)ep_reward += reward   # 累加奖励if done:break rewards.append(ep_reward)print(f"回合: {i_ep + 1} / {cfg['test_eps']}, 奖励: {ep_reward :.2f}")print("------ 完成测试 ------")env.close()return {'rewards': rewards}  # 字典###### 3、环境
def all_seed(env, seed = 1):np.random.seed(seed)random.seed(seed)torch.manual_seed(seed)  # CPU torch.cuda.manual_seed(seed)  # GPU os.environ['PYTHONHASHSEED'] = str(seed)  # pythontorch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.backends.cudnn.enabled = False def env_agent_config(cfg):env = gym.make(cfg['env_name'])if cfg['seed'] != 0:all_seed(env, seed=cfg['seed'])n_states = env.observation_space.shape[0]n_actions = env.action_space.n print(f"状态数: {n_states}, 动作数: {n_actions}")cfg.update({"n_states": n_states, "n_actions": n_actions})  # 参数配置字典 更新model = MLP(n_states, n_actions, hidden_dim=cfg['hidden_dim'])memory = ReplayBuffer(cfg['memory_capacity'])agent = DQN(model, memory, cfg)return env, agent ###### 4、参数 设置  可视化模块
def get_args():  # 超参数parser = argparse.ArgumentParser(description="hyperparameters")parser.add_argument('--algorithm_name', default='DQN', type=str, help='name of algorithm')  # 算法名称parser.add_argument('--env_name', default='CartPole-v0', type=str, help="name of environment")parser.add_argument('--train_eps', default=200, type=int, help="episodes of training")parser.add_argument('--test_eps', default = 20, type=int, help="episodes of testing")parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")parser.add_argument('--batch_size',default=64,type=int)parser.add_argument('--C',default=4,type=int,help="the Q of target network update after C steps of a episode")parser.add_argument('--hidden_dim',default=256,type=int)parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda") parser.add_argument('--seed',default=10,type=int,help="seed") args = parser.parse_args([])args = {**vars(args)} # 打印 超参数print("超参数")print(''.join(['=']*80))tplt = "{:^20}\t{:^20}\t{:^20}"print(tplt.format("Name", "Value", "Type"))for k,v in args.items():print(tplt.format(k,v,str(type(v))))   print(''.join(['=']*80))      return argsdef smooth(data, weight=0.9): # 平滑曲线last = data[0]smoothed = []for point in data:smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值smoothed.append(smoothed_val)last = smoothed_valreturn smoothed  # 返回计算好的平滑数据def plot_rewards(rewards, cfg, tag='train'):  # 训练过程的奖励 可视化sns.set(style='whitegrid')  #  可设置 seaborn 绘图风格plt.figure()plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algorithm_name']} for {cfg['env_name']}") # 算法名称  环境名称plt.xlabel('episodes')plt.plot(rewards, label='rewards')plt.plot(smooth(rewards), label='smoothed')# plt.legend() plt.show() ###### 5、训练 和 测试cfg = get_args()  # 获取参数
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)plot_rewards(res_dic['rewards'], cfg, tag = "train")# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

待改版本

拟改成 gymnasium 版本, 目前版本仍有报错

import torch.nn as nn 
import torch.nn.functional as F from collections import deque 
import random import torch 
import torch.optim as optim 
import math 
import numpy as np import gymnasium as gym
import os import argparse
import matplotlib.pyplot as plt 
import seaborn as sns # 
###### 1、算法模块
# 定义 用于 拟合 Q 函数 的 神经网络  Q(s,a; θ) 输入状态, 输出动作
class MLP(nn.Module):def __init__(self, n_states, n_actions, hidden_dim=128): # hidden_dim=128 隐藏层维度super(MLP, self).__init__()self.fc1 = nn.Linear(n_states, hidden_dim)  # 输入层self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层self.fc3 = nn.Linear(hidden_dim, n_actions)  # 输出层 def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)# 定义 经验回放
class ReplayBuffer(object):def __init__(self, capacity):self.capacity = capacity self.buffer = deque(maxlen=self.capacity)def push(self, transitions):  # 存储  (s, a, r, s') 到 buffer 中self.buffer.append(transitions)def sample(self, batch_size, sequential=False):  # 抽取 小批次 buffer 数据用于 更新 主网络if batch_size > len(self.buffer):batch_size = len(self.buffer)if sequential:  # 顺序采样rand = random.randint(0, len(self.buffer) - batch_size)batch = [self.buffer[i] for i in range(rand, rand + batch_size)]return zip(*batch)else: # 随机采样batch = random.sample(self.buffer, batch_size)return zip(*batch)def __len__(self):   # 返回 buffer中 存储的经验/数据 数量return len(self.buffer)class DQN:def __init__(self, model, memory, cfg): # cfg 定义了一些 超参数self.n_actions = cfg['n_actions']self.device = torch.device(cfg['device'])self.gamma = cfg['gamma']  # 奖励 的 折扣因子# ε 贪心策略 参数self.sample_count = 0  # 用于 ε 衰减计数self.epsilon_start = cfg['epsilon_start']self.epsilon_end = cfg['epsilon_end']self.epsilon_decay = cfg['epsilon_decay']self.batch_size = cfg['batch_size']self.policy_net = model.to(self.device)self.target_net = model.to(self.device)  # 复制参数到 目标网络for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(param.data)self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg['lr'])  # 优化器self.memory = memory  # 经验回放def sample_action(self, state):  # 状态 s 执行 动作 a 是否是 好的策略self.sample_count += 1# 让 ε 随着步数指数衰减。 权衡 探索和利用 self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \math.exp(-1. * self.sample_count / self.epsilon_decay)if random.random() > self.epsilon:  # ε 已经很小with torch.no_grad():state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)q_values = self.policy_net(state)action = q_values.max(1)[1].item()  # 选择 最大动作值 对应的动作else:action = random.randrange(self.n_actions)return action ## 用于 测试@torch.no_grad()def predict_action(self, state):  # 策略已学好。 遇到状态 s 应输出 astate = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)q_values = self.policy_net(state)  # 学到的 Q 函数网络 输出 相应的 q 值action = q_values.max(1)[1].item() return action def update(self):  # 基于数据  不断更新  Q函数 的拟合模型if len(self.memory) < self.batch_size:    # 缓存器 中样本数不足 一批,不更新策 return # 在 缓存器 中抽样 batch (s, a, r, s')state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)# 将数据 转为 tensor state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)done_batch = torch.tensor(np.float32(done_batch), device=self.device)q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch)next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 最大的 qexpected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)  # done_batch 记录 是否是批次的结束。若是,要更新 主网络loss = nn.MSELoss()(q_values, expected_q_values)  # 计算均方根损失# 优化self.optimizer.zero_grad()loss.backward()# Clip 防止 梯度爆炸for param in self.policy_net.parameters():param.grad.data.clamp_(-1,1)self.optimizer.step() ###### 2、训练模块 和 测试模块  定义
def train(cfg, env, agent):  # 训练模块print("------开始训练------")rewards = []steps = []for i_ep in range(cfg['train_eps']):ep_reward = 0   # 一个 回合 内的奖励ep_step = 0 state, info = env.reset(seed=cfg['seed'])   # 重置环境for _ in range(cfg['ep_max_steps']):ep_step += 1action = agent.sample_action(state)  next_state, reward, terminated, truncated, info = env.step(action)agent.memory.push((state, action, reward, next_state, terminated))state = next_state  # 更新下一个状态agent.update()  # 更新智能体ep_reward += reward  # 累加奖励if terminated:break if (i_ep + 1) % cfg['target_update'] == 0:  # 智能体 目标网络更新  每 C 步 更新 目标网络agent.target_net.load_state_dict(agent.policy_net.state_dict())steps.append(ep_reward)rewards.append(ep_reward)if (i_ep + 1) % 10 == 0:  # 每 10 个 回合 打印print(f"回合: {i_ep + 1} % {cfg['train_eps']}, 奖励:  {ep_reward:.2f}, Epsolon: {agent.epsilon:.3f}")print("------  完成训练 ! ------")env.close() return {'rewards': rewards}def test(cfg, env, agent):  # 训练模块print("------ 开始测试: ------")# 测试时 不用 计步数了rewards = []for i_ep in range(cfg['test_eps']):ep_reward = 0state, info = env.reset(seed=cfg['seed'])   # 重置环境 for _ in range(cfg['ep_max_steps']):action = agent.predict_action(state) next_state, reward, terminated, truncated, info = env.step(action)ep_reward += reward   # 累加奖励if terminated:break rewards.append(ep_reward)print(f"回合: {i_ep + 1} / {cfg['test_eps']}, 奖励: {ep_reward :.2f}")print("------ 完成测试 ------")env.close()return {'rewards': rewards}  # 字典###### 3、环境
def all_seed(env, seed = 1):# env.seed(seed)np.random.seed(seed)random.seed(seed)torch.manual_seed(seed)  # CPU torch.cuda.manual_seed(seed)  # GPU os.environ['PYTHONHASHSEED'] = str(seed)  # pythontorch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.backends.cudnn.enabled = False def env_agent_config(cfg):env = gym.make(cfg['env_name'])if cfg['seed'] != 0:all_seed(env, seed=cfg['seed'])n_states = env.observation_space.shape[0]n_actions = env.action_space.n print(f"状态数: {n_states}, 动作数: {n_actions}")cfg.update({"n_states": n_states, "n_actions": n_actions})  # 参数配置字典 更新model = MLP(n_states, n_actions, hidden_dim=cfg['hidden_dim'])memory = ReplayBuffer(cfg['memory_capacity'])agent = DQN(model, memory, cfg)return env, agent ###### 4、参数 设置  可视化模块
def get_args():  # 超参数parser = argparse.ArgumentParser(description="hyperparameters")parser.add_argument('--algorithm_name', default='DQN', type=str, help='name of algorithm')  # 算法名称parser.add_argument('--env_name', default='CartPole-v1', type=str, help="name of environment")parser.add_argument('--train_eps', default=200, type=int, help="episodes of training")parser.add_argument('--test_eps', default = 20, type=int, help="episodes of testing")parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")parser.add_argument('--batch_size',default=64,type=int)parser.add_argument('--target_update',default=4,type=int)  # 伪代码里的 Cparser.add_argument('--hidden_dim',default=256,type=int)parser.add_argument('--device',default='cpu',type=str,help="CPU or GPU") parser.add_argument('--seed',default=10,type=int,help="seed") args = parser.parse_args([])args = {**vars(args)} # 打印 超参数print("超参数")print(''.join(['=']*80))tplt = "{:^20}\t{:^20}\t{:^20}"print(tplt.format("Name", "Value", "Type"))for k, v in args.items():print(tplt.format(k,v,str(type(v))))   print(''.join(['=']*80))      return argsdef smooth(data, weight=0.9): # 平滑曲线last = data[0]smoothed = []for point in data:smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值smoothed.append(smoothed_val)last = smoothed_valreturn smoothed  # 返回计算好的平滑数据def plot_rewards(rewards, cfg, tag='train'):  # 训练过程的奖励 可视化sns.set(style='whitegrid') #  可设置 seaborn 绘图风格plt.figure()plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algorithm_name']} for {cfg['env_name']}") # 算法名称  环境名称plt.xlabel('episodes')plt.plot(rewards, label='rewards')plt.plot(smooth(rewards), label='smoothed')plt.show() ###### 5、训练 和 测试cfg = get_args()  # 获取参数
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)plot_rewards(res_dic['rewards'], cfg, tag = "train")# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")

▢ Double DQN_2016

论文 PDF 链接:Deep Reinforcement Learning with Double Q-learning

来自 https://arxiv.org/pdf/1511.06581 的伪代码

在这里插入图片描述

算法: Double DQN
输入:空的回放缓存 replay buffer D \cal D D、初始网络权重 θ \theta θ、目标网络权重 θ − = θ \theta^-=\theta θ=θ
~~~~~~~~~           回放缓存 replay buffer 的最大容量 N r N_r Nr,训练 batch-size N b N_b Nb 、目标网络的替换频次 N − N^- N
遍历 episode e ∈ { 1 , 2 , ⋯ , M } e\in\{1, 2,\cdots,M\} e{1,2,,M}
~~~~~~        初始化 帧序列 x ← ( ) \bf x\leftarrow() x()
~~~~~~        对于 t ∈ { 0 , 1 , ⋯ } t\in\{0,1, \cdots\} t{0,1,}:
~~~~~~~~~~~~              令 状态 s ← x s\leftarrow\bf x sx,抽样动作 a ∼ π B a\sim\pi_{\cal B}~~~ aπB    【必须从 buffer里抽取?——>模拟 均匀分布】
~~~~~~~~~~~~              给定 ( s , a ) (s, a) (s,a), 根据 E \mathcal{E} E 抽取下一个环境 x t x^t xt, 得到奖励 r r r, 将 x t x^t xt 添加到 x \bf x x 中。
~~~~~~~~~~~~              如果 ∣ x ∣ > N f |{\bf x}|>N_f x>Nf, 删掉 x {\bf x} x 中最旧的帧 x t min ⁡ x_{t_{\min}}~~~ xtmin    【设的阈值?】
~~~~~~~~~~~~              s ′ ← x s^\prime\leftarrow\bf x sx, 将 ( s , a , r , s ′ ) (s, a, r, s^\prime) (s,a,r,s) 添加到 D \cal D D,若是 ∣ D ∣ ≥ N r |{\cal D}|\geq N_r DNr, 替换最旧的数据
~~~~~~~~~~~~              抽样长度为 N b N_b Nb 的小批量元组 ( s , a , r , s ′ ) ∼ U n i f ( D ) (s, a,r,s^\prime)\sim {\rm Unif}(\cal D)~~~~ (s,a,r,s)Unif(D)     【近似 均匀分布】
~~~~~~~~~~~~             对于长度为 N b N_b Nb 的元组中的每一个样本计算 目标值
~~~~~~~~~~~~              y j = { r s ′ 是  e p i s o d e 终点 r + γ Q ( s ′ , arg ⁡ max ⁡ a ′ Q ( s ′ , a ′ ; θ ) ; θ − ) 其它 ~~y_j=\left\{\begin{aligned}&r ~~~~~~~~~~~~s^\prime ~是~{\rm episode}~ 终点\\ &r+\gamma Q\Big(s^\prime,\arg\max\limits_{a^\prime}Q(s^\prime, a^\prime;\theta);\theta^-\Big)~~~~~~其它\end{aligned}\right.   yj= r            s  episode 终点r+γQ(s,argamaxQ(s,a;θ);θ)      其它
~~~~~~~~~~~~              对损失 ∣ ∣ y j − Q ( s , a ; θ ) ∣ ∣ 2 ~||y_j-Q(s,a;\theta)||^2~  ∣∣yjQ(s,a;θ)2  梯度下降
~~~~~~~~~~~~              N − N^- N 步 替换目标参数 θ − ← θ ~\theta^-\leftarrow\theta  θθ

$\mathcal{E}$ E ~~~\mathcal{E}    E

test.py

在这里插入代码片

▢ 竞争 Dueling DQN 算法_2016

论文 PDF 链接: Dueling Network Architectures for Deep Reinforcement Learning

Q ( s , a ; θ , α , β ) = V ( s ; θ , β ) + ( A ( s , a ; θ , α ) − 1 ∣ A ∣ ∑ a ′ A ( s , a ′ ; θ , α ) ) Q(s,a;\theta,\alpha,\beta)=V(s;\theta,\beta)+\Big(A(s,a;\theta,\alpha)-\frac{1}{|\mathcal A|}\sum\limits_{a^\prime}A(s,a^\prime;\theta,\alpha)\Big) Q(s,a;θ,α,β)=V(s;θ,β)+(A(s,a;θ,α)A1aA(s,a;θ,α))

  • θ \theta θ 是卷积层参数。 α \alpha α β \beta β 分别是两个输出全连接层的参数
  • 状态-动作值函数 Q ( s , a ; θ , α , β ) Q(s,a;\theta,\alpha,\beta) Q(s,a;θ,α,β) 是真实 Q Q Q 函数的参数化估计
  • V ( s ; θ , β ) V(s;\theta,\textcolor{blue}{\beta}) V(s;θ,β) 不是 状态值 函数好的估计
  • 取决于状态的动作优势函数 A ( s , a ; θ , α ) A(s,a;\theta,\textcolor{blue}{\alpha}) A(s,a;θ,α) 也不是 优势函数的合理估计。每个行动重要性的度量

▢ Noisy DQN_2018

论文 PDF 链接:Noisy Networks for Exploration

在这里插入图片描述

▢ PER DQN_2016

论文 PDF 链接:Prioritized Experience Replay

▢ 分布式 DQN_2017

论文 PDF 链接:A Distributional Perspective on Reinforcement Learning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/363535.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++笔记:实现一个字符串类(构造函数、拷贝构造函数、拷贝赋值函数)

实现一个字符串类String&#xff0c;为其提供可接受C风格字符串的构造函数、析构函数、拷贝构造函数和拷贝赋值函数。 声明依赖文件 其中ostream库用于打印标准输入输出&#xff0c;cstring库为C风格的字符串库 #include <iostream> #include <cstring> 声明命…

基于lio-sam的重定位和增量式建图

文章目录 遇到的问题解决思路预览效果详细过程预先构建地图订阅初始估计姿态加载全局地图ICP配准计算初始位姿参考遇到的问题 为了复用上个生命周期录制的轨迹,我需要用到重定位功能,现有的开源方案中,可以实现该功能,但存在以下问题:在预先构建的地图之外,无法实现定位…

车辆轨迹预测系列 (五):Argoverse API Forecasting Tutorial代码解析

车辆轨迹预测系列 (五)&#xff1a;Argoverse API Forecasting Tutorial代码解析 文章目录 车辆轨迹预测系列 (五)&#xff1a;Argoverse API Forecasting Tutorial代码解析一、argoverse.data_loading.argoverse_forecasting_loader二、argoverse.visualization.visualize_seq…

标准版小程序订单中心path审核不通过处理教程

首先看自己小程序是不是已经审核通过并上线状态才在站内信里面提醒的&#xff1f; 如果没有提交过审核&#xff0c;请在提交的时候填写。path地址为&#xff1a;pages/goods/order_list/index 如果是已经上线的小程序&#xff0c;当时没要求填这个&#xff0c;但新的政策要求填…

深度学习 --- stanford cs231学习笔记七(训练神经网络之梯度下降优化器)

5&#xff0c;梯度下降优化器 5&#xff0c;1 梯度下降在深度学习中的作用 在深度学习中&#xff0c;权重W的值是否合理是由损失函数L来判断的。L越小&#xff0c;表示W的设置越happy。L越大&#xff0c;表示W的值越unhappy。 为了让L越来越小&#xff0c;常用的方法是梯度下降…

华为5288 V5服务器安装BCLinux8U4手记

本文记录了华为5288 V5服务器安装BCLinux8U4操作系统的过程。 一、系统环境 1、服务器 华为FusionServer Pro 5288 V5服务器 2、操作系统 BCLinux-R8-U4-Server-x86_64-220725.iso 官网下载地址 sha256sum&#xff1a;1d31d3b8e02279e89965bd3bea61f14c65b9d32ad2ab6d4eb…

【PyTorch函数解析】einsum的用法示例

一、前言 einsum 是一个非常强大的函数&#xff0c;用于执行张量&#xff08;Tensor&#xff09;运算。它的名称来源于爱因斯坦求和约定&#xff08;Einstein summation convention&#xff09;&#xff0c;在PyTorch中&#xff0c;einsum 可以方便地进行多维数组的操作和计算…

GPT-5:AI新纪元的领航者,多维度的审视与准备

一、引言&#xff1a;GPT-5与AI的多维演进 GPT-5作为AI领域的里程碑式突破&#xff0c;不仅仅代表了技术的飞跃&#xff0c;更预示着社会、文化以及经济等多个层面的深刻变革。从技术的角度看&#xff0c;GPT-5代表着AI在自然语言处理领域的最新高度&#xff1b;而从更宽广的视…

Linux双网卡默认路由的metric设置不正确,导致SSH连接失败问题定位

测试环境 VMware虚拟机 RockyLinux 9 x86_64 双网卡&#xff1a;eth0(访问外网): 10.206.216.92/24; eth1(访问内网) 192.168.1.4/24 问题描述 虚拟机重启后&#xff0c;SSH连接失败&#xff0c;提示"Connection time out"&#xff0c;重启之前SSH连接还是正常的…

音视频入门基础:H.264专题(8)——H.264官方文档的描述符

音视频入门基础&#xff1a;H.264专题系列文章&#xff1a; 音视频入门基础&#xff1a;H.264专题&#xff08;1&#xff09;——H.264官方文档下载 音视频入门基础&#xff1a;H.264专题&#xff08;2&#xff09;——使用FFmpeg命令生成H.264裸流文件 音视频入门基础&…

Java代码基础算法练习-删除有序数组中的重复项-2024.05.07

任务描述&#xff1a; 有一批同学需要计算各自的出生年月是否闰年。请使用算法计算出他们的出生年份是否闰年。 解决思路&#xff1a; 如果要一次性输出结果&#xff0c;就是先输入数字n&#xff0c;确定首先循环几次&#xff0c;在每次循环中进行闰年判断操作&#xff0c;每次…

RK3588/算能/Nvidia智能盒子:[AI智慧油站」,以安全为基,赋能精准经营

2021年9月&#xff0c;山东省应急管理厅印发了关于《全省危险化学品安全生产信息化建设与应用工作方案&#xff08;2021-2022 年&#xff09;》的通知&#xff0c;要求全省范围内加快推进危险化学品安全生产信息化、智能化建设与应用工作&#xff0c;建设完善全省危险化学品安全…

遥感数据并行运算(satellite remote sensing data parallell processing)

文章内容仅用于自己知识学习和分享&#xff0c;如有侵权&#xff0c;还请联系并删除 &#xff1a;&#xff09; 之前不太会用&#xff0c;单纯想记录一下&#xff0c;后面或许还会用到 1. 教程 [1] Pleasingly Parallel Programming: link 1.1 处理器&#xff0c;核和线程 …

山东水利职业学院空调集控系统案例,节能减排、降低维护成本

日常在公共办公场所使用空调时&#xff0c;人离开办公室空调依然开着&#xff0c;由于适用空调的不良行为导致能源浪费。良好的学习环境是保持学生好的学习状态的前提条件&#xff0c;让学生在炎热的夏季都能享受到舒适的室内空气环境是很重要的&#xff0c;对空调集中管理&…

ASUS/华硕天选Air 2021 FX516P系列 原厂win10系统

安装后恢复到您开箱的体验界面&#xff0c;带原机所有驱动和软件&#xff0c;包括myasus mcafee office 奥创等。 最适合您电脑的系统&#xff0c;经厂家手调试最佳状态&#xff0c;性能与功耗直接拉满&#xff0c;体验最原汁原味的系统。 原厂系统下载网址&#xff1a;http:…

Spring Clude 是什么?

目录 认识微服务 单体架构 集群和分布式架构 集群和分布式 集群和分布式区别和联系 微服务架构 分布式架构&微服务架构 微服务的优势和带来的挑战 微服务解决方案- Spring Cloud 什么是 Spring Cloud Spring Cloud 版本 Spring Cloud 和 SpringBoot 的关系 Sp…

深度学习 —— 1.单一神经元

深度学习初级课程 1.单一神经元2.深度神经网络3.随机梯度下降法4.过拟合和欠拟合5.剪枝、批量标准化6.二分类 前言 本套课程仍为 kaggle 课程《Intro to Deep Learning》&#xff0c;仍按之前《机器学习》系列课程模式进行。前一系列《Keras入门教程》内容&#xff0c;与本系列…

STM32 IWDG(独立看门狗)

1 IWDG简介 STM32有两个看门狗&#xff1a;一个是独立看门狗&#xff08;IWDG&#xff09;&#xff0c;另外一个是窗口看门狗。独立看门狗也称宠物狗&#xff0c;窗口看门狗也称警犬。本文主要分析独立看门狗的功能和它的应用。 独立看门狗用通俗一点的话来解释就是一个12位的…

在Ubuntu上安装VNC服务器教程

Ubuntu上安装VNC服务器方法&#xff1a;按照root安装TeactVnc&#xff0c;随后运行vncserver输入密码&#xff0c;安装并打开RickVNC客户端&#xff0c;输入服务器的IP&#xff0c;最后连接输入密码即可。 VNC或虚拟网络计算&#xff0c;可让您连接到远程Linux / Unix服务器的…

udp Socket组播 服务器

什么是组播 组播也可以称之为多播这也是 UDP 的特性之一。组播是主机间一对多的通讯模式&#xff0c;是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将一份报文发送到特定的组播地址&#xff0c;组播地址不同于单播地址&#xff0c;它并不属于特定某个主机…