通过okhttp调用SSE流式接口,并将消息返回给客户端

通过一个完整的java示例来演示如何通过okhttp来调用远程的sse流式接口
背景:我们有一个智能AI的聊天界面,需要调用三方厂商的大模型chat接口,返回答案(因为AI去理解并检索你的问题的时候这个是比较耗时的,这个时候客户端需要同步的在等待最终结果),所以我们的方案是通过流的方式把结果陆续的返回给客户端,这样能极大的提高用户的体验

1.引入相关依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.2.0</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId><version>4.2.0</version></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency>

2. controller

package com.demo.controller;
import com.alibaba.fastjson.JSON;
import com.demo.listener.SSEListener;
import com.demo.params.req.ChatGlmDto;
import com.demo.utils.ApiTokenUtil;
import com.demo.utils.ExecuteSSEUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;@RestController
@Slf4j
public class APITestController {private static final String API_KEY = "xxxx";private static final String URL = "xxx";@PostMapping(value = "/sse-invoke", produces = "text/event-stream;charset=UTF-8")public void sse(@RequestBody ChatGlmDto chatGlmDto, HttpServletResponse rp) {try {String token = ApiTokenUtil.generateClientToken(API_KEY);SSEListener sseListener = new SSEListener(chatGlmDto, rp);ExecuteSSEUtil.executeSSE(URL, token, sseListener, JSON.toJSONString(chatGlmDto));} catch (Exception e) {log.error("请求SSE错误处理", e);}}
}

3. 监听器

监听器里的事件可以自己定义,然后自己去实现自己相关的业务逻辑,onEvent主要用来接收消息

package com.demo.listener;
import com.alibaba.fastjson.JSON;
import com.demo.params.req.ChatGlmDto;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;import javax.servlet.http.HttpServletResponse;
import java.util.concurrent.CountDownLatch;@Slf4j
@Data
public class SSEListener extends EventSourceListener {private CountDownLatch countDownLatch = new CountDownLatch(1);private ChatGlmDto chatGlmDto;private HttpServletResponse rp;private StringBuffer output = new StringBuffer();public SSEListener(ChatGlmDto chatGlmDto, HttpServletResponse response) {this.chatGlmDto = chatGlmDto;this.rp = response;}/*** {@inheritDoc}* 建立sse连接*/@Overridepublic void onOpen(final EventSource eventSource, final Responseresponse) {if (rp != null) {rp.setContentType("text/event-stream");rp.setCharacterEncoding("UTF-8");rp.setStatus(200);log.info("建立sse连接..." + JSON.toJSONString(chatGlmDto));} else {log.info("客户端非sse推送" + JSON.toJSONString(chatGlmDto));}}/*** 事件** @param eventSource* @param id* @param type* @param data*/@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {try {output.append(data);if ("finish".equals(type)) {log.info("请求结束{} {}", chatGlmDto.getMessageId(), output.toString());}if ("error".equals(type)) {log.info("{}: {}source {}", chatGlmDto.getMessageId(), data, JSON.toJSONString(chatGlmDto));}if (rp != null) {if ("\n".equals(data)) {rp.getWriter().write("event:" + type + "\n");rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");rp.getWriter().write("data:\n\n");rp.getWriter().flush();} else {String[] dataArr = data.split("\\n");for (int i = 0; i < dataArr.length; i++) {if (i == 0) {rp.getWriter().write("event:" + type + "\n");rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");}if (i == dataArr.length - 1) {rp.getWriter().write("data:" + dataArr[i] + "\n\n");rp.getWriter().flush();} else {rp.getWriter().write("data:" + dataArr[i] + "\n");rp.getWriter().flush();}}}}} catch (Exception e) {log.error("消息错误[" + JSON.toJSONString(chatGlmDto) + "]", e);countDownLatch.countDown();throw new RuntimeException(e);}}/*** {@inheritDoc}*/@Overridepublic void onClosed(final EventSource eventSource) {log.info("sse连接关闭:{}", chatGlmDto.getMessageId());log.info("结果输出:{}" + output.toString());countDownLatch.countDown();}/*** {@inheritDoc}*/@Overridepublic void onFailure(final EventSource eventSource, final Throwable t, final Response response) {log.error("使用事件源时出现异常... [响应:{}]...", chatGlmDto.getMessageId());countDownLatch.countDown();}public CountDownLatch getCountDownLatch() {return this.countDownLatch;}
}

4. 相关工具类

获取token ApiTokenUtil类,这个根据自己的业务需求看是否需要,我这里为了程序能跑起来,就保留了

package com.demo.utils;import com.alibaba.fastjson.JSON;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;public class ApiTokenUtil {public static String generateClientToken(String apikey) {String[] apiKeyParts = apikey.split("\\.");String api_key = apiKeyParts[0];String secret = apiKeyParts[1];Map<String, Object> header = new HashMap<>();header.put("alg", SignatureAlgorithm.HS256);header.put("sign_type", "SIGN");Map<String, Object> payload = new HashMap<>();payload.put("api_key", api_key);payload.put("exp", System.currentTimeMillis() + 5 * 600 * 1000);payload.put("timestamp", System.currentTimeMillis());String token = null;try {token = Jwts.builder().setHeader(header).setPayload(JSON.toJSONString(payload)).signWith(SignatureAlgorithm.HS256, secret.getBytes(StandardCharsets.UTF_8)).compact();} catch (Exception e) {System.out.println();}return token;}
}

ExecuteSSEUtil 类

package com.demo.utils;import com.demo.listener.SSEListener;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;@Slf4j
public class ExecuteSSEUtil {public static void executeSSE(String url, String authToken, SSEListener eventSourceListener, String chatGlm) throws Exception {RequestBody formBody = RequestBody.create(chatGlm, MediaType.parse("application/json; charset=utf-8"));Request.Builder requestBuilder = new Request.Builder();requestBuilder.addHeader("Authorization", authToken);Request request = requestBuilder.url(url).post(formBody).build();EventSource.Factory factory = EventSources.createFactory(OkHttpUtil.getInstance());//创建事件factory.newEventSource(request, eventSourceListener);eventSourceListener.getCountDownLatch().await();}}

OkHttpUtil 类

package com.demo.utils;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import java.net.Proxy;
import java.util.concurrent.TimeUnit;public class OkHttpUtil {private static OkHttpClient okHttpClient;public static ConnectionPool connectionPool = new ConnectionPool(10, 5, TimeUnit.MINUTES);public static OkHttpClient getInstance() {if (okHttpClient == null) { //加同步安全synchronized (OkHttpClient.class) {if (okHttpClient == null) { //okhttp可以缓存数据....指定缓存路径okHttpClient = new OkHttpClient.Builder()//构建器.proxy(Proxy.NO_PROXY) //来屏蔽系统代理.connectionPool(connectionPool).connectTimeout(600, TimeUnit.SECONDS)//连接超时.writeTimeout(600, TimeUnit.SECONDS)//写入超时.readTimeout(600, TimeUnit.SECONDS)//读取超时.build();okHttpClient.dispatcher().setMaxRequestsPerHost(200);okHttpClient.dispatcher().setMaxRequests(200);}}}return okHttpClient;}
}

ChatGlmDto 请求实体类

package com.demo.params.req;import lombok.Data;/*** Created by WeiRan on  2023.03.20 19:19*/
@Data
public class ChatGlmDto {private String messageId;private Object prompt;private String requestTaskNo;private boolean incremental = true;private boolean notSensitive = true;
}

5. 接口调用调试

我这里就直接使用curl命令来调用了

curl 'http://localhost:8080/sse-invoke' --data '{"prompt":[{"role":"user","content":"泰山有多高?"}]}' -H 'Content-Type: application/json'

返回结果:
在这里插入图片描述

分割线---------------------------------------------------------------------------------------------------------------------------------

创作不易,三连支持一下吧 👍

最后的最后送大家一句话

白驹过隙,沧海桑田

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/162309.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超实用的Web兼容性测试经验总结,建议Mark

在日常工作中&#xff0c;我们经常碰到网页不兼容的问题。我们之所以要做兼容性测试&#xff0c;目的在于保证待测试项目在不同的操作系统平台上正常运行。 主要包括待测试项目能在同一操作系统平台的不同版本上正常运行&#xff1b;待测试项目能与相关的其他软件或系统的“和…

数据结构-----红黑树的删除操作

目录 前言 一、左旋和右旋 左旋&#xff08;Left Rotation&#xff09; 右旋&#xff08;Right Rotation&#xff09; 二、红黑树的查找 三、红黑树的删除 1.删除的是叶子节点 1.1删除节点颜色为红色 1.2删除节点颜色为黑色 1.2-1 要删除节点D为黑色&#xff0c;兄弟节…

创新与重塑,佛塑科技打造集团型 CRM 建设标杆

“十四五”时期是我国全面建成小康社会、实现第一个百年奋斗目标之后&#xff0c;乘势而上开启全面建设社会主义现代化国家新征程、向第二个百年奋斗目标进军的第一个五年。 在政府有序推进“十四五”规划的进程中&#xff0c;佛山佛塑科技集团股份有限公司&#xff08;证券简…

uni-app--》基于小程序开发的电商平台项目实战(七)完结篇

&#x1f3cd;️作者简介&#xff1a;大家好&#xff0c;我是亦世凡华、渴望知识储备自己的一名在校大学生 &#x1f6f5;个人主页&#xff1a;亦世凡华、 &#x1f6fa;系列专栏&#xff1a;uni-app &#x1f6b2;座右铭&#xff1a;人生亦可燃烧&#xff0c;亦可腐败&#xf…

LeetCode【17】电话号码的字母组合

题目&#xff1a; 思路&#xff1a; 参考&#xff1a;https://blog.csdn.net/weixin_46429290/article/details/121888154 和上一个题《子集》的思路一样&#xff0c;先画出树结构&#xff0c;看树的深度&#xff08;遍历层级&#xff09;&#xff0c;树的宽度&#xff08;横向…

【监督学习】基于合取子句进化算法(CCEA)和析取范式进化算法(DNFEA)解决分类问题(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

AI机器视觉多场景应用迸发检测活力,引领食品及包装行业新发展

随着食品安全意识的广泛传播&#xff0c;人们对食品质量和安全的要求越来越高&#xff0c;众多食品包装厂商加速产线数智化转型&#xff0c;迫切需要高效、准确且智能化的检测技术。 在现代食品及包装行业的自动化生产中&#xff0c;涉及到各种各样的识别、检测、测量等环节&a…

用友GRP-U8 SQL注入漏洞复现

0x01 产品简介 用友GRP-U8R10行政事业财务管理软件是用友公司专注于国家电子政务事业&#xff0c;基于云计算技术所推出的新一代产品&#xff0c;是我国行政事业财务领域最专业的政府财务管理软件。 0x02 漏洞概述 用友GRP-U8的bx_historyDataCheck jsp、slbmbygr.jsp等接口存…

C++基础——内存分区模型

1 概述 C程序在执行是&#xff0c;将内存大致分为4个区域&#xff1a; 代码区&#xff1a;用于存放二进制代码&#xff0c;由操作系统进行管理全局区&#xff1a;存放全局变量和静态变量及常量栈区&#xff1a;由编译器自动分配释放&#xff0c;存放函数的参数、局部变量等堆…

React中的key有什么作用

一、是什么 首先&#xff0c;先给出react组件中进行列表渲染的一个示例&#xff1a; const data [{ id: 0, name: abc },{ id: 1, name: def },{ id: 2, name: ghi },{ id: 3, name: jkl } ];const ListItem (props) > {return <li>{props.name}</li>; };co…

Python中的循环语句Cycle学习

二、循环语句 1、什么是循环语句 一般编程语言都有循环语句,为什么呢? 那就问一下自己,我们弄程序是为了干什么? 那肯定是为了方便我们工作,优化我们的工作效率啊。 而计算机和人类不同,计算机不怕苦也不怕累,也不需要休息,可以一直做。 你要知道,计算机最擅长就…

FPR3346501R1012 数据科学与人工智能:主要区别

FPR3346501R1012 数据科学与人工智能:主要区别 当谈到数据科学和人工智能(人工智能)&#xff0c;你会经常发现两个技能路径之间有很多交集。人工智能有许多子集&#xff0c;比如机器学习和深度学习&#xff0c;以及数据科学利用这些技术来解释和分析数据&#xff0c;发现模式…

云上攻防-云原生篇KubernetesK8s安全APIKubelet未授权访问容器执行

文章目录 K8S集群架构解释K8S集群攻击点-重点API Server未授权访问&kubelet未授权访问复现k8s集群环境搭建1、攻击8080端口&#xff1a;API Server未授权访问2、攻击6443端口&#xff1a;API Server未授权访问3、攻击10250端口&#xff1a;kubelet未授权访问 K8S集群架构解…

让GPT回复图片的咒语

咒语如下&#xff1a; 帮我画一张图关于XXXXX,用3/8Markdown 写&#xff0c;不要有反斜钱,不要用代码块。使用Unsplash APl(https://source.unsplash.com/1280x720/?<PUT YOUR QUERY HERE >) Over! ​​​​​​​

Android---DVM以及ART对JVM进行优化

Dalvik Dalvik 是 Google 公司自己设计用于 Android 平台的 Java 虚拟机&#xff0c;Android 工程师编写的 Java 或者 Kotlin 代码最终都是在这台虚拟机中被执行的。在 Android 5.0 之前叫作 DVM&#xff0c;5.0 之后改为 ART&#xff08;Android Runtime&#xff09;。在整个…

GPIO基本原理

名词解释 高低电平&#xff1a;GPIO引脚电平范围&#xff1a;0V~3.3V&#xff08;部分引脚可容忍5V&#xff09;数据0就是0V&#xff0c;代表低电平&#xff1b;数据1就是3.3V&#xff0c;代表高电平&#xff1b; STM32是32位的单片机&#xff0c;所以内部寄存器也都是32位的…

国产单片机PY32F002B,32位ARM架构Cortex -M0+内核,性价比高

PY32F002B是普冉推出的新一代入门级32位MCU&#xff0c;内核使用 ARM Cortex M0&#xff0c;主频最高支持到24M&#xff0c;24K FLASH 3K SRAM存储&#xff0c;并支持1.7V~5.5V宽工作电压&#xff0c;-40 ~ 85 C工作温度。拥有1 x 12 位ADC、I2C、SPI、USART、TIM、LPTIM、IWDT…

麻了,别再为难软件测试员了

前言 有不少技术友在测试群里讨论&#xff0c;近期的面试越来越难了&#xff0c;要背的八股文越来越多了,考察得越来越细&#xff0c;越来越底层&#xff0c;明摆着就是想让我们徒手造航母嘛&#xff01;实在是太为难我们这些测试工程师了。 这不&#xff0c;为了帮大家节约时…

C# OpenCvSharp 利用Lab空间把春天的场景改为秋天

效果 项目 代码 using OpenCvSharp; using System; using System.Diagnostics; using System.Drawing; using System.Drawing.Imaging; using System.Windows.Forms;namespace OpenCvSharp_Demo {public partial class Form1 : Form{public Form1(){InitializeComponent();}st…

LabVIEW生产者消费者架构

LabVIEW生产者消费者架构 生产者/消费者模式可以轻松地同时处理多个进程&#xff0c;同时还能以不同速率迭代。 缓冲通信 当多个进程以不同速度运行时&#xff0c;就适合采用进程间缓冲通信。有了足够大的缓冲区后&#xff0c;生产者循环可以以快于消费者循环的速度运行&…