强化学习中的Actor-Critic算法是一种普遍经常用到的经典算法,Actor-Critic 翻译成“演员—评论家”方法。策略网络 π(a|s;θ) 相当于演员,它基于状态 s 做出动作 a。价值网络 q(s,a;w) 相当于评论家,它给演员的表现打分,量化在状态 s的情况下做出动作 a 的好坏程度。策略网络(演员)和价值网络(评委)的关系如下图所示。
A2C算法在Github上有许多,其中比较经典的如下:
minimalRL/a2c.py at master · seungeunrho/minimalRL · GitHubImplementations of basic RL algorithms with minimal lines of codes! (pytorch based) - minimalRL/a2c.py at master · seungeunrho/minimalRLhttps://github.com/seungeunrho/minimalRL/blob/master/a2c.py
但该源代码没有任何注释,对要学习的小白同学很不友好,因此尝试采用ChatGPT对该代码添加了完整的注释,通过读这些注释,对大家理解A2C及其他衍射算法的实现必然带来帮助。
下面附上代码注释讲解
1、导入必要的库
import gym # 导入gym库,用于创建和管理强化学习环境
import torch # 导入torch库,用于搭建和训练神经网络
import torch.nn as nn # 导入torch.nn模块,用于定义神经网络的层和损失函数
import torch.nn.functional as F # 导入torch.nn.functional模块,用于定义激活函数和其他操作
import torch.optim as optim # 导入torch.optim模块,用于定义优化器
from torch.distributions import Categorical # 导入Categorical类,用于创建离散分布
import torch.multiprocessing as mp # 导入torch.multiprocessing模块,用于实现多进程通信
import numpy as np # 导入numpy库,用于处理数组和矩阵
2、定义参数
#Hyperparameters
n_train_processes = 3 # 定义训练进程的数量
learning_rate = 0.0002 # 定义学习率
update_interval = 5 # 定义更新间隔,即每隔多少个时间步更新一次网络参数
gamma = 0.98 # 定义折扣因子,即未来奖励的衰减系数
max_train_steps = 60000 # 定义最大训练步数,即训练结束的条件
PRINT_INTERVAL = update_interval * 100 # 定义打印间隔,即每隔多少个时间步打印一次训练信息
3、定义AC类
class ActorCritic(nn.Module): # 定义ActorCritic类,继承自nn.Module类 def init(self): # 定义初始化方法 super(ActorCritic, self).init() # 调用父类的初始化方法 self.fc1 = nn.Linear(4, 256) # 定义一个全连接层,输入维度为4(状态空间的大小),输出维度为256(隐藏层的大小) self.fc_pi = nn.Linear(256, 2) # 定义一个全连接层,输入维度为256(隐藏层的大小),输出维度为2(动作空间的大小) self.fc_v = nn.Linear(256, 1) # 定义一个全连接层,输入维度为256(隐藏层的大小),输出维度为1(价值函数的大小)def pi(self, x, softmax_dim=1): # 定义一个方法,用于计算动作概率分布x = F.relu(self.fc1(x)) # 将输入x通过第一个全连接层,并使用relu激活函数x = self.fc_pi(x) # 将输出x通过第二个全连接层,并得到动作概率分布的对数值prob = F.softmax(x, dim=softmax_dim) # 使用softmax函数将对数值转换为概率值,并指定softmax的维度return prob # 返回概率值def v(self, x): # 定义一个方法,用于计算状态价值函数x = F.relu(self.fc1(x)) # 将输入x通过第一个全连接层,并使用relu激活函数v = self.fc_v(x) # 将输出x通过第三个全连接层,并得到状态价值函数的值return v # 返回价值函数的值
4、定义worker函数
def worker(worker_id, master_end, worker_end):# 定义一个工作进程的函数,接受三个参数:工作进程的编号,主进程的管道端口,工作进程的管道端口master_end.close() # 关闭工作进程对主进程端口的访问,只允许使用工作进程端口进行通信env = gym.make('CartPole-v1') # 创建一个gym环境,用于模拟倒立摆问题env.seed(worker_id) # 为环境设置随机种子,使每个工作进程有不同的初始状态while True: # 无限循环,直到收到关闭命令cmd, data = worker_end.recv() # 从工作进程端口接收一个命令和一个数据if cmd == 'step': # 如果命令是执行一个动作ob, reward, done, info = env.step(data) # 在环境中执行该动作,返回观察值,奖励值,是否结束和其他信息if done: # 如果环境结束了ob = env.reset() # 重置环境,返回初始观察值worker_end.send((ob, reward, done, info)) # 把结果发送回主进程elif cmd == 'reset': # 如果命令是重置环境ob = env.reset() # 重置环境,返回初始观察值worker_end.send(ob) # 把结果发送回主进程elif cmd == 'reset_task': # 如果命令是重置任务ob = env.reset_task() # 重置任务,返回初始观察值worker_end.send(ob) # 把结果发送回主进程elif cmd == 'close': # 如果命令是关闭工作进程worker_end.close() # 关闭工作进程端口break # 跳出循环,结束函数elif cmd == 'get_spaces': # 如果命令是获取环境的观察空间和动作空间worker_end.send((env.observation_space, env.action_space)) # 把空间信息发送回主进程else: # 如果命令是其他未定义的情况raise NotImplementedError # 抛出一个异常,表示没有实现该功能
5、定义进程(并行运算)
# 定义一个并行环境类
class ParallelEnv:# 初始化方法,接受训练进程的数量作为参数def __init__(self, n_train_processes):# 设置环境的数量为训练进程的数量self.nenvs = n_train_processes# 设置等待标志为False,表示没有等待环境的结果self.waiting = False# 设置关闭标志为False,表示没有关闭环境self.closed = False# 创建一个空列表,用于存储工作进程self.workers = list()# 创建一对管道,用于主进程和工作进程之间的通信master_ends, worker_ends = zip(*[mp.Pipe() for _ in range(self.nenvs)])# 将主进程端和工作进程端分别赋值给类属性self.master_ends, self.worker_ends = master_ends, worker_ends# 遍历每个工作进程的编号和管道端口for worker_id, (master_end, worker_end) in enumerate(zip(master_ends, worker_ends)):# 创建一个子进程,执行worker函数,传入工作进程的编号和管道端口作为参数p = mp.Process(target=worker,args=(worker_id, master_end, worker_end))# 设置子进程为守护进程,即随主进程退出而退出p.daemon = True# 启动子进程p.start()# 将子进程添加到工作进程列表中self.workers.append(p)# 禁止主进程使用工作进程端口进行通信for worker_end in worker_ends:worker_end.close()# 定义一个异步执行动作的方法,接受动作列表作为参数def step_async(self, actions):# 遍历每个主进程端口和对应的动作for master_end, action in zip(self.master_ends, actions):# 通过管道发送一个元组,包含'step'指令和动作值master_end.send(('step', action))# 设置等待标志为True,表示正在等待环境的结果self.waiting = True# 定义一个等待结果的方法,不接受参数def step_wait(self):# 从每个主进程端口接收一个元组,包含观察值、奖励值、结束标志和信息字典,并将它们放入一个列表中results = [master_end.recv() for master_end in self.master_ends]# 设置等待标志为False,表示已经收到环境的结果self.waiting = False# 将列表中的元组拆分成四个列表,并将每个列表转换成numpy数组,分别赋值给观察值、奖励值、结束标志和信息字典变量obs, rews, dones, infos = zip(*results)return np.stack(obs), np.stack(rews), np.stack(dones), infos# 定义一个重置环境的方法,不接受参数def reset(self):# 遍历每个主进程端口for master_end in self.master_ends:# 通过管道发送一个元组,包含'reset'指令和None值master_end.send(('reset', None))# 从每个主进程端口接收一个观察值,并将它们放入一个列表中,并将列表转换成numpy数组返回return np.stack([master_end.recv() for master_end in self.master_ends])# 定义一个同步执行动作的方法,接受动作列表作为参数def step(self, actions):# 调用异步执行动作的方法,并传入动作列表self.step_async(actions)# 调用等待结果的方法,并返回结果return self.step_wait()# 定义一个关闭环境的方法,不接受参数def close(self): # For clean up resources# 如果已经关闭环境,直接返回if self.closed:return# 如果正在等待环境的结果,从每个主进程端口接收一个元组,并忽略它们if self.waiting:[master_end.recv() for master_end in self.master_ends]# 遍历每个主进程端口for master_end in self.master_ends:# 通过管道发送一个元组,包含'close'指令和None值master_end.send(('close', None))# 遍历每个工作进程for worker in self.workers:# 等待工作进程结束worker.join()# 设置关闭标志为True,表示已经关闭环境self.closed = True
6、定义测试函数
# 定义一个测试模型的函数,接受步数和模型作为参数
def test(step_idx, model):# 创建一个CartPole-v1环境env = gym.make('CartPole-v1')# 初始化分数为0.0score = 0.0# 初始化结束标志为Falsedone = False# 设置测试次数为10num_test = 10# 循环测试次数次for _ in range(num_test):# 重置环境,获取初始状态s = env.reset()# 当没有结束时,循环执行以下操作while not done:# 用模型计算状态的动作概率分布,并指定softmax的维度为0prob = model.pi(torch.from_numpy(s).float(), softmax_dim=0)# 从动作概率分布中采样一个动作,并转换成numpy数组a = Categorical(prob).sample().numpy()# 在环境中执行动作,获取下一个状态、奖励、结束标志和信息字典s_prime, r, done, info = env.step(a)# 将下一个状态赋值给当前状态s = s_prime# 将奖励累加到分数上score += r# 将结束标志重置为False,以便进行下一次测试done = False# 打印步数和平均分数,保留一位小数print(f"Step # :{step_idx}, avg score : {score/num_test:.1f}")# 关闭环境env.close()
7、定义目标函数
# 定义一个计算目标值的函数,接受最终状态的值、奖励列表和掩码列表作为参数
def compute_target(v_final, r_lst, mask_lst):# 将最终状态的值转换成一维的张量,并赋值给G变量G = v_final.reshape(-1)# 创建一个空列表,用于存储目标值td_target = list()# 从后往前遍历奖励列表和掩码列表中的元素for r, mask in zip(r_lst[::-1], mask_lst[::-1]):# 用贝尔曼方程更新G变量,即G = r + gamma * G * mask,其中gamma是折扣因子G = r + gamma * G * mask# 将G变量添加到目标值列表中td_target.append(G)# 将目标值列表反转,并转换成浮点类型的张量,返回结果return torch.tensor(td_target[::-1]).float()
8、主程序
# 如果当前文件是主文件,执行以下代码
if __name__ == '__main__':# 创建一个并行环境对象,传入训练进程的数量作为参数envs = ParallelEnv(n_train_processes)# 创建一个演员-评论家模型对象model = ActorCritic()# 创建一个优化器对象,传入模型的参数和学习率作为参数optimizer = optim.Adam(model.parameters(), lr=learning_rate)# 初始化步数为0step_idx = 0# 重置环境,获取初始状态s = envs.reset()# 当步数小于最大训练步数时,循环执行以下操作while step_idx < max_train_steps:# 创建四个空列表,用于存储状态、动作、奖励和掩码s_lst, a_lst, r_lst, mask_lst = list(), list(), list(), list()# 循环更新间隔次for _ in range(update_interval):# 用模型计算状态的动作概率分布,并转换成张量prob = model.pi(torch.from_numpy(s).float())# 从动作概率分布中采样一个动作,并转换成numpy数组a = Categorical(prob).sample().numpy()# 在环境中执行动作,获取下一个状态、奖励、结束标志和信息字典s_prime, r, done, info = envs.step(a)# 将当前状态、动作、奖励和掩码添加到对应的列表中s_lst.append(s)a_lst.append(a)r_lst.append(r/100.0) # 将奖励除以100,进行归一化处理mask_lst.append(1 - done) # 将结束标志取反,得到掩码# 将下一个状态赋值给当前状态s = s_prime# 步数加一step_idx += 1# 将最终状态转换成张量,并用模型计算它的值,并从张量中获取numpy数组,并复制一份,避免被修改s_final = torch.from_numpy(s_prime).float()v_final = model.v(s_final).detach().clone().numpy()# 调用计算目标值的函数,传入最终状态的值、奖励列表和掩码列表,得到目标值张量td_target = compute_target(v_final, r_lst, mask_lst)# 将目标值张量转换成一维的向量,并赋值给td_target_vec变量td_target_vec = td_target.reshape(-1)# 将状态列表转换成浮点类型的张量,并转换成一维的向量,并赋值给s_vec变量,4是状态的维度s_vec = torch.tensor(s_lst).float().reshape(-1, 4) # 4 == Dimension of state# 将动作列表转换成张量,并转换成一维的向量,并增加一个维度,并赋值给a_vec变量a_vec = torch.tensor(a_lst).reshape(-1).unsqueeze(1)# 计算优势函数,即目标值减去模型预测的值,并赋值给advantage变量advantage = td_target_vec - model.v(s_vec).reshape(-1)# 用模型计算状态向量的动作概率分布,并指定softmax的维度为1,并赋值给pi变量pi = model.pi(s_vec, softmax_dim=1)# 从动作概率分布中获取对应动作的概率,并转换成一维的向量,并赋值给pi_a变量pi_a = pi.gather(1, a_vec).reshape(-1)# 计算损失函数,即负的对数似然乘以优势函数的均值,加上值函数的平滑L1损失,并赋值给loss变量loss = -(torch.log(pi_a) * advantage.detach()).mean() +\F.smooth_l1_loss(model.v(s_vec).reshape(-1), td_target_vec)# 清空优化器的梯度optimizer.zero_grad()# 反向传播损失函数,计算梯度loss.backward()# 优化器更新模型的参数optimizer.step()# 如果步数能被打印间隔整除,调用测试模型的函数,传入步数和模型作为参数if step_idx % PRINT_INTERVAL == 0:test(step_idx, model)# 关闭环境envs.close()