python 协程
python 生成器的作用
协程在多个模型流式输出中的使用实例
继续学习:
https://www.cnblogs.com/traditional/p/17398542.html
https://blog.csdn.net/m0_51180924/article/details/124612738
fastapi 并发
使用 fastapi api 的形式处理任务,每个任务使用协程的方式,处理多个流式输出。
该方式支持多个 fastapi 并发请求。
服务端
import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponseimport asyncio
from asyncio import Queueapp = FastAPI()@app.get("/v1/models")
async def get_models():data = {"data": [{"id": "Qwen1.5-7B", # openai 支持模型id"object": "model", # openai 支持模型类别"owned_by": "organization-owner", # openai 支持模型所有者"permission": [] # openai 支持模型权限,暂时不支持},{"id": "chatglm3-6b","object": "model","owned_by": "organization-owner","permission": []}],"object": "list" # data 类型}return dataasync def output_data(text: str, model: str):output = ""for idx, word in enumerate(text):output += wordchunk = {"id": None,"choices": [{"delta": {"content": f"{model} {idx} {output}","function_call": None, # OpenAI返回,未知"role": "assistant", # OpenAI系统消息角色"tool_calls": None # OpenAI返回,未知},"finish_reason": "length", # OpenAI停止码"index": 0, # OpenAI返回,未知"logprobs": None # OpenAI返回,未知}],"created": 1715238637, # 时间戳"model": model, # OpenAI模型id"object": "chat.completion.chunk", # OpenAI消息类型"system_fingerprint": None # OpenAI返回,未知}data = json.dumps(chunk, ensure_ascii=False)yield dataawait asyncio.sleep(1)@app.post("/v1/chat/completions")
async def flush_stream(request: Request):models = ["chatglm3", "qwen"]async def async_generate(index:int, model: str, queue: Queue):text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"if model == "chatglm3":text = "我是chatglm3的流式输出嘿嘿!!!"items_data = output_data(text=text, model=model)async for item_data in items_data:# print(f"generate### {model} {item_data}")# yield item_dataqueue.put_nowait((index, item_data))queue.put_nowait((index, None))async def async_consumer(queue: Queue, indices: list, timeout: float):indices = set(indices)finished = set()while indices != finished:try:index, response = await asyncio.wait_for(queue.get(), timeout)if response is None:finished.add(index)print("consumer queue indices finished", indices, finished)yield (index, response)except TimeoutError:breakasync def async_process(models: list):### 1. 支持多个模型的流式输出queue = Queue()tasks = [asyncio.create_task(async_generate(index=index, model=model, queue=queue))for index, model in enumerate(models)]print("###################", tasks)all_text = [dict() for _ in range(len(models))]async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):if response is not None:all_text[index] = responseprint(f"thread00000_master###{all_text}")res_text = json.dumps(all_text)+"\n"yield res_textprint(f"thread00000_master ENDENDENDEND")# async def async_process(models: list):# ### 2.支持单个模型的流式输出# text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"# items_data = output_data(text=text, model=models[0])# async for item_data in items_data:# yield item_datareturn EventSourceResponse(async_process(models), media_type="text/event-stream")# return EventSourceResponse(async_process(models), media_type="text/plain")if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=8080)
运行:
> python.exe .\main.py
INFO: Started server process [12872]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
INFO: 127.0.0.1:64346 - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
......
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant",
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END
客户端
from openai import OpenAIclient = OpenAI(api_key="EMPTY",base_url="http://127.0.0.1:8080/v1/")
response = client.chat.completions.create(model="EMPTY",messages=[# {"role": "system", "content": "You are a helpful assistant."},{"role": "user", "content": "你好"}# {"role": "user", "content": "谁是特朗普"},# {"role": "assistant", "content": "特朗普是美国前总统"},# {"role": "user", "content": "特朗普多大年纪了"},],functions=None,temperature=1,top_p=0,max_tokens=20,stream=True,)print("#####", response)
ret_text = ""
for part in response:# print("\n33333", type(part), part)print("\n33333", type(part), part[0])print("33333", type(part), part[1])
运行:
> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None) 33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
......33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
多线程执行任务
开启多个线程执行任务,每个任务使用协程的方式,处理多个流式输出。
服务端
import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponseimport asyncio
from asyncio import Queueimport time
from threading import Threadapp = FastAPI()@app.get("/v1/models")
async def get_models():......async def output_data(text: str, model: str):output = ""for idx, word in enumerate(text):output += wordchunk = {"id": None,"choices": [{"delta": {"content": f"{model} {idx} {output}","function_call": None, # OpenAI返回,未知"role": "assistant", # OpenAI系统消息角色"tool_calls": None # OpenAI返回,未知},"finish_reason": "length", # OpenAI停止码"index": 0, # OpenAI返回,未知"logprobs": None # OpenAI返回,未知}],"created": 1715238637, # 时间戳"model": model, # OpenAI模型id"object": "chat.completion.chunk", # OpenAI消息类型"system_fingerprint": None # OpenAI返回,未知}data = json.dumps(chunk, ensure_ascii=False)yield dataawait asyncio.sleep(1)@app.post("/v1/chat/completions")
async def flush_stream(request: Request):......async def async_task(task_name: str, models: list):async def task_generate(task_name: str, model: str):text = f"这是{model}模型的流式输出他会将每个字挨个的输出哈哈!!!"# if model == "chatglm3":# text = "我是chatglm3的流式输出嘿嘿!!!"items_data = output_data(text=text, model=model)output = ""async for item_data in items_data:print(f"{task_name}### {model} {item_data}")# yield item_data# queue.put_nowait((index, item_data))output = item_data# queue.put_nowait((index, None))return output# results = await asyncio.gather(task_generate("chatglm3"), task_generate("qwen1.5"))results = await asyncio.gather(*[task_generate(task_name=task_name, model=model)for model in models])print(f"{task_name} ENDENDENDEND {results}")def task_thread(thread_name: str):print("另外开始一个子线程做任务啦", thread_name)models = ["model11111", "model22222"]# 1. 只执行一次任务# asyncio.run(async_task(thread_name, models))# 2. 可以执行多此任务count = 0flag = Truewhile True:# 执行任务if flag:task_name = f"{thread_name}_task"asyncio.run(async_task(task_name, models))flag = False# 没有任务等待time.sleep(1)print(f"{thread_name} count:{count}")count+=1print("子线程任务结束啦", thread_name)if __name__ == '__main__':# 1. 批量线程创建thread_names = ["thread11111", "thread22222", "thread33333"]threads = [Thread(target=task_thread, args=(thread_name,)).start()for thread_name in thread_names]# 2. 逐个线程创建# t1 = Thread(target=task, args=("task11111",))# t1.start()# time.sleep(1)# t2 = Thread(target=task, args=("task22222",))# t2.start()uvicorn.run(app, host="0.0.0.0", port=8080)
运行:
> python.exe .\main.py
另外开始一个子线程做任务啦 thread11111
另外开始一个子线程做任务啦 thread22222
另外开始一个子线程做任务啦 thread33333
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
INFO: Started server process [19308]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 2 这是m", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 2 这是m", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 2 这是m", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 2 这是m", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
......
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 34 这是model11111模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 34 这是model11111模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null
thread22222_task ENDENDENDEND ['{"id": null, "choices": [{"delta": {"content": "model11111 34 这是model11111模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈
哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}']
thread11111_task ENDENDENDEND ['{"id": null, "choices": [{"delta": {"content": "model11111 34 这是model11111模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈
哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}']
thread33333_task ENDENDENDEND ['{"id": null, "choices": [{"delta": {"content": "model11111 34 这是model11111模型的流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "model22222 34 这是model22222模型的流式输出他会将每个字挨个的输出哈
哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}']
Ctrl+C 终止多线程
demo
import threading
import timedef thread_function(name, flag):while not flag.is_set():print(f"Thread {name} is running")time.sleep(1)print(f"Thread {name} is exiting")def signal_handler(signum, frame):print("\nYou pressed Ctrl+C! Exiting gracefully.")for flag in flags: # Set the flags to terminate the threadsflag.set()for t in threads:t.join() # Wait for the threads to finishexit()if __name__ == "__main__":import signalfrom threading import Event# Create flags to control the threadsflags = [Event() for _ in range(3)]# Start three threadsthreads = [threading.Thread(target=thread_function, args=('Thread-{}'.format(i), flags[i])) for i in range(3)]for t in threads:t.start()# Register the signal handlersignal.signal(signal.SIGINT, signal_handler)# Wait for keyboard interrupttry:while True:time.sleep(0.1)except KeyboardInterrupt:pass
运行:
> python.exe .\test.py
Thread Thread-0 is running
Thread Thread-1 is running
Thread Thread-2 is running
Thread Thread-2 is running
Thread Thread-1 is running
Thread Thread-0 is running
Thread Thread-1 is running
Thread Thread-2 is running
Thread Thread-0 is runningYou pressed Ctrl+C! Exiting gracefully. # 此时执行了 Ctrl+C
Thread Thread-0 is exiting
Thread Thread-2 is exiting
Thread Thread-1 is exiting