java模拟GPT流式问答

流式请求gpt并且流式推送相关前端页面

1)java流式获取gpt答案

1、读取文件流的方式

使用post请求数据,由于gpt是eventsource的方式返回数据,所以格式是data:,需要手动替换一下值

/**
org.apache.http.client.methods
**/
@SneakyThrowsprivate void chatStream(List<ChatParamMessagesBO> messagesBOList) {CloseableHttpClient httpclient = HttpClients.createDefault();HttpPost httpPost = new HttpPost("https://api.openai.com/v1/chat/completions");httpPost.setHeader("Authorization","xxxxxxxxxxxx");httpPost.setHeader("Content-Type","application/json; charset=UTF-8");ChatParamBO build = ChatParamBO.builder().temperature(0.7).model("gpt-3.5-turbo").messages(messagesBOList).stream(true).build();System.out.println(JsonUtils.toJson(build));httpPost.setEntity(new StringEntity(JsonUtils.toJson(build),"utf-8"));CloseableHttpResponse response = httpclient.execute(httpPost);try {HttpEntity entity = response.getEntity();if (entity != null) {InputStream inputStream = entity.getContent();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String line;while ((line = reader.readLine()) != null) {// 处理 event stream 数据try {
//                        System.out.println(line);ChatResultBO chatResultBO = JsonUtils.toObject(line.replace("data:", ""), ChatResultBO.class);String content = chatResultBO.getChoices().get(0).getDelta().getContent();log.info(content);//                        System.out.println(chatResultBO.getChoices().get(0).getMessage().getContent());} catch (Exception e) {
//                        e.printStackTrace();}}}} finally {response.close();}}

2、sse链接的方式获取数据

用到了okhttp

需要先引用相关maven:

        <dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId></dependency>
       // 定义see接口Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header("Authorization","xxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())).build();OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天.build();// 实例化EventSource,注册EventSource监听器RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {
//                log.info("onEvent");log.info(data);//请求到的数据}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed");
//                emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.info("onFailure,t={},response={}",t,response);//这边可以监听并重新打开
//                emitter.complete();}});realEventSource.connect(okHttpClient);//真正开始请求的一步

2)流式推送答案

方法一:通过订阅式SSE/WebSocket

原理是先建立链接,然后不断发消息就可以

1、websocket

创建相关配置:


import javax.websocket.Session;import lombok.Data;/*** @description WebSocket客户端连接*/
@Data
public class WebSocketClient {// 与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;//连接的uriprivate String uri;}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
配置相关service
@Slf4j
@Component
@ServerEndpoint("/websocket/chat/{chatId}")
public class ChatWebsocketService {static final ConcurrentHashMap<String, List<WebSocketClient>> webSocketClientMap= new ConcurrentHashMap<>();private String chatId;/*** 连接建立成功时触发,绑定参数* @param session 与某个客户端的连接会话,需要通过它来给客户端发送数据* @param chatId 商户ID*/@OnOpenpublic void onOpen(Session session, @PathParam("chatId") String chatId){WebSocketClient client = new WebSocketClient();client.setSession(session);client.setUri(session.getRequestURI().toString());List<WebSocketClient> webSocketClientList = webSocketClientMap.get(chatId);if(webSocketClientList == null){webSocketClientList = new ArrayList<>();}webSocketClientList.add(client);webSocketClientMap.put(chatId, webSocketClientList);this.chatId = chatId;}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message) {log.info("chatId = {},message = {}",chatId,message);// 回复消息this.chatStream(BaseUtil.newList(ChatParamMessagesBO.builder().content(message).role("user").build()));
//        this.sendMessage(chatId,message+"233");}/*** 连接关闭时触发,注意不能向客户端发送消息了* @param chatId*/@OnClosepublic void onClose(@PathParam("chatId") String chatId){webSocketClientMap.remove(chatId);}/*** 通信发生错误时触发* @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {System.out.println("发生错误");error.printStackTrace();}/*** 向客户端发送消息* @param chatId* @param message*/public void sendMessage(String chatId,String message){try {List<WebSocketClient> webSocketClientList = webSocketClientMap.get(chatId);if(webSocketClientList!=null){for(WebSocketClient webSocketServer:webSocketClientList){webSocketServer.getSession().getBasicRemote().sendText(message);}}} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e.getMessage());}}/*** 流式调用查询gpt* @param messagesBOList* @throws IOException*/@SneakyThrowsprivate void chatStream(List<ChatParamMessagesBO> messagesBOList) {// TODO 和GPT的访问请求}
}
测试,postman建立链接

2、SSE

本质也是基于订阅推送方式

前端:
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>SseEmitter</title>
</head><body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户//const id = new Date().getTime();const id = '7829083B42464C5B9C445A087E873C7D';if (window.EventSource) {// 建立连接source = new EventSource('http://172.28.54.27:8902/api/sse/connect?conversationId=' + id);setMessageInnerHTML("连接用户=" + id);/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function(e) {setMessageInnerHTML("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function(e) {//console.log(e);setMessageInnerHTML(e.data);});source.addEventListener("close", function (event) {// 在这里处理关闭事件console.log("Server closed the connection");// 可以选择关闭EventSource连接source.close();});/*** 如果发生通信错误(比如连接中断),就会触发error事件* 或者:* 另一种写法:source.onerror = function (event) {}*/source.addEventListener('error', function(e) {console.log(e);if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function() {//closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://172.28.54.27:8902/api/sse/disconnection?conversationId=' + id, true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script></html>
后端:
controller
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.Set;
import java.util.function.Consumer;import javax.annotation.Resource;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;@Validated
@RestController
@RequestMapping("/api/sse")
@Slf4j
@RefreshScope  // 会监听变化实时变化值
public class SseController {@Resourceprivate SseBizService sseBizService;/*** 创建用户连接并返回 SseEmitter** @param conversationId 用户ID* @return SseEmitter*/@SneakyThrows@GetMapping(value = "/connect", produces = "text/event-stream; charset=utf-8")public SseEmitter connect(String conversationId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(conversationId));sseEmitter.onError(errorCallBack(conversationId));sseEmitter.onTimeout(timeoutCallBack(conversationId));log.info("创建新的sse连接,当前用户:{}", conversationId);sseBizService.addConnect(conversationId,sseEmitter);sseBizService.sendMsg(conversationId,"链接成功");
//        sseCache.get(conversationId).send(SseEmitter.event().reconnectTime(10000).data("链接成功"),MediaType.TEXT_EVENT_STREAM);return sseEmitter;}/*** 给指定用户发送信息  -- 单播*/@GetMapping(value = "/send", produces = "text/event-stream; charset=utf-8")public void sendMessage(String conversationId, String msg) {sseBizService.sendMsg(conversationId,msg);}/*** 移除用户连接*/@GetMapping(value = "/disconnection", produces = "text/event-stream; charset=utf-8")public void removeUser(String conversationId) {log.info("移除用户:{}", conversationId);sseBizService.deleteConnect(conversationId);}/*** 向多人发布消息   -- 组播* @param groupId 开头标识* @param message 消息内容*/public void groupSendMessage(String groupId, String message) {/* if (!BaseUtil.isNullOrEmpty(sseCache)) {*//*Set<String> ids = sseEmitterMap.keySet().stream().filter(m -> m.startsWith(groupId)).collect(Collectors.toSet());batchSendMessage(message, ids);*//*sseCache.forEach((k, v) -> {try {if (k.startsWith(groupId)) {v.send(message, MediaType.APPLICATION_JSON);}} catch (IOException e) {log.error("用户[{}]推送异常:{}", k, e.getMessage());removeUser(k);}});}*/}/*** 群发所有人   -- 广播*/public void batchSendMessage(String message) {/*sseCache.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户[{}]推送异常:{}", k, e.getMessage());removeUser(k);}});*/}/*** 群发消息*/public void batchSendMessage(String message, Set<String> ids) {ids.forEach(userId -> sendMessage(userId, message));}/*** 获取当前连接信息*/
//    public List<String> getIds() {
//        return new ArrayList<>(sseCache.keySet());
//    }/*** 获取当前连接数量*/
//    public int getUserCount() {
//        return count.intValue();
//    }private Runnable completionCallBack(String userId) {return () -> {log.info("结束连接:{}", userId);removeUser(userId);};}private Runnable timeoutCallBack(String userId) {return () -> {log.info("连接超时:{}", userId);removeUser(userId);};}private Consumer<Throwable> errorCallBack(String userId) {return throwable -> {log.info("连接异常:{}", userId);removeUser(userId);};}
}
service
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
@RefreshScope  // 会监听变化实时变化值
public class SseBizService {/*** * 当前连接数*/private AtomicInteger count = new AtomicInteger(0);/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();/*** 添加用户* @author pengbin <pengbin>* @date 2023/9/11 11:37* @param* @return*/public void addConnect(String id,SseEmitter sseEmitter){sseCache.put(id, sseEmitter);// 数量+1count.getAndIncrement();}/*** 删除用户* @author pengbin <pengbin>* @date 2023/9/11 11:37* @param* @return*/public void deleteConnect(String id){sseCache.remove(id);// 数量+1count.getAndDecrement();}/*** 发送消息* @author pengbin <pengbin>* @date 2023/9/11 11:38* @param* @return*/@SneakyThrowspublic void sendMsg(String id, String msg){if(sseCache.containsKey(id)){sseCache.get(id).send(msg, MediaType.TEXT_EVENT_STREAM);}}}

方法二:SSE建立eventSource,使用完成后即刻销毁

前端:在接收到结束标识后立即销毁

/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function(e) {//console.log(e);setMessageInnerHTML(e.data);if(e.data == '[DONE]'){source.close();}});

后端:
 

@SneakyThrows@GetMapping(value = "/stream/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter completionsStream(@RequestParam String conversationId){//List<ChatParamMessagesBO> messagesBOList =new ArrayList();// 获取内容信息ChatParamBO build = ChatParamBO.builder().temperature(0.7).stream(true).model("xxxx").messages(messagesBOList).build();SseEmitter emitter = new SseEmitter();// 定义see接口Request request = new Request.Builder().url("xxx").header("Authorization","xxxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),JsonUtils.toJson(build))).build();OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天.build();StringBuffer sb = new StringBuffer("");// 实例化EventSource,注册EventSource监听器RealEventSource realEventSource = null;realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {log.info(data);//请求到的数据try {ChatResultBO chatResultBO = JsonUtils.toObject(data.replace("data:", ""), ChatResultBO.class);String content = chatResultBO.getChoices().get(0).getDelta().getContent();sb.append(content);emitter.send(SseEmitter.event().data(JsonUtils.toJson(ChatContentBO.builder().content(content).build())));} catch (Exception e) {
//                        e.printStackTrace();}if("[DONE]".equals(data)){emitter.send(SseEmitter.event().data(data));emitter.complete();log.info("result={}",sb);}}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed,eventSource={}",eventSource);//这边可以监听并重新打开
//                emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.info("onFailure,t={},response={}",t,response);//这边可以监听并重新打开
//                emitter.complete();}});realEventSource.connect(okHttpClient);//真正开始请求的一步return emitter;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/154802.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库系统工程师------流水线

流水线 流水线周期&#xff1a;工序中最长的那段执行时间。 流水线计算公式&#xff1a;第一条指令计算时间 &#xff08;指令条数 - 1&#xff09;*流水线周期。 流水线吞吐率&#xff1a;指单位时间内流水线完成的任务数量或输出的结果数量。 流水线的加速比&#xff1a;完…

机器学习与模式识别作业----决策树属性划分计算

文章目录 1.决策树划分原理1.1.特征选择1--信息增益1.2.特征选择2--信息增益比1.3.特征选择3--基尼系数 2.决策树属性划分计算题2.1.信息增益计算2.2.1.属性1的信息增益计算2.2.2.属性2的信息增益计算2.2.3.属性信息增益比较 2.2.信息增益比计算2.3.基尼系数计算 1.决策树划分原…

VMware Workstation Player 17 下载安装教程

虚拟机系列文章 VMware Workstation Player 17 免费下载安装教程 VMware Workstation 17 Pro 免费下载安装教程 windows server 2012安装教程 Ubuntu22.04.3安装教程 FTP服务器搭建 VMware Workstation Player 17 下载安装教程 虚拟机系列文章前言一、 VMware Workstation Pla…

手机APP也可以学习Sui啦,通过EasyA开启你的学习之旅

Sui基金会与EasyA合作&#xff0c;开发了一门面向初学者的Sui课程。这一适用于Android和iOS移动端的学习体验&#xff0c;是进入更广泛的Sui社区和生态系统的入口。在这门课程中&#xff0c;学习者将以有趣和互动的方式获得对Sui的基本了解&#xff0c;最终能够在测试网络上部署…

Wifi列表扫描和Wifi链接

上面的截图&#xff0c;就是本文要介绍的主要功能。 1.准备工作&#xff0c;声明权限&#xff1a; <uses-permission android:name"android.permission.CHANGE_WIFI_STATE" /><uses-permission android:name"android.permission.ACCESS_WIFI_STATE&quo…

10.selenium进阶

文章目录 1、嵌套网页1、1 什么是嵌套页面1、2 selenium获取嵌套页面的数据 2、执行JavaScript代码3、鼠标动作链4、selenium键盘事件5、其他方法5、1 选择下拉框5、2 弹窗的处理 6、selenium设置无头模式7、selenium应对检测小结 1、嵌套网页 ​ 在前端开发中如果有这么一个需…

使用css 与 js 两种方式实现导航栏吸顶效果

position的属性我们一般认为有 position:absolute postion: relative position:static position:fixed position:inherit; position:initial; position:unset; 但是我最近发现了一个定位position:sticky 这个可以称为粘性定位。 这个粘性定位的元素会始终在那个位置 <st…

rust cfg的使用

前提是一个crate倒入另一个crate。 先看结构 test_lib目录结构 这与另一个crate处于同一个目录,所以另一crate倒入的时候在Cargo.toml中使用如下语句。 test_lib = {path = "../test_lib" }先在test_lib/src/abc/abc.rs中添加没有cfg的两个函数做测试。 pub fn…

虹科方案丨自动驾驶多传感器数据融合方法

文章来源&#xff1a;雅名特自动驾驶 点此阅读原文&#xff1a;https://mp.weixin.qq.com/s/QsPMWZDGZaPdEx47L2VmeA 近年来&#xff0c;深度学习技术在涉及高维非结构化数据领域展现出了最先进的性能&#xff0c;如计算机视觉、语音、自然语言处理等方面&#xff0c;并且开始涉…

Django实战项目-学习任务系统-用户登录

第一步&#xff1a;先创建一个Django应用程序框架代码 1&#xff0c;先创建一个Django项目 django-admin startproject mysite将创建一个目录&#xff0c;其布局如下&#xff1a;mysite/manage.pymysite/__init__.pysettings.pyurls.pyasgi.pywsgi.py 2&#xff0c;再创建一个…

智能视频分析系统AI智能分析网关V3触发告警图片不显示该如何解决?

AI智能分析网关V3包含有20多种算法&#xff0c;包括人脸、人体、车辆、车牌、行为分析、烟火、入侵、聚集、安全帽、反光衣等等&#xff0c;可应用在安全生产、通用园区、智慧食安、智慧城管、智慧煤矿等场景中。将网关硬件结合我们的视频监控系统EasyCVR一起使用&#xff0c;可…

Android免杀小结

目录 msfvenom 自动生成 自带免杀 工具免杀 Thefatrat backdoor-apk old-fatrat msfVenom嵌入式方法 venom 工具小记 加壳 源码免杀 加壳 源码混淆 数据通信 启动和运行方式修改 对抗反编译 反编译工具崩溃 ZIP文件格式对抗 ZIP通用位伪加密 AndroidManife…

MySQL 面试知识脑图 初高级知识点

脑图下载地址&#xff1a;https://mm.edrawsoft.cn/mobile-share/index.html?uuid18b10870122586-src&share_type1 sql_mode 基本语法及校验规则 ONLY_FULL_GROUP_BY 对于GROUP BY聚合操作&#xff0c;如果在SELECT中的列&#xff0c;没有在GROUP BY中出现&#xff…

网络初识(JAVA EE)

文章目录 一、网络发展史二、网络通信基础三、协议分层四、封装和分用 一、网络发展史 独立模式&#xff1a;计算机之间相互独立&#xff0c;每个终端都各自持有客户数据&#xff0c;且当处理一个业务时&#xff0c;按照业务流程进行 网络互连&#xff1a;将多台计算机连接在一…

如何退出commit_message页面

虽然提示命令了&#xff0c;但我试了&#xff0c;退不出去。我没搞明白。。。 退出编辑 Crtl Z设置git的编辑器为vim或vi git config --global core.editor vim如果没有vim编辑器&#xff0c;设置成vi编辑器也行 git config --global core.editor vi重新提交 再次进入commi…

【HTML5】语义化标签记录

前言 防止一个页面中全部都是div&#xff0c;或者ul li&#xff0c;在html5推出了很多语义化标签 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 常用语义化案例 一般我用的多的是header&#xff0c;main&#xff0c;footer 这些标签不难理解&#x…

怎么通过Fiddler对APP进行抓包?以及高级应用场景分析

前言 我们经常需要用到Fiddler做代理服务器对Web、APP应用进行抓包&#xff0c;以便我们对接口功能进行测试调试&#xff0c;定位问题等。这篇将讲述怎么通过Fiddler对APP进行抓包&#xff0c;以及简单介绍一些高级应用场景。 首先&#xff0c;附上Fiddler使用的环境配置清单…

MidJourney | AI绘画也有艺术

免费绘画&#xff0c;体验更多AI可关&注公&众&号&#xff1a;AI研究工厂

华测监测预警系统 2.2 存在任意文件读取漏洞

华测监测预警系统 2.2 存在任意文件读取漏洞 一、 华测监测预警系统 2.2 简介二、漏洞描述三、影响版本四、fofa查询语句五、漏洞复现1、手动复现2、自动复现 六、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信…

暴涨100万粉仅用一个月,B站内容趋势前线洞察

- 导语 在这个9月&#xff0c;B站涌现多位黑马UP主&#xff0c;有的UP主自入驻B站以来&#xff0c;一个月的时间就涨粉百万晋升为头部UP主&#xff0c;有的UP主因内容受到B站百万年轻人的追捧&#xff0c;展现账号爆发力。 接下来&#xff0c;飞瓜数据&#xff08;B站版&…