python的websocket方法教程

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。

websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。

2. 在Python中使用WebSocket

Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。

要安装websockets库,你可以使用pip:

pip install websockets

3. 创建WebSocket服务器

使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:

import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(f"Received message: {message}")await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。

然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。

4. 创建WebSocket客户端

要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:

import asyncio
import websocketsasync def main():async with websockets.connect("ws://localhost:8765") as websocket:message = "Hello, server!"await websocket.send(message)print(f"Sent: {message}")response = await websocket.recv()print(f"Received: {response}")asyncio.run(main())

在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。

5. 总结

WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。

6. 通过websockets这个项目,从大型开源项目中学习asyncio库。

一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。

Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
在这里插入图片描述

Transport对象提供以下常用函数——

is_reading:判断该Transport是否在读。

set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。

write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。

abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。

在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
在这里插入图片描述

二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
在这里插入图片描述

用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——

connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。

connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。

pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。

resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。

data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。

eof_received:当被Transport对象被调用write_eof时被调用。

在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
在这里插入图片描述

而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。

websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在这里插入图片描述

在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。

从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。

附录:进阶版本:

python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect: 在client端使用,用于建立连接。

send:发送数据

recv:接收数据

close:关闭连接

服务端

#!/usr/bin/python3
# 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data = await websocket.recv()reply = f"Data received as \"{data}\".  time: {datetime.now()}"print(reply)await websocket.send(reply)print("Send reply")async def main():async with websockets.serve(handler, "localhost", 9999):await asyncio.Future()  # run foreverif __name__ == "__main__":asyncio.run(main())

客户端

import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send("Hello, I am PyPy.")response = await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))

服务端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"# 握手,通过接收Hi,发送"success"来进行双方的握手。
async def serverHands(websocket):while True:recv_text = await websocket.recv()print("recv_text=" + recv_text)if recv_text == "Hi":print("connected success")await websocket.send("success")return Trueelse:await websocket.send("connected fail")# 接收从客户端发来的消息并处理,再返给客户端success
async def serverRecv(websocket):while True:recv_text = await websocket.recv()print("recv:", recv_text)await websocket.send("success,get mess:"+ recv_text)# 握手并且接收数据
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ == '__main__':print("======server======")server = websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()

客户端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"async def clientHands(websocket):while True:# 通过发送hello握手await websocket.send("Hi")response_str = await websocket.recv()# 接收"success"来进行双方的握手if "success" in response_str:print("握手成功")return True# 向服务器端发送消息
async def clientSend(websocket):while True:input_text = input("input text: ")if input_text == "exit":print(f'"exit", bye!')await websocket.close(reason="exit")return Falseawait websocket.send(input_text)recv_text = await websocket.recv()print(f"{recv_text}")# 进行websocket连接
async def clientRun():ipaddress = IP_ADDR + ":" + IP_PORTasync with websockets.connect("ws://" + ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ == '__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())

服务端

# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP = '127.0.0.1'
PORT_CHAT = 9090USERS ={}#提供聊天的后台
async def ServerWs(websocket,path):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',filename="chat.log",level=logging.INFO)# 握手await websocket.send(json.dumps({"type": "handshake"}))async for message in websocket:data = json.loads(message)message = ''# 用户发信息if data["type"] == 'send':name = '404'for k, v in USERS.items():if v == websocket:name = kdata["from"] = nameif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "user", "content": data["content"], "from": name})# 用户注册elif data["type"] == 'register':try:USERS[data["uuid"]] = websocketif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "login", "content": data["content"], "user_list": list(USERS.keys())})except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del USERS[data["uuid"]]if len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})#打印日志logging.info(data)# 群发await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print("server")start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ == "__main__":from multiprocessing import Processmultiprocessing.freeze_support()server = Process(target=server_run, daemon=False)server.start()

服务端

import asyncio
import websockets
import time
import json
import threading
# 功能模块
class OutputHandler():async def run(self,message,send_ms,websocket):# 用户发信息await send_ms(message, websocket)# 单发消息# await send_ms(message, websocket)# 群发消息#await s('hi起来')# 存储所有的客户端
Clients = {}# 服务端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 回调函数(发消息给客户端)async def callback_send(self, msg, websocket=None):await self.sendMsg(msg, websocket)# 发送消息async def sendMsg(self, msg, websocket):print('sendMsg:', msg)# websocket不为空,单发,为空,群发消息if websocket != None:await websocket.send(msg)else:# 群发消息await self.broadcastMsg(msg)# 避免被卡线程await asyncio.sleep(0.2)# 群发消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 针对不同的信息进行请求,可以考虑json文本async def runCaseX(self,jsonMsg,websocket):print('runCase')op = OutputHandler()# 参数:消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 连接一个客户端,起一个循环监听async def echo(self,websocket, path):# 添加到客户端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({"type": "handshake"}))# 循环监听while True:# 接受信息try:# 接受文本recv_text = await websocket.recv()message = "Get message: {}".format(recv_text)# 返回客户端信息await websocket.send(message)# 转jsondata = json.loads(recv_text)# 用户发信息if data["type"] == 'send':name = '404'for k, v in Clients.items():if v == websocket:name = kdata["from"] = nameif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "send", "content": data["content"], "from": name})await self.runCaseX(jsonMsg=message, websocket=websocket)# 用户注册elif data["type"] == 'register':try:Clients[data["uuid"]] = websocketif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})await self.runCaseX(jsonMsg=message, websocket=websocket)except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del Clients[data["uuid"]]# 对message进行解析,跳进不同功能区# await self.runCaseX(jsonMsg=data,websocket=websocket)# 链接断开except websockets.ConnectionClosed:print("ConnectionClosed...", path)# del Clientsbreak# 无效状态except websockets.InvalidState:print("InvalidState...")# del Clientsbreak# 报错except Exception as e:print("ws连接报错",e)# del Clientsbreak# 启动服务器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future()  # run forever# 多协程模式,防止阻塞主线程无法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多线程启动def startServer(self):# 多线程启动,否则会堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':print("server")s = WS_Server()s.startServer()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/215335.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NSSCTF web刷题记录7

文章目录 [SDCTF 2022]CURL Up and Read[NUSTCTF 2022 新生赛]Translate [SDCTF 2022]CURL Up and Read 考点:SSRF 打开题目发现是curl命令,提示填入url 尝试http://www.baidu.com,成功跳转 将url的字符串拿去解码,得到json格式数…

低功耗模式的通用 MCU ACM32F0X0 系列,具有高整合度、高抗干扰、 高可靠性的特点

ACM32F0X0 系列是一款支持多种低功耗模式的通用 MCU。集成 12 位 1.6 Msps 高精度 ADC 以及比 较器、运放、触控按键控制器、段式 LCD 控制器,内置高性能定时器、多路 UART、LPUART、SPI、I2C 等丰富的通讯外设,内建 AES、TRNG 等信息安全模块&#xff0…

云降水物理基础

云降水物理基础 云的分类 相对湿度变化方程 由相对湿度的定义,两边取对数之后可以推出 联立克劳修斯-克拉佩龙方程(L和R都为常数) 由右式看出,增加相对湿度的方式:增加水汽(de增大)和降低…

基于node 安装express后端脚手架

1.首先创建文件件 2.在文件夹内打开终端 npm init 3.安装express: npm install -g express-generator注意的地方:这个时候安装特别慢,最后导致不成功 解决方法:npm config set registry http://registry.npm.taobao.org/ 4.依次执行 npm install -g ex…

【基础知识】大数据概述

关键词—分布式 化整为零,再化零为整 大数据的定义 传统数据库处理起来困难的数据集。 发展历程 中国开源生态图谱2023 参考内容 中国开源生态图谱 2023.pdf 技术组件说明 数据集成 sqoop、dataX、flume 数据存储 hdfs、kafka 数据处理 mapreduce、hive…

用PHP和HTML做登录注册操作数据库Mysql

用PHP和HTML做登录注册操作数据库Mysql 两个HTML页面&#xff0c;两个PHP,两个css,两张图片&#xff0c;源码资源在上方。 目录 HTML页面 login.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta nam…

AtCoder ABC周赛2023 11/4 (Sat) E题题解

目录 原题截图&#xff1a; 原题翻译 题目大意&#xff1a; 主要思路&#xff1a; 代码&#xff1a; 原题截图&#xff1a; 原题翻译 题目大意&#xff1a; 给你一个数组&#xff0c;给你一个公式&#xff0c;让你选k个元素&#xff0c;用公式算出最终得分。 主要思路&am…

PyQt6 表单布局Form Layout (QFormLayout)

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计43条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

嵌入式系统复习--概述

文章目录 基本概念嵌入式系统的组成结构嵌入式操作系统嵌入式软件开发环境硬件基础简介下一篇 基本概念 嵌入式计算机&#xff1a;把嵌入到对象体系中、实现对象体系智能化控制的带有微控制器的计算机&#xff0c;称作嵌入式计算机 嵌入式系统&#xff1a;以应用为中心&#…

STM32-01-认识单片机

文章目录 一、单片机简介二、Cortex-M系列介绍三、初识STM32四、STM32原理图设计五、搭建开发环境六、STM32初体验七、MDK5使用技巧 一、单片机简介 单片机是什么&#xff1f; 单片机&#xff1a;Single-Chip Microcomputer&#xff0c;单片微型计算机&#xff0c;是一种集成电…

【mysql】下一行减去上一行数据、自增序列场景应用

背景 想获取if_yc为1连续账期数据 思路 获取所有if_yc为1的账期数据下一行减去上一行账期&#xff0c;如果为1则为连续&#xff0c;不等于1就为断档获取不等于1的最小账期&#xff0c;就是离当前账期最近连续账期 代码 以下为mysql语法&#xff1a; select acct_month f…

基于 librosa和soundfile对音频进行重采样 (VITS 必备)

基于 librosa和soundfile对音频进行重采样 一、前言 在玩bert-vits2的时候有对音频进行重采样的需求&#xff0c;故写了一下批量对音频进行重采样的脚本。 优化点&#xff1a; 根据机器自适应线程数为最多&#xff0c;保证充分利用机器资源&#xff0c;提高速度>30%。支持…

Caching the Application Engine Server 缓存应用程序引擎服务器

Caching the Application Engine Server 缓存应用程序引擎服务器 Application Engine caches metadata just like the application server. This caching enhances performance because a program can refer to the local cache for any objects that it uses. 应用程序引擎…

iPaaS架构深入探讨

在数字化时代全面来临之际&#xff0c;企业正面临着前所未有的挑战与机遇。技术的迅猛发展与数字化转型正在彻底颠覆各行各业的格局&#xff0c;不断推动着企业迈向新的前程。然而&#xff0c;这一数字化时代亦衍生出一系列复杂而深奥的难题&#xff1a;各异系统之间数据孤岛、…

C语言猜数字升级版

题目概述 猜数字是一种益智游戏&#xff0c;既可以两个人一起玩&#xff0c;也可以与电脑一起玩。现在我们需要将这个游戏移到电脑上&#xff0c;让电脑与我们一起玩猜数字游戏. 需求分析 用户输入&#xff1a;确定用户输入的数据是否正确游戏过程&#xff1a;保证计算机能正…

Redis HyperLogLog 数据结构模型统计

HyperLogLog HyperLogLog 不是一种新的数据结构 &#xff0c; 本质上是字符串类型。 是一种基数算法。 通过 HyperLogLog 可以节省内存空间&#xff0c;并完成独立总数的统计。 HyperLogLog 数据结构可用于仅使用少量恒定内存来计算集合中的唯一元素&#xff0c;具体而言&…

Linux高级管理-基于域名的虚拟Web主机搭建

客服机限制地址 通过 Require 配置项&#xff0c;可以根据主机的主机名或P地址来决定是否允许客户端访问。在httpd服 务器的主配置文件的<Location>&#xff0c;<Directory>、<Files>、<Limit>配置段中均可以使用Require 配置 项来控制客户端的访问。使…

Linux——web网站服务(一)

一、安装httpd服务器Apache网站服务 1、准备工作 为了避免发送端口冲突&#xff0c;程序冲突等现象&#xff0c;卸载使用rpm方式安装的httpd #使用命令检查是否下载了httpd [rootserver ~]# rpm -qa httpd #如果有则使用 [rootserver ~]# rpm -e httpd --nodeps Apache的配置…

c语言插入排序及希尔排序详解

目录 前言&#xff1a; 插入排序&#xff1a; 希尔排序&#xff1a; 前言&#xff1a; 排序在我们生活中无处不在&#xff0c;比如学生成就排名&#xff0c;商品价格排名等等&#xff0c;所以排序在数据结构的学习中尤为重要&#xff0c;今天就为大家介绍两个经典的排序算法&…

异常处理详解

异常概述 什么是异常&#xff1f; 异常是程序在“编译”或者“执行”的过程中可能出现的问题&#xff0c;注意&#xff1a;语法错误不算在异常体系中。 比如:数组索引越界、空指针异常、 日期格式化异常&#xff0c;等。 为什么要学习异常? 异常一旦出现了&#xff0c;如果…