在springboot中实现WebSocket协议通信

前面介绍了使用netty实现websocket通信,有些时候,如果我们的服务并不复杂或者连接数并不高,单独搭建一个websocket服务端有些浪费资源,这时候我们就可以在web服务内提供简单的websocket连接支持。其实springboot已经支持了websocket通信协议,只需要几步简单的配置就可以实现。
老规矩,首先需要引入相关的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version>
</dependency>

springboot的配置文件application.yaml不需要额外内容,简单指定一下端口号和服务名称就可以了:

server:port: 8081shutdown: gracefulspring:application:name: test-ws

由于我这里使用了日志,简单配置一下日志文件logback-spring.xml输出内容:

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false"><contextName>api-logger-server</contextName><!-- 控制台 --><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern><charset>UTF-8</charset></encoder></appender><!--业务日志 文件--><appender name="msg" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${user.dir}/logs/msg.log</file><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern><charset>UTF-8</charset></encoder><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><FileNamePattern>${user.dir}/logs/msg.log.%d{yyyy-MM-dd}</FileNamePattern></rollingPolicy></appender><logger name="msg" level="ERROR" additivity="false"><appender-ref ref="msg"/></logger><!--收集除error级别以外的日志--><appender name="INFO" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>ERROR</level><onMatch>DENY</onMatch><onMismatch>ACCEPT</onMismatch></filter><encoder><pattern>%d|%t|%-5p|%c|%m%n</pattern><charset>UTF-8</charset></encoder><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><!--路径--><fileNamePattern>${user.dir}/logs/info/%d.%i.log</fileNamePattern><maxFileSize>100MB</maxFileSize><!--日志文件保留天数--><maxHistory>15</maxHistory><!--超过该大小,删除旧文件--><totalSizeCap>10GB</totalSizeCap></rollingPolicy></appender><appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>ERROR</level></filter><encoder><pattern>%d|%t|%-5p|%c|%m%n</pattern><charset>UTF-8</charset></encoder><!--滚动策略--><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><!--路径--><fileNamePattern>${user.dir}/logs/error/%d.%i.log</fileNamePattern><maxFileSize>100MB</maxFileSize><!--日志文件保留天数--><maxHistory>15</maxHistory><!--超过该大小,删除旧文件--><totalSizeCap>10GB</totalSizeCap></rollingPolicy></appender><root level="INFO"><appender-ref ref="console"/><appender-ref ref="INFO"/><appender-ref ref="ERROR"/></root>
</configuration>

本项目只是简单演示在springboot中使用websocket功能,所以没有涉及到复杂的业务逻辑,但还是需要定义一个用户服务类,用来存储用户身份信息和登录时的身份校验。

import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.concurrent.ConcurrentHashMap;/*** 用户服务类** @Author xingo* @Date 2023/11/22*/
@Slf4j
@Service
public class UserService {static final ConcurrentHashMap<String, User> USER_MAP = new ConcurrentHashMap<>();static final ConcurrentHashMap<String, String> TOKEN_MAP = new ConcurrentHashMap<>();/*** 启动时存入信息*/@PostConstructpublic void run() {User user1 = User.builder().userName("zhangsan").nickName("张三").build();User user2 = User.builder().userName("lisi").nickName("李四").build();// 用户信息集合USER_MAP.put(user1.getUserName(), user1);USER_MAP.put(user2.getUserName(), user2);// 模拟用户登录成功,将身份认证的token放入集合String random1 = "token_" + RandomStringUtils.random(18, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890");String random2 = "token_" + RandomStringUtils.random(18, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890");log.info("用户身份信息|{}|{}", user1.getUserName(), random1);log.info("用户身份信息|{}|{}", user2.getUserName(), random2);TOKEN_MAP.put(random1, user1.getUserName());TOKEN_MAP.put(random2, user2.getUserName());}/*** 根据用户名获取用户信息*/public User getUserByUserName(String userName) {return USER_MAP.get(userName);}/*** 校验token和用户是否匹配*/public boolean checkToken(String token, String userName) {return userName.equals(TOKEN_MAP.get(token));}/*** 用户信息实体类*/@Data@Builderpublic static final class User {private String userName;private String nickName;}
}

接下来就是websocket相关注入到容器中,首先需要注入的是ServerEndpointExporter,这个类用来扫描ServerEndpoint相关内容:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** 注入ServerEndpointExporter,用来扫描ServerEndpoint相关注解** @author xingo* @Date 2023/11/22*/
@Configuration
public class WebsocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

接下来需要再注入一个Bean,这个Bean需要添加ServerEndpoint注解,主要用来处理websocket连接。注意这个Bean是多例的,每个websocket连接都会新建一个实例。

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.example.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** websocket服务类* 连接ws服务这里要两个参数:userName 和 token* userName 用于用户身份标识* token    用于用户身份认证,用户每次登录进入系统都有可能不同** @author xingo* @Date 2023/11/22*/
@Slf4j
@Component
@ServerEndpoint("/{userName}/{token}")
public class WebSocketEndpoint {/*** 存放所有在线的客户端:键为用户名,值为用户的所有连接*/public static final Map<String, List<Session>> USER_SESSIONS = new ConcurrentHashMap<>();/*** 存放连接最近一次写数据的时间戳*/public static final Map<Session, Long> LAST_REQUEST_TIME = new ConcurrentHashMap<>();// ServerEndpoint 是多例的,需要设置为静态的类成员,否则程序运行会出错private static UserService userService;// 只能通过属性的set方法注入@Autowiredpublic void setUserService(UserService userService) {WebSocketEndpoint.userService = userService;}/*** 客户端连接* @param session*/@OnOpenpublic void onOpen(Session session, EndpointConfig config, @PathParam("userName") String userName, @PathParam("token") String token) {System.out.println("客户端连接|" + userName + "|" + token + "|" + session);System.out.println(this);System.out.println(userService);LAST_REQUEST_TIME.put(session, System.currentTimeMillis());if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(token)) {boolean flag = false;boolean check = userService.checkToken(token, userName);if(check) {UserService.User user = userService.getUserByUserName(userName);if(user != null) {if(!USER_SESSIONS.containsKey(userName)) {USER_SESSIONS.put(userName, new ArrayList<>());}USER_SESSIONS.get(userName).add(session);flag = true;}}if(flag) {session.getAsyncRemote().sendText("连接服务端成功");} else {session.getAsyncRemote().sendText("用户信息认证失败,连接服务端失败");}} else {session.getAsyncRemote().sendText("未获取到用户身份验证信息");}}/*** 客户端关闭* @param session session*/@OnClosepublic void onClose(Session session, CloseReason closeReason, @PathParam("userName") String userName, @PathParam("token") String token) {System.out.println("客户端断开|" + userName + "|" + token + "|" + session);if(StringUtils.isNotBlank(userName)) {USER_SESSIONS.get(userName).remove(session);LAST_REQUEST_TIME.remove(session);}LAST_REQUEST_TIME.remove(session);}/*** 发生错误* @param throwable e*/@OnErrorpublic void onError(Session session, Throwable throwable) {throwable.printStackTrace();}/*** 收到客户端发来消息* @param message  消息对象*/@OnMessagepublic void onMessage(Session session, String message, @PathParam("userName") String userName, @PathParam("token") String token) {log.info("接收到客户端消息|{}|{}|{}|{}", userName, token, session.getId(), message);LAST_REQUEST_TIME.put(session, System.currentTimeMillis());String resp = null;try {if("PING".equals(message)) {resp = "PONG";} else if("PONG".equals(message)) {log.info("客户端响应心跳|{}", session.getId());} else {resp = "服务端收到信息 : " + message;}} catch (Exception e) {e.printStackTrace();}if(resp != null) {sendMessage(userName, resp);}}/*** 发送消息* @param userName      用户名* @param data          数据体*/public static void sendMessage(String userName, String data) {List<Session> sessions = USER_SESSIONS.get(userName);if(sessions != null && !sessions.isEmpty()) {sessions.forEach(session -> session.getAsyncRemote().sendText(data));} else {log.error("客户端未连接|{}", userName);}}/*** 初始化方法执行标识*/public static final AtomicBoolean INIT_RUN = new AtomicBoolean(false);/*** 处理长时间没有与服务器进行通信的连接*/@PostConstructpublic void run() {if(INIT_RUN.compareAndSet(false, true)) {log.info("检查连接定时任务启动");ScheduledExecutorService service = Executors.newScheduledThreadPool(1);service.scheduleAtFixedRate(() -> {// 超时关闭时间:超过5分钟未更新时间long closeTimeout = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);// 心跳包时间:超过2分钟未更新时间long heartbeatTimeout = System.currentTimeMillis() - TimeUnit.MICROSECONDS.convert(2, TimeUnit.MINUTES);Iterator<Map.Entry<Session, Long>> iterator = LAST_REQUEST_TIME.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Session, Long> next = iterator.next();Session session = next.getKey();long lastTimestamp = next.getValue();if(lastTimestamp < closeTimeout) {    // 超时链接关闭log.error("关闭超时连接|{}", session.getId());try {session.close();iterator.remove();USER_SESSIONS.entrySet().forEach(entry -> entry.getValue().remove(session));} catch (IOException e) {e.printStackTrace();}} else if(lastTimestamp < heartbeatTimeout) {   // 发送心跳包log.info("发送心跳包|{}", session.getId());session.getAsyncRemote().sendText("PING");}}}, 5, 10, TimeUnit.SECONDS);}}
}

对于上面的Bean需要几点说明:

  1. 该Bean是多例的,每个websocket连接都会创建一个实例。在上面连接建立的方法里面输出当前实例对象的内容每个连接输出的内容都不同:
客户端连接|zhangsan|token_JTrFGlBW01gHxFZHFG|org.apache.tomcat.websocket.WsSession@7ef1b79f
org.example.websocket.WebSocketEndpoint@33141901
org.example.service.UserService@46db8a12
客户端断开|zhangsan|token_JTrFGlBW01gHxFZHFG|org.apache.tomcat.websocket.WsSession@7ef1b79f
客户端连接|zhangsan|token_JTrFGlBW01gHxFZHFG|org.apache.tomcat.websocket.WsSession@7116a4f3
org.example.websocket.WebSocketEndpoint@341424b5
org.example.service.UserService@46db8a12
客户端断开|zhangsan|token_JTrFGlBW01gHxFZHFG|org.apache.tomcat.websocket.WsSession@7116a4f3
客户端连接|zhangsan|token_JTrFGlBW01gHxFZHFG|org.apache.tomcat.websocket.WsSession@737a3e9b
org.example.websocket.WebSocketEndpoint@3678be90
org.example.service.UserService@46db8a12
  1. 在该类中注入其他的Bean要设置为静态属性,并且注入要通过set方法,否则注入失败,之前在项目中使用时就出现过这种问题,将属性定义为成员变量并且直接在属性上面添加@Autowired注解,导致该属性一直是null。
    比如我的UserService服务就是通过这种方式注入的:
private static UserService userService;@Autowired
public void setUserService(UserService userService) {WebSocketEndpoint.userService = userService;
}

上面几个类定义好后就实现了在springboot中使用websocket,添加启动类就可以进行前后通信:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 应用启动类* * @Author xingo* @Date 2023/11/22*/
@SpringBootApplication
public class WsApplication {public static void main(String[] args) {SpringApplication.run(WsApplication.class, args);}
}

为了方便测试,再添加一个controller用于接收消息并将消息转发到客户端:

import org.example.websocket.WebSocketEndpoint;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author xingo* @Date 2023/11/22*/
@RestController
public class MessageController {/*** 发送信息*/@GetMapping("/sendmessage")public String sendMessage(String userName, String message) {WebSocketEndpoint.sendMessage(userName, message);return "ok";}
}

测试服务是否正常。我这里选择使用postman进行测试,通过postman建立连接并发送消息。
postman测试连接和发送数据
连接建立成功,并且正常的发送和接收到数据。
下面测试一下通过http发送数据到服务端,服务端根据用户名查找到对应连接将消息转发到客户端。
这里是http发送的请求
这里是websocket客户端接收到的数据
这种模拟了服务端主动推送数据给客户端场景,实现了双向通信。

以上就是使用springboot搭建websocket的全部内容,发现还是非常简单,最主要的是可以与现有的项目实行完全融合,不需要做太多的改变。

上面这种方式只是单体服务简单的实现,对于稍微有一点规模的应用都会采用集群化部署,用一个nginx做反向代理后端搭配几个应用服务器组成集群模式,对于集群服务就会涉及到服务间通信的问题,需要将消息转发到用户正在连接的服务上面发送给客户端。后面会讲一下如何通过redis作为中心服务实现服务发现和请求转发的功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/202332.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python -opencv形态学操作

python -opencv形态学操作 1.服饰和膨胀 2.开运算和闭运算 3.礼帽运算和黑帽运算 1.服饰和膨胀 opencv 腐蚀通过cv2.erode实现&#xff0c;膨胀通过cv2.dilate实现&#xff0c;看一下下面代码&#xff1a; from ctypes.wintypes import SIZE from multiprocessing.pool i…

技术细分|推荐系统——推荐系统中的数据去偏方法

本篇的主要脉络同样依据中科大何向南教授、合工大汪萌教授联合在 TKDE 上的一篇综述文章展开&#xff1a;Bias and Debias in Recommender System: A Survey and Future Directions。 下面按照前导文章中介绍的数据偏差 Selection Bias、Conformity Bias、Exposure Bias、Posit…

跨境电商包装的可持续性:EPR的视角

跨境电商的崛起已经改变了我们购物的方式&#xff0c;使我们能够轻松购买来自世界各地的产品。然而&#xff0c;这种便捷也伴随着一个不容忽视的问题&#xff1a;包装和废物管理。 跨境电商平台通常需要在全球范围内运送产品&#xff0c;这意味着大量的包装材料和废弃物。在这…

【计算机网络学习之路】TCP socket编程

文章目录 前言一. 服务器1. 初始化服务器2. 启动服务器 二. 客户端三. 多进程服务器结束语 前言 本系列文章是计算机网络学习的笔记&#xff0c;欢迎大佬们阅读&#xff0c;纠错&#xff0c;分享相关知识。希望可以与你共同进步。 本篇博客基于UDP socket基础&#xff0c;介绍…

企业建数仓的第一步是选择一个好用的ETL工具

当企业决定建立数据仓库&#xff08;Data Warehouse&#xff09;&#xff0c;第一步就是选择一款优秀的ETL&#xff08;Extract, Transform, Load&#xff09;工具。数据仓库是企业数据管理的核心&#xff0c;它存储、整合并管理各种数据&#xff0c;为商业决策和数据分析提供支…

模电知识点总结(二)二极管

系列文章目录 文章目录 系列文章目录二极管二极管电路分析方法理想模型恒压降模型折线模型小信号模型高频/开关 二极管应用整流限幅/钳位开关齐纳二极管变容二极管肖特基二极管光电器件光电二极管发光二极管激光二极管太阳能电池 二极管 硅二极管&#xff1a;死区电压&#xf…

从零开始的c语言日记day36——指针进阶

一、什么是指针: 指针的概念:1.指针就是个变量&#xff0c;用来存放地址&#xff0c;地址唯一标识一块内存空间。 ⒉指针的大小是固定的4/8个字节(32位平台/64位平台)。 指针是有类型&#xff0c;指针的类型决定了指针的-整数的步长&#xff0c;指针解引用操作的时候的权限。…

RTS 客户端-服务器网络

Stone Monarch 从一开始就支持多人游戏&#xff0c;但随着时间的推移&#xff0c;网络模型经历了多次迭代。我最初基于这篇著名的帝国时代文章实现了点对点锁步模型。 点对点锁定步骤有一些众所周知的问题。点对点方面使玩家很难相互连接&#xff0c;并增加了每个新玩家的网络…

spring boot 热部署

相信小伙伴们在日常的开发中&#xff0c;调试代码时&#xff0c;免不了经常修改代码&#xff0c;这个时候&#xff0c;为了验证效果&#xff0c;必须要重启 Spring Boot 应用。 频繁地重启应用&#xff0c;导致开发效率降低&#xff0c;加班随之而来。有没有什么办法&#xff0…

UEC++ day8

伤害系统 给敌人创建血条 首先添加一个UI界面用来显示敌人血条设置背景图像为黑色半透明 填充颜色 给敌人类添加两种状态表示血量与最大血量&#xff0c;添加一个UWidegtComponet组件与UProgressBar组件 UPROPERTY(EditAnywhere, BlueprintReadWrite, Category "Enemy …

浏览器没收到返回,后端也没报错,php的json_encode问题bug

今天网站遇到个问题&#xff0c;后端返回异常&#xff0c;但是浏览器状态码200&#xff0c;但是看不到结果。经过排查发现&#xff0c;我们在返回结果的时候使用了json_encode返回给前端&#xff0c;结果里面的字符编码异常&#xff0c;导致json_encode异常&#xff0c;但是php…

禁止linux shell 终端显示完整工作路径,如何让linux bash终端不显示当前工作路径

在操作linux时&#xff0c;默认安装的linux终端会显示当前完整的工作目录&#xff0c;如果目录比较短还是可以接收&#xff0c;如果目录比较长&#xff0c;就显得比较别扭&#xff0c;操作起来不方便&#xff0c;因此需要关闭这种功能。 要关闭这个功能&#xff0c;请按如下步骤…

生命周期评估(LCA)与SimaPro碳足迹分析

SimaPro提供最新的科学方法和数据库以及丰富的数据&#xff0c;使您可以收集和评估产品和流程的环境绩效。通过这种方式&#xff0c;您可以将改变公司产品生命周期的想法提交给您的同事&#xff0c;以便阐明您的业务未来。 SimaPro软件的特点和功能&#xff1a; 完全控制产品生…

供应链和物流的自动化新时代

今天&#xff0c;当大多数人想到物流自动化时&#xff0c;他们会想到设备。机器人、无人机和自主卡车运输在大家的谈话中占主导地位。全自动化仓库的视频在网上流传&#xff0c;新闻主播们为就业问题绞尽脑汁。这种炒作是不完整的&#xff0c;它错过了供应链和物流公司的机会。…

基于安卓android微信小程序的刷题系统

项目介绍 面试刷题系统的开发过程中&#xff0c;采用B / S架构&#xff0c;主要使用jsp技术进行开发&#xff0c;中间件服务器是Tomcat服务器&#xff0c;使用Mysql数据库和Eclipse开发环境。该面试刷题系统包括会员、答题录入员和管理员。其主要功能包括管理员&#xff1a;个…

selenium 简单案例 <批量下载文件> <网页自动化点击上报>

一、批量下载文件 网页分析 点击跳转到下载页面 from selenium import webdriver import timedef get_link_list():# 创建浏览器对象driver webdriver.Chrome(executable_pathrC:\Users\nlp_1\Desktop\chromedriver\chromedriver-win32\chromedriver.exe)url https://www…

几款Java源码扫描工具(FindBugs、PMD、SonarQube、Fortify、WebInspect)

说明 有几个常用的Java源码扫描工具可以帮助您进行源代码分析和检查。以下是其中一些工具&#xff1a; FindBugs&#xff1a;FindBugs是一个静态分析工具&#xff0c;用于查找Java代码中的潜在缺陷和错误。它可以检测出空指针引用、资源未关闭、不良的代码实践等问题。FindBu…

从Github登录的双因子验证到基于时间戳的一次性密码:2FA、OTP与TOTP

Github于2023-03-09推出一项提高软件安全标准的措施&#xff0c;所有在Github上贡献过代码的开发人员在年底前必须完成 2FA&#xff08;Two-factory authentication&#xff0c;双因子认证&#xff09;。初听此事之时&#xff0c;不以为意&#xff0c;因为自己之前就知道双因子…

再探MDG cloud-ready模式!看未来MDG的发展路线

紧跟上一篇博客&#xff0c;我们将更加深入探讨一些MDG Cloud-Ready模式的相关内容。 背景 在2021年9月&#xff0c;Harald Kuck&#xff0c;SAP ABAP Platform老大&#xff0c;介绍了未来ABAP的发展路线&#xff0c;并最终在一年后正式推出了ABAP Cloud。他在会上是这么说的…

担忧CentOS停服?KeyarchOS系统来支撑

担忧CentOS停服&#xff1f;KeyarchOS系统来支撑 近年发生的“微软黑屏门”、“微软操作系统停更”等安全事件&#xff0c;敲响了我国 IT 产业的警钟&#xff0c;建立由我国主导的 IT 产业生态尤为迫切。对此&#xff0c;我国信息技术应用创新行业乘势而起&#xff0c;旨在通过…