DQN算法的主要特点包括:
-
神经网络代替Q表:在传统的Q学习中,需要维护一个Q表来存储每个状态-动作对的Q值。而在DQN中,使用神经网络来近似这些Q值,这使得算法能够处理具有大量状态和动作的问题。
-
经验回放(Experience Replay):DQN使用经验回放来存储智能体的交互经验(状态、动作、奖励、下一个状态),并从中随机抽取样本进行训练,这有助于打破数据之间的时间相关性,提高学习效率。
-
目标网络(Target Network):DQN维护两个神经网络,一个是用于预测Q值的评估网络(Evaluation Network),另一个是用于生成目标Q值的目标网络(Target Network)。目标网络的参数会定期更新,以保持稳定,这有助于减少训练过程中的不稳定性。
-
ϵ-贪心策略(ε-greedy Strategy):DQN使用ϵ-贪心策略来平衡探索和利用。在这种策略下,智能体以一定的概率ϵ随机选择动作,以探索环境;以1-ϵ的概率选择当前最优动作,以利用已知信息。
下面我们来看一个简单的DQN代码:
import gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim# 定义DQN网络结构
class DQN(nn.Module):def __init__(self, input_size, output_size):super(DQN, self).__init__()self.fc1 = nn.Linear(input_size, 24)self.fc2 = nn.Linear(24, 24)self.fc3 = nn.Linear(24, output_size)def forward(self, x):x = torch.relu(self.fc1(x))x = torch.relu(self.fc2(x))x = self.fc3(x)return x# 经验回放
class ReplayMemory:def __init__(self, capacity):self.capacity = capacityself.memory = []self.position = 0def push(self, state, action, reward, next_state, done):if len(self.memory) < self.capacity:self.memory.append(None)self.memory[self.position] = (state, action, reward, next_state, done)self.position = (self.position + 1) % self.capacitydef sample(self, batch_size):return random.sample(self.memory, batch_size)def can_provide_sample(self, batch_size):return len(self.memory) >= batch_size# DQN Agent
class DQNAgent:def __init__(self, input_size, output_size, epsilon_start, epsilon_end, epsilon_decay, batch_size, gamma):self.gamma = gammaself.epsilon_start = epsilon_startself.epsilon_end = epsilon_endself.epsilon_decay = epsilon_decayself.batch_size = batch_sizeself.model = DQN(input_size, output_size)self.optimizer = optim.Adam(self.model.parameters())self.memory = ReplayMemory(10000)self.epsilon = epsilon_startdef select_action(self, state):if np.random.rand() <= self.epsilon:return random.randrange(env.action_space.n)state = torch.from_numpy(state).float().unsqueeze(0)action_values = self.model(state)return np.argmax(action_values.cpu().data.numpy())def optimize_model(self):if self.memory.can_provide_sample(self.batch_size):transitions = self.memory.sample(self.batch_size)batch = self._create_batch(transitions)self._train_step(batch)def _create_batch(self, transitions):batch = {'states': torch.cat([s for s, a, r, ss, d in transitions]),'actions': torch.cat([a for s, a, r, ss, d in transitions]),'reward': torch.cat([r for s, a, r, ss, d in transitions]),'next_states': torch.cat([ss for s, a, r, ss, d in transitions]),'done': torch.cat([d for s, a, r, ss, d in transitions])}return batchdef _train_step(self, batch):self.optimizer.zero_grad()states = batch['states']actions = batch['actions']rewards = batch['reward']next_states = batch['next_states']done = batch['done']Q_values = self.model(states)next_Q_values = self.model(next_states)max_next_Q_values = next_Q_values.max(1)[0]expected_Q_values = rewards + (1 - done) * self.gamma * max_next_Q_valuesloss = (Q_values[range(self.batch_size), actions] - expected_Q_values).pow(2).mean()loss.backward()self.optimizer.step()# 设置环境和参数
env = gym.make('CartPole-v1')
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = 0.995
batch_size = 64
gamma = 0.99agent = DQNAgent(input_size, output_size, epsilon_start, epsilon_end, epsilon_decay, batch_size, gamma)# 训练过程
num_episodes = 1000
for episode in range(num_episodes):state = env.reset()state = torch.from_numpy(state).float()for t in range(500):action = agent.select_action(state.numpy())next_state, reward, done, _ = env.step(action)next_state = torch.from_numpy(next_state).float()agent.memory.push(state.numpy(), action, reward, next_state.numpy(), done)state = next_stateif done:breakif len(agent.memory.memory) > agent.batch_size:agent.optimize_model()agent.epsilon = max(agent.epsilon_end, agent.epsilon * agent.epsilon_decay)
我们来分析一下每段代码
1.导入库
import gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
gym
用于创建和管理环境,numpy
用于数值计算,random
用于生成随机数,torch
是PyTorch库,用于构建和训练神经网络,torch.nn
用于定义神经网络层,torch.optim
用于优化网络参数。
2.定义DQN网络结构
class DQN(nn.Module):def __init__(self, input_size, output_size):super(DQN, self).__init__()self.fc1 = nn.Linear(input_size, 24)self.fc2 = nn.Linear(24, 24)self.fc3 = nn.Linear(24, output_size)def forward(self, x):x = torch.relu(self.fc1(x))x = torch.relu(self.fc2(x))x = self.fc3(x)return x
这里定义了一个简单的三层全连接神经网络,作为DQN的Q网络。它接受环境状态作为输入,输出每个可能动作的Q值。
3.定义经验回放
class ReplayMemory:def __init__(self, capacity):self.capacity = capacityself.memory = [] #初始化为空列表,用于存储经验元组self.position = 0 #用于追踪下一个要写入经验的位置。def push(self, state, action, reward, next_state, done):if len(self.memory) < self.capacity: #如果当前存储的经验数量小于 capacity,则扩展 memory 列表。self.memory.append(None)self.memory[self.position] = (state, action, reward, next_state, done)self.position = (self.position + 1) % self.capacity #capacity 是一个参数,它定义了回放缓冲区(replay buffer)的最大存储容量。def sample(self, batch_size):return random.sample(self.memory, batch_size)def can_provide_sample(self, batch_size):return len(self.memory) >= batch_size
体验回放是一种重要的技术,它允许智能体(agent)存储过去的经验,并在后续的训练过程中随机抽取这些经验来更新其策略。
4.定义DQN智能体
class DQNAgent:def __init__(self, input_size, output_size, epsilon_start, epsilon_end, epsilon_decay, batch_size, gamma):self.gamma = gamma #折扣因子,用于计算未来奖励的现值。self.epsilon_start = epsilon_start #用于epsilon贪心策略的参数,控制随机选择动作的概率从 epsilon_start 衰减到 epsilon_end。self.epsilon_end = epsilon_end self.epsilon_decay = epsilon_decayself.batch_size = batch_size #每次从回放缓冲区中抽取的经验批次大小。self.model = DQN(input_size, output_size)self.optimizer = optim.Adam(self.model.parameters())self.memory = ReplayMemory(10000)self.epsilon = epsilon_start #当前epsilon值,用于epsilon贪心策略。#根据当前状态选择一个动作,它使用ϵ-贪心策略在探索和利用之间进行平衡。def select_action(self, state):if np.random.rand() <= self.epsilon:return random.randrange(env.action_space.n)state = torch.from_numpy(state).float().unsqueeze(0)action_values = self.model(state) #模型对当前状态的估计价值,选择最大价值的动作。return np.argmax(action_values.cpu().data.numpy())#从经验回放中随机抽取一批样本,然后使用这些样本来更新神经网络的权重。def optimize_model(self):if self.memory.can_provide_sample(self.batch_size):transitions = self.memory.sample(self.batch_size)batch = self._create_batch(transitions)self._train_step(batch)#用于将抽取的样本整理成一个批次,并按类型分组,以便进行批量训练。def _create_batch(self, transitions):batch = {'states': torch.cat([s for s, a, r, ss, d in transitions]),'actions': torch.cat([a for s, a, r, ss, d in transitions]),'reward': torch.cat([r for s, a, r, ss, d in transitions]),'next_states': torch.cat([ss for s, a, r, ss, d in transitions]),'done': torch.cat([d for s, a, r, ss, d in transitions])}return batch#执行一次训练步骤,它计算损失并更新网络权重。def _train_step(self, batch):self.optimizer.zero_grad()states = batch['states']actions = batch['actions']rewards = batch['reward']next_states = batch['next_states']done = batch['done']Q_values = self.model(states)next_Q_values = self.model(next_states)max_next_Q_values = next_Q_values.max(1)[0] expected_Q_values = rewards + (1 - done) * self.gamma * max_next_Q_valuesloss = (Q_values[range(self.batch_size), actions] - expected_Q_values).pow(2).mean() #计算损失loss.backward()self.optimizer.step() #方法使用在 loss.backward() 中计算出的梯度来更新模型的参数。
5.设置环境和参数
env = gym.make('CartPole-v1')
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
epsilon_start = 1.0
epsilon_end = 0.1
epsilon_decay = 0.995
batch_size = 64
gamma = 0.99
这部分代码设置了环境和DQN算法的参数。env
是Gym环境,input_size
和output_size
分别是状态和动作的数量,epsilon_start
、epsilon_end
和epsilon_decay
是ϵ-贪心策略的参数,batch_size
是每次训练的样本数量,gamma
是折扣因子。
6.训练过程
agent = DQNAgent(input_size, output_size, epsilon_start, epsilon_end, epsilon_decay, batch_size, gamma)num_episodes = 1000
for episode in range(num_episodes):state = env.reset()state = torch.from_numpy(state).float()for t in range(500):action = agent.select_action(state.numpy())next_state, reward, done, _ = env.step(action)next_state = torch.from_numpy(next_state).float()agent.memory.push(state.numpy(), action, reward, next_state.numpy(), done)state = next_stateif done:breakif len(agent.memory.memory) > agent.batch_size:agent.optimize_model()agent.epsilon = max(agent.epsilon_end, agent.epsilon * agent.epsilon_decay)
这部分代码实现了DQN的训练过程。对于每一集(episode),智能体与环境交互,选择动作,执行动作,然后观察下一个状态和奖励。这些经验被存储在经验回放中。当经验回放中的样本数量足够时,智能体从经验回放中抽取样本并更新其神经网络。随着时间的推移,ϵ值逐渐减小,使得智能体更多地利用其学到的知识而不是随机探索。
我们来看一下智能交通灯比赛里面的dqn算法
def learn(self, list_sample_data):t_data = list_sample_dataobs = torch.tensor(np.array([frame.obs for frame in t_data]), dtype=torch.float32).to(self.device)action = (torch.LongTensor(np.array([frame.act if not np.any(np.isinf(frame.act)) else 0 for frame in t_data])).long().to(self.model.device))rew = torch.tensor(np.array([frame.rew for frame in t_data]), device=self.model.device)_obs = torch.tensor(np.array([frame._obs for frame in t_data]), dtype=torch.float32).to(self.device)not_done = torch.tensor(np.array([frame.done for frame in t_data]),dtype=torch.float32,device=self.device,)# Main implementation of the multi-head output DQN algorithm# 多头输出dqn算法的主要实现self.model.eval()with torch.no_grad():# Calculate the target Q-values for each head# 计算各个头的目标q值q_targets = []for head_idx in range(self.num_head):q_targets_head = (rew[:, head_idx].unsqueeze(1)+ self._gamma * (self.model(_obs)[0][head_idx]).max(1)[0].unsqueeze(1) * not_done[:, None])q_targets.append(q_targets_head)q_targets = torch.cat(q_targets, dim=1)# Calculate the Q-values for each head# 计算各个头的q值self.model.train()q_values = []for head_idx in range(self.num_head):q_values_head = self.model(obs)[0][head_idx].gather(1, action[:, head_idx + 1].unsqueeze(1))q_values.append(q_values_head)q_values = torch.cat(q_values, dim=1)self.optim.zero_grad()loss = F.mse_loss(q_targets.float(), q_values.float())loss.backward()model_grad_norm = torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0).item()self.optim.step()self.train_step += 1value_loss = loss.detach().item()target_q_value = q_targets.mean().detach().item()q_value = q_values.mean().detach().item()# Periodically report monitoring# 按照间隔上报监控now = time.time()if now - self.last_report_monitor_time >= 60:monitor_data = {"value_loss": value_loss,"target_q_value": target_q_value,"q_value": q_value,"model_grad_norm": model_grad_norm,}self.monitor.put_data({os.getpid(): monitor_data})self.logger.info(f"value_loss: {value_loss}, target_q_value: {target_q_value},\q_value: {q_value},\model_grad_norm: {model_grad_norm}")self.last_report_monitor_time = now
提问:为什么在计算Q值的时候我下一个动作还没做,就能计算它的最大Q值?
答:
在强化学习中,计算下一个状态 s′s′ 的Q值确实是在智能体实际采取行动之前进行的。这是通过使用智能体的当前策略来预测或估计Q值实现的。具体来说,智能体使用其学习到的模型(例如,在DQN中是深度神经网络)来预测或估计下一个状态 s′s′ 的Q值。这个过程不需要智能体实际进入下一个状态或在该状态下采取任何行动。
让我们分段解释每段代码
1.数据准备
t_data = list_sample_dataobs = torch.tensor(np.array([frame.obs for frame in t_data]), dtype=torch.float32).to(self.device)
action = (torch.LongTensor(np.array([frame.act if not np.any(np.isinf(frame.act)) else 0 for frame in t_data])).long().to(self.model.device)
)
rew = torch.tensor(np.array([frame.rew for frame in t_data]), device=self.model.device)
_obs = torch.tensor(np.array([frame._obs for frame in t_data]), dtype=torch.float32).to(self.device)
not_done = torch.tensor(np.array([frame.done for frame in t_data]),dtype=torch.float32,device=self.device,
)
t_data
是传入的样本数据列表。obs
是观察数据,转换为PyTorch张量并移动到指定设备(如GPU)。action
是行动数据,如果行动值中有无穷大,则替换为0,然后转换为长整型张量并移动到模型的设备。rew
是奖励数据,直接转换为张量并移动到模型的设备。_obs
是下一个观察数据,转换为张量并移动到指定设备。not_done
表示该episode是否未结束,转换为张量并移动到指定设备。
2.目标 Q 值计算
self.model.eval()with torch.no_grad():q_targets = []for head_idx in range(self.num_head):q_targets_head = (rew[:, head_idx].unsqueeze(1)+ self._gamma * (self.model(_obs)[0][head_idx]).max(1)[0].unsqueeze(1) * not_done[:, None])q_targets.append(q_targets_head)q_targets = torch.cat(q_targets, dim=1)
- 将模型设置为评估模式。
- 使用
torch.no_grad()
来避免计算梯度,节省内存和计算资源。 - 遍历每个头,计算目标Q值。目标Q值由即时奖励加上折扣因子乘以下一个状态的最大Q值(对于未结束的episode)组成。
- 将所有头的目标Q值拼接在一起。
3.Q值计算
self.model.train()
q_values = []
for head_idx in range(self.num_head):q_values_head = self.model(obs)[0][head_idx].gather(1, action[:, head_idx + 1].unsqueeze(1))q_values.append(q_values_head)
q_values = torch.cat(q_values, dim=1)
4.损失计算和优化
self.optim.zero_grad()
loss = F.mse_loss(q_targets.float(), q_values.float())
loss.backward()
#这是PyTorch提供的一个函数,用于裁剪梯度。
model_grad_norm = torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0).item()
self.optim.step()
self.train_step += 1
- 清零优化器的梯度。
- 计算目标Q值和预测Q值之间的均方误差损失。
- 进行反向传播。
- 使用梯度裁剪来限制梯度的大小,防止梯度爆炸。
- 更新模型参数。
- 增加训练步数。
5.监控和报告
value_loss = loss.detach().item()
target_q_value = q_targets.mean().detach().item()
q_value = q_values.mean().detach().item()now = time.time()
if now - self.last_report_monitor_time >= 60:monitor_data = {"value_loss": value_loss,"target_q_value": target_q_value,"q_value": q_value,"model_grad_norm": model_grad_norm,}self.monitor.put_data({os.getpid(): monitor_data})self.logger.info(f"value_loss: {value_loss}, target_q_value: {target_q_value},\q_value: {q_value},\model_grad_norm: {model_grad_norm}")self.last_report_monitor_time = now
- 计算并记录损失、目标Q值、Q值和模型梯度范数。
- 如果达到报告间隔(60秒),则记录监控数据并更新最后报告时间