SpringBoot SseEmitter,服务器单项消息推送

防止推送消息乱码

import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.nio.charset.StandardCharsets;/*** @Description 防止中文乱码* @Author WangKun* @Date 2024/7/30 11:22* @Version*/
public class SseUTF8 extends SseEmitter {public SseUTF8(Long timeout) {super(timeout);}@Overrideprotected void extendResponse(@NotNull ServerHttpResponse outputMessage) {super.extendResponse(outputMessage);HttpHeaders headers = outputMessage.getHeaders();headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));}}
SseEmitter工具类
import com.harmonywisdom.enums.ResultCode;
import com.harmonywisdom.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.io.IOException;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @Description SSE消息推送工具* @Author WangKun* @Date 2024/7/29 14:55* @Version*/
@Component
@Slf4j
public class SseUtils {/*** 容器*/private static final ConcurrentHashMap<String, SseUTF8> SSE_MAP_CACHE = new ConcurrentHashMap<>(0);/*** 默认时长不过期(默认30s)*/private static final long DEFAULT_TIMEOUT = 0L;/*** @param userId* @Description 创建连接* @Throws* @Return SseUTF8* @Date 2024-07-29 15:01:58* @Author WangKun**/public static SseUTF8 createConnect(String userId) {SseUTF8 sseEmitter = new SseUTF8(DEFAULT_TIMEOUT);// 需要给客户端推送IDif (SSE_MAP_CACHE.containsKey(userId)) {remove(userId);}// 长链接完成后回调接口(关闭连接时调用)sseEmitter.onCompletion(() -> {log.info("SSE连接结束:{}", userId);remove(userId);});// 连接超时回调sseEmitter.onTimeout(() -> {log.error("SSE连接超时:{}", userId);remove(userId);});// 连接异常时,回调方法sseEmitter.onError(throwable -> {try {log.info("SSE{}连接异常,{}", userId, throwable.toString());sseEmitter.send(SseUTF8.event().id(userId).name("发生异常!").data("发生异常重试!").reconnectTime(3000));SSE_MAP_CACHE.put(userId, sseEmitter);} catch (IOException e) {log.error("用户--->{} SSE连接失败重试,异常信息--->{}", userId, e.getMessage());e.printStackTrace();}});SSE_MAP_CACHE.put(userId, sseEmitter);try {// 注册成功返回用户信息sseEmitter.send(SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(userId, MediaType.APPLICATION_JSON));} catch (IOException e) {log.error("用户--->{} SSE连接失败,异常信息--->{}", userId, e.getMessage());}return sseEmitter;}/*** @param userId* @Description 移除用户连接* @Throws* @Return void* @Date 2024-07-29 15:07:03* @Author WangKun**/private static void remove(String userId) {SSE_MAP_CACHE.remove(userId);log.info("SSE移除用户连接--->{} ", userId);}/*** @param userId* @Description 关闭连接* @Throws* @Return void* @Date 2024-07-29 15:38:16* @Author WangKun**/public static void closeConnect(String userId) {SseUTF8 sseEmitter = SSE_MAP_CACHE.get(userId);if (sseEmitter != null) {sseEmitter.complete();log.info("SSE关闭连接:{}", userId);remove(userId);}}/*** @param userId* @param message* @param sseEmitter* @Description 推送消息到客户端* @Throws* @Return void* @Date 2024-07-29 15:48:19* @Author WangKun**/private static boolean sendMsgToClient(String userId, String message, SseUTF8 sseEmitter) {// 推送之前检测心态是否存在boolean isAlive = checkSseConnectAlive(sseEmitter);if (!isAlive) {// 失去连接移除log.error("SSE推送消息失败:客户端{}未创建长链接或者关闭,失败消息:{}", userId, message);SSE_MAP_CACHE.remove(userId);return false;}SseUTF8.SseEventBuilder sendData = SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(message, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);return true;} catch (IOException e) {log.error("推送消息失败:{}", message);}return true;}/*** @param sseEmitter* @Description 检测连接心跳* @Throws* @Return boolean* @Date 2024-07-30 17:27:32* @Author WangKun**/public static boolean checkSseConnectAlive(SseUTF8 sseEmitter) {if (sseEmitter == null) {return false;}// 返回true代表还连接, 返回false代表失去连接return !(Boolean) getField(sseEmitter, sseEmitter.getClass(), "sendFailed") &&!(Boolean) getField(sseEmitter, sseEmitter.getClass(), "complete");}/*** @param obj* @param clazz* @param fieldName* @Description 反射获取 sendFailed complete* @Throws* @Return java.lang.Object* @Date 2024-07-30 17:27:49* @Author WangKun**/public static Object getField(Object obj, Class<?> clazz, String fieldName) {for (; clazz != Object.class; clazz = clazz.getSuperclass()) {try {Field field;field = clazz.getDeclaredField(fieldName);field.setAccessible(true);return field.get(obj);} catch (Exception ignored) {}}return null;}/*** @param msg* @Description 发送消息给所有客户端* @Throws* @Return void* @Date 2024-07-29 15:48:40* @Author WangKun**/public static boolean sendTextMessage(String msg) {if (SSE_MAP_CACHE.isEmpty()) {return false;}if (StringUtils.isEmpty(msg) || StringUtils.isBlank(msg)) {return false;}boolean isSuccess = false;for (Map.Entry<String, SseUTF8> entry : SSE_MAP_CACHE.entrySet()) {isSuccess = sendMsgToClient(entry.getKey(), msg, entry.getValue());if (!isSuccess) {log.error("群发客户端{}消息推送,失败消息:{}", entry.getKey(), msg);}}return isSuccess;}/*** @param clientId* @param msg* @Description 给指定客户端发送消息* @Throws* @Return Boolean* @Date 2024-07-29 15:51:30* @Author WangKun**/public static boolean sendTextMessage(String clientId, String msg) {return sendMsgToClient(clientId, msg, SSE_MAP_CACHE.get(clientId));}/*** @Description 检测客户端心跳(连接状态,给客户端发送信息,如果sendFailed,complete返回false 移除客户端,说明客户端关闭)* @param* @Throws* @Return void* @Date 2024-07-31 16:22:25* @Author WangKun**/@Async("threadPoolExecutor")@Scheduled(cron = "0 0/15 * * * ?")public void checkSseAlive() {log.info("检测客户端连接状态");sendTextMessage("LIVE");}}

使用方法

/*** @Description 测试sse* @Author WangKun* @Date 2024/7/29 15:56* @Version*/
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class TestSSEController {@Log("测试SSE消息连接")@AnonymousPostMapping(value = "/connect")public SseUTF8 connect(@RequestParam String userId) {return SseUtils.createConnect(userId);}@Log("测试SSE消息推送")@AnonymousPostMapping(value = "/send")public ResponseResult<Boolean> send(@RequestParam String userId, @RequestParam String param) {boolean flag = SseUtils.sendTextMessage(userId, param);if (!flag) {return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());}return ResponseResult.success(true, ResultCode.OK.getCode());}@Log("测试SSE所有客户端消息推送")@AnonymousPostMapping(value = "/sendAll")public ResponseResult<Boolean> sendAll(@RequestParam String param) {boolean flag = SseUtils.sendTextMessage(param);if (!flag) {return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());}return ResponseResult.success(true, ResultCode.OK.getCode());}@Log("测试SSE消息关闭")@AnonymousPostMapping(value = "/close")public ResponseResult<String> close(@RequestParam String userId) {SseUtils.closeConnect(userId);return ResponseResult.success(ResultCode.OK.getCode());}
}

简要说明

1:

推送消息之前,检测推送得到客户端是否存在,不存在,直接移除,避免浪费。

前端关闭浏览器或者关闭界面要调用关闭接口,将其关闭。

结果:

连接

推送:

给admin的客户端推送

给全部客户端推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/388865.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日OJ_牛客HJ74 参数解析

目录 牛客HJ74 参数解析 解析代码1 解析代码2 牛客HJ74 参数解析 参数解析_牛客题霸_牛客网 解析代码1 本题通过以空格和双引号为间隔&#xff0c;统计参数个数。对于双引号&#xff0c;通过添加flag&#xff0c;保证双引号中的空格被输出。 #include <iostream> #i…

Ubuntu20.04安装Angular CLI

一、更换apt-get源 使用原来的apt-get源有几个包报错&#xff0c;下不下来 更换阿里源&#xff08;阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区&#xff09;&#xff0c;使用网站中的内容&#xff0c;在 apt-get update 时总是报错 改用清华源&#xff1a; deb http:/…

学习日记:二维数组

目录 1. 定义 2. 初始化 3. 数组元素的引用 4. 二维字符型数组 4.1 初始化 1. 定义 C语言中并不存在真正的二维数组&#xff1b; 二维数组的本质&#xff1a;一维数组类型的一维数组。 二维数组数据存储时按行优先存储。 语法&#xff1a; 类型说明符 数组名 [常量表达…

java单链表;双向链表;双向循环链表——简单应用

一、链表(Linked List)介绍 链表是有序的列表&#xff0c;但是它在内存中是存储如下 链表是以节点的方式来存储,是链式存储每个节点包含 data 域&#xff0c; next 域&#xff1a;指向下一个节点.如图&#xff1a;发现链表的各个节点不一定是连续存储.链表分带头节点的链表和没…

LLM实战系列(1)—强强联合Langchain-Vicuna应用实战

背景 本文主要介绍一下&#xff0c;基于Langchain与Vicuna-13B的外挂OceanBase知识库项目实战以及QA使用&#xff0c;项目地址: github.com/csunny/DB-G… 在开始之前&#xff0c;我们还是先看看效果&#xff5e; 自Meta发布LLaMA大模型以来&#xff0c; 围绕LLaMA微调的模型…

基于PHP+MySQL组合开发的微信活动投票小程序源码系统 带完整的安装代码包以及搭建部署教程

系统概述 在当今数字化时代&#xff0c;微信作为社交媒体的巨头&#xff0c;为企业和个人提供了丰富的互动营销平台。其中&#xff0c;投票活动作为一种有效的用户参与和互动方式&#xff0c;被广泛应用于各种场景。为了满足这一需求&#xff0c;我们推出了一款基于PHPMySQL组…

W1R3S靶机全通详细教程

文章目录 w1r3s主机发现主机扫描 端口扫描tcp端口扫描UDP扫描漏洞扫描 攻击面分析FTP渗透匿名登录 web渗透目录爆破 cuppa cms文件包含漏洞getshell提权 w1r3s 引言 近些日子看红笔大佬的靶机精讲视频时&#xff0c;他的一句话让我感受颇深&#xff0c;很多视频在讲解时&…

数据结构:线性表(下)

那么这篇就来总结一下栈和队列 一、栈 栈 (Stack) 只允许在有序的线性数据集合的一端&#xff08;称为栈顶 top&#xff09;进行加入数据&#xff08;push&#xff09;和移除数据&#xff08;pop&#xff09;。因而按照 后进先出&#xff08;LIFO, Last In First Out&#xf…

好用的抠图小技巧

在ps里的抠图方法 方法一&#xff1a;直接在菜单栏里选择主体&#xff0c;选中主体后会出现蚂蚁线&#xff0c;这个时候可能选区还不够完整&#xff0c;需要借助快速选择工具细化选取&#xff0c;选好之后按ctrlj复制选区就抠好啦 方法二&#xff1a;用快速选择工具直接选取人…

浏览器指纹技术:如何更改浏览器指纹?

“指纹信息”是一个人独有的身份象征&#xff0c;而“浏览器指纹”&#xff0c;就是网站和在线平台使用浏览器指纹来收集有关您的浏览器、设备和网络的详细信息&#xff0c;它可以说是你上网的身份象征&#xff0c;可让网站跟踪您的在线行为。 下面我们简单科普浏览器指纹的工…

【Python体验】第五天:目录搜索、数据爬虫(评论区里写作业)

文章目录 目录搜索 os、shutil库数据爬虫 request、re作业&#xff1a;爬取案例的top250电影的关键信息&#xff08;名称、类型、日期&#xff09;&#xff0c;并保存在表格中 目录搜索 os、shutil库 os 模块提供了非常丰富的方法用来处理文件和目录。 os.listdir(path)&#x…

C语言| 文件操作详解(二)

目录 四、有关文件的随机读写函数 4.1 fseek 4.2 ftell 4.3 rewind 五、判定文件读取结束的标准与读写文件中途发生错误的解决办法 5.1 判定文件读取结束的标准 5.2 函数ferror与feof 5.2.1 函数ferror 5.2.2 函数feof 在上一章中&#xff0c;我们主要介绍了文件类型…

MySQL:管理和操作数据表

数据表是数据库的重要组成部分&#xff0c;每一个数据库都是由若干个数据表组成的。没有数据表就无法在数据库中存放数据。MySQL数据表的管理和操作是数据库管理员和开发人员日常工作中不可或缺的一部分。 创建数据表 CREATE 创建数据表的过程是规定数据列的属性的过程&#…

网工内推 | 云运维工程师,最高19K,五险一金加补充医疗险

01 云计算运维工程师 &#x1f537;岗位职责 1、负责客户云计算解决方案的运维&#xff0c;负责云计算解决方案中云、虚拟化工作&#xff1b; 2、负责客户现场H3C产品的日常问题处理、变更维护、巡检、版本升级等工作&#xff0c;保障客户网络的稳定运行&#xff1b; 3、协调…

揭秘智能工牌:如何成为房企销售团队的数字化转型加速器

在这个竞争激烈的市场环境中&#xff0c;房企想要脱颖而出&#xff0c;不仅需要优质的产品和服务&#xff0c;更需要高效的销售团队。而销售团队的能力提升&#xff0c;离不开精细化管理和科技的赋能。DuDuTalk智能语音工牌&#xff0c;正是这样一款融合了AI技术与销售实战智慧…

Python中的yieId,比return更高效!

本文旨在深入探索"yield"的基本原理和实际应用&#xff0c;帮助你理解为什么它在Python编程中如此重要。 一、深入理解Yield "yield"与常用的"return"有本质的区别。"yield"不是真正返回一个值并退出函数&#xff0c;而是暂停函数执行…

springboot报错

springboot报错&#xff1a;g.yaml.snakeyaml.error.YAMLException: java.nio.charset.MalformedInputException: Input length 1 解决办法&#xff1a; file->settings 搜索encoding 然后选择File encodings 也可以直接找 File encodings 全部都更改整utf-8&#xff…

8.1IO进程线程

笔记 进程 一.多进程引入 1.1引入目的 程序员写程序时&#xff0c;一个程序可能由多个任务组成&#xff0c;如果使用的是单进程&#xff0c;或单任务&#xff0c;那么该任务执行阻塞时&#xff0c;其他任务就无法执行&#xff0c;必须等到该任务解除阻塞后&#xff0c;才能…