SpringCloud源码探析(十)-Web消息推送

1.概述

消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息,发送至用户当前访问的网页或者移动设备。本文主要分析在web端进行消息推送的几种方式,实现用户在web端接收推送消息。

2.消息推送几种方式

web消息推送的方式主要分为两种:一种是主动向服务端请求(简单理解为客户端pull消息)、一种是服务端推送(简单理解为服务端push消息)。两种方式各有利弊:主动向服务端请求会按照一定周期不断去请求服务器,如果客户端数量庞大会对服务端造成极大压力,并且数据具有一定延时性;服务端推送实时性较好,但服务端需要存储客户端会话信息,如果客户端数量较多,服务端查询对应会话压力较大。

2.1 Pull消息

Pull消息主要是客户端发起的操作,定时向服务端进行轮询获取消息,轮询可分为短轮询和长轮询。

短轮询:指定时间间隔,由应用浏览器发送http请求,服务器实时返回消息至客户端,浏览器进行展示;短轮询在前端一般通过JS定时器定时发送请求来实现;
长轮询:是对短轮询的一种优化,客户端发起请求,服务器不会立即返回请求结果,而是将请求挂起等待一段时间,如果此时间段内数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间响应请求,客户端重新发起长连接;长轮询在nacos、Kafka、RocketMQ队列中使用较多。

2.2 Push消息

服务端向客户端推送,在一定程度上能节约一部分资源,常用的方式有WebSocket、SSE等,还有一些通过中间件RabbitMQ来实现等。本文主要介绍利用SSE方式和WebSocket方式来推送消息,具体如下:

2.2.1 SSE

SSE(Server-sent events)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的短轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。SSE的主要特点包括:

简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单;
单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据;
实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

SSE的整体实现思路如下,它的原理其实类似于在线视频播放,视频流会连续不断的推送到浏览器,图如下:
在这里插入图片描述
其实可以简单地理解为它是一种单向实时通信技术,一旦客户端与服务端建立连接,只能接收服务端信息,不能向服务端发送信息,且拥有自动重连机制,客户端与服务端断开会进行自动重连,websocket断开不能自动重连,这是SSE优于websocket的地方。
Springboot使用SSE功能发送信息代码如下,由于springboot内嵌sse模块,因此不需要引入额外包:

package com.eckey.lab.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @Author: Marin* @CreateTime: 2023-10-08  14:29* @Description: TODO* @Version: 1.0*/
@Slf4j
@RestController
@CrossOrigin //此注解是为了解决测试过程中的跨域问题
@RequestMapping("/sse")
public class SSEController {/*** 使用Map对象来存放userId和对应的会话*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** @description: 浏览器端注册,将会话信息存入Map,这种方式会导致一个userId只能与服务器建立一个会话,生产环境慎用这种方式* @author: Marin* @date: 2023/10/9 16:51* @param: [userId]* @return: org.springframework.web.servlet.mvc.method.annotation.SseEmitter**/@GetMapping(path = "/subscribe/{userId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmitter subscribe(@PathVariable("userId") String userId) throws IOException {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(30_000L);// 设置前端的重试时间为15s,如果不加这个发送一下,前端就不会显示连接成功sseEmitter.send("连接成功");// 注册回调sseEmitter.onCompletion(() -> {log.info("连接结束:{}", userId);});sseEmitter.onError((Throwable throwable) -> {log.error("连接异常:{}", throwable.getMessage());});sseEmitter.onTimeout(() -> {log.warn("连接超时:{}", userId);});//以userId为key,如果一个用户多个设备连接,会不准确sseEmitterMap.put(userId, sseEmitter);log.info("创建新的sse连接,当前用户:{}", userId);return sseEmitter;}/*** @description: 向指定用户发送指定信息* @author: Marin* @date: 2023/10/9 16:53* @param: [userId, message]* @return: void**/@GetMapping(path = "/sendMessage")public void sendMessage(String userId, String message) {if (sseEmitterMap.containsKey(userId)) {try {log.info("向用户:{},发送消息:{}", userId, message);sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户[{}]推送异常:{}", userId, e.getMessage());removeUser(userId);}}}/*** @description: 移除用户* @author: Marin* @date: 2023/10/9 16:53* @param: [userId]* @return: void**/private void removeUser(String userId) {sseEmitterMap.remove(userId);log.info("移除用户成功:{}", userId);}/*** @description: 删除与指定用户会话* @author: Marin* @date: 2023/10/9 16:54* @param: [userId]* @return: void**/@GetMapping("/close/{userId}")public void close(@PathVariable("userId") String userId) {removeUser(userId);log.info("关闭连接成功:{}", userId);}
}

前端测试代码如下:

<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>SseEmitter</title>
</head><body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户const userId = new Date().getTime();if (window.EventSource) {// 建立连接source = new EventSource('http://127.0.0.1:9090/sse/subscribe/' + userId);setMessageInnerHTML("连接用户=" + userId);source.addEventListener('open', function(e) {setMessageInnerHTML("建立连接。。。");}, false);source.addEventListener('message', function(e) {setMessageInnerHTML(e.data);});source.addEventListener('error', function(e) {if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else if (e.target.readyState === EventSource.CONNECTING) { console.log('Connecting...');}else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function() {closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://127.0.0.1:9090/sse/close/'+ userId, true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script></html>

打开前端页面,会出现连接信息,如下所示:
在这里插入图片描述
调用信息发送接口,跟据用户id发送指定消息,如下:
在这里插入图片描述
发送成功后前端接收并显示在页面上,如下:
在这里插入图片描述

2.2.2 WebSocket

WebSocket是一种用于实现实时双向通信的Web技术,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:

数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换;
连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信;
兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题;
适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。

WebSocket原理图如下所示:
在这里插入图片描述
Springboot整合websocket如下:
1.引入pom

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2.编写socket配置

package com.eckey.lab.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.servlet.ServletContext;
import javax.servlet.ServletException;/*** @Author: Marin* @CreateTime: 2023-10-08  16:26* @Description: TODO* @Version: 1.0*/
@Slf4j
@Configuration
public class WebSocketConfig implements ServletContextInitializer {/*** 这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket,如果你使用外置的tomcat就不需要该配置文件*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}@Overridepublic void onStartup(ServletContext servletContext) throws ServletException {String serverInfo = servletContext.getServerInfo();log.info("serverInfo:{}", serverInfo);}}

3.编写SocketServer代码

package com.eckey.lab.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;/*** @Author: Marin* @CreateTime: 2023-10-08  15:49* @Description: TODO* @Version: 1.0*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();// 用来存在线连接数private static final Map<String, Session> sessionPool = new HashMap<String, Session>();/*** 链接成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId") String userId) {try {this.session = session;webSockets.add(this);sessionPool.put(userId, session);log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());} catch (Exception e) {log.error("");}}/*** 收到客户端消息后调用的方法*/@OnMessagepublic void onMessage(String message) {log.info("websocket消息: 收到客户端消息:" + message);}/*** 此为单点消息*/public void sendOneMessage(String userId, String message) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("websocket发送单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}
}

4.编写controller代码

package com.eckey.lab.controller;import com.alibaba.fastjson.JSON;
import com.eckey.lab.config.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.io.IOException;
import java.util.HashMap;/*** @Author: Marin* @CreateTime: 2023-10-08  15:24* @Description: TODO* @Version: 1.0*/
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/socket")
public class WebSocketController {@Autowiredprivate WebSocketServer webSocketServer;@GetMapping(path = "/publish/{userId}")public String publish(@PathVariable("userId") String userId, String message) throws IOException {webSocketServer.sendOneMessage(userId, message);log.info("信息发送成功!userId:{},message:{}", userId, message);HashMap maps = new HashMap();maps.put("code", "0");maps.put("msg", "success");return JSON.toJSONString(maps);}}

5.测试
客户端发送消息,服务接收到消息,具体如下:
在这里插入图片描述
在这里插入图片描述
调用服务端接口发送消息,客户端接收到消息,具体如下:
在这里插入图片描述
在这里插入图片描述

3.小结

1.SSE方式是一种基于TCP协议的单向数据传输方式,当连接建立完成,只能由服务端向客户端发送信息;
2.WebSocket是一种双向通信技术,能够实现客户端和服务端的全双工通信,它在建立连接时使用HTTP协议,其它时候都是直接基于TCP协议进行通信;
3.在选择SSE或者WebSocket时,需要跟据场景、性能损耗进行综合考虑,合理的技术选型能够有效增强服务的健壮性。

4.参考文献

1.https://zhuanlan.zhihu.com/p/634581294
2.https://juejin.cn/post/7122014462181113887
3.https://javaguide.cn/system-design/web-real-time-message-push.html

5.附录

https://gitee.com/Marinc/nacos.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/154145.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

剑指offer——JZ84 二叉树中和为某一值的路径(三) 解题思路与具体代码【C++】

一、题目描述与要求 二叉树中和为某一值的路径(三)_牛客题霸_牛客网 (nowcoder.com) 题目描述 给定一个二叉树root和一个整数值 sum &#xff0c;求该树有多少路径的的节点值之和等于 sum 。 1.该题路径定义不需要从根节点开始&#xff0c;也不需要在叶子节点结束&#xff…

【已解决】msvcp140.dll丢失怎样修复?msvcp140.dll重新安装的解决方法

今天我要和大家分享的是关于msvcp140.dll丢失的五种不同解决方法。我们知道&#xff0c;在运行一些软件或游戏的时候&#xff0c;经常会遇到“msvcp140.dll丢失”的问题&#xff0c;这可能会影响到我们的使用体验。那么&#xff0c;面对这个问题&#xff0c;我们应该如何应对呢…

【2023研电赛】安谋科技企业命题特别奖:面向独居老人的智能居家监护系统

本文为2023年第十八届中国研究生电子设计竞赛安谋科技企业命题特别奖分享&#xff0c;参加极术社区的【有奖活动】分享2023研电赛作品扩大影响力&#xff0c;更有丰富电子礼品等你来领&#xff01;&#xff0c;分享2023研电赛作品扩大影响力&#xff0c;更有丰富电子礼品等你来…

Http请求响应 Ajax 过滤器

10/10/2023 近期总结&#xff1a; 最近学的后端部署&#xff0c;web服务器运行&#xff0c;各种请求响应&#xff0c;内容很多&#xff0c;学的很乱&#xff0c;还是需要好好整理&#xff0c;前面JavaSE内容还没有完全掌握&#xff0c;再加上一边刷题&#xff0c;感觉压力很大哈…

JavaScript Web APIs第五天笔记

Web APIs - 第5天笔记 目标&#xff1a; 能够利用JS操作浏览器,具备利用本地存储实现学生就业表的能力 BOM操作综合案例 js组成 JavaScript的组成 ECMAScript: 规定了js基础语法核心知识。比如&#xff1a;变量、分支语句、循环语句、对象等等 Web APIs : DOM 文档对象模型&…

IP协议总结

一、定义。 IP全称为Internet Protocol&#xff0c;是TCP/IP协议族中的一员&#xff0c;负责实现数据在网络上的传输。它是一种无连接、不可靠的数据报协议。 IP协议常用于Internet网络和局域网中&#xff0c;它通过将数据包进行分组并进行逐跳转发来实现数据在网络中的传输。…

攻防世界题目练习——Web引导模式(一)

题目目录 1. command_execution2.xff_referer3.simple_js4.php_rce5.Web_php_include6.upload17. warmup 难度1全部写过了&#xff0c;这个系列里没有 指路&#xff1a; 攻防世界题目练习——Web难度1&#xff08;一&#xff09; 攻防世界题目练习——Web难度1&#xff08;二&a…

【力扣2011】执行操作后的变量值

&#x1f451;专栏内容&#xff1a;力扣刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、题目描述二、题目分析 一、题目描述 题目链接&#xff1a;执行操作后的变量值 存在一种仅支持 4 种操作和 1 个变量 …

Python 无废话-办公自动化Excel格式美化

设置字体 在使用openpyxl 处理excel 设置格式&#xff0c;需要导入Font类&#xff0c;设置Font初始化参数&#xff0c;常见参数如下&#xff1a; 关键字参数 数据类型 描述 name 字符串 字体名称&#xff0c;如Calibri或Times New Roman size 整型 大小点数 bold …

mysql面试题34:Hash索引和B+树区别是什么?在设计索引怎么选择?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:Hash索引和B+树区别是什么?在设计索引怎么选择? 在MySQL中,Hash索引和B+树索引是两种常见的索引类型,他们有以下区别: 数据结构:Hash索引:…

【小沐学Python】Python实现Web图表功能(Dash)

文章目录 1、简介2、安装3、功能示例3.1 Hello World3.2 连接到数据3.3 可视化数据3.4 控件和回调3.5 设置应用的样式3.5.1 HTML and CSS3.5.2 Dash Design Kit (DDK)3.5.3 Dash Bootstrap Components3.5.4 Dash Mantine Components 4、更多示例4.1 Basic Dashboard4.2 Using C…

从0开始学go第六天

方法一&#xff1a;gin获取querystring参数 package main//querystring import ("net/http""github.com/gin-gonic/gin" )func main() {r : gin.Default()r.GET("/web", func(c *gin.Context) {//获取浏览器那边发请求携带的query String参数//…

基于 Mtcnn(人脸检测)+Hopenet(姿态检测)+拉普拉斯算子(模糊度检测) 的人脸检测服务

写在前面 工作原因&#xff0c;顺便整理博文内容为一个 人脸检测服务分享以打包 Docker 镜像&#xff0c;可以直接使用服务目前支持 http 方式该检测器主要适用低质量人脸图片处理理解不足小伙伴帮忙指正&#xff0c;多交流&#xff0c;相互学习 对每个人而言&#xff0c;真正的…

【Python】WebUI自动化—Selenium的下载和安装、基本用法、项目实战(16)

文章目录 一.介绍二.下载安装selenium三.安装浏览器驱动四.QuickStart—自动访问百度五.Selenium基本用法1.定位节点1.1.单个元素定位1.2.多个元素定位 2.控制浏览器2.1.设置浏览器窗口大小、位置2.2.浏览器前进、刷新、后退、关闭3.3.等待3.4.Frame3.5.多窗口3.6.元素定位不到…

为Yolov7环境安装Cuba匹配的Pytorch

1. 查看Cuba版本 方法一 nvidia-smi 找到CUDA Version 方法二 Nvidia Control Panel > 系统信息 > 组件 > 2. 安装Cuba匹配版本的PyTorch https://pytorch.org/get-started/locally/这里使用conda安装 conda install pytorch torchvision torchaudio pytorch-cu…

Oracle笔记-对ROWNUM的一次理解(简单分页)

此博文记录时间&#xff1a;2023-05-05&#xff0c;发到互联网上是2023-10-09 这个在分页里面用得比较多&#xff0c;在MySQL中&#xff0c;通常使用limit去操作&#xff0c;而去感觉比较简单&#xff0c;Oracle中无此关键字。 通过查阅资料后&#xff0c;要实现分页需要用到…

Lab 1: Unix utilities汇总

这个实验主要学习了常用的一些系统调用。 Lab 1: Unix utilities Boot xv6 (easy) git克隆&#xff0c;切换分支&#xff0c;qemu。根据要求进行操作即可。 $ git clone git://g.csail.mit.edu/xv6-labs-2020 $ cd xv6-labs-2020 $ git checkout util $ make qemusleep (ea…

idea将jar包deploy到本地仓库

1、pom.xml文件引入配置&#xff0c;如下参考&#xff1a; <distributionManagement><snapshotRepository><id>maven-snapshots</id><url>http://nexus1.coralglobal.cn/repository/maven-snapshots/</url></snapshotRepository><…

唐老师讲电赛

dc-dc电源布局要点

【C++】:日期类实现

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…