一个java项目中,如何使用sse协议,构造一个chatgpt的流式对话接口

前言
如何注册chatGPT,怎么和它交互,本文就不讲了;因为网上教程一大堆,而且你要使用的话,通常会再包一个算法服务,用来做一些数据训练和过滤处理之类的,业务服务基本不会直接与原生chatGPT交互。
而下面阐述的,就是业务服务与算法服务的交互。

业务需求-需要实现什么样的功能

想要一个类似与AI问答助手的机器人,可以实现根据某些场景对话提问的功能

  1. 可以直接提问,类似直接使用chatGPT,只不过这个提问的过程会做一些业务通用处理,比如问答数据的归纳反馈、敏感词过滤等等。
  2. 也可以给它喂一篇论文,喂一批近期的资讯,或者是一本小说之类的,根据指定的上下文去进行问答(这种场景需要先投递数据建立相关索引)
  3. ai的回答要求和chatGPT一样保持流式返回(也就是一个字一个字,一边生成一边返回,而不是等整个回答生成完之后一股脑返回)

剖析

重点是流式,这里我们预设算法侧已经有了一个流式返回的接口,整体的交互如下图所示
在这里插入图片描述
下面分别介绍几个关键节点的数据交互设计,仅供参考

q1

简述:页面发送问答数据给业务服务端

{"chatId": 233,"question": "这篇论文有几个论点?"
}
  • 这里的chatId可以理解为一个对话框id,业务服务端可以根据这个来进行问答归类、批量删除收藏、问答上下文查询等操作。
  • question就是问题的内容

这里需要注意就是,交互数据格式尽可能简单、易拓展,有些产品的页面交互设计的非常复杂,什么历史问答、角色信息之类的,套了一层又一层,其实很多都没必要的,这样前端组装起来也麻烦,也不利于数据的管理与后期功能的拓展。

q2

简述:就是业务服务根据前端传来的问题和所属的对话框,把相应的上下文查询出来(甚至可以前端维护一个是否发送上下文的开关,更动态一点),包装成算法服务所需要的问题数据,发给它。

{"chatId":  233,"userName":  "张三","messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e078cfe","message":  "这篇论文有几个论点?","chatHistory":  [{"messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e07dasd","question":  "这篇论文的作者是谁","answer":  "这篇论文的作者是李四博士。"}],"callbackUrl":  "http://xxx/chat/question/callback"
}
  • 上述的messageKey就是一个消息的key,用以常规的接口调试
  • chatHistory就是历史问答记录,即上下文,众所周知chatGPT带不带上下文,回答的结果可能截然不同
  • callbackUrl是业务定义的一个回调接口,用来回调一些算法侧异步生产的信息,比如原文的定位信息、根据当前问题生成的推荐问答等,这些和流式的回答是不会一起返回的,所以额外提供一个接口来接收。

q3和a3

这两步不详述(主要我也不是研究算法模型的哈哈,不是很清楚细节)
我们只需要定义好a2返回的结果即可

a2

简述:主要是算法侧返回给业务服务的同步的流式回答,同时还可能有异步的额外信息的回调(q2的callbackUrl来接收)。所以a2的返回结果分为两个response
response1:同步的流式回答,一般在2-7s内返回第一个字符

data:data:data:... // 省略一些输出
data:data:

流式问答的规范可以参考:流式接口协议规范

response2:异步的拓展信息(可有可无)

{"messageKey":"0a795f6a-a029-435f-8d67-6f6f8e078cfe", //必传 回调的消息key,每次问答唯一"expand":{"recommendedQuestions":[ // 推荐问题"这篇论文的主要论点是什么?"],"originalIndex":[{"sourceId":3432,"text":"首先第一个论点是......","textIndex":90}]}
}

a1

简述:a1要返回的格式很好理解,就是把a2中的两个response组合在一起,需要注意的有几点

  1. a2的response2不一定有,需要设置超时策略,且需要在流式回答最后输出
  2. a2中的response1是流式,response2不是;但输出到a1的时候,需要保证都在流中
  3. 最好需要约定一些event来作为标识符
event:messageKey // 消息key事件data:0a795f6a-a029-435f-8d67-6f6f8e078cfeevent:answer // 流式回答开始事件data:"在论文"data:"中"data:","data:"我"data:"们"data:"一"data:"共"
...
data:"几"data:"个"data:"论"data:"点"event:endTimedata:2024-02-27 17:05:24event:expand // 拓展信息开始事件,此处等待15s超时data:{"recommendedQuestions":["这篇论文的主要论点是什么"],"originalIndex":{"sourceId":32133,"text":"首先第一个论点是......","textIndex":90}}

代码

代码省略了一些无关紧要的业务特有的部分,只保留通用的部分
工具类:SSEUtils,用来操作SSE客户端

import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** description** @author luhui* @date 2024/1/25*/
@Slf4j
public class SSEUtils {/*** timeout 30min*/private static final Long DEFAULT_TIME_OUT = 30 * 60 * 1000L;/*** 订阅表*/private static final Map<String, EvaEmitter> EMITTER_MAP = new ConcurrentHashMap<>();public static final String MSG_DATA_PREFIX = "data:";public static final String MSG_EVENT_PREFIX = "event:";/*** description: 创建流** @param messageKey 本次问答的消息key* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter* @author luhui* @date 2024/2/23 17:09*/public static EvaEmitter getEmitter(String messageKey) {if (null == messageKey || "".equals(messageKey)) {return null;}EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null == emitter) {emitter = new EvaEmitter(DEFAULT_TIME_OUT);EMITTER_MAP.put(messageKey, emitter);}return emitter;}/*** description: 发消息** @param messageKey 本次问答的消息key* @param msg        消息* @author luhui* @date 2024/2/23 17:09*/public static void pushMsg(String messageKey, String msg) throws IOException {EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {emitter.send(EvaEmitter.event().data(msg));}}public static void pushEvent(String messageKey, String eventDesc) throws IOException{EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {emitter.send(EvaEmitter.event().name(eventDesc));}}/*** description: 关闭流** @param messageKey 本次问答的消息key* @author luhui* @date 2024/2/23 17:08*/public static void closeEmitter(String messageKey) {EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {try {emitter.complete();EMITTER_MAP.remove(messageKey);} catch (Exception e) {e.printStackTrace();}}}
}

工具类:SSEClient ,用来获取SSE流

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;/*** description** @author luhui* @date 2024/1/25*/
@Slf4j
public class SSEClient {// timeoutpublic static Integer DEFAULT_TIME_OUT = 60 * 1000;/*** 获取SSE输入流*/public static InputStream getSseInputStream(String urlPath, JSONObject param, int timeoutMill) {HttpURLConnection urlConnection = null;try {urlConnection = getHttpURLConnection(urlPath, timeoutMill);putData(urlConnection, param);InputStream inputStream = urlConnection.getInputStream();return new BufferedInputStream(inputStream);} catch (IOException e) {e.printStackTrace();}return null;}/*** 读流数据*/public static void readStream(InputStream is, MsgHandler msgHandler) throws IOException {BufferedReader reader = new BufferedReader(new InputStreamReader(is));try {String line = "";while ((line = reader.readLine()) != null) {if ("".equals(line)) {continue;}msgHandler.handleMsg(line);}} catch (Exception e) {e.printStackTrace();// 目前这里抛出的显式异常来自与用户手动关闭的连接,此时服务端与算法端的连接也捕获并关闭,无需存储} finally {// 服务器端主动关闭时,客户端手动关闭reader.close();is.close();}}private static HttpURLConnection getHttpURLConnection(String urlPath, int timeoutMill) throws IOException {URL url = new URL(urlPath);HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();urlConnection.setDoOutput(true);urlConnection.setDoInput(true);urlConnection.setUseCaches(false);urlConnection.setRequestMethod("POST");urlConnection.setRequestProperty("Connection", "Keep-Alive");urlConnection.setRequestProperty("Charset", "UTF-8");urlConnection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");urlConnection.setRequestProperty("accept", "text/event-stream");// 读过期时间urlConnection.setReadTimeout(timeoutMill);return urlConnection;}public static void putData(HttpURLConnection connection, JSONObject jsonStr) throws IOException {byte[] writebytes = jsonStr.toJSONString().getBytes();connection.setRequestProperty("Content-Length", String.valueOf(writebytes.length));DataOutputStream wr = new DataOutputStream(connection.getOutputStream());wr.write(jsonStr.toJSONString().getBytes());wr.flush();wr.close();}/*** 消息处理接口*/public interface MsgHandler {void handleMsg(String line) throws IOException;}
}

工具类:EvaEmitter,用来封装一些流信息

import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;/*** description EvaEmitter** @author luhui* @date 2024/02/22*/
@Data
public class EvaEmitter extends SseEmitter {public EvaEmitter(Long timeout) {super(timeout);}@ApiModelProperty("版本id")private Long versionId;@ApiModelProperty("用户问题")private String question;@ApiModelProperty("唯一消息key")private String messageKey;@ApiModelProperty("当前用户")private Long currentUid;@ApiModelProperty("当前用户名")private String currentUserName;@ApiModelProperty("项目id")private Long projectId;@ApiModelProperty("ai回答")private String aiAnswer;@ApiModelProperty("拓展信息")private JSONObject expand;@ApiModelProperty("错误信息")private JSONObject error;@ApiModelProperty("提问开始时间")private DateTime startTime;public JSONObject getHistory() {JSONObject history = new JSONObject();history.put("question", question);history.put("answer", aiAnswer);history.put("expand", expand);history.put("error", error);return history;}
}

具体的chat交互方法

	String messageKey = UUID.randomUUID().toString();EvaEmitter emitter = SSEUtils.getEmitter(messageKey);emitter.setProjectId(111);// 初始化相关字段sseService.chatTransfer(messageKey);
    @Async@Overridepublic void chatTransfer(String messageKey) {EvaEmitter emitter = SSEUtils.getEmitter(messageKey);// 正式参数JSONObject params = new JSONObject(true);params.put("versionId", emitter.getVersionId().toString());params.put("userName", emitter.getCurrentUserName());params.put("messageKey", emitter.getMessageKey());params.put("message", emitter.getQuestion());params.put("chatHistory", chatHistoryService.getChatHistory(emitter));params.put("callbackUrl", gateway + "/xxxchat/question/callback");InputStream inputStream = SSEClient.getSseInputStream(aiChatUrl, params, SSEClient.DEFAULT_TIME_OUT);try {StringBuilder answer = new StringBuilder();SSEUtils.pushEvent(messageKey, "messageKey");SSEUtils.pushMsg(messageKey, messageKey);SSEUtils.pushEvent(messageKey, "answer");AtomicReference<Boolean> sdkError = new AtomicReference<>(false);SSEClient.readStream(inputStream, line -> {log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), line);String message = "";if (sdkError.get()) {String errorStr = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();if (StringUtils.isNotBlank(errorStr)) {// 做一些错误处理message = "算法未知错误,请稍后再试";emitter.setError(message);}} else if (line.contains(SSEUtils.MSG_DATA_PREFIX)) {message = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();} else if (line.contains(SSEUtils.MSG_EVENT_PREFIX)) {sdkError.set(true);} else {message = "";}if (StringUtils.isNotBlank(message)) {answer.append(message.replaceAll("\"", ""));SSEUtils.pushMsg(messageKey, message);}});emitter.setAiAnswer(answer.toString());// 保存当前问答消息,自行实现ChatHistoryEntity message = chatHistoryService.saveHistory(messageKey);SSEUtils.pushEvent(messageKey, "endTime");SSEUtils.pushMsg(messageKey, DateUtil.formatDateTime(message.getEndTime()));SSEUtils.pushEvent(messageKey, "expand");chatHistoryService.pushExpand(messageKey);} catch (IllegalStateException | IOException e) {log.error("pushMsg error, web端流已被关闭");} catch (Exception e) {e.printStackTrace();} finally {// 消息发送完或者出现异常的话,存储当前的消息,然后关闭流try {chatHistoryService.saveHistory(messageKey);} catch (Exception e) {e.printStackTrace();} finally {SSEUtils.closeEmitter(messageKey);}}}
	@Override@Retryable(value = Exception.class, maxAttempts = 6, backoff = @Backoff(delay = 500, multiplier = 2))public void pushExpand(String messageKey) throws IOException {// 如果异步的拓展信息,即a2中的response2回调成功的话,会存储到这里Object expandObj = redisService.hGet(RedisConstants.CHAT_AI_RECOMMENDED_QUESTIONS, messageKey);if (expandObj == null) {log.error("未获取到相关拓展信息, 稍后重试");throw new RuntimeException("未获取到相关拓展信息");} else {JSONObject expand = JSONObject.parseObject(expandObj.toString());EvaEmitter emitter = SSEUtils.getEmitter(messageKey);emitter.setExpand(expand);SSEUtils.pushMsg(messageKey, expand.toJSONString());log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), expand);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/313736.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git操作与异常处理

文章目录 常用操作1、代码拉取2、代码提交3、暂存区状态4、提交代码5、推送远程仓库 异常处理【1】报错信息&#xff1a;Cannot pull into a repository with state: MERGING【2】报错信息&#xff1a;You have not concluded your merge (MERGE_HEAD exists)【3】报错信息&…

BGP的基本概念和工作原理

AS的由来 l Autonomous System 自治系统&#xff0c;为了便于管理规模不断扩大的网络&#xff0c;将网络划分为不同的AS l 不同AS通过AS号区分&#xff0c;AS号取值范围1&#xff0d;65535&#xff0c;其中64512&#xff0d;65535是私有AS号 l IANA机构负责AS号的分发 AS之…

NumPy简单学习(需要结合书本)

NumPy简单学习&#xff08;需要结合书本&#xff1a;Python数据分析与应用&#xff09; 文章目录 NumPy简单学习&#xff08;需要结合书本&#xff1a;Python数据分析与应用&#xff09;前言导库&#xff1a; 一、大概内容1.掌握NumPy数组对象ndarray&#xff08;1&#xff09;…

Excel 公式的定义、语法和应用(LOOKUP 函数、HLOOKUP 函数、VLOOKUP 函数;MODE.MULT 函数; ROUND 函数)

一、公式的定义和语法 二、公式的应用 附录 查找Excel公式使用方法的官方工具【强烈推荐&#xff01;&#xff01;&#xff01;】&#xff1a;Excel 函数&#xff08;按字母顺序&#xff09;【微软官网】 excel 函数说明语法LOOKUP 函数在向量或数组中查找值LOOKUP(lookup_va…

Linux-文件系统

1. 物理结构 计算机的存储硬件有很多&#xff0c;这里讲磁盘。 磁盘的物理结构大致分为&#xff1a; 磁盘&#xff08;数据存储&#xff09;磁头音圈马达主轴 所有的数据都存储在磁盘上&#xff0c;磁盘有很多片&#xff0c;每一个面都有对应的磁头来对数据进行更改 磁头是…

轻松处理文件名,告别重复命名烦恼!一键覆盖复制操作,让文件管理更高效!

我们每天都在与大量的文件打交道。从工作文档到生活照片&#xff0c;从学习资料到娱乐视频&#xff0c;每一个文件都承载着我们的记忆和辛勤付出。然而&#xff0c;随着文件数量的不断增加&#xff0c;文件名冲突、重复命名等问题也愈发突出&#xff0c;给我们的文件管理带来了…

Python:解析pyserial串口通讯

简介&#xff1a;串行接口简称串口&#xff0c;也称串行通信接口或串行通讯接口&#xff08;通常指COM接口&#xff09;&#xff0c;是采用串行通信方式的扩展接口。串行接口 &#xff08;Serial Interface&#xff09;是指数据一位一位地顺序传送。其特点是通信线路简单&#…

DC-DC电源芯片规格书上的各种参数详解

1.输出电压精确度 输出电压的精确度,也被称为设定点精度,它描述了输出电压的允许误差。该参数通常是在常温,满载和额定输入电压的条件下测得的,它是这样定义的: 输出电压之所以产生误差,是因为元器件本身存在误差,特别是输出端的分压电阻,它将输出电压降低后比PWM比较…

【白盒测试】单元测试的理论基础及用例设计技术(6种)详解

目录 &#x1f31e;前言 &#x1f3de;️1. 单元测试的理论基础 &#x1f30a;1.1 单元测试是什么 &#x1f30a;1.2 单元测试的好处 &#x1f30a;1.3 单元测试的要求 &#x1f30a;1.4 测试框架-Junit4的介绍 &#x1f30a;1.5 单元测试为什么要mock &#x1f3de;️…

BGP配置和应用案例

策略路由的配置步骤 l 策略路由的配置步骤如下&#xff1a; 创建route-map 通过ACL匹配感兴趣的数据&#xff0c;定义策略动作 在指定接口下通过ip policy 命令应用route-map l 最终实现对通过该接口进入设备的数据进行检查&#xff0c;对匹配的数据执行规定的策略…

pytest参数化数据驱动(数据库/execl/yaml)

常见的数据驱动 数据结构&#xff1a; 列表、字典、json串 文件&#xff1a; txt、csv、excel 数据库&#xff1a; 数据库链接 数据库提取 参数化&#xff1a; pytest.mark.parametrize() pytest.fixture()…

大语言模型在研究领域的应用——信息检索中的大语言模型

信息检索中的大语言模型 大语言模型提升信息检索任务利用大语言模型进行信息检索大语言模型增强的信息检索模型. 检索增强的大语言模型输入优化策略.指令微调策略.预训练策略. 总结应用建议未来方向 大语言模型对于传统信息检索技术与应用范式带来了重要影响。这两者在技术路径…

数字接龙(蓝桥杯)

文章目录 数字接龙【问题描述】解题思路DFS 数字接龙 【问题描述】 小蓝最近迷上了一款名为《数字接龙》的迷宫游戏&#xff0c;游戏在一个大小为N N 的格子棋盘上展开&#xff0c;其中每一个格子处都有着一个 0 . . . K − 1 之间的整数。游戏规则如下&#xff1a; 从左上…

游戏发行困境及OgGame云游戏解决方案简述

随着全球化浪潮的持续推进&#xff0c;中国游戏开发者们不再满足于国内市场的发展&#xff0c;而是开始将目光投向更为广阔的海外市场。这一趋势的崛起背后&#xff0c;是中国企业意识到国际化是其发展的必由之路&#xff0c;也是游戏行业突破国内困境的体现。本文将简要阐述游…

【线性代数 C++】求逆矩阵

对于 n n n阶矩阵 A A A&#xff0c;如果有 n n n阶矩阵 B B B&#xff0c;使 A B B A E ABBAE ABBAE&#xff0c;则说 A A A是可逆的&#xff0c;并把 B B B称为 A A A的逆矩阵. A A A的逆矩阵记作 A − 1 A^{-1} A−1&#xff0c;则 B A − 1 BA^{-1} BA−1.若 ∣ A ∣ ≠…

Recommended Azure Monitors

General This document describes the recommended Azure monitors which can be implemented in Azure cloud application subscriptions. SMT incident priority mapping The priority “Blocker” is mostly used by Developers to prioritize their tasks and its not a…

Hive-Sql复杂面试题

参考链接&#xff1a;hive sql面试题及答案 - 知乎 有哪些好的题目都可以给我哦 我来汇总到一起 1、编写sql实现每个用户截止到每月为止的最大单月访问次数和累计到该月的总访问次数 数据&#xff1a; userid,month,visits A,2015-01,5 A,2015-01,15 B,2015-01,5 A,2015-01,…

MySQL面试——聚簇/非聚簇索引

存储引擎是针对表结构&#xff0c;不是数据库 引擎层&#xff1a;对数据层以何种方式进行组织 update&#xff1a;加索引&#xff1a;行级锁&#xff1b;不加索引&#xff1a;表级锁

Java 网络编程之TCP(三):基于NIO实现服务端,BIO实现客户端

前面的文章&#xff0c;我们讲述了BIO的概念&#xff0c;以及编程模型&#xff0c;由于BIO中服务器端的一些阻塞的点&#xff0c;导致服务端对于每一个客户端连接&#xff0c;都要开辟一个线程来处理&#xff0c;导致资源浪费&#xff0c;效率低。 为此&#xff0c;Linux 内核…

Stable Diffusion WebUI 使用 VAE 增加滤镜效果

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里&#xff0c;订阅后可阅读专栏内所有文章。 大家好&#xff0c;我是水滴~~ 本文主要介绍 VAE 模型&#xff0c;主要内容有&#xff1a;VAE 模型的概念、如果下载 VAE 模型、如何安装 VAE 模型、如…