【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

【通用消息通知服务】0x3 - 发送我们第一条消息

项目地址: A generic message notification system[Github]

实现接收/发送Websocket消息

Websocket Connection Pool

import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Anyimport async_timeout
import orjson
from sanic.log import logger
from ulid import ULIDfrom common.depend import DependencyPING = "#ping"
PONG = "#pong"class WebsocketConnectionPoolDependency(Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):def __init__(self, app) -> None:super().__init__(app)self.lock = asyncio.Lock()self.connections = {}	# 存储websocket connectionsself.send_queues = {}   # 各websocket发送队列self.recv_queues = {}   # 各websocket接收消息队列self.close_callbacks = {} # websocket销毁回调self.listeners = {} # 连接监听函数def _gen_id(self) -> str:return str(ULID())async def add_connection(self, connection) -> str:async with self.lock:id = self._gen_id()self.connections[id] = connectionself.send_queues[id] = Queue()self.app.add_task(self.send_task(self.send_queues[id], connection),name=f"websocket_{id}_send_task",)self.recv_queues[id] = Queue()self.app.add_task(self.recv_task(self.recv_queues[id], connection),name=f"websocket_{id}_recv_task",)self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")self.app.add_task(self.is_alive_task(id), name=f"websocket_{id}_is_alive_task")setattr(connection, "_id", id)return connection._iddef get_connection(self, connection_id: str):return self.connections.get(connection_id)async def add_listener(self, connection_id, handler) -> str:async with self.lock:id = self._gen_id()self.listeners.setdefault(connection_id, {}).update({id: handler})return idasync def remove_listener(self, connection_id, listener_id):async with self.lock:self.listeners.get(connection_id, {}).pop(listener_id, None)async def add_close_callback(self, connection_id, callback):async with self.lock:self.close_callbacks.setdefault(connection_id, []).append(callback)def is_alive(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idreturn connection_id in self.connectionsasync def remove_connection(self, connection: Any):if hasattr(connection, "_id"):connection_id = connection._idelse:connection_id = connectionif connection_id not in self.connections:# removed alreadyreturnasync with self.lock:logger.info(f"remove connection: {connection_id}")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_send_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_recv_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_notify_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")if connection_id in self.send_queues:del self.send_queues[connection_id]if connection_id in self.recv_queues:del self.recv_queues[connection_id]if connection_id in self.listeners:del self.listeners[connection_id]if connection_id in self.close_callbacks:await self.do_close_callbacks(connection_id)del self.close_callbacks[connection_id]if connection_id in self.connections:del self.connections[connection_id]async def do_close_callbacks(self, connection_id):for cb in self.close_callbacks.get(connection_id, []):self.app.add_task(cb(connection_id))async def prepare(self):self.is_prepared = Truelogger.info("dependency:WebsocketPool is prepared")return self.is_preparedasync def check(self):return Trueasync def send_task(self, queue, connection):while self.is_alive(connection):try:data = queue.get_nowait()except QueueEmpty:await asyncio.sleep(0)continuetry:if isinstance(data, (bytes, str, int)):await connection.send(data)else:await connection.send(orjson.dumps(data).decode())queue.task_done()except Exception as err:breakasync def recv_task(self, queue, connection):while self.is_alive(connection):try:data = await connection.recv()await queue.put(data)logger.info(f"recv message: {data} from connection: {connection._id}")except Exception as err:breakasync def notify_task(self, connection_id):while self.is_alive(connection_id):try:logger.info(f"notify connection: {connection_id}'s listeners")data = await self.recv_queues[connection_id].get()for listener in self.listeners.get(connection_id, {}).values():await listener(connection_id, data)except Exception as err:passasync def is_alive_task(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idget_pong = asyncio.Event()async def wait_pong(connection_id, data):if data != PONG:returnget_pong.set()while True:get_pong.clear()await self.send(connection_id, PING)listener_id = await self.add_listener(connection_id, wait_pong)with suppress(asyncio.TimeoutError):async with async_timeout.timeout(self.app.config.WEBSOCKET_PING_TIMEOUT):await get_pong.wait()await self.remove_listener(connection_id, listener_id)if get_pong.is_set():# this connection is closedawait asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)else:await self.remove_connection(connection_id)async def wait_closed(self, connection_id: str):"""if negative=True, only release when client close this connection."""while self.is_alive(connection_id):await asyncio.sleep(0)return Falseasync def send(self, connection_id: str, data: Any) -> bool:if not self.is_alive(connection_id):return Falseif connection_id not in self.send_queues:return Falseawait self.send_queues[connection_id].put(data)return True

Websocket Provider

from typing import Dict
from typing import List
from typing import Unionfrom pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import loggerfrom apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_appclass WebsocketMessageProviderModel(MessageProviderModel):class Info:name = "websocket"description = "Bio-Channel Communication"type = MessageProviderType.WEBSOCKETclass Capability:is_enabled = Truecan_send = Trueclass Message(BaseModel):connections: List[Union[EndpointTag, EndpointExID, str]]action: strpayload: Union[List, Dict, str, bytes]@field_serializer("connections")def serialize_connections(self, connections):return list(set(map(str, connections)))async def send(self, provider_id, message: Message) -> SendResult:app = get_app()websocket_pool = app.ctx.ws_poolsent_list = set()connections = []for connection in message.connections:if isinstance(connection, ETag):connections.extend([wfor c in await connection.decode()for w in c.get("websockets", [])])elif isinstance(connection, ExID):endpoint = await connection.decode()if endpoint:connections.extend(endpoint.get("websockets", []))else:connections.append(connection)connections = list(set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections)))# logger.info(f"sending websocket message to {connections}")for connection in connections:if await websocket_pool.send(connection, data=message.model_dump_json(exclude=["connections"])):sent_list.add(connection)if sent_list:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED)else:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.FAILED)

websocket接口


@app.websocket("/websocket")
async def handle_websocket(request, ws):from apps.endpoint.listeners import register_websocket_endpointfrom apps.endpoint.listeners import unregister_websocket_endpointcon_id = Nonetry:ctx = request.app.ctxcon_id = await ctx.ws_pool.add_connection(ws)logger.info(f"new connection connected -> {con_id}")await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)await ctx.ws_pool.send(con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}})await ctx.ws_pool.wait_closed(con_id) # 等待连接断开finally:# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))

结果截图

websocket connected

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/109665.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 版 spring cloud 工程系统管理 工程项目管理系统源码 工程项目各模块及其功能点清单

工程项目各模块及其功能点清单 一、系统管理 1、数据字典:实现对数据字典标签的增删改查操作 2、编码管理:实现对系统编码的增删改查操作 3、用户管理:管理和查看用户角色 4、菜单管理:实现对系统菜单的增删改查操…

C++ IO流

文章目录 一.C语言的输入与输出二.流是什么三.CIO流1.C标准IO流2.C文件IO流&#xff08;1&#xff09;文件操作步骤&#xff08;2&#xff09;以二进制的形式操作文件&#xff08;3&#xff09;以文本的形式操作文件&#xff08;4&#xff09;使用>>和<<对文件进行…

get请求报错400 非法参数

get请求报错400 非法参数 背景&#xff1a;get请求数据&#xff0c;SpringBoot提供接口&#xff0c;返回400&#xff0c;报错非法参数此种情况排除接口本身错误之外&#xff0c;检查参数中有没有特殊字符 " < > [ \ ] ^ { | } 我这边就是因为其中一个参数中有中括…

Spring MVC 四:Context层级

这一节我们来回答上篇文章中避而不谈的有关什么是RootApplicationContext的问题。 这就需要引入Spring MVC的有关Context Hierarchy的问题。Context Hierarchy意思就是Context层级&#xff0c;既然说到Context层级&#xff0c;说明在Spring MVC项目中&#xff0c;可能存在不止…

怎么建设ITIIL运维管理体系?

市场上大多数ITIL解决方案都过于复杂&#xff0c;让我们举一个客户希望实施ITIL方案的例子。首先&#xff0c;客户要通过ITIL咨询来定义ITIL流程&#xff0c;并使其与业务目标保持一致。接下来就是购买ITIL软件&#xff1b;大多数ITIL解决方案将事件、问题和变更管理作为不同的…

STM32CubeMX配置STM32G0 Standby模式停止IWDG(HAL库开发)

1.打开STM32CubeMX选择好对应的芯片&#xff0c;打开IWDG 2.打开串口1进行调试 3.配置好时钟 4.写好项目名称&#xff0c;选好开发环境&#xff0c;最后获取代码。 5.打开工程&#xff0c;点击魔术棒&#xff0c;勾选Use Micro LIB 6.修改main.c #include "main.h"…

【InsCode】InsCode打造的JavaSE与Linux命令互融的伪Linux文件系统小项目

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 &#x1f4d6;所属专栏&#xff1a;Ja…

浅谈 Linux 下 vim 的使用

Vim 是从 vi 发展出来的一个文本编辑器&#xff0c;其代码补全、编译及错误跳转等方便编程的功能特别丰富&#xff0c;在程序员中被广泛使用。 Vi 是老式的字处理器&#xff0c;功能虽然已经很齐全了&#xff0c;但还有可以进步的地方。Vim 可以说是程序开发者的一项很好用的工…

Redis三种特殊数据类型

Redis三种特殊数据类型 geospatial 地理位置 Redis 地理空间数据类型简介 Redis 地理空间索引允许您存储坐标并搜索它们。 此数据结构可用于查找给定半径或边界框内的邻近点。 基本命令 GEOADD 将位置添加到给定的地理空间索引&#xff08;请注意&#xff0c;使用此命令&a…

开始MySQL之路—— DDL语法、DML语法、DQL语法基本操作详解

DDL语法 DDL&#xff08;Data Definition Language&#xff09; 数据定义语言&#xff0c;该语言部分包括以下内容。 对数据库的常用操作 对表结构的常用操作 修改表结构 对数据库的常用操作 1: 查看当前所有的数据库 show databases; 2&#xff1a;创建数据库 create dat…

【项目实战典型案例】05.前后端分离的好处(发送调查问卷)

目录 一、背景二、思路三、过程1、主要的业务逻辑2、解决问题的思路 四、总结五、面向对象的好处 一、背景 以下流程图是给用户发送调查问的整体流程&#xff0c;将不必要的业务逻辑放到前端进行处理。这样导致逻辑混乱难以维护。前后端分离的其中一个目的是将功能的样式放在了…

数据结构 - 线性表的顺序存储

一、顺序存储定义&#xff1a; 把逻辑上相邻的数据元素存储在物理上相邻的存储单元中。简言之&#xff0c;逻辑上相邻&#xff0c;物理上也相邻顺序表中&#xff0c;任一元素可以随机存取&#xff08;优点&#xff09; 二、顺序表中元素存储位置的计算 三、顺序表在算法中的实…

RISC-V公测平台发布 · 在SG2042上配置Jupiter+Octave科学计算环境

简介 JupyterHub是一个开源的共享计算平台&#xff0c;它为每个用户管理一个单独的 Jupyter 环境&#xff0c; 可以用于学生班级、企业数据科学小组或科学研究小组。它是一个多用户中心&#xff0c;可以生成、管理和代理多个单用户Jupyter笔记本服务器的实例。 GNU Octave是一…

视频怎么变成动态gif图?一个方法轻松转换

怎么将视频转换成gif动态图片呢&#xff1f;大家在日常看电影、电视剧&#xff0c;刷短视频的时候想要将其做成gif表情包时&#xff0c;应该如何操作呢&#xff1f;这时候&#xff0c;给大家分享一款操作简单无需下载的视频gif转换&#xff08;https://www.gif.cn/&#xff09;…

Spring相关知识

0、Spring的核心就是AOP和IOC IOC&#xff1a; AOP&#xff1a;AOP&#xff08;Aspect Oriented Programming&#xff09;是面向切面编程&#xff0c;它是一种编程思想&#xff0c;是面向对象编程&#xff08;OOP&#xff09;的一种补充。面向对象编程将程序抽象成各个层次的…

华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)

第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案&#xff1a;D 解析&#xff1a; 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…

融媒行业落地客户旅程编排,详解数字化用户运营实战

移动互联网时代是流量红利的时代&#xff0c;企业常用低成本的方式进行获客&#xff0c;“增长黑客”的概念大范围传播。与此同时&#xff0c;机构媒体受到传播环境的影响&#xff0c;也开始启动全行业的媒体融合转型。在此背景下&#xff0c;2015 年神策数据成立&#xff0c;核…

Django(6)-django项目自动化测试

Django 应用的测试应该写在应用的 tests.py 文件里。测试系统会自动的在所有以 tests 开头的文件里寻找并执行测试代码。 我们的 polls 应用现在有一个小 bug 需要被修复&#xff1a;我们的要求是如果 Question 是在一天之内发布的&#xff0c; Question.was_published_recentl…

【VRTK4.0运动专题】轴移动AxisMove(真实身体的移动)

文章目录 1、概览2、释义3、属性设置 1、概览 2、释义 “竖直轴”控制的行为“水平轴”控制的行为1Vertical-Slide 滑动Horizontal-Slide 滑动2Vertical-Slide 滑动Horizontal-SmoothRotate 转动3Vertical-Slide 滑动Horizontal-SnapRotate 转动&#xff08;不连续&#xff09…

5G网关如何提升智慧乡村农业生产效率

得益于我国持续推进5G建设&#xff0c;截至今年5月&#xff0c;我国5G基站总数已达284.4万个&#xff0c;覆盖全国所有地级市、县城城区和9成以上的乡镇镇区&#xff0c;实现“镇镇通5G”&#xff0c;全面覆盖了从城市到农村的延伸。 依托5G网络的技术优势&#xff0c;智慧乡村…