强化学习DQN实践(gymnasium+pytorch)

Pytorch官方教程中有强化学习教程,但是很多中文翻译都太老了,里面的代码也不能跑了
这篇blog按照官方最新教程实现,并加入了一些个人理解

工具

  • gymnasium:由gym升级而来,官方定义:An API standard for reinforcement learning with a diverse collection of reference environments。提供强化学习的“环境”
    • pip install gymnasium
  • pytorch

任务

倒立摆模型,使用强化学习控制小车来使倒立摆稳定,有小车向左和向右两个action
在这里插入图片描述
gym中提供了很多类似的强化学习“环境”

代码

准备工作

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import countimport torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as Fenv = gym.make("CartPole-v1")# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:from IPython import displayplt.ion()# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else"mps" if torch.backends.mps.is_available() else"cpu"
)

定义transition,包括state,action,next_state,reward:

Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))class ReplayMemory(object):'''样本经验池,用于存储过往的transition,这些信息会被用来训练模型'''def __init__(self, capacity):self.memory = deque([], maxlen=capacity)def push(self, *args):"""Save a transition"""self.memory.append(Transition(*args))def sample(self, batch_size):return random.sample(self.memory, batch_size)def __len__(self):return len(self.memory)

DQN算法

class DQN(nn.Module):def __init__(self, n_observations, n_actions):super(DQN, self).__init__()self.layer1 = nn.Linear(n_observations, 128)self.layer2 = nn.Linear(128, 128)self.layer3 = nn.Linear(128, n_actions)# Called with either one element to determine next action, or a batch# during optimization. Returns tensor([[left0exp,right0exp]...]).def forward(self, x):x = F.relu(self.layer1(x))x = F.relu(self.layer2(x))return self.layer3(x)

根据上篇强化学习理论blog,强化学习的流程是通过贝尔曼最优公式得到最优策略 π ( s ) \pi(s) π(s),求解需要知道reward函数 r ( s , a ) r(s,a) r(s,a),action value函数 Q ( s , a ) Q(s,a) Q(s,a),其中reward函数可以自行定义,action value如何得到?
在Q-learning算法中,设定策略为贪心策略,即每次都是选择action value最大的action执行,这样就可以将原本原贝尔曼公式中的state value变为action value中,将 V ( s ′ ) V(s^{\prime}) V(s) Q π ( s ′ , π ( s ′ ) ) Q^{\pi}(s^{\prime},\pi (s^{\prime})) Qπ(s,π(s))来代替:
Q π ( s , a ) = r + γ Q π ( s ′ , π ( s ′ ) ) Q^{\pi}(s,a) = r + \gamma Q^{\pi}(s^{\prime},\pi (s^{\prime})) Qπ(s,a)=r+γQπ(s,π(s))
实际更新流程变为:

  • 初始化 Q 0 Q_0 Q0
  • 使用结果估计 Q ~ ( s , a ) = r + γ Q 0 ( s ′ , π ( s ′ ) ) \tilde{Q}(s,a) = r + \gamma Q_0(s^{\prime},\pi (s^{\prime})) Q~(s,a)=r+γQ0(s,π(s))
  • 计算误差 δ \delta δ δ = Q 0 ( s , a ) − Q ~ ( s , a ) \delta = Q_0(s,a)-\tilde{Q}(s,a) δ=Q0(s,a)Q~(s,a)
  • 使用损失函数 L L L Δ = L ( δ ) \Delta = L(\delta) Δ=L(δ)
  • 更新 Q Q Q Q 1 = Q 0 − α Δ Q_1 = Q_0-\alpha \Delta Q1=Q0αΔ

经过迭代后便会收敛到最优策略。
DNQ将Q-learning中的Q表改进为了神经网络,对于连续状态的环境更合适。而且会将过往的决策过程记录下来维护一个样本池,方便一次更新多个样本,而不是按时间一次次运行更新,这样回放机制就会减少应用于高度相关的状态序列时因为前后样本存在关联导致的强化学习震荡和发散的问题。

强化学习分类:
通过学习目标分类:

  • 基于价值的方法,训练agent学习行为价值函数,隐式学习了策略,如Q-learning
  • 基于策略的方法,训练agent直接学习策略
  • 把 value-based 和 policy-based 结合起来就是 演员-评论家(Actor-Critic)方法。这一类 agent 需要显式地学习价值函数和策略。如DDPG

通过交互策略和更新策略间的关系分类

  • on-policy方法:目标策略和交互策略是同一个策略 π ( s ) \pi(s) π(s)
  • off-policy方法:使用一种行为策略 μ ( s ) \mu(s) μ(s)来与环境交互,学习的目标策略是 π ( s ) \pi(s) π(s)

训练准备

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA 折扣因子,用于计算discounted
# EPS_START:随机选择action的概率初始值。
# EPS_END 随机选择action的概率末尾值
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR 优化器的学习率
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)steps_done = 0def select_action(state):'''选择一个action,随机选择or使用模型输出,这么做是为了能够全面学习所有行为,而不陷入局部最优随着迭代随机选择的概率会逐渐减小至EPS_END'''global steps_donesample = random.random()eps_threshold = EPS_END + (EPS_START - EPS_END) * \math.exp(-1. * steps_done / EPS_DECAY)steps_done += 1if sample > eps_threshold:with torch.no_grad():# t.max(1) will return the largest column value of each row.# second column on max result is index of where max element was# found, so we pick action with the larger expected reward.return policy_net(state).max(1).indices.view(1, 1)else:return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)episode_durations = []def plot_durations(show_result=False):'''画过去的随机概率的变化过程'''plt.figure(1)durations_t = torch.tensor(episode_durations, dtype=torch.float)if show_result:plt.title('Result')else:plt.clf()plt.title('Training...')plt.xlabel('Episode')plt.ylabel('Duration')plt.plot(durations_t.numpy())# Take 100 episode averages and plot them tooif len(durations_t) >= 100:means = durations_t.unfold(0, 100, 1).mean(1).view(-1)means = torch.cat((torch.zeros(99), means))plt.plot(means.numpy())plt.pause(0.001)  # pause a bit so that plots are updatedif is_ipython:if not show_result:display.display(plt.gcf())display.clear_output(wait=True)else:display.display(plt.gcf())

train loop

def optimize_model():'''模型更新函数,从memory中抽取一个batch,然后更新'''if len(memory) < BATCH_SIZE:returntransitions = memory.sample(BATCH_SIZE)# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for# detailed explanation). This converts batch-array of Transitions# to Transition of batch-arrays.batch = Transition(*zip(*transitions))# Compute a mask of non-final states and concatenate the batch elements# (a final state would've been the one after which simulation ended)non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)), device=device, dtype=torch.bool)non_final_next_states = torch.cat([s for s in batch.next_stateif s is not None])state_batch = torch.cat(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)# 计算Q(s,a),模型输出是每个action的概率,根据这个输出到action_batch中获取action value# These are the actions which would've been taken# for each batch state according to policy_netstate_action_values = policy_net(state_batch).gather(1, action_batch)# Compute V(s_{t+1}) for all next states.# Expected values of actions for non_final_next_states are computed based# on the "older" target_net; selecting their best reward with max(1).values# This is merged based on the mask, such that we'll have either the expected# state value or 0 in case the state was final.next_state_values = torch.zeros(BATCH_SIZE, device=device)with torch.no_grad():next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values# Compute the expected Q valuesexpected_state_action_values = (next_state_values * GAMMA) + reward_batch# Compute Huber losscriterion = nn.SmoothL1Loss()loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))# Optimize the modeloptimizer.zero_grad()loss.backward()# In-place gradient clippingtorch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)optimizer.step()if torch.cuda.is_available() or torch.backends.mps.is_available():num_episodes = 600
else:num_episodes = 50for i_episode in range(num_episodes):# Initialize the environment and get its statestate, info = env.reset()state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)for t in count():action = select_action(state)observation, reward, terminated, truncated, _ = env.step(action.item())reward = torch.tensor([reward], device=device)done = terminated or truncatedif terminated:next_state = Noneelse:next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)# Store the transition in memorymemory.push(state, action, next_state, reward)# Move to the next statestate = next_state# Perform one step of the optimization (on the policy network)optimize_model()# 软更新,即老的权重使用新的权重进行加权更新# θ′ ← τ θ + (1 −τ )θ′target_net_state_dict = target_net.state_dict()policy_net_state_dict = policy_net.state_dict()for key in policy_net_state_dict:target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)target_net.load_state_dict(target_net_state_dict)if done:episode_durations.append(t + 1)plot_durations()breakprint('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

Ref

强化学习基本原理
Pytorch官方tutorial

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/461137.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu22.04安装向日葵

1、下载deb安装包 进入官网下载图形版本&#xff1a;https://sunlogin.oray.com/download/linux?typepersonal 2、命令行安装 sudo chmod x 文件名.deb sudo dpkg -i 文件名.deb 3、开始报错的看这里&#xff01; 首先展示一下安装成功的效果图&#xff1a; 接下来是我安…

Vuestic 数据表格 使用demo

<template><br><div class"grid sm:grid-cols-3 gap-6 mb-6"><VaButton click"()>{for(const it in this.selectedItems){console.log(this.selectedItems);}}">参数设置</VaButton><VaButton>参数刷新</VaButt…

深入了解 美国高防 CN2 :如何提升全球化业务的网络安全与性能

美国高防 CN2 的重要性 在跨国企业和全球化业务的不断扩展下&#xff0c;对高性能和安全的网络连接需求不断增加。美国高防 CN2&#xff08;Global Internet Access&#xff09;以其卓越的跨境传输效率和强大的防护能力&#xff0c;成为许多企业关注的焦点。尤其是对电商、游戏…

NVR批量管理软件/平台EasyNVR多个NVR同时管理支持视频投放在电视墙上

在当今智能化、数字化的时代&#xff0c;视频监控已经成为各行各业不可或缺的一部分&#xff0c;无论是公共安全、交通管理、企业监控还是智慧城市建设&#xff0c;都离不开高效、稳定的视频监控系统的支持。而在这些应用场景中&#xff0c;将监控视频实时投放到大屏幕电视墙上…

新材料产业数据管理:KPaaS平台的创新驱动

近日&#xff0c;工业和信息化部、财政部、国家数据局联合印发《新材料大数据中心总体建设方案》&#xff08;以下简称《建设方案》&#xff09;&#xff0c;为新材料产业的发展注入了强大动力。该方案规划清晰&#xff0c;目标明确&#xff0c;旨在充分发挥大数据、人工智能对…

AI代币是什么?AI与Web3结合的未来方向在哪里?

近两年随着人工智能的崛起&#xff0c;AI已经渗透到制造业、电商、广告、医药等各个行业&#xff0c;加密货币领域也不例外&#xff0c;人工智能与区块链的融合&#xff0c;让我们看到了独特的数字资产 — AI加密代币。 它的流行始于2022年底&#xff0c;随着OpenAI智能聊天机…

关于springboot跨域与拦截器的问题

今天写代码的时候遇到的一个问题&#xff0c;在添加自己设置的token拦截器之后&#xff0c;报错&#xff1a; “ERROR Network Error AxiosError: Network Error at XMLHttpRequest.handleError (webpack-internal:///./node_modules/axios/lib/adapters/xhr.js:112:14) at Axi…

基于微信小程序实现信阳毛尖茶叶商城系统设计与实现

作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参与学生毕业答辩指导&#xff0c;…

FPGA开发verilog语法基础1

文章目录 主体内容1.1 逻辑值1.2 数字进制格式1.3 数据类型1.3.1 寄存器类型1.3.2 线网类型1.3.3 参数类型1.3.4 存储器类型 参考资料 主体内容 1.1 逻辑值 1&#xff0c;逻辑0&#xff0c;表示低电平 2&#xff0c;逻辑1&#xff0c;表示高电平 3&#xff0c;逻辑X&#xff0…

Java阶段三02

第3章-第2节 一、知识点 面向接口编程、什么是spring、什么是IOC、IOC的使用、依赖注入 二、目标 了解什么是spring 理解IOC的思想和使用 了解IOC的bean的生命周期 理解什么是依赖注入 三、内容分析 重点 了解什么是spring 理解IOC的思想 掌握IOC的使用 难点 理解IO…

Android Preference浅析(设置Setting)

各位&#xff0c;好久不见&#xff0c;最近时间较为充裕&#xff0c;更新一下博客。 本篇在我的理解、认识范围内&#xff0c;讲述一下Android中的Preference&#xff08;破粉斯~&#xff09;这玩意&#xff0c;常用于项目中的设置模块中。在工作中我也主要负责了设置模块相关…

鸿道Intewell操作系统架构介绍之Intewell-Hyper I 虚拟化构型

鸿道Intewell-Hyper I 虚拟化构型是鸿道Intewell-V虚拟化架构下的构型体系&#xff01;鸿道Intewell-V是科东软件自主研发的实时虚拟化操作系统&#xff0c;包括鸿道Intewell-Hyper I 和鸿道Intewell-Hyper II。鸿道Intewell-V可以实现多个操作系统在同一物理硬件上并行运行&am…

讲一讲 kafka 的 ack 的三种机制?

大家好&#xff0c;我是锋哥。今天分享关于【K讲一讲 kafka 的 ack 的三种机制&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; 讲一讲 kafka 的 ack 的三种机制&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Kafka的消息确认机制&…

多租户系统的应用架构

大家好&#xff0c;我是汤师爷~ 我们看下多租户系统的应用架构是如何从一层层构建起来的。 1、应用层设计 应用层的主要作用是为具体的用户场景提供应用服务&#xff0c;帮助用户在特定场景下完成操作。通过编排领域层的各项能力&#xff0c;实现SaaS产品的核心功能。应用层包…

波兰喜嘎嘎

之前做的一个项目&#xff0c;需要用c写一个服务去访问和控制硬件。这个服务是同事写的&#xff0c;今年年中离职了&#xff0c;很自然地&#xff0c;轮到我接手。 一、认知 我捣鼓了几天&#xff0c;勉强读懂一点原来的代码&#xff0c;并在原来基础上&#xff0c;做了一些修…

基于LORA的一主多从监测系统_4G模块上巴法云

临时添加一个更新&#xff0c;更换云平台为巴法云&#xff0c;事情的起因是因为阿里云这个老六&#xff0c;早上睡了一觉起来发短信告诉我云平台给我停了&#xff0c;得交钱&#xff0c;好嘛&#xff0c;不过也没办法现在这基本都收费&#xff0c;当然还有onenet可以用&#xf…

.NET Core WebApi第4讲:控制器、路由

一、控制器是什么&#xff1f; 1、创建一个空的API控制器&#xff1a;TestController.cs 2、里面有一个类叫TestController&#xff0c;把它叫做控制器 因为它继承了ControllerBase类&#xff0c;ControllerBase类里提供了一系列的方法&#xff0c;使得TestController这个类具…

Java面试经典 150 题.P55. 跳跃游戏(009)

本题来自&#xff1a;力扣-面试经典 150 题 面试经典 150 题 - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台https://leetcode.cn/studyplan/top-interview-150/ 题解&#xff1a; class Solution {public boolean canJump(int[] nums) {int…

计算机网络网络层笔记

互联网提供的两种服务 1.虚电路服务 2.数据报服务 需要记住的是现在只用第二种也就是数据报服务 网际协议IP 物理层的中断系统:转发器(hub) 链路层的中断系统:交换机 网络层的中断系统:路由器 网络层以上:网关 如上图所示,网关是用来访问其他的网段的一个接口,网关的地…