webSocket对接参考
话不多说直接上代码
WebSocket
package com.student.config;import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;/*** @Description:* @Author: hwk* @Date: 2024-07-17 17:46* @Version: 1.0**/
@Slf4j
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** gpt密钥*/private static final String key = "";/*** 请求地址*/private static final String url = "";/*** 用户ID*/private String userId;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。// 注:底下WebSocket是当前类名private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();// 用来存在线连接用户信息private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();/*** 链接成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId") String userId) {try {this.session = session;this.userId = userId;webSockets.add(this);sessionPool.put(userId, session);log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());} catch (Exception e) {}}/*** 链接关闭调用的方法*/@OnClosepublic void onClose() {try {webSockets.remove(this);sessionPool.remove(this.userId);log.info("【websocket消息】连接断开,总数为:" + webSockets.size());} catch (Exception e) {}}/*** 收到客户端消息后调用的方法** @param message*/@OnMessagepublic void onMessage(String message) {log.info("【websocket消息】收到客户端消息:" + message);JSONObject jsonObject = new JSONObject();JSONArray objects = new JSONArray();JSONObject messages = new JSONObject();messages.put("role", "user");messages.put("content", message);objects.add(messages);jsonObject.put("model", "gpt-3.5-turbo");jsonObject.put("messages", objects);jsonObject.put("max_tokens", 1024);jsonObject.put("temperature", 0);jsonObject.put("stream", true);Map<String, String> heads = new HashMap<>();heads.put("Content-Type", "application/json");heads.put("Accept", "application/json");heads.put("Authorization", "Bearer "+key);WebClient webClient = WebClient.create();Flux<String> stringFlux = webClient.post().uri(url).header("Content-Type", "application/json").header("Accept", "application/json").header("Authorization", "Bearer " + key).accept(MediaType.TEXT_EVENT_STREAM).bodyValue(jsonObject).retrieve().bodyToFlux(String.class);stringFlux.subscribe(s -> {if (!Objects.equals(s, "[DONE]")) {JSONObject parsed = JSONObject.parseObject(s);JSONArray choices = parsed.getJSONArray("choices");if (!choices.isEmpty()) {JSONObject dataJson = JSONObject.parseObject(choices.get(0).toString());String content = dataJson.getJSONObject("delta").getString("content");if (StringUtils.hasLength(content)) {try {content = content.replaceAll("\n", "<br>");content = content.replace(" ", "");log.info(content);if (sessionPool != null) {sessionPool.get(userId).getBasicRemote().sendText(content);}} catch (Exception e) {e.printStackTrace();}}}}});}/*** 发送错误时的处理** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("用户错误,原因:" + error.getMessage());error.printStackTrace();}/*** 此为广播消息** @param message*/public void sendAllMessage(String message) {log.info("【websocket消息】广播消息:" + message);for (WebSocket webSocket : webSockets) {try {if (webSocket.session.isOpen()) {webSocket.session.getAsyncRemote().sendText(message);}} catch (Exception e) {e.printStackTrace();}}}/*** 此为单点消息** @param userId* @param message*/public void sendOneMessage(String userId, String message) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("【websocket消息】 单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}/*** 此为单点消息(多人)** @param userIds* @param message*/public void sendMoreMessage(String[] userIds, String message) {for (String userId : userIds) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("【websocket消息】 单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}}
}
WebSocketConfig
package com.student.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @Description: WebSocketConfig配置* @Author: hwk* @Date: 2024-07-17 17:44* @Version: 1.0**/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
html代码
<!DOCTYPE html>
<html>
<head><meta charset="utf-8" /><title>ChatGPT</title><script src="marked.min.js"></script><link rel="stylesheet" type="text/css" href="index.css"><style>.normal-text {color: black;}.rich-text {color: blue;font-weight: bold;}</style>
</head>
<body><h2>ChatGPT</h2><div id="message-container"><p id="message"></p></div><div id="footer"><input id="text" class="my-input" type="text" /><button onclick="send()">发送</button></div><div id="footer1"><br /><button onclick="closeWebSocket()">关闭WebSocket连接</button><button onclick="openWebSocket()">建立WebSocket连接</button></div><script>marked.setOptions({highlight: function (code, lang) {return hljs.highlightAuto(code).value;}});var websocket = null;// 判断当前浏览器是否支持WebSocket,是则创建WebSocketif ('WebSocket' in window) {console.log("浏览器支持WebSocket");websocket = new WebSocket("ws://127.0.0.1:8056/websocket/1");} else {alert('当前浏览器不支持WebSocket');}// 连接发生错误的回调方法websocket.onerror = function () {console.log("WebSocket连接发生错误");setMessageInnerHTML("WebSocket连接发生错误");};// 连接成功建立的回调方法websocket.onopen = function () {console.log("WebSocket连接成功");};// 接收到消息的回调方法websocket.onmessage = function (event) {if (event.data) {setMessageInnerHTML(event.data);}console.log(event.data);};// 连接关闭的回调方法websocket.onclose = function () {console.log("WebSocket连接关闭");};// 关闭WebSocket连接function closeWebSocket() {websocket.close();}// 发送消息function send() {var message = document.getElementById('text').value;websocket.send(message);}// 建立连接的方法function openWebSocket() {websocket = new WebSocket("ws://127.0.0.1:8056/websocket/1");websocket.onopen = function () {console.log("WebSocket连接成功");};}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {console.log(innerHTML);// var element = document.getElementById('message');// if (innerHTML.match(/```/g)) {// element.innerHTML += marked(innerHTML); // 使用marked渲染Markdown// } else {// element.innerHTML += innerHTML; // 直接添加普通文本消息// }document.getElementById('message').innerHTML += innerHTML;}// 如果websocket连接还没断开就关闭了窗口,后台server端会抛异常。// 所以增加监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接window.onbeforeunload = function () {closeWebSocket();};</script>
</body>
</html>
效果