路径操作的高级配置
OpenAPI 的 operationId
from fastapi import FastAPIapp = FastAPI()# 通过 operation_id 参数设置
@app.get("/items/", operation_id="some_specific_id_you_define")
async def read_items():return [{"item_id": "Foo"}]
使用 路径操作函数 的函数名作为 operationId
from fastapi import FastAPI
from fastapi.routing import APIRouteapp = FastAPI()@app.get("/items/")
async def read_items():return [{"item_id": "Foo"}]def use_route_names_as_operation_ids(app: FastAPI) -> None:"""Simplify operation IDs so that generated API clients have simpler functionnames.Should be called only after all routes have been added."""for route in app.routes:if isinstance(route, APIRoute):# 个人觉得这种操作挺不错,可以在一个地方统一处理,更有利于建立规范 且方便管理route.operation_id = route.name # in this case, 'read_items'use_route_names_as_operation_ids(app)
从 OpenAPI 中排除
from fastapi import FastAPIapp = FastAPI()# 使用 include_in_schema 参数并将其设置为 False,
# openapi 文档中就看不到这个接口的信息
#
# 个人认为,可以作为是否完成的开关使用,默认设置 False,当接口完成开发后设置为 True
@app.get("/items/", include_in_schema=False)
async def read_items():return [{"item_id": "Foo"}]
docstring 的高级描述
from typing import Set, Unionfrom fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI()class Item(BaseModel):name: strdescription: Union[str, None] = Noneprice: floattax: Union[float, None] = Nonetags: Set[str] = set()@app.post("/items/", response_model=Item, summary="Create an item")
async def create_item(item: Item):"""Create an item with all the information:- **name**: each item must have a name- **description**: a long description- **price**: required- **tax**: if the item doesn't have tax, you can omit this- **tags**: a set of unique tag strings for this item> \\f : 换页 的转义字符 1 <br/>> 比较有意思的是,这里可以按照 Markdown 的语法写文档注释> Swagger UI 只支持一些简单的 Markdown 语法> 比如现在的 `>` Swagger UI就不支持,但是 ReDoc 支持> 但是比较遗憾的是 这两者对 Markdown 的语法支持的不够完善但是比较遗憾的是 这两者都不支持换行但是比较遗憾的是 这两者都不支持换行但是比较遗憾的是 这两者都不支持换行`\n` 和 `<br/>` 两者的换行还不一样对于 `>` 中的换行 要使用 `<br/>`, 不然会作为两段 引用,只能说对于 Markdown语法的支持两者都有提升空间啊\f\\f : 换页 的转义字符 2换页的内容,不会再 上面提到的两者中 进行展示比较有意思的是,这里可以按照 Markdown 的语法写文档注释然后在文档中就会按照 Markdown 的样式进行展示:param item: User input."""return item
自定义响应
使用 ORJSONResponse
如果你需要压榨性能,你可以安装并使用
orjson
并将响应设置为ORJSONResponse
from fastapi import FastAPI
from fastapi.responses import ORJSONResponseapp = FastAPI()# 使用 ORJSONResponse 代替 JSONResponse
@app.get("/items/", response_class=ORJSONResponse)
async def read_items():return ORJSONResponse([{"item_id": "Foo"}])
StreamingResponse
如果您有类似文件的对象(例如,由 open() 返回的对象),则可以在 StreamingResponse 中将其返回
from fastapi import FastAPI
from fastapi.responses import StreamingResponsesome_file_path = "large-video-file.mp4"
app = FastAPI()@app.get("/")
def main():def iterfile(): # (1)with open(some_file_path, mode="rb") as file_like: # (2)yield from file_like # (3)# 使用 StreamingResponse 返回数据return StreamingResponse(iterfile(), media_type="video/mp4")
FileResponse
异步传输文件作为响应。
from fastapi import FastAPI
from fastapi.responses import FileResponsesome_file_path = "large-video-file.mp4"
app = FastAPI()@app.get("/")
async def main():return FileResponse(some_file_path)
响应 Cookies
使用 Response
参数
app = FastAPI()@app.post("/cookie-and-object/")
def create_cookie(response: Response):# 定义一个 response: Response 的参数# 调用 set_cookieresponse.set_cookie(key="fakesession", value="fake-cookie-session-value")return {"message": "Come to the dark side, we have cookies"}
直接响应 Response
from fastapi import FastAPI
from fastapi.responses import JSONResponseapp = FastAPI()@app.post("/cookie/")
def create_cookie():content = {"message": "Come to the dark side, we have cookies"}# 创建一个 Response 或其子类 对象response = JSONResponse(content=content)# 调用 set_cookieresponse.set_cookie(key="fakesession", value="fake-cookie-session-value")return response
响应头
和 响应Cookies 类似也有两种方式
使用 Response
参数
from fastapi import FastAPI, Responseapp = FastAPI()@app.get("/headers-and-object/")
def get_headers(response: Response):# 声明一个 Response 类的形参# 像字典添加 键值对 一样,往 headers中添加键值对response.headers["X-Cat-Dog"] = "alone in the world"return {"message": "Hello World"}
直接返回 Response
from fastapi import FastAPI
from fastapi.responses import JSONResponseapp = FastAPI()@app.get("/headers/")
def get_headers():content = {"message": "Hello World"}# 将要返回的响应头信息先存到字典中 header_dict header_dict = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"}# 通过 kv 形式设置 headers, headers=header_dict return JSONResponse(content=content, headers=header_dict )
直接使用请求
from fastapi import FastAPI, Requestapp = FastAPI()@app.get("/items/{item_id}")
def read_root(item_id: str, request: Request):# 声明一个形参 request: Request# 可以从 request 中得到请求信息client_host = request.client.hostreturn {"client_host": client_host, "item_id": item_id}
添加 ASGI 中间件
from fastapi import FastAPI
from unicorn import UnicornMiddlewareapp = FastAPI()# 通过 FastAPI 的实例 app 调用 add_middleware 方法添加中间件
app.add_middleware(UnicornMiddleware, some_config="rainbow")
使用代理
在 FastAPI 应用里设置 root_path
from fastapi import FastAPI, Request# 将 "/api/v1" 赋值给 root_path
app = FastAPI(root_path="/api/v1")@app.get("/app")
def read_main(request: Request):return {"message": "Hello World", "root_path": request.scope.get("root_path")}
附加的服务器(多环境部署时)
from fastapi import FastAPI, Request# 可以通过 servers,将多个环境的地址以列表的形式赋值给 servers
app = FastAPI(servers=[{"url": "https://stag.example.com", "description": "Staging environment"},{"url": "https://prod.example.com", "description": "Production environment"},],root_path="/api/v1",# 将 root_path 从 servers 剔除,root_path_in_servers=False# 默认 servers 会包含 root_path, root_path_in_servers=True# root_path_in_servers=False,
)@app.get("/app")
def read_main(request: Request):return {"message": "Hello World", "root_path": request.scope.get("root_path")}
WebSockets
安装 WebSockets
pip install websockets
创建 websocket
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponseapp = FastAPI()html = """
<!DOCTYPE html>
<html><head><title>Chat</title></head><body><h1>WebSocket Chat</h1><form action="" οnsubmit="sendMessage(event)"><input type="text" id="messageText" autocomplete="off"/><button>Send</button></form><ul id='messages'></ul><script>var ws = new WebSocket("ws://localhost:8000/ws");ws.onmessage = function(event) {var messages = document.getElementById('messages')var message = document.createElement('li')var content = document.createTextNode(event.data)message.appendChild(content)messages.appendChild(message)};function sendMessage(event) {var input = document.getElementById("messageText")ws.send(input.value)input.value = ''event.preventDefault()}</script></body>
</html>
"""@app.get("/")
async def get():return HTMLResponse(html)# 通过 app.websocket 创建一个 websocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):# 接收消息await websocket.accept()while True:data = await websocket.receive_text()# 发送消息await websocket.send_text(f"Message text was: {data}")
事件:启动 - 关闭
startup
事件
from fastapi import FastAPIapp = FastAPI()items = {}# 通过 app.on_event("startup") ”声明“一个函数为启动函数
# 可以做一些预处理工作,比如 往数据库中写入一些初始数据
@app.on_event("startup")
async def startup_event():items["foo"] = {"name": "Fighters"}items["bar"] = {"name": "Tenders"}@app.get("/items/{item_id}")
async def read_items(item_id: str):return items[item_id]
shutdown
事件
from fastapi import FastAPIapp = FastAPI()# 通过 app.on_event("shutdown") ”声明“一个函数为关闭函数
@app.on_event("shutdown")
def shutdown_event():with open("log.txt", mode="a") as log:log.write("Application shutdown")@app.get("/items/")
async def read_items():return [{"name": "Foo"}]
到此结 DragonFangQy 2024.07.18