一 简单聊天机器人搭建
1.1 openai调用
import os
from openai import OpenAI
from dotenv import load_dotenv, find_dotenvload_dotenv()
client = OpenAI()# 打印所支持的模型
model_lst = client.models.list()for model in model_lst:print (model.id)# 调用API接口
completion = client.chat.completions.create(model="gpt-4",messages=[{"role": "system", "content": "你是一名专业的英语助教,给学生提供必要的支持如提供提示、纠正错误等,但对于学生的请求不能直接给出答案,而是要一步步引导学生完成任务。 你仅需要回复跟英语有关的问题,如与英语无关,不要回复。"},{"role": "user", "content": "如何学好英语?"},{"role": "assistant", "content": "xxxx"},{"role": "user", "content": "xx"}],max_tokens = 500,#n=5,temperature=0.7,
)print (completion)
#print(completion.choices[0].message)
1.2 ai画图
# 如果缺乏这些library, 请安装。
# pip install openai python-dotenv
from dotenv import load_dotenvload_dotenv()from openai import OpenAIclient = OpenAI() # is this a new way of initializing OpenAI API?response = client.images.generate(model="dall-e-3",prompt="在一个教室里,很多学生在上数学课,并且激烈地在跟老师讨论问题。",size="1024x1024",quality="standard",n=1,
)
image_url = response.data[0].url
1.3 llm-developing-mygpt
主要是简单搭建一个聊天机器人。
app.py
import gradio as gr
from loguru import logger
from MyGPT import MyGPT
from config import MODELS, DEFAULT_MODEL, MODEL_TO_MAX_TOKENSmygpt = MyGPT()def fn_prehandle_user_input(user_input, chat_history):# 检查输入if not user_input:gr.Warning("请输入您的问题")logger.warning("请输入您的问题")return chat_history# 用户消息在前端对话框展示chat_history.append([user_input, None])logger.info(f"\n用户输入: {user_input}, \n"f"历史记录: {chat_history}")return chat_historydef fn_predict(user_input,chat_history,model,max_tokens,temperature,stream):# 如果用户输入为空,则返回当前的聊天历史if not user_input:return chat_history# 打印日志,记录输入参数信息logger.info(f"\n用户输入: {user_input}, \n"f"历史记录: {chat_history}, \n"f"使用模型: {model}, \n"f"要生成的最大token数: {max_tokens}\n"f"温度: {temperature}\n"f"是否流式输出: {stream}")# 构建 messages 参数messages = user_input # or [{"role": "user", "content": user_input}]if len(chat_history) > 1:messages = []for chat in chat_history:if chat[0] is not None:messages.append({"role": "user", "content": chat[0]})if chat[1] is not None:messages.append({"role": "assistant", "content": chat[1]})print(messages)# 生成回复bot_response = mygpt.get_completion(messages, model, max_tokens, temperature, stream)if stream:# 流式输出chat_history[-1][1] = ""for character in bot_response:character_content = character.choices[0].delta.contentif character_content is not None:chat_history[-1][1] += character_contentyield chat_historyelse:# 非流式输出chat_history[-1][1] = bot_responselogger.info(f"历史记录: {chat_history}")yield chat_historydef fn_update_max_tokens(model, origin_set_tokens):"""更新最大令牌数的函数。:param model: 要更新最大令牌数的模型。:param origin_set_tokens: 原始滑块组件设置的令牌数。:return: 包含新最大令牌数的滑块组件。"""# 获取模型对应的新最大令牌数,如果没有设置则使用传入的最大令牌数new_max_tokens = MODEL_TO_MAX_TOKENS.get(model)new_max_tokens = new_max_tokens if new_max_tokens else origin_set_tokens# 如果原始设置的令牌数超过了新的最大令牌数,将其调整为默认值(这里设置为500,你可以根据需要调整)new_set_tokens = origin_set_tokens if origin_set_tokens <= new_max_tokens else 500# 创建新的最大令牌数滑块组件new_max_tokens_component = gr.Slider(minimum=0,maximum=new_max_tokens,value=new_set_tokens,step=1.0,label="max_tokens",interactive=True,)return new_max_tokens_componentwith gr.Blocks() as demo:# 标题gr.Markdown("# MyGPT")with gr.Row(equal_height=True):# 左侧对话栏with gr.Column(scale=4):chatbot = gr.Chatbot(label="聊天机器人")user_input_textbox = gr.Textbox(label="用户输入框", value="你好")with gr.Row():submit_btn = gr.Button("Submit")clear_btn = gr.Button("Clear", elem_id="btn")# 右侧工具箱with gr.Column(scale=1):# 创建一个包含三个滑块的选项卡,用于调整模型的温度、最大长度和Top P参数with gr.Tab(label="参数"):# 选择模型model_dropdown = gr.Dropdown(label="model",choices=MODELS,value=DEFAULT_MODEL,multiselect=False,interactive=True,)max_tokens_slider = gr.Slider(minimum=0,maximum=4096,value=500,step=1.0,label="max_tokens",interactive=True)temperature_slider = gr.Slider(minimum=0,maximum=1,value=0.5,step=0.01,label="temperature",interactive=True)stream_radio = gr.Radio(choices=[True,False],label="stream",value=True,interactive=True)# 模型有改动时,对应的 max_tokens_slider 滑块组件的最大值随之改动。# https://www.gradio.app/docs/dropdownmodel_dropdown.change(fn=fn_update_max_tokens,inputs=[model_dropdown, max_tokens_slider],outputs=max_tokens_slider)# 当用户在文本框处于焦点状态时按 Enter 键时,将触发此侦听器。# https://www.gradio.app/docs/textboxuser_input_textbox.submit(fn=fn_prehandle_user_input,inputs=[user_input_textbox,chatbot],outputs=[chatbot]).then(fn=fn_predict,inputs=[user_input_textbox,chatbot,model_dropdown,max_tokens_slider,temperature_slider,stream_radio],outputs=[chatbot])# 单击按钮时触发。# https://www.gradio.app/docs/buttonsubmit_btn.click(fn=fn_prehandle_user_input,inputs=[user_input_textbox,chatbot],outputs=[chatbot]).then(fn=fn_predict,inputs=[user_input_textbox,chatbot,model_dropdown,max_tokens_slider,temperature_slider,stream_radio],outputs=[chatbot])clear_btn.click(lambda: None, None, chatbot, queue=False)demo.queue().launch(share=True)
config.py
import os
from dotenv import load_dotenv, find_dotenv# 加载环境变量
load_dotenv(find_dotenv())# 获取 OpenAI API 密钥
OPENAI_API_KEY = os.environ['OPENAI_API_KEY']# 官方文档 - Models:https://platform.openai.com/docs/models
MODELS = [# 最新的 GPT-3.5 Turbo 模型,具有改进的指令遵循、JSON 模式、可重现输出、并行函数调用等。最多返回 4,096 个输出标记。"gpt-3.5-turbo-1106","gpt-3.5-turbo", # 当前指向 gpt-3.5-turbo-0613 。自 2023 年 12 月 11 日开始指向gpt-3.5-turbo-1106。"gpt-3.5-turbo-16k", # 当前指向 gpt-3.5-turbo-0613 。将指向gpt-3.5-turbo-1106 2023 年 12 月 11 日开始。# "gpt-3.5-turbo-instruct", # 与 text-davinci-003功能类似,但兼容遗留的 Completions 端点,而不是 Chat Completions。# "gpt-4-1106-preview", # 最新的 GPT-4 模型,具有改进的指令跟踪、JSON 模式、可重现输出、并行函数调用等。最多返回 4,096 个输出标记。此预览模型尚不适合生产流量。# "gpt-4-vision-preview", # 除了所有其他 GPT-4 Turbo 功能外,还能够理解图像。最多返回 4,096 个输出标记。这是一个预览模型版本,尚不适合生产流量。"gpt-4", # 当前指向 gpt-4-0613。8192 tokens# "gpt-4-32k", # 当前指向 gpt-4-32k-0613。32768 tokens# "gpt-4-0613", # 从 2023 年 6 月 13 日开始的 gpt-4 快照,改进了函数调用支持。# "gpt-4-32k-0613", # 从 2023 年 6 月 13 日开始的 gpt-4-32k 快照,改进了函数调用支持。
]
DEFAULT_MODEL = MODELS[1]
MODEL_TO_MAX_TOKENS = {"gpt-3.5-turbo-1106": 4096,"gpt-3.5-turbo": 4096,"gpt-3.5-turbo-16k": 16385,"gpt-4": 8192,
}
mygpt.py
import openai
from loguru import logger
from config import *class MyGPT:def __init__(self, api_key=OPENAI_API_KEY):"""初始化 MyGPT 类:param api_key: 设置 OpenAI API 密钥"""self.client = openai.OpenAI(api_key=api_key)def get_completion(self,messages,model=DEFAULT_MODEL,max_tokens=200,temperature=0,stream=False,):"""Creates a model response for the given chat conversation.为给定的聊天对话创建模型响应。API官方文档:https://platform.openai.com/docs/api-reference/chat/create:param messages: 到目前为止,构成对话的消息列表。:param model: 要使用的模型的 ID。:param max_tokens: 聊天完成时可以生成的最大令牌数。:param temperature: 使用什么采样温度,介于 0 和 2 之间。较高的值(如 0.8)将使输出更加随机,而较低的值(如 0.2)将使其更具集中性和确定性。:param stream: 是否流式输出。:return: chat completion object(聊天完成对象),如果请求是流式处理的,则返回chat completion chunk(聊天完成区块对象)的流序列。"""if isinstance(messages, str):messages = [{"role": "user", "content": messages}]elif not isinstance(messages, list):return "无效的 'messages' 类型。它应该是一个字符串或消息列表。"response = self.client.chat.completions.create(messages=messages,model=model,max_tokens=max_tokens,stream=stream,temperature=temperature,)if stream:# 流式输出return response# 非流式输出logger.debug(response.choices[0].message.content)logger.info(f"总token数: {response.usage.total_tokens}")return response.choices[0].message.contentdef get_completion(self,messages,model=DEFAULT_MODEL,max_tokens=200,temperature=0,stream=False,):"""Creates a model response for the given chat conversation.为给定的聊天对话创建模型响应。API官方文档:https://platform.openai.com/docs/api-reference/chat/create:param messages: 到目前为止,构成对话的消息列表。:param model: 要使用的模型的 ID。:param max_tokens: 聊天完成时可以生成的最大令牌数。:param temperature: 使用什么采样温度,介于 0 和 2 之间。较高的值(如 0.8)将使输出更加随机,而较低的值(如 0.2)将使其更具集中性和确定性。:param stream: 是否流式输出。:return: chat completion object(聊天完成对象),如果请求是流式处理的,则返回chat completion chunk(聊天完成区块对象)的流序列。"""if isinstance(messages, str):messages = [{"role": "user", "content": messages}]elif not isinstance(messages, list):return "无效的 'messages' 类型。它应该是一个字符串或消息列表。"response = self.client.chat.completions.create(messages=messages,model=model,max_tokens=max_tokens,stream=stream,temperature=temperature,)if stream:# 流式输出return response# 非流式输出logger.debug(response.choices[0].message.content)logger.info(f"总token数: {response.usage.total_tokens}")return response.choices[0].message.contentdef get_embedding(self, input):"""Creates an embedding vector representing the input text.创建表示输入文本的嵌入向量。API官方文档:https://platform.openai.com/docs/api-reference/embeddings/create:param input: 输入要嵌入的文本,编码为字符串或标记数组。若要在单个请求中嵌入多个输入,请传递字符串数组或令牌数组数组。输入不得超过模型的最大输入标记数(8192 text-embedding-ada-002 个标记),不能为空字符串,任何数组的维数必须小于或等于 2048。:return: 嵌入对象的列表。"""response = self.client.embeddings.create(input=input,model='text-embedding-ada-002',)embeddings = [data.embedding for data in response.data]return embeddingsif __name__ == "__main__":# 测试mygpt = MyGPT()# promptprompt = '你好'response = mygpt.get_completion(prompt, temperature=1)print(response)# # messages# messages = [# {'role': 'user', 'content': '什么是大模型'},# ]# response = mygpt.get_completion(messages, temperature=1)# print(response)# vectors = mygpt.get_embedding("input text")# print(len(vectors), len(vectors[0]))# # 1 1536## vectors = mygpt.get_embedding(["input text 1", "input text 2"])# print(len(vectors), len(vectors[0]))# # 2 1536
二 构建向量数据库
2.1 文本拆分-英文
Split by Sentence
import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenizetext = ("The Earth's atmosphere is composed of layers, including the troposphere, ""stratosphere, mesosphere, thermosphere, and exosphere. The troposphere is ""the lowest layer where all weather takes place and contains 75% of the atmosphere's mass. ""Above this, the stratosphere contains the ozone layer, which protects the Earth ""from harmful ultraviolet radiation.")# Split the text into sentences
chunks = sent_tokenize(text)for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
Fixed length chunks
def fixed_length_chunks(text, chunk_size):return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]chunks = fixed_length_chunks(text, 100) # 假设我们想要100个字符的块for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
Chunks with overlapping window
def sliding_window_chunks(text, chunk_size, stride):return [text[i:i+chunk_size] for i in range(0, len(text), stride)]chunks = sliding_window_chunks(text, 100, 50) # 100个字符的块,步长为50for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
RecursiveCharacterTextSplitter from langchain
# pip install langchain may be required
from langchain.text_splitter import RecursiveCharacterTextSplittertext = "The Earth's atmosphere is a layer of gases surrounding the planet Earth and retained by Earth's gravity. It contains roughly 78% nitrogen and 21% oxygen, with trace amounts of other gases. The atmosphere protects life on Earth by absorbing ultraviolet solar radiation and reducing temperature extremes between day and night."splitter = RecursiveCharacterTextSplitter(chunk_size = 150,chunk_overlap = 20,length_function = len,
)
trunks = splitter.split_text(text)
for i, chunk in enumerate(trunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
2.2 文本拆分-中文
# 按照sentence来切分
# %pip install jieba
import retext = "在这里,我们有一段超过200字的中文文本作为输入例子。这段文本是关于自然语言处理的简介。自然语言处理(NLP)是计算机科学、人工智能和语言学的交叉领域,它旨在让计算机能够理解和处理人类语言。在这一领域中,机器学习技术扮演着核心角色。通过使用各种算法,计算机可以解析、理解、甚至生成人类可以理解的语言。这一技术已广泛应用于机器翻译、情感分析、自动摘要、实体识别等多个方面。随着深度学习技术的发展,自然语言处理的准确性和效率都得到了显著提升。当前,一些高级的NLP系统已经能够完成复杂的语言理解任务,例如问答系统、语音识别和对话系统等。自然语言处理的研究不仅有助于改善人机交互,而且对于提高机器的自主性和智能化水平也具有重要意义。"
# 正则表达式匹配中文句子结束的标点符号
sentences = re.split(r'(。|?|!|\…\…)', text)
# 重新组合句子和结尾的标点符号
chunks = [sentence + (punctuation if punctuation else '') for sentence, punctuation in zip(sentences[::2], sentences[1::2])]
for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
# 按照固定字符数切分
def split_by_fixed_char_count(text, count):return [text[i:i+count] for i in range(0, len(text), count)]# 假设我们按照每100个字符来切分文本
chunks = split_by_fixed_char_count(text, 100)
for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
# 按照固定sentence数切分def split_by_fixed_sentence_count(sentences, count):return [sentences[i:i+count] for i in range(0, len(sentences), count)]# 假设我们按照每5个句子来切分文本
chunks = split_by_fixed_sentence_count(sentences, 5)for i, chunk in enumerate(chunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
from langchain.text_splitter import RecursiveCharacterTextSplittertext = """
在这里,我们有一段超过200字的中文文本作为输入例子。这段文本是关于自然语言处理的简介。自然语言处理(NLP)是计算机科学、人工智能和语言学的交叉领域,它旨在让计算机能够理解和处理人类语言。在这一领域中,机器学习技术扮演着核心角色。通过使用各种算法,计算机可以解析、理解、甚至生成人类可以理解的语言。这一技术已广泛应用于机器翻译、情感分析、自动摘要、实体识别等多个方面。随着深度学习技术的发展,自然语言处理的准确性和效率都得到了显著提升。当前,一些高级的NLP系统已经能够完成复杂的语言理解任务,例如问答系统、语音识别和对话系统等。自然语言处理的研究不仅有助于改善人机交互,而且对于提高机器的自主性和智能化水平也具有重要意义。
"""splitter = RecursiveCharacterTextSplitter(chunk_size = 150,chunk_overlap = 0,length_function = len,
)trunks = splitter.split_text(text)
for i, chunk in enumerate(trunks):print(f"块 {i+1}: {len(chunk)}: {chunk}")
2.3 文本向量化
vector_embedding_and_similarity
from openai import OpenAI
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # 读取本地 .env 文件,里面定义了 OPENAI_API_KEY
client = OpenAI()def get_embedding(text, model="text-embedding-ada-002"):text = text.replace("\n", " ")return client.embeddings.create(input = [text], model=model).data[0].embedding
import numpy as npdef cosine_similarity(A, B):dot_product = np.dot(A, B)norm_A = np.linalg.norm(A)norm_B = np.linalg.norm(B)return dot_product / (norm_A * norm_B)
emb1 = get_embedding("大模型的应用场景很多")
emb2 = get_embedding("大模型")
emb3 = get_embedding("大模型有很多应用场景")
emb4 = get_embedding("Java开发")cosine_similarity(emb1, emb2) # 0.9228796051583866
cosine_similarity(emb1, emb4) # 0.796571635600008
2.4 读取文件写入向量数据库
import json
import os
from ast import literal_evalfrom utils import *
from db_qdrant import Qdrantdef preprocess_data(df_news):# 数据预处理块# 将包含字符串表示的列表转换为实际的列表# pd.notna(x) 检查x是否为非缺失值(即不是NaN),确保不对缺失值进行转换。# literal_eval(x) 是一个安全的方式来将字符串转换为相应的Python对象df_news['title_entities'] = df_news['title_entities'].apply(lambda x: literal_eval(x) if pd.notna(x) else [])df_news['abstract_entities'] = df_news['abstract_entities'].apply(lambda x: literal_eval(x) if pd.notna(x) else [])# 使用空字符串填充其他列的 NaN 值df_news = df_news.fillna('')# 新增 news_info 列,合并`类别、子类别、标题、摘要`字符串concatenation_order = ["category", "sub_category", "title", "abstract"]df_news['news_info'] = df_news.apply(lambda row: ' '.join(f"[{col}]:{row[col]}" for col in concatenation_order), axis=1)news_info_list = df_news['news_info'].values.tolist()logger.trace(f"新增 news_info 列 | len(news_info_list): {len(news_info_list)}")return df_news, news_info_listdef store_embeddings_to_json(embeddings, ids, payloads, file_path):# 存储嵌入为 JSON 文件json_data = {"batch_ids": ids,"batch_embeddings": embeddings,"batch_payloads": payloads}with open(file_path, 'w') as json_file:json.dump(json_data, json_file)def compute_and_store_embeddings(data_list, embedding_folder, batch_size=1000):# 嵌入计算和存储块# 分批次向量化ids = list(range(1, len(data_list) + 1)) # 生成递增的 ids 列表for batch_idx, i in enumerate(range(0, len(data_list), batch_size)):# 获取批次数据 batch_ids、batch_payloadsbatch_ids = ids[i:i + batch_size]df_news_batch = df_news.iloc[i:i + batch_size]batch_payloads = df_news_batch.to_dict(orient='records')# 计算嵌入 batch_embeddingsbatch_data = data_list[i:i + batch_size]batch_embeddings = embed_sentences(batch_data)# 存储为 JSON 文件file_path = os.path.join(embedding_folder,f"batch_{batch_idx + 1}.json")store_embeddings_to_json(batch_embeddings,batch_ids,batch_payloads,file_path)# 打印存储信息logger.info(f"批次 {batch_idx} 数据存储成功,文件路径: {file_path}")def load_embeddings_and_save_to_qdrant(collection_name,embedding_folder,batch_size):# 加载嵌入和存储到向量数据库qdrant = Qdrant()# 创建新的集合if qdrant.create_collection(collection_name):logger.success(f"创建集合成功 | collection_name: {collection_name}")else:logger.error(f"创建集合失败 | collection_name: {collection_name}")# 分批次存储到向量数据库for batch_idx, i in enumerate(range(0, len(news_info_list), batch_size)):# 读取 JSON 文件file_path = os.path.join(embedding_folder,f"batch_{batch_idx + 1}.json")if os.path.exists(file_path):with open(file_path, 'r') as json_file:json_data = json.load(json_file)batch_ids = json_data["batch_ids"]batch_embeddings = json_data["batch_embeddings"]batch_payloads = json_data["batch_payloads"]# 插入数据到 Qdrantif qdrant.add_points(collection_name,batch_ids,batch_embeddings,batch_payloads):logger.success(f"批次 {batch_idx + 1} 插入成功")else:logger.error(f"批次 {batch_idx + 1} 插入失败")else:logger.warning(f"文件 {file_path} 不存在,跳过该批次数据的插入。")logger.info("所有数据插入完成。")# 读取新闻数据
df_news = get_df_news()# 数据预处理
df_news, news_info_list = preprocess_data(df_news)# 指定存储 embeddings 的文件夹路径
embedding_folder = 'embeddings_folder'
os.makedirs(embedding_folder, exist_ok=True)# 计算和存储嵌入
compute_and_store_embeddings(news_info_list, embedding_folder, 1000)# 加载嵌入和存储到向量数据库
collection_name = "all_news"
# load_embeddings_and_save_to_qdrant(collection_name, embedding_folder, 1000)
2.5 构建向量数据库类
from loguru import logger
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams, Batch
from qdrant_client.http.exceptions import UnexpectedResponse # 捕获错误信息from config import QDRANT_HOST, QDRANT_PORT, QDRANT_EMBEDDING_DIMSclass Qdrant:def __init__(self):self.client = QdrantClient(host=QDRANT_HOST,port=QDRANT_PORT) # 创建客户端实例self.size = QDRANT_EMBEDDING_DIMS # openai embedding 维度 = 1536def get_points_count(self, collection_name):"""先检查集合是否存在。- 如果集合存在,返回该集合的 points_count (集合中确切的points_count)- 如果集合不存在,创建集合。- 创建集合成功,则返回 points_count (0: 刚创建完points_count就是0)- 创建集合失败,则返回 points_count (-1: 创建失败了,定义points_count为-1)Returns:points_countRaises:UnexpectedResponse: 如果在获取集合信息时发生意外的响应。ValueError: Collection test_collection not found"""try:collection_info = self.get_collection(collection_name)except (UnexpectedResponse, ValueError) as e: # 集合不存在,创建新的集合if self.create_collection(collection_name):logger.success(f"创建集合成功 | collection_name: {collection_name} points_count: 0")return 0else:logger.error(f"创建集合失败 | collection_name: {collection_name} 错误信息:{e}")return -1except Exception as e:logger.error(f"获取集合信息时发生错误 | collection_name: {collection_name} 错误信息:{e}")return -1 # 返回错误码或其他适当的值else:points_count = collection_info.points_countlogger.success(f"库里已有该集合 | collection_name: {collection_name} points_count:{points_count}")return points_countdef list_all_collection_names(self):"""CollectionsResponse类型举例:CollectionsResponse(collections=[CollectionDescription(name='GreedyAIEmployeeHandbook'),CollectionDescription(name='python')])CollectionsResponse(collections=[])"""CollectionsResponse = self.client.get_collections()collection_names = [CollectionDescription.name for CollectionDescription in CollectionsResponse.collections]return collection_names# 获取集合信息def get_collection(self, collection_name):"""获取集合信息。Args:collection_name (str, optional): 自定义的集合名称。如果未提供,则使用默认的self.collection_name。Returns:collection_info: 集合信息。"""collection_info = self.client.get_collection(collection_name=collection_name)return collection_info# 创建集合def create_collection(self, collection_name) -> bool:"""创建集合。Args:collection_name (str, optional): 自定义的集合名称。如果未提供,则使用默认的self.collection_name。Returns:bool: 如果成功创建集合,则返回True;否则返回False。"""return self.client.recreate_collection(collection_name=collection_name, vectors_config=VectorParams(size=self.size, distance=Distance.COSINE), )def add_points(self, collection_name, ids, vectors, payloads):# 将数据点添加到Qdrantself.client.upsert(collection_name=collection_name,wait=True,points=Batch(ids=ids,payloads=payloads,vectors=vectors))return True# 搜索def search(self, collection_name, query_vector, limit=3):return self.client.search(collection_name=collection_name,query_vector=query_vector,limit=limit,with_payload=True)def search_with_query_filter(self,collection_name,query_vector,query_filter,limit=3):"""根据向量相似度和指定的过滤条件,在集合中搜索最相似的points。API 文档:https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/search_points:param collection_name:要搜索的集合名称:param query_vector:用于相似性比较的向量:param query_filter:过滤条件:param limit:要返回的结果的最大数量:return:"""return self.client.search(collection_name=collection_name,query_filter=query_filter,query_vector=query_vector,limit=limit,with_payload=True)if __name__ == "__main__":qdrant = Qdrant()# 创建集合# collection_name = "test"# 获取集合信息# qdrant.get_collection(collection_name)# 如果之前没有创建集合,则会报以下错误# qdrant_client.http.exceptions.UnexpectedResponse: Unexpected Response: 404 (Not Found)# Raw response content:# b'{"status":{"error":"Not found: Collection `test` doesn\'t exist!"},"time":0.000198585}'# 获取集合信息,如果没有该集合则创建collection_name = "all_news"count = qdrant.get_points_count(collection_name)print(count)# 如果之前没有创建集合,且正确创建了该集合,则输出0。例:创建集合成功。集合名:test。节点数量:0。# 如果之前创建了该集合,则输出该集合内部的节点数量。例:库里已有该集合。集合名:test。节点数量:0。# 删除集合# collection_name = "test"# qdrant.client.delete_collection(collection_name)
2.6 计算向量相似度
from sentence_transformers import SentenceTransformertexts1 = ["胡子长得太快怎么办?", "在香港哪里买手表好"]
texts2 = ["胡子长得快怎么办?", "怎样使胡子不浓密!", "香港买手表哪里好", "在杭州手机到哪里买"]model = SentenceTransformer('../Dmeta-embedding') # DMetaSoul/Dmeta-embedding
print("Use pytorch device: {}".format(model.device))# 使用 GPU 加载模型
# import torch
# device = "cuda" if torch.cuda.is_available() else "cpu"
# print("Use pytorch device: {}".format(device))
# model = SentenceTransformer('../Dmeta-embedding', device=device)embs1 = model.encode(texts1, normalize_embeddings=True)
embs2 = model.encode(texts2, normalize_embeddings=True)# 计算两两相似度
similarity = embs1 @ embs2.T
print(similarity)# 获取 texts1[i] 对应的最相似 texts2[j]
for i in range(len(texts1)):scores = []for j in range(len(texts2)):scores.append([texts2[j], similarity[i][j]])scores = sorted(scores, key=lambda x: x[1], reverse=True)print(f"查询文本:{texts1[i]}")for text2, score in scores:print(f"相似文本:{text2},打分:{score}")print()
2.7 推荐系统案例
from utils import *
from db_qdrant import Qdrant
from qdrant_client.http import models# 获取数据
df_behaviors_sample = get_df_behaviors_sample()
df_news = get_df_news()# 循环 df_behaviors_sample 的每一行
for _, row in df_behaviors_sample.iterrows():user_id = row['user_id']click_history = row['click_history'].split()# 召回# 生成历史交互字符串 historical_records_strhistorical_records = generate_historical_records(df_news, click_history)historical_records_str = '\n'.join(historical_records)logger.info(f"历史交互字符串 | historical_records_str: \n{historical_records_str}")# 生成用户画像 user_profileuser_profile = generate_user_profile(historical_records_str)# 向量化用户画像 user_embuser_emb = embed_sentences([user_profile])[0]# 过滤条件 query_filter# 统计出当前用户的(新闻类别,新闻子类别)偏好组合user_favorite_combinations = get_user_favorite_combinations(click_history, df_news)should_items = []for category, sub_category in user_favorite_combinations:should_item = models.Filter(must=[models.FieldCondition(key="category",match=models.MatchValue(value=category,)),models.FieldCondition(key="sub_category",match=models.MatchValue(value=sub_category,))])should_items.append(should_item)query_filter = models.Filter(should=should_items)# 使用 Qdrant 查询与用户画像字符串最相似的 news 列表qdrant = Qdrant()scored_point_list = qdrant.search_with_query_filter("all_news", user_emb, query_filter, 20)coarse_top_news = [scored_point.payload for scored_point in scored_point_list]logger.info(f"len(top_news): {len(coarse_top_news)}")if coarse_top_news:# 排序coarse_top_news_str = '\n'.join([f"{idx}. {news}" for idx, news in enumerate(coarse_top_news)])fine_top_news = fine_ranking(user_profile,historical_records_str,coarse_top_news_str,5)for idx in fine_top_news:news = coarse_top_news[int(idx)]logger.success(int(idx))logger.success(news)break
结合向量数据库内容构建prompt
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
import openai# 初始化模型和向量数据库
model = SentenceTransformer('all-MiniLM-L6-v2') # 轻量级嵌入模型
index = faiss.read_index("path/to/your_vector_index.faiss") # 加载预存索引
documents = ["doc1 text...", "doc2 text...", ...] # 假设已加载原始文本def retrieve_context(query: str, top_k: int = 3) -> str:"""检索相关上下文"""query_embedding = model.encode([query]) # 编码为向量distances, indices = index.search(query_embedding, top_k) # FAISS 搜索return "\n".join([documents[i] for i in indices[0]])def generate_answer(query: str, context: str) -> str:"""调用大模型生成回答"""prompt = f"""根据以下上下文回答问题:
{context}问题:{query}
答案:"""response = openai.Completion.create(engine="text-davinci-003",prompt=prompt,max_tokens=500,temperature=0.3)return response.choices[0].text.strip()# 用户输入处理
user_query = "如何学习机器学习?"
context = retrieve_context(user_query)
answer = generate_answer(user_query, context)
print(answer)