开篇
嘿,各位小伙伴!今天我们来聊一个超级有趣的话题:embedding向量化数据。听起来很高大上对不对?别担心,让我用最简单的方式告诉你这是啥。 想象一下,你有一大堆文字、图片或者其他数据,想要让计算机理解它们该怎么办?这就需要我们把它们变成计算机能读懂的"数字",这个过程就叫embedding向量化!
简单来说,就是把数据转换成一串数字,这串数字就像是数据的"指纹",我们管它叫向量。
比如,"我爱吃火锅"这句话,经过向量化后可能变成[-0.2, 0.5, 0.8]这样的数字组合。 那这些数字有什么用呢?最神奇的是,通过计算这些向量之间的余弦值(简单理解就是数学上的一个相似度打分),我们就能知道不同数据之间有多相似!余弦值越接近1,说明两个数据越相似;越接近-1,说明越不相似。 所以下次当你使用搜索引擎,或者看到AI给出惊人的相似推荐时,别忘了,背后可能就是这些小向量们在默默发挥作用哦!
如何生成向量
这张图是我经常用在博客中讲关于Rag类应用的。这是因为Rag自2023年年初被越来越多的人所接受时第一个出现的标准数据流就是这样的。
关键在于这里面的一个个向量是怎么计算出来的,以及怎么选向量模型。
选向量模型
总体来说向量模型目前在市面上分成两类
SAAS化
最有代表的就是OpenAI的Text Embedding Large 3,目前支持达到3172个维度,在Azure OpenAI Studio里可以直接调用,收费相当的便宜、速度快而且支持高并发(零点几厘每千token);
本地化开源
但是考虑到数据安全性以及解决方案一致和一体性同时要兼顾考虑到中文、英文多场景结合这样的跨语言问题我们还是首选本地开源布署向量模型。
比较著名的有以下三种模型,我们分别从几个能力来比对一下这三种模型。
多语言处理能力
- BGE - BGE可以将任意文本映射到低维的稠密向量,在文本向量化任务中得到了广泛的应用。 BGE系列模型在C-MTEB中文排行榜中名列前茅,显示了其强大的文本处理和语义表征能力。
- M3E - M3E采用大规模混合嵌入技术,旨在提高词向量的表达能力和泛化能力。M3E在训练过程中使用千万级的中文句对数据集进行训练,表现出色的向量编码能力。
- BCE - BCE模型主要用于提升RAG应用的准确度,具体细节较少,但其作为开源大模型的一部分,应具备较强的文本处理能力。
文本处理能力
- BGE - BGE可以将任意文本映射到低维的稠密向量,在文本向量化任务中得到了广泛的应用。 BGE系列模型在C-MTEB中文排行榜中名列前茅,显示了其强大的文本处理和语义表征能力。
- M3E - M3E采用大规模混合嵌入技术,旨在提高词向量的表达能力和泛化能力。M3E在训练过程中使用千万级的中文句对数据集进行训练,表现出色的向量编码能力。
- BCE - BCE模型主要用于提升RAG应用的准确度,具体细节较少,但其作为开源大模型的一部分,应具备较强的文本处理能力。
检索精度与整体语义表征能力
-
BGE - BGE在中英文语义检索精度与整体语义表征能力方面均超越了社区所有同类模型,如OpenAI的text embedding 003等。其保持了同等参数量级模型中的最小向量维度,使用成本更低。
-
M3E - M3E在私有部署和大规模文本处理方面表现出色,适用于需要私有化和资源节约的场景。它通过大规模混合嵌入技术提高了词向量的表达能力和泛化能力,适用于各种文本处理任务。
- BCE - BCE的具体检索精度和语义表征能力未详细说明,但其在RAG应用中的表现表明其具有较高的准确性。
资源使用情况
- BGE - BGE系列模型在全球下载量超过1500万,位居国内开源AI模型首位,表明其资源使用高效且受欢迎。
- M3E - M3E属于小模型,资源使用不高,CPU也可以运行,适合私有化部署和资源受限的环境。
- BCE - BCE的具体资源使用情况未明确提及,但作为开源大模型的一部分,其资源使用可能相对较高。
选定本地化的BGE布署方案
最终我们发觉在多语言支持、文本处理能力和检索精度方面表现优异,尤其适合需要高精度和高效率的场景。同时,其资源使用较为经济。
因此我们就选用BGE Large ZH v1.5模型来做我们的向量模型。
Ollama自带向量模型
当然,还有一种就是Ollama自带embedding模型,叫MX_BAI_Embedding,也是1024位维度的。
你可以通过以下指令安装它,前提是你已经安装有ollama了。
ollama pull mxbai-embed-large
它安装完后即是restful service的。
构建向量化服务的代码
官方例子太简单无法应用在正式生产环境
官方的例子很简单
先安装(记得要用conda虚拟化一个python环境)
conda create -n bge-rerank-large python=3.10 -yconda init powershellconda activate bge-rerank-largepip install -U FlagEmbedding
然后就是直接调用了
rom FlagEmbedding import FlagModel
sentences_1 = ["样例数据-1", "样例数据-2"]
sentences_2 = ["样例数据-3", "样例数据-4"]
model = FlagModel('BAAI/bge-large-zh-v1.5', query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradation
embeddings_1 = model.encode(sentences_1)
embeddings_2 = model.encode(sentences_2)
similarity = embeddings_1 @ embeddings_2.T
print(similarity)# for s2p(short query to long passage) retrieval task, suggest to use encode_queries() which will automatically add the instruction to each query
# corpus in retrieval task can still use encode() or encode_corpus(), since they don't need instruction
queries = ['query_1', 'query_2']
passages = ["样例文档-1", "样例文档-2"]
q_embeddings = model.encode_queries(queries)
p_embeddings = model.encode(passages)
scores = q_embeddings @ p_embeddings.T
这样的调用根本没法用在生产环境,在生产环境一切都为“服务”即API。更何况:
- query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:"
这一个点,网上无任何解释,这个参数是干什么的?
我看到过很多生产环境里都有这样一句代码,这太搞笑了。
这个参数的值可以为空但也可以填,这是为什么呢?
告诉大家吧,这个参数是为每一个向量生成时添加“前缀”时,以便于在检索时碰到类似前缀可以更精准得把这个向量从向量库里搜索得到用的,你可以认为这是一个“打标签”的功能。
下面就来构建生产环境用API服务。
生产环境的实战例子
提出需求
- 可满足生产环境中各种业务在需要embedding时取得向量化用,生产环境里有各种系统有Java的有Python的甚至还有Go、.Net、SAP应用,当这些异质架构混在一起时如果都要调用embedding时怎么办?因此我们需要把它构建成API服务;
- API在被调用时需要调用方在http header头带上一个自定义的企业用api-key;
- 可以把向量返回成浮点数组同时还能返回一个spentTime用于统计每一次调用所消耗时间;
- 任何错误以.log文件日志落盘方式记录它
为此,我们选用轻量级的Flask来构建基于python的restful service。如果你是django当然也可以使用django去构建它。
不过我这介绍的还是最轻量级的flask restful service方案,它轻量、启动方便。
下面上全代码:
from flask import Flask, request, jsonify
import os
import time
import logging
from logging.handlers import RotatingFileHandler
from FlagEmbedding import FlagReranker
from FlagEmbedding import FlagModel
app = Flask(__name__)model = FlagModel('BAAI/bge-large-zh-v1.5',query_instruction_for_retrieval="",use_fp16=False) # 设置use_fp16为True可以加快计算,效果会稍有下降
# 配置日志
if not os.path.exists('logs'):os.makedirs('logs')file_handler = RotatingFileHandler('logs/FountainRank.log', maxBytes=10240000, backupCount=10)
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)@app.route('/getBGEEmbedding', methods=['POST'])
def get_vector():start_time = time.time()error_messages = [] # 用于收集错误信息api_key = request.headers.get('apiKey')if api_key != os.environ.get('X_API_KEY'):error_msg = 'Invalid API Key'app.logger.error(error_msg)error_messages.append(error_msg)return jsonify({'data': {'embeddings': [],'spentTime': time.time() - start_time}}), 403#if not api_key or api_key != os.environ.get('LOCAL_MODEL_API_KEY'):# return jsonify({'error': '无效的登录'}), 403inputText = request.json.get('input', None)if not inputText:return jsonify({'error': '请输入有效文本'}), 400else:embedding = model.encode([inputText])vec_list = embedding.tolist()return jsonify({'data': {'embeddings': vec_list,'spentTime': time.time() - start_time}}), 200if __name__ == '__main__':app.run(host='0.0.0.0', port=5000)
核心代码解读
- 把bge large zh1.5模型作为全局加载,这样每次调用都是从内存里加载模型而不是每调用一次就中加载一次,这是为性能作考虑(第一次加载时会下载690-800Mb模型至本地的HF_HOME所指向目录,即hugging face home所在路径);
- 在python当前目录创建logs目录,如果已经存在就使用它,以便于落盘相关log日志;
- 用flask的@app.route构建api,这个flask运行在5000端口因此它的访问方式就为:http://localhost:5000/getBGEEmbedding;
- 请求时http header请求头必须带上一个apiKey,并且代码用http header里的apiKey和服务端环境变量设置的X_API_KEY的值去做比对,以防止乱访问造成的安全问题;
- 代码通过请求体里的input获取用于请求生成向量的文本段并通过model.encode返回,最终把它以tolist() - 浮点数数组的形式返回给到调用端;
如何调用
启动该例子
我们可以使用python embedding.py直接这样启动它,不过这是开发调试模式。
我们也可以使用gunicorn启动它
gunicorn -w 4 -b 0.0.0.0:5000 --timeout 120 --access-logfile logs/access.log --error-logfile logs/error.log embedding:app
这儿的:app前写的就是这个python文件的名字(不带.py)。
生产环境如何调用
curl --location --request POST 'http://localhost:5000/getBGEEmbedding' \
--header 'apiKey: secret_password' \
--header 'Content-Type: application/json' \
--header 'Host: localhost:5000' \
--data-raw '{"input": "圣诞节水晶球"
}'
于是它就会得到下面这样的向量
在对接时我们取data.embedding里的数据成浮点数组即可用来存储和搜索用了,如我们使用spring boot对接中台代码获取向量示例:
String jsonStr = okHttpHelper.postJsonWithMultiHeaders(embeddingUrl, input, headers, 4000, 4000);logger.info(">>>>>>getEmbedding模型返回的result->{}", jsonStr);if (StringUtil.isNotBlank(jsonStr)) {// 使用JsonNode来解析JsonNode rootNode = objectMapper.readTree(jsonStr);// 先获取data节点JsonNode dataNode = rootNode.get("data");if (dataNode != null) {// 从data节点中获取embeddingsJsonNode embeddingsNode = dataNode.get("embeddings");if (embeddingsNode != null && embeddingsNode.isArray() && embeddingsNode.size() > 0) {// 获取第一个embedding数组JsonNode firstEmbedding = embeddingsNode.get(0);if (firstEmbedding != null && firstEmbedding.isArray()) {vectorResult = new ArrayList<>();for (JsonNode value : firstEmbedding) {vectorResult.add(value.floatValue());}}}// 如果需要获取spentTimeJsonNode spentTimeNode = dataNode.get("spentTime");if (spentTimeNode != null) {double spentTime = spentTimeNode.doubleValue();logger.info(">>>>>>BgeEmbedding spent time->{}",spentTime);// 处理spentTime...}}}
上述代码中的vectorResult是一个List<Float>类型的变量。平均耗时在15-30毫秒,性能相当好(注意了,你要用GPU环境运行)。
如何实现重排序
什么是重排序
在上一篇《一文看懂最前沿得高级RAG+设计是如何消灭AI幻觉的架构设计》 中我们提到了目前最先进的RAG架构-RAG+又称为:高级RAG中,必须使用到重排序这一步骤。
重排序步骤其实就好比:
- 我们先要为用户的提问生成>=提问所用的答案;
- 再像一个渔网一样,逐步把“不合格的鱼”筛选出去;
- 最终留下精确的语料送给LLM去做仲裁,以便得LLM削除不必要的幻觉同时取得最佳性能;
好比用户如果问的是:附近有什么吃的?
RAG会检索出10-30条含有附近设施、配套、相关信息的介绍,而真正和“吃的”有关系的可能只有4条,因此我们不能把30条全部一股脑送给AI,而是把这有用的4条信息:筛选出来,这就要用到重排序了。
为此,我们也使用BGE Rrerank Large模型,它可以本地布署同时性能很棒。
BGE Rerank的使用
官方的便子给出的也是太简单了,无法使用在含有异质架构的生产环境里。
from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradationscore = reranker.compute_score(['query', 'passage'])
print(score)scores = reranker.compute_score([['what is panda?', 'hi'], ['what is panda?', 'The giant panda (Ailuropoda melanoleuca), sometimes called a panda bear or simply panda, is a bear species endemic to China.']])
print(scores)
为此我们也构建flask服务来构建它。
我们把它可以放在刚才的embedding.py文件里,做成另一个API,这样就不需要分成两个flask服务去分别运行了。
全代码
from flask import Flask, request, jsonify
import os
import time
import logging
from logging.handlers import RotatingFileHandler
from FlagEmbedding import FlagReranker
from FlagEmbedding import FlagModel
app = Flask(__name__)# 全局 reranker 对象
global_reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=True)
model = FlagModel('BAAI/bge-large-zh-v1.5',query_instruction_for_retrieval="",use_fp16=False) # 设置use_fp16为True可以加快计算,效果会稍有下降
# 配置日志
if not os.path.exists('logs'):os.makedirs('logs')file_handler = RotatingFileHandler('logs/FountainRank.log', maxBytes=10240000, backupCount=10)
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)@app.route('/getBGEEmbedding', methods=['POST'])
def get_vector():start_time = time.time()error_messages = [] # 用于收集错误信息api_key = request.headers.get('apiKey')if api_key != os.environ.get('X_API_KEY'):error_msg = 'Invalid API Key'app.logger.error(error_msg)error_messages.append(error_msg)return jsonify({'data': {'embeddings': [],'spentTime': time.time() - start_time}}), 403#if not api_key or api_key != os.environ.get('LOCAL_MODEL_API_KEY'):# return jsonify({'error': '无效的登录'}), 403inputText = request.json.get('input', None)if not inputText:return jsonify({'error': '请上传有效的文本'}), 400else:embedding = model.encode([inputText])vec_list = embedding.tolist()return jsonify({'data': {'embeddings': vec_list,'spentTime': time.time() - start_time}}), 200@app.route('/rank', methods=['POST'])
def rank():start_time = time.time()error_messages = [] # 用于收集错误信息try:# 验证 API Keyapi_key = request.headers.get('apiKey')if api_key != os.environ.get('X_API_KEY'):error_msg = 'Invalid API Key'app.logger.error(error_msg)error_messages.append(error_msg)return jsonify({'data': {'resultList': [],'spentTime': time.time() - start_time,'failReason': ' | '.join(error_messages)}}), 403# 获取请求数据data = request.get_json()user_input = data.get('userInput', '')knowledge_list = data.get('knowledgeList', [])# 如果知识列表为空,返回空结果if not knowledge_list:return jsonify({'data': {'resultList': [],'spentTime': time.time() - start_time,'failReason': ''}})# 处理每条知识并计算相似度result_list = []potential_records = [] # 新增一个列表存储分数小于0的记录for idx, item in enumerate(knowledge_list):try:score = global_reranker.compute_score([user_input, item['fileContent']])[0]# 构建通用的结果项result_item = {'id': item.get('id', ''),'pointId': item.get('pointId', 0),'fileName': item.get('fileName', ''),'fileContent': item.get('fileContent', ''),'score': float(score)}# 根据分数分别添加到不同列表if score >=-1.8:result_list.append(result_item)else:potential_records.append(result_item)except Exception as e:error_msg = f'Error processing item {idx}: {str(e)}'app.logger.error(error_msg)error_messages.append(error_msg)continue# 按 score 降序排序result_list.sort(key=lambda x: x['score'], reverse=True)return jsonify({'data': {'resultList': result_list,'potentialRecords': potential_records,'spentTime': time.time() - start_time,'failReason': ' | '.join(error_messages) if error_messages else ''}})except Exception as e:error_msg = f'Error in rank API: {str(e)}'app.logger.error(error_msg)error_messages.append(error_msg)return jsonify({'data': {'resultList': [],'spentTime': time.time() - start_time,'failReason': ' | '.join(error_messages)}})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000)
代码核心解读
- 我们在embedding.py里定义了2个API一个是/getBGEEmbeddingg,一个是/rank,这样只要我们用gunicorn把这个flask服务启动起来就可以满足生产环境中对于embedding和rerank的API调用了;
- 和embedding时一样,我们在一启动时就把rerank模式加载到内存里去而不是每一次调用都去重复加载它;
- rerank会接收一个这样的输入
{"userInput": "外公79岁, 眼压高, 视力模糊, 头痛, 眼睛胀痛, 青光眼, 老年人眼健康, 眼科药物, 眼部维生素补充, 护眼产品","knowledgeList": [{"id": "1","pointId": 3398761,"fileName": "eye.txt","fileContent": "眼压高需要及时就医...","score": 0},{"id": "2","pointId": 1109871,"fileName": "eye.txt","fileContent": "局部用药","score": 0},{"id": "3","pointId": 4788771,"fileName": "eye.txt","fileContent": "外用药","score": 0},{"id": "4","pointId": 538871,"fileName": "被充眼药文档.txt","fileContent": "常用眼药水","score": 0}]
}
- userInput:用户的提问
- knowledgeList,内含从RAG中被检索出来的topK条原始语料;
4. 把rerank排序后得-2分以下的记录全部删除(具体得分段需要根据你自己的项目不断去做调整,一般通用情况下我们是把0分以下的记录全部删除只作为potentialRecords返回,而正确的数据作为resultList返回。也有是全部rerank后返回,取1-10(topP)条记录的作法),这个是没有一定的规定的,而是根据你当前系统中的语料特性不断去做适应得以取得一个最佳的组合,所谓最佳使用就是:正确的语料+潜在的语料是最后检索出的结果。
如何访问
访问很简单
curl --location --request POST 'http://localhost:5000/rank' \
--header 'apiKey: secret_password' \
--header 'Content-Type: application/json' \
--data-raw '{"userInput": "外公79岁, 眼压高, 视力模糊, 头痛, 眼睛胀痛, 青光眼, 老年人眼健康, 眼科药物, 眼部维生素补充, 护眼产品","knowledgeList": [{"id": "1","pointId": 3398761,"fileName": "eye.txt","fileContent": "眼压高需要及时就医...","score": 0},{"id": "2","pointId": 1109871,"fileName": "eye.txt","fileContent": "局部用药","score": 0},{"id": "3","pointId": 4788771,"fileName": "eye.txt","fileContent": "外用药","score": 0},{"id": "4","pointId": 538871,"fileName": "补充眼药文档.txt","fileContent": "常用眼药水","score": 0}]
}'
最后我们来看返回:
{"data": {"failReason": "","potentialRecords": [{"fileContent": "局部用药","fileName": "eye.txt","id": "2","pointId": 1109871,"score": -3.58418345451355}],"resultList": [{"fileContent": "眼压高需要及时就医...","fileName": "eye.txt","id": "1","pointId": 3398761,"score": 4.278597354888916},{"fileContent": "常用眼药水","fileName": "补充眼药文档.txt","id": "4","pointId": 538871,"score": 0.8522164225578308},{"fileContent": "外用药","fileName": "eye.txt","id": "3","pointId": 4788771,"score": -1.5808547735214233}],"spentTime": 0.9499616622924805}
}
请大家观察那几个>0分的记录,可以看到结果相当的精准。
何时可能会取-2分及以上呢?我这给大家举一个例子:
提问:我住在正大大美白广场,停车费找谁?语料:停车位收费,正大大美白附近联系电话:xxxxxxxx。福建路书店附近:联系:xxxxxxxx。
在使用MX_BAI或者是其它一类的embedding模型时这样的匹配会产生负分,因为它有偏差。当然,情况各异,具体还是需要遵守:正确的语料+潜在的语料,而不能也不可能追究到达:100%精准的。
好了,结束今天的分享。
关键还是自己需要多动动手。