持续更新常用的强化学习算法,采用单python文件实现,简单易读
- 2024.11.09 更新:PPO(GAE); SAC
- 2024.11.12 更新:OptionCritic(PPOC)
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as Fimport gymnasium as gym
import matplotlib.pyplot as pltfrom tqdm import trange
from torch.distributions import Normalclass Actor(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = F.tanh(self.mu(x))sigma = F.softplus(self.sigma(x))return mu, sigmaclass Critic(nn.Module):def __init__(self, state_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):states, actions, log_probs, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).to(device),zip(*trajectory))rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).int()with torch.no_grad():next_values = critic(next_states.float())td_target = rewards + gamma * next_values * (1 - dones)td_value = critic(states.float())td_delta = td_target - td_valuetd_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float().to(device)advantages = (advantages - advantages.mean()) / advantages.std()for k in range(k_epochs):mu, sigma = actor(states.float())dist = Normal(mu, sigma)new_log_probs = dist.log_prob(actions)entropy = dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagesactor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())actor_optimizer.zero_grad()critic_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()critic_loss.backward()critic_optimizer.step()if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 40gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)critic = Critic(env.observation_space.shape[0]).to(device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes+1)score_list = []for e in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1s = torch.from_numpy(state).float().to(device)mu, sigma = actor(s)dist = Normal(mu, sigma)a = dist.sample()log_prob = dist.log_prob(a).detach().cpu().numpy()action = a.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, reward, next_state, done])if timestep % train_timesteps == 0:ppo_training(trajectory,actor,critic,actor_optimizer,critic_optimizer,clip,k_epochs,gamma,lam,device,T)trajectory = []state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))
"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trangeimport torch
import torch.nn as nn
import torch.nn.functional as Fimport copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = self.mu(x)sigma = F.softplus(self.sigma(x))return mu, sigmaclass QNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size + action_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, s, a):x = torch.cat((s, a), dim=-1)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)class ReplayBuffer:def __init__(self, capacity):self.memory = deque(maxlen=capacity)def __len__(self):return len(self.memory)def save_memory(self, state, action, reward, next_state, done):self.memory.append([state, action, reward, next_state, done])def sample(self, batch_size):sample_size = min(len(self), batch_size)experiences = random.sample(self.memory, sample_size)return experiencesdef soft_update(target, source, tau=0.05):for param, target_param in zip(source.parameters(), target.parameters()):target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)def choice_action(actor, state):mu, sigma = actor(state)normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))epsilon = normal_dist.sample()action = torch.tanh(mu + sigma * epsilon)log_prob = normal_dist.log_prob(epsilon)log_prob -= torch.log(1 - action.pow(2) + 1e-6)log_prob = log_prob.sum(-1, keepdim=True)return action, log_probdef training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):(actor,q1_net,target_q1_net,q2_net,target_q2_net) = models(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer) = optimizersbatch_data = replay_buffer.sample(batch_size)states, actions, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).float().to(device),zip(*batch_data))with torch.no_grad():alpha = torch.exp(log_alpha)with torch.no_grad():next_state_actions, next_state_log_probs = choice_action(actor, next_states)target_q1_next = target_q1_net(next_states, next_state_actions)target_q2_next = target_q2_net(next_states, next_state_actions)min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probstd_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_targetq1 = q1_net(states, actions)q2 = q2_net(states, actions)q1_loss = F.mse_loss(q1, td_target_value)q2_loss = F.mse_loss(q2, td_target_value)q1_optimizer.zero_grad()q2_optimizer.zero_grad()q1_loss.backward()q2_loss.backward()q1_optimizer.step()q2_optimizer.step()state_actions, state_log_probs = choice_action(actor, states)q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))actor_loss = torch.mean((alpha * state_log_probs) - q)actor_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()with torch.no_grad():_, log_prob = choice_action(actor, states)alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))alpha_optimizer.zero_grad()alpha_loss.backward()alpha_optimizer.step()soft_update(target_q1_net, q1_net, tau)soft_update(target_q2_net, q2_net, tau)if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 4policy_lr = 1e-4q_lr = 1e-4alpha_lr = 1e-2tau = 0.05buffer_capacity = int(1e6)batch_size = 64gamma = 0.9state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))actor = ActorNetwork(state_size, action_size).to(device)q1_net = QNetwork(state_size, action_size).to(device)target_q1_net = QNetwork(state_size, action_size).to(device)q2_net = QNetwork(state_size, action_size).to(device)target_q2_net = QNetwork(state_size, action_size).to(device)target_q1_net.load_state_dict(q1_net.state_dict())target_q2_net.load_state_dict(q2_net.state_dict())log_alpha = torch.tensor(0.0, requires_grad=True, device=device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)replay_buffer = ReplayBuffer(buffer_capacity)pbar = trange(1, episodes+1)timestep = 0score_list = []for episode in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1if timestep % train_timesteps == 0:training(gamma,replay_buffer,(actor,q1_net,target_q1_net,q2_net,target_q2_net),log_alpha,target_entropy,(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer),batch_size,tau)action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))action = action.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardreplay_buffer.save_memory(state, action, reward, next_state, done)state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(episode, episodes, scores, timestep, log_alpha.item()))
"OptionCritic(PPOC)"import torch
import torch.nn as nn
import torch.nn.functional as Ffrom torch.distributions import Bernoulli, Normal
from torch import optimfrom tqdm import trangeimport matplotlib.pyplot as plt
import gymnasium as gym
import numpy as np
import random
import copyclass QNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options))def forward(self, x):return self.nn(x)class ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),)self.mu = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Tanh())self.sigma = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Softplus())def forward(self, x):x = self.fc(x)return self.mu(x), self.sigma(x)class TerminationNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options),nn.Sigmoid())def forward(self, x):return self.nn(x)class OptionCritic(nn.Module):def __init__(self, state_size, action_size, num_options):super().__init__()self.upper_policy_q_net = QNetwork(state_size, num_options)self.termination_network = TerminationNetwork(state_size, num_options)self.options = nn.ModuleList([ActorNetwork(state_size, action_size)for _ in range(num_options)])self.num_options = num_optionsdef get_option_id(self, state, epsilon):if np.random.rand() > epsilon:return torch.argmax(self.upper_policy_q_net(state),dim=-1).detach().cpu().numpy().item()else:return random.sample(range(self.num_options), 1)[0]def is_option_terminated(self, state, option_id):option_termination_prob = self.termination_network(state)[option_id]option_termination = Bernoulli(option_termination_prob).sample()return bool(option_termination.item())def select_action(self, state, epsilon, option_id):if self.is_option_terminated(state, option_id):option_id = self.get_option_id(state, epsilon)else: option_id = option_idmu, sigma = self.options[option_id](state)normal_dist = Normal(mu, sigma)action = normal_dist.sample()log_prob = normal_dist.log_prob(action)action = action.detach().cpu().numpy()log_prob = log_prob.detach().cpu().numpy()return action, log_prob, option_iddef training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T):states, actions, log_probs, option_id, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)), zip(*trajectory))option_id = option_id.view(-1, 1)rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).float()with torch.no_grad():option_terminated_prob = agent.termination_network(next_states.float()).gather(-1, option_id)next_q = agent.upper_policy_q_net(next_states.float())q_target = rewards + gamma * (1 - dones) * ((1 - option_terminated_prob) * next_q.gather(-1, option_id)+ option_terminated_prob * next_q.max(dim=-1, keepdim=True)[0])td_delta = q_target - agent.upper_policy_q_net(states.float()).gather(-1, option_id)td_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float()advantages = ((advantages - advantages.mean())/ (1e-6 + advantages.std()))for k in range(k_epochs):mus, sigmas = [], []for i in range(states.shape[0]):mu, sigma = agent.options[option_id[i]](states[i].float())mus.append(mu), sigmas.append(sigma)mu = torch.stack(mus, 0)sigma = torch.stack(sigmas, 0)normal_dist = Normal(mu, sigma)new_log_probs = normal_dist.log_prob(actions)entropy = normal_dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagespolicy_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(agent.upper_policy_q_net(states.float()).gather(-1, option_id),q_target.float())termination_loss = agent.termination_network(states.float()).gather(-1, option_id) * (agent.upper_policy_q_net(states.float()).gather(-1, option_id)- agent.upper_policy_q_net(states.float()).max(dim=-1, keepdim=True)[0]).detach()losses = policy_loss + critic_loss + termination_loss.mean()optimizer.zero_grad()losses.backward()optimizer.step()if __name__ == '__main__':env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 10gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4epsilon = 1.0epsilon_decay = 0.995mini_epsilon = 0.1state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]num_options = 4agent = OptionCritic(state_size, action_size, num_options)optimizer = optim.Adam(agent.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes + 1)scores_list = []for e in pbar:state, _ = env.reset()scores = 0.0option_id = agent.get_option_id(torch.from_numpy(state).float(), epsilon)options = [option_id]while True:timestep += 1if timestep % train_timesteps == 0:training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T)trajectory = []action, log_prob, option_id = agent.select_action(torch.from_numpy(state).float(), epsilon, option_id)options.append(option_id)next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, option_id, reward, next_state, done])state = copy.deepcopy(next_state)if done: breakscores_list.append(scores)epsilon = max(mini_epsilon, epsilon * epsilon_decay)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Epsilon: {:.2f}".format(e, episodes, scores, timestep, epsilon))plt.plot(scores_list)plt.show()