java springboot 中使用webSocket接入openAI接口调用chatGPT3.5接口实现自由返回
在springboot中添加webSocketServer
@Component
@Anonymous
@ServerEndpoint(“/websocket/{id}”) // 访问路径: ws://localhost:8080/websocket
public class WebSocketServer {
protected static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);/*** 客户端ID*/
private String id="";/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;/*** 记录当前在线连接数(为保证线程安全,须对使用此变量的方法加lock或synchronized)*/
private static int onlineCount = 0;/*** 用来存储当前在线的客户端(此map线程安全)*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();private static ConcurrentHashMap<String, StringBuffer> stringBufferMap = new ConcurrentHashMap<>();/*** 连接建立成功后调用*/
@OnOpen
public void onOpen(@PathParam(value = "id") String id, Session session) {this.session = session;// 接收到发送消息的客户端编号this.id = id;// 加入map中StringBuffer stringBuffer = new StringBuffer();stringBufferMap.put(id,stringBuffer);webSocketMap.put(id, this);
// try {
// sendMessage(“WebSocket连接成功”);
// } catch (Exception e) {
//
// }
}
/**
* 发送消息
* @param message 要发送的消息
*/
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
// 关闭连接调用
@OnClose
public void close() {}// 接收消息
@OnMessage
public void message(String message, Session session) {System.out.println("client send: " + message);
}
/*** 推送信息给指定ID客户端,如客户端不在线,则返回不在线信息给自己** @param message 客户端发来的消息* @param sendClientId 客户端ID*/
public void sendToUser(String message, String sendClientId) throws IOException {if (webSocketMap.get(sendClientId) != null) {if (!id.equals(sendClientId)) {if("DONE".equals(message)){StringBuffer stringBuffers = new StringBuffer();stringBufferMap.put(sendClientId,stringBuffers);}else{StringBuffer stringBuffer = stringBufferMap.get(sendClientId);StringBuffer append = stringBuffer.append(message);stringBufferMap.put(sendClientId,append);}webSocketMap.get(sendClientId).sendMessage(message);} else {webSocketMap.get(sendClientId).sendMessage(message);}} else {// 如客户端不在线,则返回不在线信息给自己sendToUser("当前客户端不在线", id);}
}public String getString(String sendClientId){return stringBufferMap.get(sendClientId).toString();
}
}
添加用户询问缓存机制
public class LocalCache {
/**
* 缓存时长
/
public static final long TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
/*
* 清理间隔
/
private static final long CLEAN_TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
/*
* 缓存对象
*/
public static final TimedCache<String, Object> CACHE = CacheUtil.newTimedCache(TIMEOUT);
static {//启动定时任务CACHE.schedulePrune(CLEAN_TIMEOUT);
}
}
调用openai国内一位大佬写的jar 注意因为我项目使用了其他日志文件,所以我排除了jar包自带的slf4j
<dependency><groupId>com.unfbx</groupId><artifactId>chatgpt-java</artifactId><version>1.0.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion></exclusions></dependency>
创建SSE接口转发类OpenAIEventSourceListener也可自定义其他长连接
public class OpenAIEventSourceListener extends EventSourceListener {
protected static final Logger log = LoggerFactory.getLogger(OpenAIEventSourceListener.class);
private String sockentId;private WebSocketServer webSocket;private final AppService appService;public OpenAIEventSourceListener(WebSocketServer webSocket, String sockentId) {this.sockentId = sockentId;this.webSocket = webSocket;
}/*** {@inheritDoc}*/
@Override
public void onOpen(EventSource eventSource, Response response) {log.info("OpenAI建立sse连接...");
}/*** {@inheritDoc}*/
@SneakyThrows
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {//log.info("OpenAI返回数据:{}", sockentId);try {if (data.equals("[DONE]")) {String string = webSocket.getString(sockentId);log.info("问题最终答案:"+string);//log.info("OpenAI返回数据结束了");webSocket.sendToUser("DONE",sockentId);webSocket.close();return;}ChatCompletionResponse completionResponse = JSON.parseObject(data, ChatCompletionResponse.class);if(ObjectUtil.isNotNull(completionResponse.getChoices().get(0).getDelta().getContent())){// log.info("返回数据"+completionResponse.getChoices().get(0).getDelta().getContent());webSocket.sendToUser(completionResponse.getChoices().get(0).getDelta().getContent(),sockentId);}}catch (Exception e){}
}@Override
public void onClosed(EventSource eventSource) {log.info("OpenAI关闭sse连接...");
}@SneakyThrows
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {if(Objects.isNull(response)){return;}ResponseBody body = response.body();if (Objects.nonNull(body)) {log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t);} else {log.error("OpenAI sse连接异常data:{},异常:{}", response, t);}eventSource.cancel();
}
}
Controller层
@RepeatSubmit
@Operation(summary = "用户问询")
@PostMapping("/enquiry")
public R<String> enquiry(//需要定义对象接收参数) {String messageContext = (String) LocalCache.CACHE.get();List<Message> messages = new ArrayList<>();if (StrUtil.isNotBlank(messageContext)&&openAIBo.getType()==1) {messages = JSONUtil.toList(messageContext, Message.class);if (messages.size() >= 10) {messages = messages.subList(1, 10);}Message currentMessage = Message.builder().content(openAIBo.getLastPrompt()).role(Message.Role.USER).build();messages.add(currentMessage);} else {Message currentMessage = Message.builder().content(openAIBo.getLastPrompt()).role(Message.Role.USER).build();messages.add(currentMessage);}log.info("最终问题:{}",messages);OpenAiStreamClient client = OpenAiStreamClient.builder().connectTimeout(50).readTimeout(50).writeTimeout(50).apiKey(//openai的token).apiHost(//自己香港服务器或者国外服务器的转发).build();WebSocketServer webSocketServer = new WebSocketServer();OpenAIEventSourceListener openAIEventSourceListener = new OpenAIEventSourceListener(webSocketServer,openAIBo.getUid(), appService);client.streamChatCompletion(messages, openAIEventSourceListener);LocalCache.CACHE.put(userId.toString(), JSONUtil.toJsonStr(messages), LocalCache.TIMEOUT);return R.ok("询问成功");
}
香港/国外服务器nginx转发配置
server {
listen 8080;
server_name //自己服务器IP;
access_log /home/nginx/log/api.openai.com.log;
error_log /home/nginx/log/api.openai.com.err.log;
location / {
proxy_pass https://api.openai.com/;
client_max_body_size 10M;client_body_buffer_size 128k;proxy_connect_timeout 600;proxy_send_timeout 600;proxy_read_timeout 600;proxy_buffer_size 4k;proxy_buffers 4 32k;proxy_busy_buffers_size 64k;proxy_temp_file_write_size 64k;proxy_set_body $request_body;
}
}
结尾
项目中确实有自己项目中的对象需要自己添加,因为我是自己的项目也看了国内大佬的jar把国内大佬jar项目放到这里感兴趣可以研究研究其他连接方式。
https://github.com/Grt1228/chatgpt-steam-output
博主使用这套代码开发了一个小程序感兴趣朋友可以看看,自己使用。确实最近国内VPN被封的挺多的,所以有一部分问题还是能帮到我的。微信扫码打开小程序。目前还是第一版后续会继续研究,有感兴趣可以私信一起讨论。