b站链接:合集·Triton 从入门到精通
文章目录
- 算法名词解释:
- scheduler 任务调度器
- model instance、inference和request
- batching
- 一、Triton Inference Server原理
- 1. Overview of Trition
- 2. Design Basics of Trition
- 3. Auxiliary Features of Trition
- 4. Additional resources
- 5. In-house Inference Server vs Trition
- 6. 编程实战
- 6.1 Prepare the Model Repository
- 6.2 Configure the Served Model
- 6.3 Launch Triton Server
- 6.4 Configure an Ensemble Model
- 6.5 Send Requests to Triton Server
- 二、Triton Backend详细教程
- 1. Overview
- 1.1 什么时候实现backend?
- 1.2 实现backend过程中需要实现哪些内容?
- 1.3 为什么triton是这么设计去实现一个backend的?
- 2. Code Exploration
- 2.1 注意事项
- 2.2 backend写好后是如何编译、build的?
- 3. Summary
- 三、Triton Python Backend & BLS Deep Dive
- 1. Python Backend(用python实现custom backends)
- 1.1 Triton工作原理
- 1.2 为什么我们需要Python backend?
- 1.3 工作原理
- 1.4 如何实现?
- 2. BLS:Business Logic Scripting
- 2.1 何时使用?
- 2.2 如何实现?
- 2.3 工作原理?
- 四、Triton Stateful Model
- 1. Application
- 2. Stateful Models
- 2.1 Sequence batcher
- 2.2 Control Inputs
- 2.3 Direct & Oldest
- 2.4 Streaming client
- 3. Practice : CTC Streaming
- 4. Practice : WeNet
- 5. Summary
- 五、Triton 优先级管理
- Triton Priority Queue
- 工作原理
- 如何使用?
- Rate Limiter
- 原理
- 如何使用?
- 总结
算法名词解释:
scheduler 任务调度器
- Triton中的scheduler通常指的是任务调度器(scheduler),它是一种用于管理和调度计算机集群中任务执行的软件组件。在Triton中,scheduler的作用是管理GPU资源的分配和任务的调度,以确保系统的高效利用和任务的顺利执行。
- 具体来说,Triton的scheduler负责以下几个方面的功能:
- 资源管理:scheduler跟踪和管理GPU资源的可用情况,确保任务能够得到适当的GPU资源分配。
- 任务调度:根据任务的优先级、资源需求和系统负载等因素,scheduler决定何时执行哪些任务以及在哪些GPU上执行。
- 负载均衡:scheduler通过在集群中动态分配任务和资源,实现负载均衡,以最大程度地提高系统的整体性能。
- 故障恢复:scheduler能够检测并处理集群中的故障情况,例如GPU节点的故障或任务执行失败,以确保任务能够在其他可用资源上重新启动或恢复。
model instance、inference和request
- Model Instance(模型实例):
- 模型实例指的是部署在计算机系统中的一个特定的模型的实例化对象,它是模型在运行时的一个具体实例。在深度学习模型部署中,通常会将训练好的模型加载到内存中,创建模型实例以供推理(inference)使用。
- 一个模型可以有多个实例运行在不同的计算节点上,这样可以提高系统的并发性和吞吐量。
- Inference(推理):
- 推理是指利用已经训练好的模型对输入数据进行预测或分类的过程。在深度学习中,推理阶段是模型部署的核心任务,通过输入数据经过模型实例进行前向传播计算,得到输出结果。
- 推理通常是模型部署中的主要活动,用于实际应用中对新数据进行预测或分析。
- Request(请求):
- 请求是指外部系统向模型部署服务发送的请求,请求通常包含需要进行推理的输入数据。请求可以是针对单个样本的推理请求,也可以是批量处理多个样本的请求。
- 请求通常包含了需要对数据进行推理的详细信息,比如输入数据、期望的输出格式等。
- 这些概念之间的联系可以总结如下:
- 外部系统通过发送请求(request)向部署的模型实例发送数据,请求可以包含一个或多个需要进行推理的输入数据。
- 模型实例接收到请求后,对输入数据进行推理(inference)操作,利用模型进行计算,得出预测结果。
- 推理的结果可以通过响应(response)返回给外部系统,用于后续的处理或展示。
- 这些概念在模型部署过程中起着重要的作用:
- 模型实例负责加载和运行模型,处理来自外部系统的推理请求。
- 推理是模型实例执行的主要任务,它对输入数据进行处理,生成输出结果。
- 请求是外部系统与模型实例之间进行通信和交互的方式,通过发送请求,外部系统可以向模型实例提交需要进行推理的数据。
batching
-
在大模型部署中,“batching” 是指将多个输入样本一起发送到模型进行推理的过程,而不是逐个样本进行推理。这个过程通常发生在推理阶段,也就是在模型实例接收到推理请求后,对输入数据进行处理的阶段。
-
作用:
- Batching 的主要作用在于提高推理过程的效率和性能,具体表现在以下几个方面:
- 减少推理延迟:通过批量处理多个样本,可以减少推理请求的数量,从而减少整体推理过程的延迟。相比逐个样本进行推理,批处理可以更有效地利用计算资源,提高推理的并发性和吞吐量。
- 优化计算资源利用:在深度学习中,通常会使用并行计算来加速推理过程。通过 batching,可以使得计算图中的操作能够更好地被并行执行,充分利用计算资源,提高系统的整体效率。
- 降低通信开销:在分布式环境下,将多个输入样本打包成一个批次发送到模型实例,可以减少网络通信的开销。这对于跨网络较大的推理请求尤其重要,可以减少数据传输的时间和带宽消耗。
-
例子:
- 假设有一个图像分类任务,需要对一批图像进行分类,每个图像的大小为 224x224 像素。如果单独对每个图像进行推理,那么对于每个图像,模型都需要进行一次前向传播计算,这样会造成大量的计算资源浪费和推理延迟。
- 而如果使用 batching,将多个图像打包成一个批次发送到模型进行推理,比如将 32 张图像作为一个批次。这样,模型可以一次性并行处理所有 32 张图像的推理请求,大大提高了计算资源的利用效率,并且减少了整体推理时间。
- 因此,通过 batching,可以在大模型部署中有效地提高推理过程的效率和性能,特别是在处理大规模数据时尤为重要。
一、Triton Inference Server原理
1. Overview of Trition
- K8S cluster:在一个容器化的服务编排工具集中,部署和管理一个推理服务(Inference Service),该服务能够利用多个节点和多个加速卡(如 GPU)进行高效的推理计算。
- Triton:一个使用单个机器学习模型(mode),运行在一个容器内的推理服务。这个服务可以利用一个或多个 GPU 进行加速计算。
- TensorRT:一种用于加速神经网络(Neural Network, NN)模型推理过程的库。这些库通常包含优化算法、硬件加速支持和其他技术,以提高神经网络模型在推理(即预测或分类)过程中的速度和效率。
- Triton基本功能:
- 支持多个框架(TensorFlow,PyTorch,TensorRT,ONNX RT,custom backends)
- 支持CPU,GPU和多GPU
- 可以支持多线程在同一时间内同时运行多个机器学习模型的过程。
- 接受服务:支持http/rest grpt apis
- 系统或服务能够与编排系统和自动扩展工具进行集成,并基于延迟和健康状况指标来自动调整资源和优化性能。
- 模型管理:加载/卸载,模型更新…
- 在github和NGC上开源
2. Design Basics of Trition
- 基于推理生命周期的
- 支持的多个模型框架->这取决于框架,应该是用backend的方式来实现对不同模型的支持
- 共同功能:
- 公共后端管理
- 后端负载等
- 模型管理-加载/卸载、模型更新、查询模型状态等。
- 并发模型执行-实例管理
- 请求队列调度器
- 推理生命周期管理
- 推理请求管理
- 推理响应管理
- 公共后端管理
- GRPC相关
GRPC服务器
- 基于模型,大致可分为三种类型
- 单一的没有依赖的模型
- 不同模型的组合,模型之间有顺序
- 有状态的模型:语言类模型(模型内部有顺序)
- 多线程思路:
1. 对单一模型启动多个线程进行推理
2. 对多个模型进行多线程推理
- 三种模型:
- 无状态模型
- 一般是CV模型
- 两种分配方式:均匀分配(默认调度程序),动态批处理(提高吞吐率)
- 有状态模型(预测结果取决于先前的序列)
- 一般是NLP模型,模型内部是有顺序的
- 顺序批处理:直接的,最古老的
- 组合模型:有依赖关系的
- 一条模型管道
- 每个模型内部调度顺序无所谓,但模型与模型之间有调度顺序。
- 无状态模型
3. Auxiliary Features of Trition
- model analyzer:
- 模型分析器是一套工具,它提供关于如何基于特定性能优化Triton中的单个或多个模型所需,帮助用户围绕吞吐量、延迟和GPU内存做出权衡决策。
- 选择正确的模型配置时的占用空间
- 现有的两个基准功能:
- 性能分析-度量变化下的吞吐量(inf/S)和延迟
- 内存分析–测量模型在不同内存占用情况下的GPU内存足迹
4. Additional resources
- 一些学习资料
5. In-house Inference Server vs Trition
- 有问题可以去社区讨论
6. 编程实战
6.1 Prepare the Model Repository
- 我们必须正确组织好model_reposity后,triton_server才能正确加载模型到服务端。
- 模型目录中三个基本的components
6.2 Configure the Served Model
- 基本组件:
max_batch_size:一般不会超过GPU的显存/内存。
- instance groups:在一个triton上对一个模型在gpu上并行运行多个instance,增加吞吐率
- scheduling和Batching决定了triton该使用哪种调度策略去调度请求。
- 加速器
- 热身
6.3 Launch Triton Server
- Simplest Way
- –gpus all :选择可以看到哪些GPU
- -it:interactive 表示开启容器后可以在容器内部进行一些交互
- –rm:当container任务完成后自动关掉container
- –shm-size:指定container可以访问的共享内存的大小
- -p:指定需要监听的端口,(host端口:映射到container的端口)【8000端口用于http的访问;8001用于 grpc的访问;8002用于?的访问】
- -v:目录的映射,将host中的目录映射到container中的目录
- 最后是triton docker的名称
- run:指定好model_repository的路径即可运行文件
- 检查模型状态:
6.4 Configure an Ensemble Model
- 有顺序模型的组合:value用来连接不同的step
- Notice:
- 如果其中一个模型是有状态模型,那么推理请求应该包含有状态模型中的信息。
- 组合模型有自己的调度程序。
- 如果集成中的模型是框架backend(TensorFlow这种),则它们之间的数据传输不必经过CPU存储器。
6.5 Send Requests to Triton Server
- 在triton server准备好时,如何向server发送请求
- client发送请求的方式:http,grpc,capi
- 通过shared memory传递数据
二、Triton Backend详细教程
1. Overview
1.1 什么时候实现backend?
- 公司想在triton上用自己的深度学习框架:
- 腾讯 - TNN
- 百度 - Paddle
- 英伟达 - HugeCTR
- 客户想用一些custom module和一些主流的深度学习结合在一起成为一个pipeline:
- 预处理
- 后处理
- 一些在主流框架中没有实现的模块
1.2 实现backend过程中需要实现哪些内容?
- 名词:
- backend:
- model:模型,像tensorflow这种
- Execute inference:instance,可以同时多个在GPU/CPU上跑
- 接口含义:
- TRITONBACKEND_Backend对象会把backend对象放到TRITONBACKEND_initialize里面,然后在该函数中对传进来的函数做一些初始化的操作。
- TRITONBACKEND_Modelinitialize:初始化model对象(初始化一些模型共有的属性:模型名称、输入输出,在model configure中定义的那些)。
- TRITONBACKEND_ModelInstanceInitialize:对instance做一些初始化(在哪个设备上运行、包含一切和instance相关的内容)
- TRITONBACKEND_ModelInstanceExecute:做某一个模型推理时执行的函数。
- TRITONBACKEND_Finalize、TRITONBACKEND_ModeFinalize、TRITONBACKEND_ModelInstanceFinalize:做一些收尾的工作。
- ModelExecution的代码就是写在ModelState和ModelInstanceState中的。
- ModelState:
- 依附于Model对象。
- 维护与Model和ModelInstance相关的所有的属性,包括模型名称、输入输出、max_batch_size等。
- 实行推理:LoadModel——如何把模型从文件读入到TRITONBACKEND文件来的函数。
- ModelInstanceState:
- 依附于ModelInstance对象。
- 维护ModelInstance的相关信息,包括instance在哪个设备上运行等。
- SetInput Tensors:用来准备模型推理的输入数据的。
- Execute:进行模型推理。
- readOutput Tensors:把模型推理完的结果的内容拿出来,并且返回给TRITON。
1.3 为什么triton是这么设计去实现一个backend的?
- backend的作用:TRITON对于不同的backend都要有一套统一的机制去运行和管理,至于模型的效果是backend内部的内容,TRITON只负责调用。
- ModelState和ModelInstanceState两个状态类的作用:接口是纯C的函数,不是面向对象的,但同一时间可能有很多instance都要调用这些接口。为了使不同instance的推理可以安全和独立,所以这些接口函数应该在不同instance内部去执行。
- 为什么不将接口函数设计为虚基类:为了解耦backend和triton的主流程。这些接口都写在一个.cc文件中,如果想改变backend或对该backend添加一些新功能时,只需要重新编译该.cc文件即可,不需要重新编译整个TRITON。
2. Code Exploration
2.1 注意事项
- 不同的model instance可能是run在不同的device上的。不同的device要分配自己的内存和推理,所以要注意device_id。
- 批处理(batching):要backend自己实现。TRITON的pipeline只是把要打batch的requests集合在一个列表中,它本身没有做batching的工作。在执行完推理后,要把输出的数据拆散,还给不同的requests。
- 注意requests对象的管理:
- requests对象是在backend外面创建的,是TRITON创建好了后送进来的。但是执行完成后需要由backend自己来销毁。
- 如果requests发生了问题(空,超过max_batch_size,或者里面获取不到input tensor),则要及时终止整个推理流程并且release掉这些requests对象。
- responses对象的管理:
- 是在backend内部创建的,且不需要在由我们来释放,它是在TRITON pipeline中释放和销毁的。如果我们不慎销毁了response,会导致TRITON pipeline无法把response中的结果返回给用户。
- 当推理发生错误时,要立即返回ERROR response。
2.2 backend写好后是如何编译、build的?
- 要仿照一个线程的CMakeLists写一个自己的CMakeLists。
- 仿照TritonPyTorchBackendConfig.cmake.in写一个TritonPyTorchBackendConfig.cmake.in文件。
- 将libtriton/pytorch.ldscript 复制到你的backend文件夹中。(内容都是一样的)
- build:
- 只buildbackend本身:
- 用cmake去build,用make去编译得到backend.so。
- 将backend.so以特定文件格式copy到tritonserver/backends/中。
- 和整个triton一起build:
- 运行build.py文件,build.py文件是用来编译整个TRITON的,它在编译triton的过程中会编译每一个backend。
- 用 "–backend 你的backend名称:<container_tag> "选项来编译你自己的backend。
- 只buildbackend本身:
3. Summary
- backend有三个层级:backend,model和modelInstance。这三个类都是由Triton pipeline预实现和管理的,不需要由我们实现。
- 需要我们完成的工作:
- 需要为backend实现初始化、完成、执行(Initialize,Finalize,Execute)接口。
- 重点实现ModelState。它负责模型读取和维护模型相关的属性,并附加(attach)到BackendModel中去。
- 重点实现ModelnstanceState。它是真正负责推理的类,它负责发送推理完的responses,准备输入数据,负责维护和modelInstance相关的属性和数据,并将其附加(attach)到BackendModelInstance对象上。
- SetinpuTensors():进行批处理,准备输入张量
- EXECUTE():模型推断
- ReadOutputTensors():读取输出和发送响应
- 注意事项:
- 注意设备(device_id)。
- 批处理:从所有的requests中将input date汇聚成大batch,并将输出拆开,还给所有的requests对应的responses。
- 谨慎管理requests和responses对象的创建和销毁。
- 编译backend:
- 编写CMakeLists.txt。
- 编译backend,最后将共享库复制到Triton backend目录中。
三、Triton Python Backend & BLS Deep Dive
1. Python Backend(用python实现custom backends)
1.1 Triton工作原理
- 模型推理部署流程:triton上部署的所有模型都是通过backend部署在服务器上的。例如我们训练好一个tensorflow的模型,我们就用tensorflow的backend去起一个/多个的model instance,将这些model instance放在CPU/GPU上去执行推理。
- 并发执行原理:对于每一个backend来说,它里面的每一个model instance是由一个线程来管理的。因此,我们可以在一个triton sever上对一个model起多个model instance,让这些model instance并行地去做推理的执行。
- 客户端:可以用triton提供的custom的api去实现自己想要的任意的操作逻辑,包括前后处理。
1.2 为什么我们需要Python backend?
- 许多预处理/后处理模块都是用Python实现的(例如:在pipeline中加入前后处理的模块)。
- 我们现在已经有一些用python编写的处理单元,我们想把这些处理单元放到triton中去部署。我们可以用Python代码将其打包到Triton模型中。
- 比C++自定义backend更容易编写,不需要编译。
1.3 工作原理
- python backend的作用:
- 所有backend都要遵循7大关键接口,且每一个backend实例要在一个triton线程中管理。虽然Triton同样要根据7大接口来实现通用backend框架,但是用C++写的backend仅仅是ttriton关于python backend的一个agent(代理),它里面并不包含真正的python backend的执行逻辑。由C++实现的python backend也是由线程管理的,和Triton server主线程都存在于一个主进程内。
- 最后一列的python model才是python backend真正要实现的内容,python model是由进程管理的。所以triton是通过共享内存shard mem让triton server和python model进行一个交互。对每一个python model都建立一个shm block来负责python model与triton server之间的通信。
- shared mem需要负责哪些内容:
- health flag:健康标志位,标志python backend的模型实例是不是正常。我们在每次做triton backend和python backend通信时,不管是input通信还是output通信,都会先把health tag置为false。而python backend会每隔300ms将health tag置为true。
- Request Message(信息队列):将triton的数据传给python model instance。每当一个request送到triton server之后,C++ python backend agent会将该request以message的形式入队到messageQ中,MessageQ中的数据会被取出到实际的python backend中。
- Response MessageQ(信息队列):当python backend运算完成后,它的输入同样会以message的形式存放到response MessageQ中。MessageQ中的output会被C++ python backend agent取出,并打包成response发回给客户端。
- Request MessageQ和Response MessageQ中的inputQ和outputQ都是生产者消费者模型,我们需要维护两个信号量来管理生产者消费者问题。
1.4 如何实现?
- 在python backend中只需要实现3个关键接口。
- 例子1:
- “rn50_onnx_pb.py”
import triton_python_backend_utils as pb_utils
import onnxruntime
import json
import os# 类的名字不可改
class TritonPythonModel:"""Your Python model must use the same class name. Every Python modelthat is created must have "TritonPythonModel" as the class name."""def initialize(self, args):"""`initialize is called only once when the model is being loaded.Implementing initialize function is optional. This function allowsthe model to initialize any state associated with this model.Parameters----------args : dictBoth keys and values are strings. The dictionary keys and values are:*model config:A JSON string containing the model configurationmodel instance kind: A string containing model instance kindmodel instance device id: A string containing model instance device IDmodel repository: Model repository pathmodel version: Model versionmodel name: Model name"""# 获取模型的config:You must parse model config. JSON string is not parsed here self.model_config = model_config = json.loads(args['model_config'])# 取出输出的config:Get OUTPUT configurationoutput_config = pb_utils.get_output_config_by_name(model_config, "output")# 获取输出的data_type:Convert Triton types to numpy typesself.output_dtype = pb_utils.triton_string_to_numpy(output_config['data_type'])# 获取python脚本的路径:Get path of model repositoryself.model_directory = os.path.dirname(os.path.realpath( file ))# 通过上述路径生成onnx model的路径,创建onnx模型处理的session:Create nnx runtime session for inferenceself.session = onnxruntime.InferenceSession(os.path.join(self.model_directory, 'model.onnx'))print('Initialized...')def execute(self,requests):"""`execute` must be implemented in every Python model.`execute` function receives a list of pb utils.InferenceRequest as the onlyargument. This function is called when an inference is requested for this model.Parameters------------------requests :list 可能包含一个/多个requestA list of pb utils.InferenceRequestReturns------listA list of pb utils.InferenceResponse. The length of this list mustbe the same as requests."""output_dtype = self.output_dtyperesponses = [] #返回值# Every Python backend must iterate through list of requests and create# an instance of pb utils.InferenceResponse class for each of them. You# should avoid storing any of the input Tensors in the class attributes# as they will be overridden in subsequent inference requests. You can# make a copy of the underlying NumPy array and store it if it is required.for request in requests:# Get INPUT# 这个输入的tensor是python backend的tensorinput_tensor = pb_utils.get_input_tensor_by_name(request, "input")input_array = input_tensor.as numpy() #转换为numpy array,为了让input tensor可以去onnx里做input# 跑onnx的session(会话)得到输出的tensor(prediction):Run inference with onnxruntime sessionprediction = self.session.run(None,{"input": input_array})# 将prediction转换为config文件中定义好的output datatype,再打包为python backend的output tensor:Pack output as python backend tensorout_tensor = pb_utils.Tensor("output", prediction[0].astype(output_dtype))# 将out_tensor转换为request对应的输出tensorinference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])responses.append(inference_response)# You must return a list of pb utils.InferenceResponse. Length# of this list must match the length of `requests` list.return responsesreturn responsesdef finalize(self):""" finalize is called only once when the model is being unloadedImplementing finalize`function is optional. This function allowsthe model to perform any necessary clean ups before exit."""print('cleaning up...')
- 例子2:
- 默认情况下,发送到Python backend的tensor将自动复制到cpu上。
- 要在GPU上保持tensor,需要在配置文件中设置以下内容:
- parameters:{ key:“FORCE CPU ONLY INPUT TENSORS” value: (string value:“no”} }
- 这样,传入python backend的input tensor将不会被强制转换为CPU上。
- 你可以通过以下方法来检查tensor的位置:
- Pb_utils.Tensor.is_CPU()
- 可以检查tensor到底在CPU还是GPU上
- 例子3:
- 将pytorch推理的过程放到GPU上去运行。
- “rn50_torch_pb.py”
import triton_python_backend_utils as pb_utils
from torch.utils.dlpack import from dlpack
from torch.utils.dlpack import to_dlpagk
import torch
import json
import os# 类的名字不可改
class TritonPythonModel:"""Your Python model must use the same class name. Every Python modelthat is created must have "TritonPythonModel" as the class name."""def initialize(self, args):"""`initialize is called only once when the model is being loaded.Implementing initialize function is optional. This function allowsthe model to initialize any state associated with this model.Parameters----------args : dictBoth keys and values are strings. The dictionary keys and values are:*model config:A JSON string containing the model configurationmodel instance kind: A string containing model instance kindmodel instance device id: A string containing model instance device IDmodel repository: Model repository pathmodel version: Model versionmodel name: Model name"""# 获取模型的config:You must parse model config. JSON string is not parsed here self.model_config = model_config = json.loads(args['model_config'])# 取出输出的config:Get OUTPUT configurationoutput_config = pb_utils.get_output_config_by_name(model_config, "output")# 获取输出的data_type:Convert Triton types to numpy typesself.output_dtype = pb_utils.triton_string_to_numpy(output_config['data_type'])# 获取python脚本的路径:Get path of model repositoryself.model_directory = os.path.dirname(os.path.realpath( file ))# 检查系统是否有GPU,如有则设置device为GPUself.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")print(self.device)# 加载pytorch模型:Load Torchscript and put on GPU# 组织好pytorch模型所在的路径model_path = os.path.join(self.model_directory, 'model.pt')if not os.path.exists(model_path):raise pb_utils.TritonModelException("Cannot find the pytorch model")# 将pyrotch模型加载到device上self.model = torch.jit.load(model path).to(self.device)print('Initialized...')def execute(self,requests):"""`execute` must be implemented in every Python model.`execute` function receives a list of pb utils.InferenceRequest as the onlyargument. This function is called when an inference is requested for this model.Parameters------------------requests :list 可能包含一个/多个requestA list of pb utils.InferenceRequestReturns------listA list of pb utils.InferenceResponse. The length of this list mustbe the same as requests."""output_dtype =self.output_dtyperesponses = [] #返回值# Every Python backend must iterate through list of requests and create# an instance of pb utils.InferenceResponse class for each of them. You# should avoid storing any of the input Tensors in the class attributes# as they will be overridden in subsequent inference requests. You can# make a copy of the underlying NumPy array and store it if it is required.for request in requests:# Get INPUTinput_tensor = pb_utils.get_input_tensor_by_name(request, "input")# 将pb tensor转为pytorch tensor:Convert Triton tensor to Torch tensorpytorch_tensor =from_dlpack(input_tensor.to_dlpack())if pytorch_tensor.shape[2] > 1000 or pytorch_tensor.shape[3] > 1000:responses.append(pb_utils.InferenceResponse(output_tensors=[], error = pb_utils.TritonError("Image shape should not be larger than 1800"))) #error responsecontinue# 将pytorch tensor放到GPU上,运行模型得到prediction:Run inference with onnxruntime session on GPUprediction = self.model(pytorch_tensor.to(self.device))#Transfer the GPu tensor to CPU# prediction =prediction.to('cpu')#将输出的pytorch tensor转换回pb tensor:Convert Torch output tensor to Triton tensorout_tensor = pb_utils.Tensor.from_dlpack("output", to_dlpack(prediction))# 将out_tensor转换为request对应的输出tensorinference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])responses.append(inference_response)# You must return a list of pb utils.InferenceResponse. Length# of this list must match the length of `requests` list.return responsesreturn responsesdef finalize(self):""" finalize is called only once when the model is being unloadedImplementing finalize`function is optional. This function allowsthe model to perform any necessary clean ups before exit."""print('cleaning up...')
- 配置custom python execution environment
- 注意事项:
- 手动指定模型跑在什么设备上。
- requests是没有被批处理的,它是一个requests列表,需要手动批处理。
- 对于每个request,必须发送一个response。
- 将支持未来的解耦response:在C++的custom backend支持一收多发、一收不发、一收一发或者多收一发、多收不发、多收多发。
- 在python backend中必须一收一发。
- FORCE_CPU_ONLY_INPUT_TENSORS 参数是必要的,以避免cpu-gpu副本。
- 数据传输需要大的共享内存:每个实例至少需要64 MB的SHM。
- python backend绝对不如C++ backend有效,特别是对于循环。
2. BLS:Business Logic Scripting
2.1 何时使用?
- 动态pipeline:运行时根据output1的值动态地决定要去哪个分支。
- 动态pipeline用triton实现不了,可以用BLS实现。
2.2 如何实现?
- 同步执行:只有当获得了inference_response之后,才能去执行后续的代码(if else)。
- 例子:
- 在exec代码中的分类操作:
- 例子:
def execute(self,requests):responses = []input_tensors = []batch_sizes = []# Every Python backend must iterate over everyone of the requests# and create a pb utils.InferenceResponse for each of them.for request in requests:# Get INPUT Triton]tensorin_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT" )# Convert to torchtensorpytorch_tensor = from_dlpack(in_tensor.to_dlpack())input_tensors.append(pytorch_tensor)batch_sizes.append(pytorch_tensor.shape[0])# Concat input tensor in all requests into batchbatch_input_tensor = torch.cat(input_tensors, axis=0).to('cuda')# 函数的第一个参数要填写你即将请求的模块的参数(config中)(INPUT_0)batch_input = pb_utils.Tensor.from_dlpack("INPUT_0",to_dlpack(batch_input_tensor))# Make inference request for the first BLs call to preprocessinfer_request = pb_utils.InferenceRequest(model_name = 'preprocess',requested_output_names = ["OUTPUT_0"],inputs = [batch_input])# 依次调用BLS的模块# 调用预处理,proprocess模块:First BLS call to preprocessbatch_preprocess_response = infer_request.exec()# Extract output tensor from the resposne of first callbatch_preprocess_output = pb_utils.get_output_tensor_by_name(batch_preprocess_response, 'OUTPUT_0')# Make inference request for the second BlS call to classifier# 对inference_request来说,它的pb tensor的名字必须和请求的模块(model_name)的输入tensor一致。infer_request = pb_utils.InferenceRequest(model_name = 'classifier'requested_output_names =「"OUTPUT_0"],inputs = [pb_utils.Tensor.from_dlpack('INPUT_0',batch_preprocess_output.to_dlpack())])# 运行第二个模块:Second BLS call to classifierbatch_classifier_response = infer_request.exec()# 判断分类器的分类结果# 把分类器的分类结果拿出来:Extract output tensor from the resposne of second callbatch_classifier_output = pb_utils.get_output_tensor_by_name(batch_classifier_response, 'OUTPUT_0')# 将结果转为torch tensor:Convert classifier output to torch tensor, shape 「batch size, 1000]batch_classifier_tensor = from_dlpack(batch_classifier_output.to_dlpack())# 找出概率值最大的那个:Get the category indices from classifier output, shape [ batch size ]batch_class_ids = torch.argmax(batch_classifier_tensor, dim=1)batch_seg_input = pb_utils.Tensor.from_dlpack("input", batch_preprocess_output.to_dlpack())#判断图片是不是猫/狗:Check if the input images contain cat or dogif 283 in batch_class_ids or 263 in batch_class_ids:# if the input images contain cat or dog, segment with deeplabv3_rn50# Make inference request for the third BLs call to deeplabv3 rn50infer_request = pb_utils.InferenceRequest(model_name = 'deeplabv3 rn50',requested_output_names = ["out",'aux'], #config里定义好的inputs = [batch_seg_input])# 运行第一个分割模块:Third BLs call to deeplabv3 rn50batch_seg_response = infer_request.exec()else:# if the input images do not contain cat or dog, segment with fcn_resnet50# Make inference request for the third BLS call to fcn resnet50infer_request = pb_utils.InferenceRequest(model_name = 'fcn resnet50',requested_output_names=["out",'aux']inputs = [batch_seg_input])# 运行第二个分割模块:Third BLs call to deeplabv3 rn50batch_seg_response = infer_request.exec()# 后处理操作:Get segmentation output tensor from responsebatch_seg_output = pb_utils.get_output_tensor_by_name(batch_seg_response, 'out')batch_seg_tensor = from dlpack(batch_seg_output.to_dlpack())batch_seg_tensor = torch.softmax(batch_seg_tensor, dim=1)batch_seg_tensor = batch_seg_tensor*255.0batch_seg_tensor = batch_seg_tensor.type(torch.uint8)# Define the mapping between classifier id and segmentation idclass_seg_id_map = {817:7,283:8,263:12,339:13,681:0,665:14, 176:13}batch_classes = batch_class_ids.cpu().detach().numpy()# Extract output data from batched tensor to individual responsecursor = 0for i in range(len(requests))batch_size = batch_sizes[i]batch_mask = []for j in range(cursor, cursor + batch_size):cls = class_seg_id_map[batch_classes[j]]print(cls)mask = batch_seg_tensor[j,cls,:,:]batch_mask.append(mask)batch_mask_tensor =torch.stack(batch_mask, dim=0)print(batch_mask_tensor.shape)batch_output = pb_utils.Tensor.from_dlpack('OUTPUT', to_dlpack(batch_mask_tensor))responses.append(pb_utils.InferenceResponse(output_tensors=[batch_output]))cursor += batch_sizereturn responses
- 异步执行
- 原理: 发出之后立刻返回,可以继续做别的事情。
2.3 工作原理?
- BLS的本质是python backend。
- 在python backend中,首先准备一个BLS Request去调用step的模型。调用过程会把request传递给python stub 进程,进程会把该request存入shared mem中。
- triton的python backend agent会将shared mem中的request input取出,调用triton的api将input送入你指定的模型上去运行。而后将输出结果存入shared mem里面,python stub进程从中将输出结果取出,传给python的model。
- 在python backend agent中会运行一个while(InferExecRequest) 循环,这个标志位也是存在shared mem中,由python stub进程设置的。如果python model一直没执行完,那么该标志位就会一直是true,直到执行完最后一个模块,该标志位才会被置为false。
- Notice:
- 内存复制开销:
- 对于cpu管道,输入1份(将输入从shared mem拷贝到两个模块上),输出2份(第一次是将模块的输出tensor拷贝到shared mem中,第二次是将shared mem中的输出tensor拷贝出来,让triton返回给客户端)。
- 对于GPU管道,使用cudalPC,非常小的开销。(不需要做shared mem到模型空间的拷贝,直接走cudaIPC)
- BLS不支持pipeline并行。
- BLS中的步骤只能按顺序执行。
- FORCE_CPU_ONLY_INPUT_TENSORS 参数是必要的,以避免cpu-gpu副本。
- 内存复制开销:
- 总结:
四、Triton Stateful Model
1. Application
- 应用:
- 语言识别:智能音箱
- 长文本识别:
2. Stateful Models
- 定义:模型中需要把同一个sequence中的不同的请求之间的状态维护住,且模型每次拿到的请求一定是把所有inferences定向到一个instance里面,因为模型的state是与instance绑定的。
- 用什么方法能把所有的请求定向到一个instance里面?用sequence batcher。
- 怎么设置control signals控制信号?要在客户端中设置信号且模型是能够识别这些信号的。
2.1 Sequence batcher
2.2 Control Inputs
2.3 Direct & Oldest
- 打batch的两种方法:direct和oldest
- direct:
- 把每个sequence和每个instance上的每一个slot做一个绑定。
- slot就是batch中第几个的位置。
- sequence instance只负责定向到每个batch上,而direct还负责定向到每个slot上。
- 参数:
- max_queue_delayr_microseconds
- float_minimum_slot_utilization
- oldest:
- slot不需要和某个sequence绑定。
- 保证一个batch里面是来自不同的sequence
2.4 Streaming client
3. Practice : CTC Streaming
- “model.py”
import numpy as np
import json
# triton python backend utils is available in every Triton Python model. You
# need to use this module to create inference requests and responses, It also
#contains some utility functions for extracting information from modol confia
#and converting Triton input/output types to numpy types.
import triton_python_backend_utils as pb_utils
from multiprocessing.pool import ThreadPoolclass Decoder(obfect):def init (self,blank):self.prev = ''self.result = ''self.blank_symbol = blankdef decode(self,input,start, ready): # 进行解码"""input: a list of characters/a string"""if start:self.prev = ''self.result = ''if ready:for li in input.decode("utf.8”):if li != self.prev:if li != self.blank_symbol:self.result += l1self.prev = lir = np.array([[self.result]])return rclass TritonPythonModel:"""Your Python model must use the same class name. Every Python modelthat is created must have "TritonPythonodel" as the class name."""def initialize(self, args):"""initialize`is called only once when the model is being loaded.Implementing `initialize`function is optional. This function allowsthe model to intialize any state associated with this model.Parameters:----------------args :dictBoth keys and values are strings. The dictionary keys and values are:*model config: A JSON string containing the model configuration*model instance kind: A string containing model instance kind*model instance device id: A string containing model instance device ID*model repository:Model repository path*model version: Model version*model name: Model name"""# 加载模型配置文件:You must parse model config. JSON string is not parsed hereself.model_config = model_config = json.loads(args['model_config'])# get max batch sizemax_batch_size = max(model config["max_batch_size"],1)# 读取blank符号:get blank symbol from configblank = self.model_config.get("blank_id",'.')# 将解码器的数量初始化为max_batch_size:initialize decodersself.decoders = [Decoder(blank)for i in range(max_batch_size)]# Get 0UTPUT configurationoutput0_config = pb_utils.get_output_config_by_name(model_config,"OUTPUTO")# Convert Triton types to nupy typesself.output0_dtype = pbvuils.triton_string_to_numpy(outpute_config['data_type'])def batch_decode(self, batch_input, batch_start, batch_ready):responses = []args = []ldx = 0for i,r,s in zip(batch_input, batch_ready, batch_start):args.append([lidx,1,r,s])idx += lwith ThreadPool()as p:responses = p.map(self.process_single_request, args)return responsesdef process_single_request(self,inp):decoder_idx,input,ready,start = inp# 对每一个request都调用对应的decoder对它进行一个操作response = self.decoders[decoder idx].decode(input[e], start[0], ready[0])out_tensor_0 = pb_utils.Tensor("0UTPUTo", response.astype(self.output0 dtype))inference_response = pb_utils.InferenceResponse(output_tensors = loqt_tensor_0])return inference_responsedef execute(self,requests):"""execute’ MuST be implemented in every python model. 'executefunction receives a list of pb utils.InferenceRequest as the onlyargument. This function is called when an inference reguest is madefor this model, Depending on the batching configuration (e.g. DynamicBatching)used,"reguests may contain multiple requests. EveryPython model, must create one pb utiis.inferenceResponse for everypb utils.InferenceRequest in "requests . if there is an error, you canset the error arqument when creating a pb utils.InferenceResponseParameters----------------------------requests :listA list of pb utils.InferenceReguestReturns----------------------------listA list of pb utils.InferenceResponse, The length of this list mustbe the same as "requests"""# print("START NEW")responses = []batch_input = []batch_ready = []batch_start =[]batch_corrid =[]# Every Python backend ausr iterate over everyone of the reguests# and create a pb utils.InferenceResponse for each of them.for request in requests:#Get INPUTein_0 = pb_utils.get_input_tensor_by_name(request, "INPUT")#in 8> <triton python backend utils.Tensor object#in 0> ndarray!'xcx }batch_input += in_0.as_numpy().tolist()in_start = pb utils.get_input_tensor_by_name(request, "START")batch_start += in_start.as_numpy().tolist()in_ready = pb_utils.get_input_tensor_by_name(request, "READY")batch_ready += in_ready.as_numpy().tolist()in_corrid = pb_utils.get_input_tensor_by_name(request, "CORRID")batch_corrid += in_corrid.as_numpy().tolist()#print("corrid",batch corrid)#print("batch input".batch input)responses = self.batch_decode(batch_input, batch_start, batch_ready)#print("batch response",responses)# You should rerurn a list of pb utils.InferenceResponse. Length#of this list must match the lenoth ofreguests listassert len(requests) == len(responses)#printf"send responses":responses)return responsesdef finalize(self):"""finalize’is called only once when the model is being unloadedImplmenting “finalize" function is OPTIONAL. This function allowsthe model to perform any necessary clean ups before exit."""print('cleaning up...')
4. Practice : WeNet
5. Summary
- 流程:
- 模型初始化:从config文件中将参数读取进来初始化我们的模型,将我们的模型放到GPU上去。
- 模型运行:对于每一个请求,先检查下其start状态,将其初始化。将请求相关的信息都batching,然后用模型去做推理。做完推理后更新所有的state,将response返回给客户端(用户)。[可以在CPU上写多线程代码,用swig包一下,然后在python backend中用python代码调用C++代码,就可以绕过全局解释的问题了。]
- 模型卸载:在model finalize中把所有模型都清掉,将state也清除掉。
五、Triton 优先级管理
- 如何在triton中进行优先级的管理?给不同的request和model设置优先级信息?
- request queue:可以对request进行优先级划分及管理。
- rate limiter:可以对model instance进行优先级管理。
Triton Priority Queue
工作原理
- 在triton中有很多不同的scheduler(任务调度器)环境进行request以及model的调度。不同的scheduler适用于不同model的类型。
- 一般一个模型只有一个调度队列(scheduler queue),接收到的所有的request都会按照顺序放入scheduler queue当中。无论模型有几个model instance,默认都只有一个scheduler queue。如果有空闲的model instance,那么request队列中的request就会按照先后顺序被调度到空闲的model instance上去一些inference推理。
- dynamic batching中priority queue的运行模式:
- 在dynamic batching中,我们可以设置priority queue来进行request优先级的管理和调度。在dynamic batching中,我们可以设置多个优先级,拥有多个不同的优先级的队列,priority_n,n越大,优先级越低。
- 当我们发送请求时,需要在客户端给请求设置一个优先级。当server接收到请求后,就会把请求加入到相应的优先级队列中,去等待调度和执行。假如有多个空闲的model instance,那么它会去首先执行高优先级队列中的request。
- 缺点:
- 如果客户端没有指定request的priority咋办?
- 低优先级的request可能会饿死。
- 怎样控制queue的大小?
- ensemble model会组合多个模型,如果其中一个模型使用到了dynamic batching且设置了priority queue,那么在dynamic batching中是否适用?(依然有效)
如何使用?
- 使用的步骤
- 配置config文件,在config文件中在dynamic batching的配置项中配置priority的字段。
- load模型。
- 通过client api向server发起请求。
- config文件的参数:
- “client发送请求”
# 发送n个请求
for i in range(inference_count):if i % 2 == 0:priority = 1else:priority = 2client.async infer(model_name,inputs,partial(completion_callback, user_data)request_id = str(i),model_version = str(1),# timeout=1000,outputs = outputs,priority =priority)
processedcount =0
while processed_count < inference_count:(results,error) = user data. completed_requests.get()processed_count += 1if error is not None:print("inference failed:"+ str(error))# sys.exit(1)continuethis_id = results.get_response().idif int(this_id)%2 == 0:this_priority = 1else:this_priority = 2print("Request id {} with priority {} has been executed".format(this id, this priority))
Rate Limiter
原理
- 当客户端发送了很多request,server接收了request之后,只要我们的model instance有空闲的,scheduler就会把request调度到空闲的model instance上,然后到硬件上执行model inference。
- 当request足够多,triton server会同时运行多个模型,多个model instance的计算的。可以提高GPU的利用率和网络的吞吐量。
如何使用?
- 使用rate limiter需要人为设置一个资源池,在资源池中可以设置很多种不同的资源以及它的数量。这些资源并不映射到任何硬件资源上,它只是我们人为设置的一个逻辑上的概念。
- 资源的分类:
- local resource:每个GPU都有的资源。
- global resource:不分GPU,整个triton server共用的资源。
- 当我们把request调度到空闲model instance上时,它首先要去资源池申请资源。只有当它把资源都申请到后,才能去执行inference。
- rate limiter起到一个限速的作用,它会使没有资源的model instance不能执行,只能等待。
- rate limiter可以选择把资源分配给哪个进程,从而给model instance制定优先级。
- 如何使用rate limiter?
-
在model的配置文件中进行rate limiter的设置。
-
如何设置资源池中的资源
-
rate limiter的表现
-