FastAPI是WEB UI接口,随着LLM的蓬勃发展,FastAPI的生态也迎来了新的机遇。本文将围绕FastAPI、OpenAI的API以及FastCRUD,来创建一个个性化的电子邮件写作助手,以展示如何结合这些技术来构建强大的应用程序。
下面我们开始分步骤操作:
一、安装环境
首先,我们创建一个文件夹
mkdir email-assistant-api
进入到该文件夹
cd email-assistant-api
我们使用poetry来管理python包,首先需要先安装poetry
pip install poetry
进入到email-assistant-api文件夹中,并初始化
poetry init
写下所要求的内容(默认为空),然后按 Enter 键
对交互式依赖项键入 no,直到您获得如下内容:
然后只需按 enter 键确认生成。您应该注意到在您的文件夹中创建了一个 pyproject.toml 文件,这是 poetry 用来管理依赖项的文件。
让我们从添加依赖项开始
poetry add fastapi fastcrud sqlmodel openai aiosqlite greenlet python-jose bcrypt
现在你应该还注意到一个 poetry.lock 文件,这是 poetry 保存已安装包的实际版本的方式。
二、项目结构
对于 FastAPI 应用程序,我们有三个主要内容:模型、架构和端点。由于这个 API 很简单,我们可以像这样创建我们的结构:
email_assistant_api/
│
├── app/
│ ├── __init__.py
│ ├── main.py # The main application file
│ ├── routes.py # Contains API route definitions and endpoint logic
│ ├── database.py # Database setup and session management
│ ├── models.py # SQLModel models for the application
│ ├── crud.py # CRUD operation implementations using FastCRUD
│ ├── schemas.py # Schemas for request and response models
│ └── .env # Environment variables
│
├── pyproject.toml # Project configuration and dependencies
├── README.md # Provides an overview and documentation
└── .gitignore # Files to be ignored by version control
- models.py 中定义我们的模型(数据库表的抽象);
- schemas.py用于验证和序列化数据;
- routes.py定义端点;
- database.py定义数据库相关信息;
- crud.py定义与数据库交互的crud操作;
- .env定义环境变量,比如API Key;
- main.py定义 FastAPI 应用程序和 API 的入口点;
请注意,此结构适用于小型应用程序,但如果您想为大型应用程序提供更强大的模板,请参考:https://github.com/igorbenav/FastAPI-boilerplate
2.1 对数据库进行建模
对于数据库,我们有一个简单的模型
- 用户有一个用户名、一个名字、一个电子邮件,我们存储一个哈希密码。
- 电子邮件日志包含我们为输入定义的内容,以及与此日志关联的用户的时间戳、generated_email 和 ID。
- 用户可能有多个电子邮件日志。
models.py代码示例:
# app/models.py
from sqlmodel import SQLModel, Field
from typing import Optional
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(..., min_length=2, max_length=30)
username: str = Field(..., min_length=2, max_length=20)
email: str
hashed_password: str
class EmailLog(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
user_input: str
reply_to: Optional[str] = Field(default=None)
context: Optional[str] = Field(default=None)
length: Optional[int] = Field(default=None)
tone: str
generated_email: str
timestamp: str
为了与我们的数据库交互,我们将在 crud.py 中为每个模型实例化 FastCRUD[1]
# app/crud.py
from fastcrud import FastCRUD
from .models import User, EmailLog
crud_user = FastCRUD(User)
crud_email_log = FastCRUD(EmailLog)
2.2 创建schemas
schemas.py 中创建我们的 schemas
# app/schemas.py
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field
# ------- user -------
class UserCreate(SQLModel):
name: str
username: str
email: str
password: str
class UserRead(SQLModel):
id: int
name: str
username: str
email: str
class UserCreateInternal(SQLModel):
name: str
username: str
email: str
hashed_password: str
# ------- email -------
class EmailRequest(SQLModel):
user_input: str
reply_to: Optional[str] = None
context: Optional[str] = None
length: int = 120
tone: str = "formal"
class EmailResponse(SQLModel):
generated_email: str
# ------- email log -------
class EmailLogCreate(SQLModel):
user_id: int
user_input: str
reply_to: Optional[str] = None
context: Optional[str] = None
length: Optional[int] = None
tone: Optional[str] = None
generated_email: str
timestamp: datetime = Field(
default_factory=lambda: datetime.now(UTC)
)
class EmailLogRead(SQLModel):
user_id: int
user_input: str
reply_to: Optional[str]
context: Optional[str]
length: Optional[int]
tone: Optional[str]
generated_email: str
timestamp: datetime
- 要创建用户,我们要求提供姓名、用户名、电子邮件和密码(我们将存储哈希值);
- 我们将默认长度设置为 120,默认tone设置为 “正式”;
- 我们自动生成 EmailLog 的时间戳
2.3 创建我们的应用程序并设置数据库
尽管我们已经有了模型和架构,但实际上我们既没有为终端节点提供服务的应用程序,也没有用于创建表的数据库。
下面看一下database.py:
# app/database.py
from sqlmodel import SQLModel, create_engine, AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "sqlite+aiosqlite:///./emailassistant.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
在这里,我们连接到了一个 SQLite 数据库,创建了一个函数来创建我们的数据库和表,以及一个允许我们与该数据库交互的会话。
现在让我们最终创建我们的 FastAPI 应用程序:
# app/main.py
from fastapi import FastAPI
from .database import create_db_and_tables
async def lifespan(app):
await create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
我们定义lifespan,以便在启动时创建 db 和 tables。
让我们运行一下代码来测试一下:
poetry run fastapi run
结果如下所示:
在浏览器登录如下地址:
127.0.0.1:8000/docs
可以看到如下界面:
可以在终端中按 Ctrl C 暂时关闭应用程序。
2.4 创建端点
接下来,创建端点来生成电子邮件。
首先在 .env 中输入 OpenAI API 密钥(这将被 .gitignore 忽略,并且不会出现在我们的存储库中):
# app/.env
OPENAI_API_KEY="my_openai_api_key"
将其写入 .gitignore 以确保不会提交此 API 密钥:
# .gitignore
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
现在从 .env 中获取 OpenAI API 密钥并实例化客户端:
# app/routes.py
import os
from starlette.config import Config
from openai import OpenAI
current_file_dir = os.path.dirname(os.path.realpath(__file__))
env_path = os.path.join(current_file_dir, ".env")
config = Config(env_path)
OPENAI_API_KEY = config("OPENAI_API_KEY")
open_ai_client = OpenAI(api_key=OPENAI_API_KEY)
为电子邮件终端节点创建一个路由器,并实际为电子邮件创建终端节点:
- 我们将创建一个系统提示符,使输出适应我们想要的结果;
- 我们将创建一个基本提示,用于格式化传递的信息;
- 然后我们将此信息传递给 OpenAI 客户端;
- 最后,我们将在数据库中创建一个日志条目并返回生成的电子邮件
# app/routes.py
...
from openai import OpenAI
from fastapi import APIRouter, Depends, HTTPException
from .schemas import EmailRequest, EmailResponse
from .database import get_session
...
# ------- email -------
email_router = APIRouter()
@email_router.post("/", response_model=EmailResponse)
async def generate_email(
request: EmailRequest,
db: AsyncSession = Depends(get_session)
):
try:
system_prompt = f"""
You are a helpful email assistant.
You get a prompt to write an email,
you reply with the email and nothing else.
"""
prompt = f"""
Write an email based on the following input:
- User Input: {request.user_input}
- Reply To: {request.reply_to if request.reply_to else 'N/A'}
- Context: {request.context if request.context else 'N/A'}
- Length: {request.length if request.length else 'N/A'} characters
- Tone: {request.tone if request.tone else 'N/A'}
"""
response = await open_ai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
max_tokens=request.length
)
generated_email = response.choices[0].message['content'].strip()
log_entry = EmailLogCreate(
user_id=request.user_id,
user_input=request.user_input,
reply_to=request.reply_to,
context=request.context,
length=request.length,
tone=request.tone,
generated_email=generated_email,
)
await crud_email_logs.create(db, log_entry)
return EmailResponse(generated_email=generated_email)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
现在定义 app/main.py ,将这个电子邮件路由器包含到我们的 FastAPI 应用程序中:
# app/main.py
from fastapi import FastAPI
from .database import create_db_and_tables
from .routes import email_router
async def lifespan(app):
await create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
app.include_router(email_router, prefix="/generate", tags=["Email"])
再次保存并运行 FastAPI 应用程序 (127.0.0.1:8000/docs),会看到如下界面:
点击这个新创建的 post 端点,传递一些信息并单击 execute
可以得到如下内容:
结果是我们所希望的回应,但是目前还无法通过查看日志来判断系统是否正常工作,因此让我们也创建电子邮件日志端点:
# app/routes.py
...
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio.session import AsyncSession
from .schemas import EmailLogCreate, EmailLogRead
...
# ------- email log -------
log_router = APIRouter()
@log_router.get("/")
async def read_logs(db: AsyncSession = Depends(get_session)):
logs = await crud_email_logs.get_multi(db)
return logs
@log_router.get("/{log_id}", response_model=EmailLogRead)
async def read_log(log_id: int, db: AsyncSession = Depends(get_session)):
log = await crud_email_logs.get(db, id=log_id)
if not log:
raise HTTPException(status_code=404, detail="Log not found")
return log
我们可以按其 ID 查看多个日志或一个日志。让我们也将这个路由器包含在我们的应用程序中:
# app/main.py
from fastapi import FastAPI
from .database import create_db_and_tables
from .routes import email_router, log_router
async def lifespan(app):
await create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
app.include_router(email_router, prefix="/generate", tags=["Email"])
app.include_router(log_router, prefix="/logs", tags=["Logs"])
三、用户功能、身份验证和安全性
现在,让我们添加实际的用户创建功能。首先在终端上运行:
openssl rand -hex 32
然后将结果写入 .env 作为SECRET_KEY
# app/.env
OPENAI_API_KEY="my_openai_api_key"
SECRET_KEY="my_secret_key"
首先创建一个文件 helper.py 并将以下代码粘贴到其中:
# app/helper.py
import os
from datetime import UTC, datetime, timedelta
from typing import Any, Annotated
import bcrypt
from jose import JWTError, jwt
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import SQLModel
from starlette.config import Config
from .database import get_session
from .crud import crud_users
current_file_dir = os.path.dirname(os.path.realpath(__file__))
env_path = os.path.join(current_file_dir, ".env")
config = Config(env_path)
# Security settings
SECRET_KEY = config("SECRET_KEY")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login")
# Token models
class Token(SQLModel):
access_token: str
token_type: str
class TokenData(SQLModel):
username_or_email: str
# Utility functions
async def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against a hashed password."""
return bcrypt.checkpw(plain_password.encode(), hashed_password.encode())
def get_password_hash(password: str) -> str:
"""Hash a password."""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
async def create_access_token(
data: dict[str, Any],
expires_delta: timedelta | None = None
) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(UTC).replace(tzinfo=None) + expires_delta
else:
expire = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
async def verify_token(token: str, db: AsyncSession) -> TokenData | None:
"""Verify a JWT token and extract the user data."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username_or_email: str = payload.get("sub")
if username_or_email is None:
return None
return TokenData(username_or_email=username_or_email)
except JWTError:
return None
async def authenticate_user(username_or_email: str, password: str, db: AsyncSession):
if "@" in username_or_email:
db_user: dict | None = await crud_users.get(db=db, email=username_or_email, is_deleted=False)
else:
db_user = await crud_users.get(db=db, username=username_or_email, is_deleted=False)
if not db_user:
return False
elif not await verify_password(password, db_user["hashed_password"]):
return False
return db_user
# Dependency
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_session)]
) -> dict[str, Any] | None:
"""Get the current authenticated user."""
token_data = await verify_token(token, db)
if token_data is None:
raise HTTPException(status_code=401, detail="User not authenticated.")
if "@" in token_data.username_or_email:
user = await crud_users.get(
db=db, email=token_data.username_or_email, is_deleted=False
)
else:
user = await crud_users.get(
db=db, username=token_data.username_or_email, is_deleted=False
)
if user:
return user
raise HTTPException(status_code=401, detail="User not authenticated.")
- verify_password: 根据哈希密码验证普通密码。它用于检查用户提供的密码是否与存储的哈希密码匹配。
- get_password_hash: 在将用户提供的密码存储到数据库之前对其进行哈希处理。
- create_access_token: 用于为经过身份验证的用户生成 JWT 类型的令牌。
- verify_token: 验证 JWT 令牌并提取用户数据。
- authenticate_user: 负责根据用户的用户名或电子邮件和密码对用户进行身份验证。
- get_current_user: 是一种依赖项,它根据提供的令牌检索当前经过身份验证的用户。
现在,让我们使用这些实用程序函数来创建用户路由。
# app/routes.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from .database import get_session
from .schemas import UserCreate, UserRead
from .helper import (
get_password_hash,
authenticate_user,
create_access_token,
get_current_user,
Token
)
# ------- user -------
user_router = APIRouter()
@user_router.post("/register", response_model=UserRead)
async def register_user(
user: UserCreate,
db: AsyncSession = Depends(get_session)
):
hashed_password = get_password_hash(user.password)
user_data = user.dict()
user_data["hashed_password"] = hashed_password
del user_data["password"]
new_user = await crud_users.create(
db,
object=UserCreateInternal(**user_data)
)
return new_user
@user_router.post("/login", response_model=Token)
async def login_user(user: UserCreate, db: AsyncSession = Depends(get_session)):
db_user = await crud_users.get(db, email=user.email)
password_verified = await verify_password(
user.password, db_user.hashed_password
)
if not db_user or not password_verified:
raise HTTPException(status_code=400, detail="Invalid credentials")
access_token_expires = timedelta(minutes=30)
access_token = await create_access_token(
data={"sub": user["username"]},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
并将路由器包含在我们的应用程序中:
# app/main.py
from fastapi import FastAPI
from .database import create_db_and_tables
from .routes import user_router, email_router, log_router
async def lifespan(app):
await create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
app.include_router(user_router, prefix="/users", tags=["Users"])
app.include_router(email_router, prefix="/generate", tags=["Email"])
app.include_router(log_router, prefix="/logs", tags=["Logs"])
最后,让我们在 generate_email 端点中注入 get_current_user 依赖项,添加用户登录以生成电子邮件的需求,此外,还会自动将用户的 ID 存储在日志中:
# app/routes.py
...
@email_router.post("/", response_model=EmailResponse)
async def generate_email(
request: EmailRequest,
db: AsyncSession = Depends(get_session),
current_user: dict = Depends(get_current_user)
):
try:
prompt = f"""
Write an email based on the following input:
- User Input: {request.user_input}
- Reply To: {request.reply_to if request.reply_to else 'N/A'}
- Context: {request.context if request.context else 'N/A'}
- Length: {request.length if request.length else 'N/A'} characters
- Tone: {request.tone if request.tone else 'N/A'}
"""
response = open_ai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful email assistant."},
{"role": "user", "content": prompt}
],
max_tokens=request.length
)
generated_email = response.choices[0].message.content
log_entry = EmailLogCreate(
user_id=current_user['id'],
user_input=request.user_input,
reply_to=request.reply_to,
context=request.context,
length=request.length,
tone=request.tone,
generated_email=generated_email,
)
await crud_email_logs.create(db, log_entry)
return EmailResponse(generated_email=generated_email)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
如果现在检查终端节点,您将在右侧看到一个小锁。
现在运行程序,需要进行身份验证,您可以通过单击锁并在此处传递有效的用户名和密码(您创建的用户)来完成:
现在,我们还将此依赖项设置为日志端点,此外,让我们使用 FastCRUD 仅过滤当前用户 ID 的日志。
我们可以通过注入 get_current_user 依赖项并将 user_id=current_user[“id”] 传递给 FastCRUD 来实现这一点(当前用户是 get_current_user 返回的)。
...
# ------- email log -------
log_router = APIRouter()
@log_router.get("/")
async def read_logs(
db: AsyncSession = Depends(get_session),
current_user: dict[str, Any] = Depends(get_current_user)
):
logs = await crud_email_logs.get_multi(db, user_id=current_user["id"])
return logs
@log_router.get("/{log_id}", response_model=EmailLogRead)
async def read_log(
log_id: int,
db: AsyncSession = Depends(get_session),
current_user: dict[str, Any] = Depends(get_current_user)
):
log = await crud_email_logs.get(db, id=log_id, user_id=current_user["id"])
if not log:
raise HTTPException(status_code=404, detail="Log not found")
return log
现在,我们实际上只能读取我们自己的日志,而且,只有在登录时才能读取。
最终的 routes 文件:
# app/routes.py
import os
from typing import Annotated, Any
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio.session import AsyncSession
from starlette.config import Config
from openai import OpenAI
from .crud import crud_email_logs, crud_users
from .database import get_session
from .schemas import (
EmailRequest,
EmailResponse,
EmailLogCreate,
EmailLogRead,
UserCreate,
UserRead,
UserCreateInternal,
)
from .helper import (
get_password_hash,
authenticate_user,
create_access_token,
get_current_user,
Token
)
current_file_dir = os.path.dirname(os.path.realpath(__file__))
env_path = os.path.join(current_file_dir, ".env")
config = Config(env_path)
OPENAI_API_KEY = config("OPENAI_API_KEY")
open_ai_client = OpenAI(api_key=OPENAI_API_KEY)
# ------- user -------
user_router = APIRouter()
@user_router.post("/register", response_model=UserRead)
async def register_user(
user: UserCreate,
db: AsyncSession = Depends(get_session)
):
hashed_password = get_password_hash(user.password)
user_data = user.dict()
user_data["hashed_password"] = hashed_password
del user_data["password"]
new_user = await crud_users.create(
db,
object=UserCreateInternal(**user_data)
)
return new_user
@user_router.post("/login", response_model=Token)
async def login_user(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: AsyncSession = Depends(get_session)
):
user = await authenticate_user(
username_or_email=form_data.username,
password=form_data.password,
db=db
)
if not user:
raise HTTPException(status_code=400, detail="Invalid credentials")
access_token_expires = timedelta(minutes=30)
access_token = await create_access_token(
data={"sub": user["username"]},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# ------- email -------
email_router = APIRouter()
@email_router.post("/", response_model=EmailResponse)
async def generate_email(
request: EmailRequest,
db: AsyncSession = Depends(get_session),
current_user: dict = Depends(get_current_user)
):
try:
system_prompt = f"""
You are a helpful email assistant.
You get a prompt to write an email,
you reply with the email and nothing else.
"""
prompt = f"""
Write an email based on the following input:
- User Input: {request.user_input}
- Reply To: {request.reply_to if request.reply_to else 'N/A'}
- Context: {request.context if request.context else 'N/A'}
- Length: {request.length if request.length else 'N/A'} characters
- Tone: {request.tone if request.tone else 'N/A'}
"""
response = open_ai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
max_tokens=request.length
)
generated_email = response.choices[0].message.content
log_entry = EmailLogCreate(
user_id=current_user['id'],
user_input=request.user_input,
reply_to=request.reply_to,
context=request.context,
length=request.length,
tone=request.tone,
generated_email=generated_email,
)
await crud_email_logs.create(db, log_entry)
return EmailResponse(generated_email=generated_email)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ------- email log -------
log_router = APIRouter()
@log_router.get("/")
async def read_logs(
db: AsyncSession = Depends(get_session),
current_user: dict[str, Any] = Depends(get_current_user)
):
logs = await crud_email_logs.get_multi(db, user_id=current_user["id"])
return logs
@log_router.get("/{log_id}", response_model=EmailLogRead)
async def read_log(
log_id: int,
db: AsyncSession = Depends(get_session),
current_user: dict[str, Any] = Depends(get_current_user)
):
log = await crud_email_logs.get(db, id=log_id, user_id=current_user["id"])
if not log:
raise HTTPException(status_code=404, detail="Log not found")
return log
参考文献:
[1] https://github.com/igorbenav/fastcrud?tab=readme-ov-file