OpenAIGPTModel源码解析
- 1. GPT 介绍
- 2. OpenAIGPTModel类 源码解析
说到ChatGPT,大家可能都使用过吧。2022年,ChatGPT的推出引发了广泛的关注和讨论。这款对话生成模型不仅具备了强大的语言理解和生成能力,还能进行非常自然的对话,给用户带来了全新的互动体验。然而,ChatGPT的成功背后离不开它的前身——GPT。
1. GPT 介绍
GPT(Generative Pre-trained Transformer)是由OpenAI开发的一种基于Transformer架构的大型语言模型。它由多个堆叠的自注意力解码器层(Transformer Blocks)组成,每一层包含多头自注意力机制和前馈神经网络,并配有残差连接和层归一化以稳定训练。GPT采用自回归方式生成文本,通过在大规模互联网数据上进行预训练,具备强大的自然语言理解和生成能力,能够完成对话生成、文本补全等多种任务。其结构如下:
2. OpenAIGPTModel类 源码解析
源码地址:transformers/src/transformers/models/openai/modeling_openai.py
# -*- coding: utf-8 -*-
# @time: 2024/9/3 20:39
from typing import Optional, Union, Tupleimport torchfrom torch import nn
from transformers import add_start_docstrings, OpenAIGPTPreTrainedModel
from transformers.modeling_outputs import BaseModelOutput
from transformers.models.openai.modeling_openai import OPENAI_GPT_START_DOCSTRING, Block, OPENAI_GPT_INPUTS_DOCSTRING, _CHECKPOINT_FOR_DOC, _CONFIG_FOR_DOC
from transformers.utils import add_start_docstrings_to_model_forward, add_code_sample_docstrings@add_start_docstrings("The bare OpenAI GPT transformer model outputting raw hidden-states without any specific head on top.",OPENAI_GPT_START_DOCSTRING,
)
class OpenAIGPTModel(OpenAIGPTPreTrainedModel):def __init__(self, config):super().__init__(config)self.tokens_embed = nn.Embedding(config.vocab_size, config.n_embd) # 定义 token 嵌入层self.positions_embed = nn.Embedding(config.n_positions, config.n_embd) # 定义 position 嵌入层self.drop = nn.Dropout(config.embd_pdrop) # 定义 drop 层self.h = nn.ModuleList([Block(config.n_positions, config, scale=True) for _ in range(config.n_layer)]) # 定义多个 Block 层# 注册一个缓冲区用于存储position_ids,初始化为从 0 到 config.n_positions 的序列self.register_buffer("position_ids", torch.arange(config.n_positions), persistent=False)# Initialize weights and apply final processingself.post_init()def get_input_embeddings(self):return self.tokens_embeddef set_input_embeddings(self, new_embeddings):self.tokens_embed = new_embeddingsdef _prune_heads(self, heads_to_prune):"""Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer}"""# 剪掉模型多头注意力机制中的一些头,heads_to_prune 是一个字典,键为layer_num,值为需要剪枝的 heads 列表。for layer, heads in heads_to_prune.items():self.h[layer].attn.prune_heads(heads)@add_start_docstrings_to_model_forward(OPENAI_GPT_INPUTS_DOCSTRING)@add_code_sample_docstrings(checkpoint=_CHECKPOINT_FOR_DOC,output_type=BaseModelOutput,config_class=_CONFIG_FOR_DOC,)def forward(self,input_ids: Optional[torch.LongTensor] = None,attention_mask: Optional[torch.FloatTensor] = None,token_type_ids: Optional[torch.LongTensor] = None,position_ids: Optional[torch.LongTensor] = None,head_mask: Optional[torch.FloatTensor] = None,inputs_embeds: Optional[torch.FloatTensor] = None,output_attentions: Optional[bool] = None,output_hidden_states: Optional[bool] = None,return_dict: Optional[bool] = None,) -> Union[Tuple[torch.Tensor], BaseModelOutput]:# 根据 config 配置设定 output_attentions, output_hidden_states, return_dict 的值output_attentions = output_attentions if output_attentions is not None else self.config.output_attentionsoutput_hidden_states = (output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states)return_dict = return_dict if return_dict is not None else self.config.use_return_dict# 获取 input_ids 或者 inputs_embeds 以及 input_shapeif input_ids is not None and inputs_embeds is not None: # 当 input_ids 和 inputs_embeds 同时存在时,抛出错误raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")elif input_ids is not None: # 如果存在 input_ids,将其形状调整为 (batch_size, sequence_length)self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)input_shape = input_ids.size()input_ids = input_ids.view(-1, input_shape[-1])elif inputs_embeds is not None: # 如果存在 inputs_embeds,获取其形状input_shape = inputs_embeds.size()[:-1]else: # 如果 input_ids 和 inputs_embeds 都不存在,抛出错误raise ValueError("You have to specify either input_ids or inputs_embeds")# 如果没有传入 position_ids,则生成默认的 position_idsif position_ids is None:# Code is different from when we had a single embedding matrix from position and token embeddingsposition_ids = self.position_ids[None, : input_shape[-1]]# ------------------------------------- 1. 获取 attention_mask -----------------------------## Attention mask.if attention_mask is not None:# We create a 3D attention mask from a 2D tensor mask.# Sizes are [batch_size, 1, 1, to_seq_length]# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]# this attention mask is more simple than the triangular masking of causal attention# used in OpenAI GPT, we just need to prepare the broadcast dimension here.attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # 将 2D 掩码扩展为 3D 掩码,适用于批量输入# Since attention_mask is 1.0 for positions we want to attend and 0.0 for# masked positions, this operation will create a tensor which is 0.0 for# positions we want to attend and the dtype's smallest value for masked positions.# Since we are adding it to the raw scores before the softmax, this is# effectively the same as removing these entirely.# 将注意力掩码转换为与模型参数相同的数据类型,并进行数值变换,torch.finfo(self.dtype).min 返回数据类型的最小值。attention_mask = attention_mask.to(dtype=next(self.parameters()).dtype) # fp16 compatibilityattention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min# ----------------------------------------------------------------------------------------## ------------------------------------- 2. 获取 head_mask ---------------------------------## Prepare head mask if neededhead_mask = self.get_head_mask(head_mask, self.config.n_layer)# ---------------------------------------------------------- -----------------------------## ------------------------------------- 3. 获取 hidden_states -----------------------------## 如果 inputs_embeds 为 None,则使用 tokens_embed 对 input_ids 计算if inputs_embeds is None:inputs_embeds = self.tokens_embed(input_ids)# 计算 position_embedsposition_embeds = self.positions_embed(position_ids)# 如果存在 token_type_ids,使用 tokens_embed 计算;否则 token_type_embeds 为 0if token_type_ids is not None:token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1))token_type_embeds = self.tokens_embed(token_type_ids)else:token_type_embeds = 0# 计算 hidden_states,即inputs_embeds、position_embeds 和 token_type_embeds 之和,并使用 dropouthidden_states = inputs_embeds + position_embeds + token_type_embedshidden_states = self.drop(hidden_states)# -------------------------------------------------------------------------------------## 获取输出形状,以及初始化输出结果 all_attentions 和 all_hidden_statesoutput_shape = input_shape + (hidden_states.size(-1),)all_attentions = () if output_attentions else Noneall_hidden_states = () if output_hidden_states else None# -----------------------------------4. Block逐层计算处理(核心部分)--------------------#for i, block in enumerate(self.h):# 如果需要输出 hidden states,将当前 hidden_states 添加到 all_hidden_statesif output_hidden_states:all_hidden_states = all_hidden_states + (hidden_states,)# 通过当前 Block 处理 hidden_states,得到新的 hidden_states 和 attentionsoutputs = block(hidden_states, attention_mask, head_mask[i], output_attentions=output_attentions)hidden_states = outputs[0]# 如果需要输出 attentions,将当前 attentions 添加到 all_attentionsif output_attentions:all_attentions = all_attentions + (outputs[1],)# ---------------------------------------------------------------------------------## 将 hidden_states 的形状调整为输出形状hidden_states = hidden_states.view(*output_shape)# 如果需要输出 hidden states,将最后的 hidden_states 添加到 all_hidden_statesif output_hidden_states:all_hidden_states = all_hidden_states + (hidden_states,)# -----------------------------------5. 根据配置的输出方式输出结果-------------------------------#if not return_dict:return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None)return BaseModelOutput(last_hidden_state=hidden_states,hidden_states=all_hidden_states,attentions=all_attentions,)