实现功能
服务端注册的客户端的列表;服务端向客户端发送广播消息;服务端向指定客户端发送消息;服务端向多个客户端发送消息;客户端给服务端发送消息;
效果:
环境
jdk:1.8
SpringBoot:2.4.17
服务端
1.引入依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.在启动类上加上开启WebSocket的注解
@EnableWebSocket
3.配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** date created : Created in 2024/3/18 16:57* description : WebSocketConfig 主要解决使用了@ServerEndpoint注解的websocket endpoint不被springboot扫描到的问题* class name : WebSocketConfig*/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
4.服务端实现
/*** date created : Created in 2024/3/18 16:31* description : 服务端实现,方法的封装* class name : WebSocketServer*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{applicationName}")
public class WebSocketServer {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;// 应用名称private String applicationName;//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();// 用来存在线连接用户信息private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();/*** 链接成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "applicationName") String applicationName) {try {this.session = session;this.applicationName = applicationName;webSockets.add(this);sessionPool.put(applicationName, session);log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());log.info("【当前客户端列表】:"+ sessionPool.keySet());} catch (Exception e) {}}/*** description : 有连接断开之后的处理方法* method name : onClose* param : []* return : void*/@OnClosepublic void onClose() {try {webSockets.remove(this);sessionPool.remove(this.applicationName);log.info("【websocket消息】连接断开,总数为:" + webSockets.size());log.info("【当前客户端列表】:"+ sessionPool.keySet());} catch (Exception e) {}}/*** description : 收到客户端消息的处理方法* method name : onMessage* param : [message]* return : void*/@OnMessagepublic void onMessage(String message) {log.info("【websocket消息】收到客户端消息:" + message);}/*** description : 错误处理* method name : onError* param : [session, error]* return : void*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("用户错误,原因:" + error.getMessage());error.printStackTrace();}/*** description : 广播消息 给所有注册的客户端发送消息* method name : sendBroadcastMessage* param : [message]* return : void*/public void sendBroadcastMessage(String message) {log.info("【websocket消息】广播消息:" + message);for (WebSocketServer webSocket : webSockets) {try {if (webSocket.session.isOpen()) {webSocket.session.getAsyncRemote().sendText(message);}} catch (Exception e) {e.printStackTrace();}}}/*** description : 给指定的客户端发送消息* method name : sendApplicationMessage* param : [applicationName 客户端的应用名称, message 要发送的消息]* return : void*/public void sendApplicationMessage(String applicationName, String message) {Session session = sessionPool.get(applicationName);if (session != null && session.isOpen()) {try {log.info("【websocket消息】 单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}/*** description : 给多个客户端发送消息* method name : sendMassApplicationMessage* param : [applicationNames 注册的客户端的应用名称, message 要发送的消息]* return : void*/public void sendMassApplicationMessage(String[] applicationNames, String message) {for (String userId : applicationNames) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("【websocket消息】 单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}}}
客户端
1.客户端配置
yaml文件的末尾添加
# websocket的配置
websocket:host: localhostport: 19022prefix: websocket
2.客户端配置类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;/*** date created : Created in 2024/3/19 14:36* description : 注入配置文件中的参数 并生成服务端的对应的url* class name : WebSocketProperties*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties(prefix = "websocket")
@Configuration
public class WebSocketProperties {@Value("${spring.application.name}")String appName;String host;String port;String prefix;public String getUrl() {return String.format("ws://%s:%s/%s/%s", host, port, prefix,appName);}
}
3.客户端实现
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;import javax.websocket.ClientEndpoint;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** date created : Created in 2024/3/18 16:36* description : 客户端接收服务端的实时消息、发送消息等方法的封装* class name : WebSocketClient*/@ClientEndpoint
@AutoConfigureBefore(WebSocketProperties.class)
@Component
@Import(WebSocketProperties.class)
@Configuration
public class WebSocketClient {private Session session;public WebSocketClient() {try {WebSocketProperties webSocketProperties = SpringUtils.getBean(WebSocketProperties.class);WebSocketContainer container = ContainerProvider.getWebSocketContainer();container.connectToServer(this, new URI(webSocketProperties.getUrl()));} catch (DeploymentException | URISyntaxException | IOException e) {e.printStackTrace();}}@OnOpenpublic void onOpen(Session session) {this.session = session;System.out.println("Connected to server");}@OnMessagepublic String onMessage(String message) {System.out.println("来自WebSocket的消息: " + message);return message;}@OnClosepublic void onClose() {System.out.println("Disconnected from server");}public void register() {try {session.getBasicRemote().sendText("register");System.out.println("Registered with server");} catch (IOException e) {e.printStackTrace();}}public void unregister() {try {session.getBasicRemote().sendText("unregister");System.out.println("Unregistered from server");} catch (IOException e) {e.printStackTrace();}}}
使用@Autowired注入配置类无法注入,使用工具类获取,工具类:
* Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;@Component
public class SpringUtils implements ApplicationContextAware {private static ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {context = applicationContext;}public static Object getBean(String name) {return context.getBean(name);}public static <T> T getBean(Class<T> clazz) {return context.getBean(clazz);}public static <T> T getBean(String name, Class<T> clazz) {return context.getBean(name, clazz);}
}