消息推送只会用websocket、轮询?试试SSE,轻松高效。

SSE介绍

HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点:

单向通信:
SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。

持久连接:
SSE 使用 HTTP 持久连接(long-lived connection)来保持客户端与服务器之间的连接,避免频繁地重新建立连接。

事件驱动:
SSE 采用事件驱动的方式,服务器将数据封装成事件推送给客户端,客户端可以根据事件类型进行相应的处理。

简单易用:
SSE 的协议简单,基于标准的 HTTP 协议,可以在任何支持 HTTP 的环境中使用。
客户端和服务器端的实现也相对简单,开发成本较低。

可靠性:
SSE 基于 HTTP 协议,可以利用 HTTP 的重试机制来提高数据传输的可靠性。
如果连接断开,客户端可以自动重新连接并恢复数据传输。

浏览器支持:
主流浏览器(Chrome、Firefox、Safari 等)都原生支持 SSE。
对于不支持 SSE 的浏览器,可以使用 polyfill 库来实现兼容性。

应用场景:
SSE 适用于实时性要求较高的场景,如聊天应用、体育赛事直播、股票行情更新等。
与 WebSocket 相比,SSE 更加轻量级,适用于一些对实时性要求不太高但需要持续更新的场景。

总的来说,HTTP SSE 提供了一种简单、可靠、高效的服务器推送机制,可以在各种 Web 应用中得到广泛应用。它是 Web 实时通信技术的一种重要补充。

与websocket对比

HTTP Server-Sent Events (SSE) 和 WebSocket 都是实现服务器与客户端之间实时双向通信的技术,但它们在某些方面存在一些差异。以下是它们的对比:

  1. 通信模式:
    • SSE 是单向通信,服务器只能主动推送数据给客户端,客户端只能被动接收。
    • WebSocket 是双向通信,服务器和客户端可以互相发送和接收数据。
  2. 连接方式:
    • SSE 使用标准的 HTTP 连接,利用 HTTP 持久连接来保持连接。
    • WebSocket 使用独立的 WebSocket 协议,建立全双工的 TCP 连接。
  3. 传输协议:
    • SSE 使用标准的 HTTP 协议,数据以文本的形式传输。
    • WebSocket 使用自己的二进制协议,可以传输二进制数据。
  4. 浏览器支持:
    • SSE 被大多数现代浏览器原生支持。
    • WebSocket 也被大多数现代浏览器原生支持。
  5. 可靠性:
    • SSE 可以利用 HTTP 的重试机制来提高数据传输的可靠性。
    • WebSocket 建立在 TCP 协议之上,也具有较高的可靠性。
  6. 实时性:
    • SSE 的实时性略低于 WebSocket,因为它需要依赖 HTTP 的连接机制。
    • WebSocket 建立在独立的 TCP 连接之上,实时性更高。
  7. 应用场景:
    • SSE 更适合于一些实时性要求不太高但需要持续更新的场景,如聊天应用、体育赛事直播等。
    • WebSocket 更适合于需要实时双向通信的场景,如在线游戏、视频会议等。

总的来说,SSE 和 WebSocket 都是实现服务器与客户端实时通信的有效方式,它们各有优缺点,适用于不同的应用场景。在选择时需要根据具体的需求来权衡取舍。

上代码

主体工具类 SseUtil

import com.alibaba.fastjson.JSON;
import com.enums.EnumDeviceType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** SSE 通信工具类** @author Supreme_Sir* @version V1.0.0*/
@Component
@Slf4j
public class SseUtil {/*** SSE 超时时间 24小时*/private static final Long TIMEOUT_24_HOUR = 86400000L;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate UnreadMessageCountCacheUtil unreadMessageCountCacheUtil;/*** 订阅SSE*/public SseEmitter subscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter == null) {//生成连接并存储sseEmitter = new SseEmitter(TIMEOUT_24_HOUR);SingletonConcurrentHashMap.INSTANCE.put(deviceType, userId, sseEmitter);}//设置回调函数sseEmitter.onCompletion(completionCallBack(deviceType, userId));sseEmitter.onTimeout(timeoutCallBack(deviceType, userId));sseEmitter.onError(errorCallBack(deviceType, userId));// 立即发送未读消息数量,消除前端等待Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));log.info("用户-{}-{} SSE连接成功", userId, deviceType.getName());return sseEmitter;}/*** 退订消息** @param userId 用户ID*/public String unsubscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {log.info("用户-{}-{} 主动断开连接", userId, deviceType.getName());//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。sseEmitter.complete();SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);}return "退订成功";}/*** 发送SSE消息** @param userId  用户ID* @param content 消息内容*/public void sendMessage(Long userId, SseMessageVo content) {for (EnumDeviceType deviceType : EnumDeviceType.values()) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {try {log.info("向用户-{} SSE发送消息-{}", userId, JSON.toJSONString(content));sseEmitter.send(content);} catch (IOException e) {log.error("用户-{}-{} SSE发送消息异常-{}", userId, deviceType.getName(), e.getMessage());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);log.error("用户-{}-{} SSE发送消息异常被移除", userId, deviceType.getName());}}}}/*** SSE 单向通信心跳检测(需配合定时任务)*/public void heartbeat() {SingletonConcurrentHashMap.INSTANCE.getMap().forEach((key, value) -> {Long userId = extractNumbers(key.toString());Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));});}/*** SSE 连接成功回调** @param userId 用户ID*/private Runnable completionCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> log.info("用户-{}-{} SSE连接断开", userId, deviceType.getName()));}/*** 出现超时,将当前用户缓存删除** @param userId 用户ID*/private Runnable timeoutCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.error("用户-{}-{} SSE连接超时", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接超时被移除", userId, deviceType.getName());});}/*** 出现异常,将当前用户缓存删除** @param userId 用户ID*/private Consumer<Throwable> errorCallBack(EnumDeviceType deviceType, Long userId) {return throwable -> {log.error("用户-{}-{} SSE连接异常", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接异常被移除", userId, deviceType.getName());};}/*** 截取字符串中的数字** @param input 待截取的字符串*/private Long extractNumbers(String input) {Pattern pattern = Pattern.compile("[a-zA-Z](\\d+)");Matcher matcher = pattern.matcher(input);if (matcher.find()) {// 返回第一个匹配的数字序列return Long.valueOf(matcher.group(1));} else {// 如果没有找到匹配项,可以返回null或抛出异常return null;}}
}

要点:

  1. 新建好的 SSE 对象需要用容器存储起来,以服务于后续消息通信。
  2. 回调使用 ThreadPool 进行管理避免线程过多。
  3. 一个 SSE 对象只能与一端保持通信,如果存在多端的话,需要创建多个对象。

SSE对象单例存储容器 SingletonConcurrentHashMap

import com.enums.EnumDeviceType;
import lombok.Getter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;/*** 基于ConcurrentHashMap的单例版SSE存储容器*/
@Getter
public enum SingletonConcurrentHashMap {/*** 单例版存储容器*/INSTANCE;private final ConcurrentHashMap<Object, SseEmitter> map = new ConcurrentHashMap<>();/*** 存入对象*/public void put(EnumDeviceType deviceType, Object key, SseEmitter value) {map.put(deviceType.getCode() + key, value);}/*** 获取对象*/public SseEmitter get(EnumDeviceType deviceType, Object key) {return map.get(deviceType.getCode() + key);}/*** 判断缓存中是否存在当前用户的SSE实例** @param key 用户ID*/public boolean haveInstance(Object key) {// 分别查询PC、小程序的SSE实例for (EnumDeviceType deviceType : EnumDeviceType.values()) {if (map.get(deviceType.getCode() + key) != null) {return true;}}return false;}/*** 移除对象*/public void remove(EnumDeviceType deviceType, Object key) {map.remove(deviceType.getCode() + key);}/*** 判断是否存在*/public boolean containsKey(EnumDeviceType deviceType, Object key) {return map.containsKey(deviceType.getCode() + key);}/*** 获取对象数量*/public int size() {return map.size();}/*** 清空*/public void clear() {map.clear();}}

心跳数据缓存工具 UnreadMessageCountCacheUtil

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.redis.RedisTemplateUtils;
import com.enums.EnumYesOrNo;
import com.util.RedisKeyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** @author Supreme_Sir* @description 未读消息条数缓存工具**/
@Component
@Slf4j
public class UnreadMessageCountCacheUtil {@Resourceprivate IDao dao;// 过期时间30分钟private static final Long TIMEOUT = 30L;/*** 添加缓存*/private void put(Long key, Object value) {if (Objects.isNull(key) || Objects.isNull(value)) {return;}RedisTemplateUtils.setCacheObject(RedisKeyUtils.getUnreadMessageCount() + key, value, TIMEOUT, TimeUnit.MINUTES);}/*** 获取缓存(缓存中如果没有则回数据库查询)*/public Long getWithCallBack(Long key) {if (Objects.isNull(key)) {return null;}Object cnt = RedisTemplateUtils.getCacheObject(RedisKeyUtils.getUnreadMessageCount() + key);if (Objects.isNull(cnt)) {cnt = queryCount(key);put(key, cnt);}return Long.valueOf(cnt.toString());}/*** 获取最新缓存** @return {@link Long} 最新未读数据条数*/public Long getWithRefresh(Long key) {if (Objects.isNull(key)) {return null;}Long cnt = queryCount(key);put(key, cnt);return cnt;}/*** 手动刷新缓存*/public void refresh(Long key) {if (Objects.isNull(key)) {return;}put(key, queryCount(key));}/*** 回库查询未读消息条数** @param userId 用户ID* @return {@link Long} 未读消息数量*/private Long queryCount(Long userId) {QueryWrapper<> wrapper = new QueryWrapper<>();// 连接数据库查询数据return dao.selectCount(wrapper);}
}

注意:该缓存工具对象由 Spring 容器管理,以确保单例。

前端关键代码

import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
fetchEventSource(`${env.VITE_API_URL_PREFIX}/xxx/sse/xxx`, {signal: ctrl.signal,method: 'POST',headers: {'Auth-Token': localStorage.getItem(TOKEN_NAME),},body: JSON.stringify({UserID: localStorage.getItem('userID'),}),openWhenHidden: true,onopen: async (event: any) => {console.log('sse open:', event);},onmessage: async (event: any) => {const data = JSON.parse(event.data);this.setMsgCount(data.UnreadMsgCount || 0);console.log('SSE 消息:', data);if (data.Data) {const NotifyInstance = await NotifyPlugin.info({class: 'global-notify-card-wrap',icon: false,duration: 10000,closeBtn: false,offset: [0, 53],content: (h) =>h(MessageBox, {Data: data.Data,onHide: () => {NotifyInstance.close();},}),} as any);}},
});
this.see = {close: () => ctrl.abort(),
};

-------------------------------------------风雨里做个大人,阳光下做个孩子。-------------------------------------------

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387060.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLM推理优化——KV Cache篇(百倍提速)

LLM推理优化——KV Cache篇&#xff08;百倍提速&#xff09; 注意&#xff1a;KV Cache本质上是空间换时间的技术。与计算机组成原理中的cache不同&#xff0c;它不涉及访存优化。 不知道大家在用LLM的时候&#xff0c;有没有注意到一个问题&#xff1a;我们在输入我们的问题…

vscode搭建rust开发环境

由于rustrover不是免费的&#xff0c;此处教学搭建一套基于vscode的rust开发环境&#xff0c;可运行&#xff0c;可调式 1.下载vscode1.91.1 Download Visual Studio Code - Mac, Linux, Windows 2.下载插件 打开网站下载插件 rust-analyzer-0.4.2049、vscode-lldb-1.10.0、…

java使用hutool工具判断ip或者域名是否可用,java使用ping判断ip或者域名是否可用

1.导入hutool的maven依赖 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency>2.复制以下代码直接运行 import cn.hutool.core.net.NetUtil;public class …

论文解读:DiAD之SG网络

目录 一、SG网络功能介绍二、SG网络代码实现 一、SG网络功能介绍 DiAD论文最主要的创新点就是使用SG网络解决多类别异常检测中的语义信息丢失问题&#xff0c;那么它是怎么实现的保留原始图像语义信息的同时重建异常区域&#xff1f; 与稳定扩散去噪网络的连接&#xff1a; S…

将 magma example 改写成 cusolver example eqrf

1&#xff0c;简单安装Magma 1.1 下载编译 OpenBLAS $ git clone https://github.com/OpenMathLib/OpenBLAS.git $ cd OpenBLAS/ $ make -j DEBUG1 $ make install PREFIX/home/hipper/ex_magma/local_d/OpenBLAS/1.2 下载编译 magma $ git clone https://bitbucket.org/icl…

【Kubernetes】二进制部署k8s集群(中)之cni网络插件flannel和calico

&#xff01;&#xff01;&#xff01;继续上一篇实验部署&#xff01;&#xff01;&#xff01; 目录 一.k8s的三种网络模式 1.Pod 内容器与容器之间的通信 2.同一个 Node 内 Pod 之间的通信 3.不同 Node 上 Pod 之间的通信 二.k8s的三种接口 三.Flannel 网络插件 1.U…

美摄科技企业级视频拍摄与编辑SDK解决方案

在数字化浪潮汹涌的今天&#xff0c;视频已成为企业传递信息、塑造品牌、连接用户不可或缺的强大媒介。为了帮助企业轻松驾驭这一视觉盛宴的制作过程&#xff0c;美摄科技凭借其在影视级非编技术领域的深厚积累&#xff0c;推出了面向企业的专业视频拍摄与编辑SDK解决方案&…

每日OJ_牛客CM26 二进制插入

目录 牛客CM26 二进制插入 解析代码 牛客CM26 二进制插入 二进制插入_牛客题霸_牛客网 解析代码 m:1024&#xff1a;100000000 00 n:19 &#xff1a; 10011 要把n的二进制值插入m的第j位到第i位&#xff0c;只需要把n先左移j位&#xff0c;然后再进行或运算&#xff08;|&am…

高品质定制线缆知名智造品牌推荐-精工电联:高压线缆行业定制服务的领航者

定制线缆源头厂家推荐-精工电联&#xff1a;高压线缆行业定制服务的领航者 在当今这个高度信息化的社会&#xff0c;电力传输与分配系统的稳定运行至关重要。作为连接各个电力设备的纽带&#xff0c;高压线缆的质量直接关系到电力系统的安全性和稳定性。在定制高压线缆行业中&a…

android(安卓)最简单明了解释版本控制之MinSdkVersion、CompileSdkVersion、TargetSdkVersion

1、先明白几个概念 &#xff08;1&#xff09;平台版本&#xff08;Android SDK版本号&#xff09; 平台版本也就是我们平时说的安卓8、安卓9、安卓10 &#xff08;2&#xff09;API级别&#xff08;API Level&#xff09; Android 平台提供的框架 API 被称作“API 级别” …

Hugo 部署与自动更新(Git)

文章目录 Nginx部署Hugonginx.confhugo.conf Hugo自动更新Hugo自动更新流程添加访问令牌添加web hookrust实现自动更新接口 Nginx部署Hugo nginx.conf user nginx; worker_processes auto;error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;even…

C++STL简介(三)

目录 1.vector的模拟实现 1.1begin&#xff08;&#xff09; 1.2end&#xff08;&#xff09; 1.3打印信息 1.4 reserve&#xff08;&#xff09; 1.5 size&#xff08;&#xff09; 1.6 capacity&#xff08;&#xff09; 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…

合并K个有序链表

题目 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff0c;返回合并后的链表。 示例1&#xff1a; 输入&#xff1a; 输出&#xff1a; 示例2&#xff1a; 输入&#xff1a; 输出&#xff1a; 示例3&#xff1a; 输入&…

【音视频之SDL2】Windows配置SDL2项目模板

文章目录 前言 SDL2 简介核心功能 Windows配置SDL2项目模板下载SDL2编译好的文件VS配置SDL2 测试代码效果展示 总结 前言 在开发跨平台的音视频应用程序时&#xff0c;SDL2&#xff08;Simple DirectMedia Layer 2&#xff09;是一个备受欢迎的选择。SDL2 是一个开源库&#x…

自研Vue3开源Tree组件:节点拖拽bug修复

当dropType为after&#xff0c;且dropNode为父节点时&#xff0c;bug出现了&#xff1a; bug原因&#xff1a;插入扁平化列表的位置insertIndex计算的不对&#xff1a; 正确的逻辑&#xff0c;同inner要算上子孙节点所占的位置&#xff1a; bug修复&#xff01;

「数组」实现动态数组的功能(C++)

概述 动态数组&#xff0c;顾名思议即可变长度的数组。数组这种数据结构的实现是在栈空间或堆空间申请一段连续的可操作区域。 实现可变长度的动态数组结构&#xff0c;应该有以下操作&#xff1a;申请一段足够长的空间&#xff0c;如果数据的存入导致空间已满&#xff0c;则…

PackagesNotFoundError 错误表明 conda 在当前使用的镜像源中找不到 contourpy 版本 1.2.1。以下是可能的解决方法:

PackagesNotFoundError 错误表明 conda 在当前使用的镜像源中找不到 contourpy 版本 1.2.1。以下是可能的解决方法&#xff1a; PackagesNotFoundError 错误表明 conda 在当前使用的镜像源中找不到 contourpy 版本 1.2.1。以下是可能的解决方法&#xff1a; 1. 更换镜像源 虽…

rust 桌面 sip 软电话(基于tauri 、pjsip库)

本文尝试下rust 的tauri 桌面运用 原因在于体积小 1、pjsip 提供了rust 接口官方的 rust demo 没编译出来 在git找了个sip-phone-rs-master https://github.com/Charles-Schleich/sip-phone-rs 可以自己编译下pjsip lib库替换该项目的lib 2、创建一个tauri demo 引用 [depe…

计算机毕业设计选题推荐-某炼油厂盲板管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

实战:Zookeeper 简介和单点部署ZooKeeper

Zookeeper 简介 ZooKeeper是一个开源的分布式协调服务&#xff0c;它是Apache软件基金会下的一个项目&#xff0c;旨在解决分布式系统中的协调和管理问题。以下是ZooKeeper的详细简介&#xff1a; 一、基本定义 ZooKeeper是一个分布式的、开放源码的分布式应用程序协调服务&a…