首先可以简单了解一下向量数据库相关知识:
向量数据库相关知识(搬运学习,建议还是看原文,这个只是我自己的学习记录)-CSDN博客
补充:
使用embedding API
文心千帆API
Embedding-V1是基于百度文心大模型技术的文本表示模型,Access token为调用接口的凭证,使用Embedding-V1时应先凭API Key、Secret Key获取Access token,再通过Access token调用接口来embedding text。同时千帆大模型平台还支持bge-large-zh等embedding model。
对于json的操作:【强烈推荐】Python中JSON的基本使用(超详细)_python json-CSDN博客
import requests
import json
import os
from dotenv import load_dotenv, find_dotenv def wenxin_embedding(text: str):# 获取环境变量 wenxin_api_key、wenxin_secret_keyload_dotenv(find_dotenv()) api_key = os.environ['QIANFAN_AK']secret_key = os.environ['QIANFAN_SK']print('api_key', api_key)print('secret_key', secret_key)# 使用API Key、Secret Key向https://aip.baidubce.com/oauth/2.0/token 获取Access token# url = https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_prourl = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={0}&client_secret={1}".format(api_key, secret_key)payload = json.dumps("")headers = {'Content-Type': 'application/json','Accept': 'application/json'}response_1 = requests.request("POST", url, headers=headers, data=payload)print('response_1:', response_1)print(response_1.text)# 通过获取的Access token 来embedding texturl = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings/embedding-v1?access_token=" + str(response_1.json().get("access_token")) # 从response_1json里面获取access_token的值input = []input.append(text) # 转为字符串列表payload = json.dumps({"input": input})headers = {'Content-Type': 'application/json','Accept': 'application/json'}response = requests.request("POST", url, headers=headers, data=payload)return json.loads(response.text)
# text应为List(string)
text = "要生成 embedding 的输入文本,字符串形式。"
response = wenxin_embedding(text=text)
print('response:', response)print('本次embedding id为:{}'.format(response['id']))
print('本次embedding产生时间戳为:{}'.format(response['created']))print('返回的embedding类型为:{}'.format(response['object']))
print('embedding长度为:{}'.format(len(response['data'][0]['embedding'])))
print('embedding(前10)为:{}'.format(response['data'][0]['embedding'][:10]))
加载文档、数据处理、文档划分:
1:加载,处理
'''
加载pdf
'''
from langchain.document_loaders.pdf import PyMuPDFLoader# 创建一个 PyMuPDFLoader Class 实例,输入为待加载的 pdf 文档路径
loader = PyMuPDFLoader("./llm-universe/data_base/knowledge_db/pumkin_book/pumpkin_book.pdf")# 调用 PyMuPDFLoader Class 的函数 load 对 pdf 文件进行加载
pdf_pages = loader.load()
print(f"载入后的变量类型为:{type(pdf_pages)},", f"该 PDF 一共包含 {len(pdf_pages)} 页")pdf_page = pdf_pages[1]
print(f"每一个元素的类型:{type(pdf_page)}.", f"该文档的描述性数据:{pdf_page.metadata}", f"查看该文档的内容:\n{pdf_page.page_content}", sep="\n'------------------------------------------------------------------------------'\n")'''
加载md
'''
from langchain.document_loaders.markdown import UnstructuredMarkdownLoaderloader = UnstructuredMarkdownLoader("./llm-universe/data_base/knowledge_db/prompt_engineering/1. 简介 Introduction.md")
md_pages = loader.load()
print(f"载入后的变量类型为:{type(md_pages)},", f"该 Markdown 一共包含 {len(md_pages)} 页")md_page = md_pages[0]
print(f"每一个元素的类型:{type(md_page)}.", f"该文档的描述性数据:{md_page.metadata}", f"查看该文档的内容:\n{md_page.page_content[0:][:200]}", sep="\n------------------------------------------------------------------------------\n")'''
数据清洗
'''
import re
# 去换行空格
pattern = re.compile(r'[^\u4e00-\u9fff](\n)[^\u4e00-\u9fff]', re.DOTALL)
pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('\n', ''), pdf_page.page_content)
print(pdf_page.page_content)
# 去•和空格
pdf_page.page_content = pdf_page.page_content.replace('•', '')
pdf_page.page_content = pdf_page.page_content.replace(' ', '')
print(pdf_page.page_content)
2:分块处理文档
由于单个文档的长度往往会超过模型支持的上下文,导致检索得到的知识太长超出模型的处理能力,因此,在构建向量知识库的过程中,我们往往需要对文档进行分割,将单个文档按长度或者按固定的规则分割成若干个 chunk,然后将每个 chunk 转化为词向量,存储到向量数据库中。
在检索时,我们会以 chunk 作为检索的元单位,也就是每一次检索到 k 个 chunk 作为模型可以参考来回答用户问题的知识,这个 k 是我们可以自由设定的。
Langchain 中文本分割器都根据 chunk_size
(块大小)和 chunk_overlap
(块与块之间的重叠大小)进行分割。
-
chunk_size 指每个块包含的字符或 Token (如单词、句子等)的数量
-
chunk_overlap 指两个块之间共享的字符数量,用于保持上下文的连贯性,避免分割丢失上下文信息
Langchain 提供多种文档分割方式,区别在怎么确定块与块之间的边界、块由哪些字符/token组成、以及如何测量块大小
- RecursiveCharacterTextSplitter(): 按字符串分割文本,递归地尝试按不同的分隔符进行分割文本。
- CharacterTextSplitter(): 按字符来分割文本。
- MarkdownHeaderTextSplitter(): 基于指定的标题来分割markdown 文件。
- TokenTextSplitter(): 按token来分割文本。
- SentenceTransformersTokenTextSplitter(): 按token来分割文本
- Language(): 用于 CPP、Python、Ruby、Markdown 等。
- NLTKTextSplitter(): 使用 NLTK(自然语言工具包)按句子分割文本。
- SpacyTextSplitter(): 使用 Spacy按句子的切割文本。
'''
文档分割
'''
'''
* RecursiveCharacterTextSplitter 递归字符文本分割
RecursiveCharacterTextSplitter 将按不同的字符递归地分割(按照这个优先级["\n\n", "\n", " ", ""]),这样就能尽量把所有和语义相关的内容尽可能长时间地保留在同一位置
RecursiveCharacterTextSplitter需要关注的是4个参数:* separators - 分隔符字符串数组
* chunk_size - 每个文档的字符数量限制
* chunk_overlap - 两份文档重叠区域的长度
* length_function - 长度计算函数
'''
#导入文本分割器
from langchain.text_splitter import RecursiveCharacterTextSplitter# 知识库中单段文本长度
CHUNK_SIZE = 500# 知识库中相邻文本重合长度
OVERLAP_SIZE = 50# 使用递归字符文本分割器
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE,chunk_overlap=OVERLAP_SIZE
)
text_splitter.split_text(pdf_page.page_content[0:1000])split_docs = text_splitter.split_documents(pdf_pages)
print(f"切分后的文件数量:{len(split_docs)}")
print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in split_docs])}")
langchain自定义embedding封装
要实现自定义 Embeddings,需要定义一个自定义类继承自 LangChain 的 Embeddings 基类,然后定义两个函数:① embed_query 方法,用于对单个字符串(query)进行 embedding;②embed_documents 方法,用于对字符串列表(documents)进行 embedding
from __future__ import annotationsimport logging
from typing import Dict, List, Anyfrom langchain.embeddings.base import Embeddings
from langchain.pydantic_v1 import BaseModel, root_validator logger = logging.getLogger(__name__)# 继承自 Embeddings 类的自定义 Embeddings 类
class ZhipuAIEmbeddings(BaseModel, Embeddings):"""`Zhipuai Embeddings` embedding models."""client: Any"""`zhipuai.ZhipuAI"""@root_validator() # root_validator 用于在校验整个数据模型之前对整个数据模型进行自定义校验,以确保所有的数据都符合所期望的数据结构def validate_environment(cls, values: Dict) -> Dict:"""实例化ZhipuAI为values["client"]Args:values (Dict): 包含配置信息的字典,必须包含 client 的字段.Returns:values (Dict): 包含配置信息的字典。如果环境中有zhipuai库,则将返回实例化的ZhipuAI类;否则将报错 'ModuleNotFoundError: No module named 'zhipuai''."""from zhipuai import ZhipuAIvalues["client"] = ZhipuAI()return values'''embed_query 是对单个文本(str)计算 embedding 的方法,这里我们重写该方法,调用验证环境时实例化的ZhipuAI来 调用远程 API 并返回 embedding 结果。'''def embed_query(self, text: str) -> List[float]:"""生成输入文本的 embedding.Args:texts (str): 要生成 embedding 的文本.Return:embeddings (List[float]): 输入文本的 embedding,一个浮点数值列表."""embeddings = self.client.embeddings.create(model="embedding-2",input=text)return embeddings.data[0].embedding'''embed_documents 是对字符串列表(List[str])计算embedding 的方法,对于这种类型输入我们采取循环方式挨个计算列表内子字符串的 embedding 并返回。'''def embed_documents(self, texts: List[str]) -> List[List[float]]:"""生成输入文本列表的 embedding.Args:texts (List[str]): 要生成 embedding 的文本列表.Returns:List[List[float]]: 输入列表中每个文档的 embedding 列表。每个 embedding 都表示为一个浮点值列表。"""return [self.embed_query(text) for text in texts]
对于 embed_query 可以加入一些内容处理后再请求 embedding,比如如果文本特别长,我们可以考虑对文本分段,防止超过最大 token 限制,这些都是可以的,靠大家发挥自己的主观能动性完善
这里只是给出一个简单的 demo.通过上述步骤,我们就可以基于 LangChain 与 智谱 AI 定义 embedding 的调用方式了。我们将此代码封装在 zhipuai_embedding.py 文件中。