WebSocket是什么
WebSocket 是一种用于在客户端和服务器之间建立双向通信的协议,它能实现实时、持久的连接。与传统的 HTTP 请求响应模式不同,WebSocket 在建立连接后允许客户端和服务器之间相互发送消息,直到连接关闭。由于 WebSocket 具有低延迟、双向通信和高效的特点,因此适用于多种实时应用场景。
源码在下面
相关依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
websocket必须配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket 配置*/
@Configuration
public class WebSocketConfig {/*** ServerEndpointExporter 作用** 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
一、入门例子
代码demo
import jakarta.websocket.*;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @Description:* @Author: sh* @Date: 2024/12/15 00:06*/
@Component
@ServerEndpoint("/websocket/WEBSOCKET_MSG_TOPIC")
public class WebSocketServer {@OnOpenpublic void onOpen(Session session) {System.out.println("客户端已连接: " + session.getId());}@OnMessagepublic void onMessage(Session session, String message) {System.out.println("收到消息: " + message);try {session.getBasicRemote().sendText("消息已收到: " + message);} catch (IOException e) {e.printStackTrace();}}@OnClosepublic void onClose(Session session) {System.out.println("客户端已关闭: " + session.getId());}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("发生错误: " + throwable.getMessage());}
}
测试demo
import com.alipay.api.java_websocket.client.WebSocketClient;
import com.alipay.api.java_websocket.handshake.ServerHandshake;import java.net.URI;
import java.net.URISyntaxException;/*** @Description:* @Author: sh* @Date: 2024/12/15 00:04*/
public class WebSocketClientExample {public static void main(String[] args) throws URISyntaxException {WebSocketClient client = new WebSocketClient(new URI("ws://localhost:8080/websocket/WEBSOCKET_MSG_TOPIC")) {@Overridepublic void onOpen(ServerHandshake handshakedata) {System.out.println("连接已打开");send("Hello, Server!"); // 发送消息到 WebSocket 服务器}@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);}@Overridepublic void onClose(int code, String reason, boolean remote) {System.out.println("连接关闭: " + reason);}@Overridepublic void onError(Exception ex) {System.out.println("发生错误: " + ex.getMessage());}};client.connect();}
}
二、进阶与redis结合
进阶demo
直接从redis中获取数据通过订阅从redis中获取数据
import com.macro.mall.websocket.listener.WebSocketSubscribeListener;
import jakarta.annotation.Resource;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;/*** @Description:* @Author: sh* @Date: 2024/12/13 15:38*/
@Component
//@ServerEndpoint(value="/websocket/WEBSOCKET_MSG_TOPIC", decoders = MyMessageDecoder.class)
@ServerEndpoint(value="/websocket/WEBSOCKET_MSG_TOPIC")
public class JavaDemo {/*** 日志对象*/private static Logger logger = LoggerFactory.getLogger(JavaDemo.class);/*** redis消息监听者容器,此处不好直接注入*/private static RedisMessageListenerContainer redisMessageListenerContainer;private static RedisTemplate redisTemplate;@Resourcepublic void setRedisMessageListenerContainer(RedisMessageListenerContainer redisMessageListenerContainer) {JavaDemo.redisMessageListenerContainer = redisMessageListenerContainer;}@Resourcepublic void setRedisTemplate(RedisTemplate redisTemplate) {JavaDemo.redisTemplate = redisTemplate;}/*** concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识*/private static CopyOnWriteArraySet<JavaDemo> webSocketSet = new CopyOnWriteArraySet<>();/*** websocket订阅监听器*/private WebSocketSubscribeListener subscribeListener;@OnOpenpublic void onOpen(Session session, EndpointConfig config) {webSocketSet.add(this);subscribeListener = new WebSocketSubscribeListener();subscribeListener.setSession(session);// 设置订阅topicredisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("WEBSOCKET_MSG_TOPIC"));}@OnMessagepublic void onMessage(String message, Session session) {logger.debug("get msg from websocket client: {}", message);//1获取redis数据String result = (String) redisTemplate.opsForValue().get(message);//2.订阅获取redis数据
// Object result = null;
// // 处理消息并准备发送给前端
// if ("WEBSOCKET_MSG_TOPIC".equals(new String(message.getChannel()))) {
// String responseMessage = "服务器收到的消息: " + new String(message.getBody());
//
// result = redisTemplate.opsForValue().get(new String(message.getBody()));
// }// 使用 Session 发送消息回客户端try {session.getBasicRemote().sendText(result.toString());} catch (IOException e) {logger.error("发送消息失败: {}", e.getMessage());}}@OnClosepublic void onClose(Session session) {// 移除session对象webSocketSet.remove(this);// 移除订阅对象redisMessageListenerContainer.removeMessageListener(subscribeListener);}@OnErrorpublic void onError(Session session, Throwable error) {}
}
redis配置
@Configuration
public class RedisConfig extends BaseRedisConfig {@Beanpublic RedisConnectionFactory redisConnectionFactory() {return new LettuceConnectionFactory();}// 配置 Redis 消息监听容器@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,MessageListener subscribeListener, // 注意这里是 inject 消息监听器ChannelTopic channelTopic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);container.addMessageListener(subscribeListener, channelTopic); // 订阅监听器return container;}// 配置 ChannelTopic@Beanpublic ChannelTopic channelTopic() {return new ChannelTopic("WEBSOCKET_MSG_TOPIC"); // 你可以更改为你实际需要的频道名}// 配置消息监听器(假设你的 subscribeListener 是一个 MessageListener)@Beanpublic MessageListener subscribeListener() {return new WebSocketSubscribeListener(); // 假设你有一个自定义的 MessageListener 类}
}
redis的sub监听器,监听websocket收到的消息
/*** @Description:subscribe监听器* @Author: sh* @Date: 2024/12/13 16:00*/
public class WebSocketSubscribeListener implements MessageListener {/*** 日志对象*/private Logger logger = LoggerFactory.getLogger(WebSocketSubscribeListener.class);/*** websocket连接对象* -- GETTER --* 获取websocket连接对象** @return websocket连接对象*/@Getterprivate Session session;/*** 设置websocket连接对象** @param session websocket连接对象*/public void setSession(Session session) {this.session = session;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 获取消息String msg = new String(message.getBody());try {session.getBasicRemote().sendText(msg);} catch (IOException e) {throw new RuntimeException(e);}}
}
Html页面测试demo
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket 测试</h1>
<div id="status">未连接</div>
<textarea id="messages" rows="10" cols="30" readonly></textarea><br>
<input type="text" id="messageInput" placeholder="输入消息" />
<button onclick="sendMessage()">发送</button>
<button onclick="closeConnection()">关闭连接</button><script>let websocket;// 创建 WebSocket 连接function connect() {websocket = new WebSocket('ws://127.0.0.1:8080/websocket/WEBSOCKET_MSG_TOPIC'); // 连接到后端 WebSocket 服务// WebSocket 连接打开时websocket.onopen = () => {document.getElementById("status").textContent = "连接已建立";};// 处理接收到的消息websocket.onmessage = (event) => {const message = event.data;// 假设服务器发送的是 JSON 格式的消息try {const parsedMessage = JSON.parse(message);// 假设服务器返回的数据格式是 { "user": "username", "content": "message text" }document.getElementById("messages").value += `来自 ${parsedMessage.user}: ${parsedMessage.content}\n`;} catch (e) {// 如果解析失败,则显示原始消息document.getElementById("messages").value += '收到: ' + message + '\n';}};// 连接关闭时websocket.onclose = () => {document.getElementById("status").textContent = "连接已关闭";};// 连接错误时websocket.onerror = (error) => {console.error("WebSocket 错误:", error);document.getElementById("status").textContent = "连接错误";};}// 发送消息到服务器function sendMessage() {const message = document.getElementById('messageInput').value;if (websocket && websocket.readyState === WebSocket.OPEN) {websocket.send(message);document.getElementById('messageInput').value = ''; // 清空输入框document.getElementById('messageInput').focus(); // 聚焦到输入框} else {alert("WebSocket 连接未打开");}}// 关闭 WebSocket 连接function closeConnection() {if (websocket) {websocket.close();}}// 页面加载时自动连接window.onload = connect;
</script>
</body>
</html>