高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制

高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制

目录
  1. 🌐 WebSocket 实时通讯概述
  2. 💬 FastAPI 中实现 WebSocket 聊天系统
  3. 🔧 WebSocket 并发控制与性能优化
  4. 🔒 WebSocket 安全性与认证机制
  5. 🚀 WebSocket 在生产环境的优化与部署

1. 🌐 WebSocket 实时通讯概述

WebSocket 是一种网络协议,旨在提供在客户端与服务器之间持久化、全双工的通信通道。与传统的 HTTP 请求-响应模型不同,WebSocket 允许客户端和服务器之间建立一个持续的连接,这使得它非常适合用于构建实时应用,如聊天系统、在线游戏和实时数据流等。

在传统的 HTTP 通信中,每次请求都需要重新建立连接,而 WebSocket 通过单一的握手过程就可以保持连接的持久性。这种技术可以显著减少延迟,使得数据交换更为迅速与高效。特别是在需要实时交互的应用场景中,WebSocket 可以显著提升用户体验。

FastAPI 是一个现代化、快速(高性能)的 Web 框架,支持构建高效的 RESTful API,同时也具备了对 WebSocket 协议的原生支持。通过 FastAPI,我们可以快速构建一个强大且高效的实时通讯系统。接下来将详细讲解如何在 FastAPI 中实现 WebSocket 协议并构建一个简单的聊天系统。

2. 💬 FastAPI 中实现 WebSocket 聊天系统

WebSocket 连接管理

在构建聊天系统时,我们需要处理多个用户之间的消息传递。为了支持多用户之间的消息广播,我们需要管理每个 WebSocket 连接,并确保消息能够正确地从发送者转发到接收者。

在 FastAPI 中,WebSocket 连接的管理非常简单。通过 WebSocket 对象,可以进行连接的建立、接收和发送消息。下面是一个简单的 WebSocket 聊天系统的实现:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 存储所有的 WebSocket 连接
active_connections: List[WebSocket] = []# 处理 WebSocket 连接
@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):# 等待连接await websocket.accept()active_connections.append(websocket)try:while True:# 接收客户端消息message = await websocket.receive_text()# 广播消息给所有连接for connection in active_connections:if connection != websocket:await connection.send_text(f"{username}: {message}")except WebSocketDisconnect:# 断开连接时移除 WebSocketactive_connections.remove(websocket)await websocket.close()
代码解析
  1. 连接管理:通过 active_connections 列表保存当前所有活跃的 WebSocket 连接。每当一个新的客户端连接时,系统会将其 WebSocket 对象添加到这个列表中。
  2. 接受连接websocket.accept() 允许客户端与服务器建立 WebSocket 连接。
  3. 消息接收与广播:通过 await websocket.receive_text() 来接收客户端发送的文本消息。然后,系统将该消息广播给所有其他连接的客户端(除了发送者)。
  4. 断开连接处理:当客户端断开连接时,抛出 WebSocketDisconnect 异常,系统会从 active_connections 列表中移除该 WebSocket 对象,并关闭连接。

通过这种方式,聊天系统能够支持多个客户端之间的实时消息传递。

3. 🔧 WebSocket 并发控制与性能优化

在实际应用中,尤其是高并发的场景下,如何有效地管理 WebSocket 连接并确保系统的高性能是至关重要的。虽然 FastAPI 本身就具备了异步处理能力,但仍然需要一些策略来优化性能,处理大量并发连接。

并发管理
  1. 异步操作:FastAPI 利用 Python 的异步 I/O(asyncio)模型,可以在不阻塞的情况下高效处理多个并发的 WebSocket 连接。每个 WebSocket 的处理都是独立的异步任务,不会阻塞其他连接。
  2. 连接数控制:在高并发情况下,可以根据需要限制最大连接数,防止过多连接带来的资源耗尽。
MAX_CONNECTIONS = 1000  # 最大连接数限制@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):if len(active_connections) >= MAX_CONNECTIONS:await websocket.close(code=1000)  # 关闭新连接returnawait websocket.accept()active_connections.append(websocket)...
  1. 心跳检测:为了确保连接的健康性,可以定期发送心跳包,检测客户端是否仍然在线。如果客户端断开连接但没有显式关闭 WebSocket,心跳包可以帮助检测并及时清理无效连接。
import asyncioasync def send_heartbeat(websocket: WebSocket):while True:await asyncio.sleep(30)  # 每30秒发送一次心跳await websocket.send_text("heartbeat")
性能优化
  1. 消息压缩:在大量消息交换的场景下,采用压缩算法(如 gzip)可以减少数据传输量,提高性能。
  2. 负载均衡:在高并发的情况下,可以采用负载均衡技术将 WebSocket 请求分发到多个 FastAPI 实例上,从而分担流量压力。

4. 🔒 WebSocket 安全性与认证机制

在实时通讯系统中,安全性是一个不可忽视的问题,尤其是 WebSocket 连接的认证与授权。以下是常见的安全措施:

WebSocket 身份验证

通过 HTTP 头部或 Cookie 传递认证信息,是一种常见的做法。在 FastAPI 中,可以通过 Depends 依赖注入机制来实现 WebSocket 的身份验证。

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBeareroauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")async def get_current_user(token: str = Depends(oauth2_scheme)):# 校验 token 并获取用户信息user = get_user_from_token(token)if user is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")return user

通过这种方式,每当用户尝试连接 WebSocket 时,可以在连接之前通过 OAuth2 或 JWT token 校验其身份。

数据加密与防篡改

在 WebSocket 数据传输过程中,建议使用 WSS(WebSocket Secure)协议来确保数据传输的加密性。此外,为了防止数据篡改,可以在传输数据时使用 HMAC 等签名机制进行数据验证。

import hmac
import hashlibdef verify_message_signature(message: str, signature: str, secret: str) -> bool:computed_signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()return hmac.compare_digest(computed_signature, signature)

5. 🚀 WebSocket 在生产环境的优化与部署

在生产环境中,WebSocket 服务需要处理更多的请求,并且通常需要与其他微服务进行协作。为了确保 WebSocket 服务的高效运行,下面是一些常见的优化措施:

服务部署
  1. 水平扩展:通过多个 FastAPI 实例并行运行 WebSocket 服务,可以有效地分担负载。
  2. WebSocket 与负载均衡器:在大规模部署时,采用支持 WebSocket 的负载均衡器(如 NGINX、HAProxy)可以将客户端请求合理地分发到多个 FastAPI 实例。
持久化与状态管理

对于聊天系统,用户的聊天记录和会话状态可能需要持久化。可以使用 Redis 或数据库来存储聊天历史记录,并且可以使用 Redis 进行会话的共享。

import redisr = redis.Redis(host='localhost', port=6379, db=0)def store_message(message: str, channel: str):r.publish(channel, message)  # 将消息存入 Redis
高可用性与容错

为了保证 WebSocket 服务的高可用性,采用分布式系统设计、故障转移机制(如 Kubernetes 自动扩容、健康检查等)是必要的。

通过这些部署和优化策略,可以确保 WebSocket 服务在生产环境中具有良好的性能和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/16327.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【力扣】148.排序链表

AC截图 题目 思路 基本情况处理: 如果链表为空 (head NULL) 或者链表仅有一个节点 (head->next NULL),则链表已经是有序的,直接返回头节点 head。 分割链表: 使用快慢指针法找到链表的中间节点。slow 指针每次前进一格&…

新数据结构(7)——Object

Object类是所有类的父类,也就是说所有类都是object类的子类可以使用Object里的方法。 equals()和hashCode()是Object包含的方法 和equals的区别和联系 和equals()都是用于判断是否相等 基本数据类型只能用判断其是否相等,引用数据类型用判断的是其指…

保研考研机试攻略:python笔记(4)

🐨🐨🐨15各类查找 🐼🐼二分法 在我们写程序之前,我们要定义好边界,主要是考虑区间边界的闭开问题。 🐶1、左闭右闭 # 左闭右闭 def search(li, target): h = len(li) - 1l = 0#因为都是闭区间,h和l都可以取到并且相等while h >= l:mid = l + (h - l) // 2…

doris集群

开发doris的团队厉害,这个百度工程师确实也干了一些实事,不像领导层只会跑火车。 1 参数配置 1.1 文件句柄数 vim /etc/security/limits.conf * soft nofile 655350 * hard nofile 6553501.2 关闭透明大页 echo never > /sys/kernel/mm/transpare…

STM32的HAL库开发---高级定时器---互补输出带死区实验

一、互补输出简介 互补输出:OCx输出高电平,则互补通道OCxN输出低电平。OCx输出低电平,则互补通道OCxN输出高电平。 带死区控制的互补输出:OCx输出高电平时,则互补通道OCxN过一会再输出输出低电平。这个时间里输出的电…

京东广告生成式召回基于 NVIDIA TensorRT-LLM 的推理加速实践

0000 生成式推荐系统优势介绍 推荐系统的主要任务是根据用户的历史行为预测其兴趣点,并向其推荐相应的商品。传统的推荐系统在处理用户请求时,会触发多个召回模块(包括热门商品召回、个性化召回、深度召回等),以召回大…

3.React 组件化开发

react:版本 18.2.0node: 版本18.19.1脚手架:版本 5.0.1 一、类组件 (一) 一个干净的脚手架 【1】使用已经被废弃的 CRA (create-react-app) create-react-app 已经被废弃,且目前使用会报错,官方已经不推荐使用&…

51单片机(国信长天)矩阵键盘的基本操作

在CT107D单片机综合训练平台上,首先将J5处的跳帽接到1~2引脚,使按键S4~S19按键组成4X4的矩阵键盘。在扫描按键的过程中,发现有按键触发信号后(不做去抖动),待按键松开后,在数码管的第一位显示相应的数字:从左至右&…

【AI赋能】蓝耘智算平台实战指南:3步构建企业级DeepSeek智能助手

蓝耘智算平台实战指南:3步构建企业级DeepSeek智能助手 引言:AI大模型时代的算力革命 在2025年全球AI技术峰会上,DeepSeek-R1凭借其开源架构与实时推理能力,成为首个通过图灵测试的中文大模型。该模型在语言理解、跨模态交互等维…

机器学习 - 词袋模型(Bag of Words)实现文本情感分类的详细示例

为了简单直观的理解模型训练,我这里搜集了两个简单的实现文本情感分类的例子,第一个例子基于朴素贝叶斯分类器,第二个例子基于逻辑回归,通过这两个例子,掌握词袋模型(Bag of Words)实现文本情感…

没有服务器和显卡电脑如何本地化使用deepseek|如何通过API使用满血版deepseek

目录 一、前言二、使用siliconflow硅基流动 API密钥1、注册硅基流动2、创建API密钥3、下载AI客户端4、使用API密钥5、效果演示 三、使用deepseek官方API密钥1、创建API密钥2、使用API密钥3、效果演示 四、总结 一、前言 上篇文章我介绍了如何通过云服务器或者显卡电脑来本地化…

算法学习笔记之贪心算法

导引(硕鼠的交易) 硕鼠准备了M磅猫粮与看守仓库的猫交易奶酪。 仓库有N个房间,第i个房间有 J[i] 磅奶酪并需要 F[i] 磅猫粮交换,硕鼠可以按比例来交换,不必交换所有的奶酪 计算硕鼠最多能得到多少磅奶酪。 输入M和…

oracle执行grant授权sql被阻塞问题处理

一 问题描述 执行普通的grant授权sql(grant select,update on 表名 to 用户名)好几分钟都没反应,跟被阻塞了似的。 二 问题排查 #排查是否有阻塞 用OEM可以看到阻塞信息: 点‘性能’-‘阻塞会话’: 下面那个会话2958是我执行grant sql的…

SSM仓库物品管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.用户登录代码:2.保存物品信息代码:3.删除仓库信息代码: 一、项目演示 项目演示地址: 视频地址 二、项目介绍 项目描述:这是一个基于SSM框架开发的仓库…

Deepseek 接入Word处理对话框(隐藏密钥)

硅基流动邀请码:1zNe93Cp 邀请链接:网页链接 亲测deepseek接入word,自由调用对话,看截图有兴趣的复用代码(当然也可以自己向deepseek提问,帮助你完成接入,但是提问逻辑不一样给出的答案是千差万…

Docker Compose介绍及安装使用MongoDB数据库详解

在现代容器化应用部署中,Docker Compose是一种非常实用的工具,它允许我们通过一个docker-compose.yml文件来定义和运行多容器应用程序。然而,除了Docker之外,Podman也提供了类似的工具——Podman Compose,它允许我们在…

IntelliJ IDEA Console控制台输出成json的配置方式

【IntelliJ IDEA Console控制台输出成json的配置方式】 1.帮助->查找操作 2.搜索注册表 3.ctrlf 搜索pty 控制台右键 结果

基础入门-HTTP数据包红蓝队研判自定义构造请求方法请求头修改状态码判断

知识点: 1、请求头&返回包-方法&头修改&状态码等 2、数据包分析-红队攻击工具&蓝队流量研判 3、数据包构造-Reqable自定义添加修改请求 一、演示案例-请求头&返回包-方法&头修改&状态码等 数据包 客户端请求Request 请求方法 …

react redux用法学习

参考资料: https://www.bilibili.com/video/BV1ZB4y1Z7o8 https://cn.redux.js.org/tutorials/essentials/part-5-async-logic AI工具:deepseek,通义灵码 第一天 安装相关依赖: 使用redux的中间件: npm i react-redu…

机器学习 - 线性回归(最大后验估计)

最大似然估计的一个缺点是当训练数据比较少时会发生过拟合,估计的参数可能不准确.为了避免过拟合,我们可以给参数加上一些先验知识. 一、先从最大似然估计的一个缺点入手 最大似然估计(MLE)在处理小样本数据时,容易发…