我们都知道chatgpt是生成式的,因此它返回给客户端的消息也是一段一段的,所以普通的HTTP协议无法满足,当然websocket是能满足的,但是这个是双向的通信,其实 SSE(Server-Sent Events) 正好满足这个需求。
SSE相比websocket的优点:
- SSE是使用http协议,而websocket是一种单独的协议。
- SSE是单向传输,只能服务端向客户端推送,websocket是双向。
- SSE支持断点续传,websocket需要自己实现。
- SSE支持自动重连、轻量级。
- SSE支持发送自定义类型消息。
- SSE的响应头Content-Typ:text/event-stream
要实现SSE,服务端需要设置以下Headers
"Content-Type":"text/event-stream"
"Cache-Control":"no-cache"
"Connection":"keep-alive"
"Access-Control-Allow-Origin": "*" // 跨域问题
一、前端代码
我看网络上有两种实现方式:fetch 和 EventSource
fetch方式
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Event Stream Demo</title><style type="text/css">body {font-family: Arial, sans-serif;text-align: center;}#event-stream-data {margin: 50px auto;max-width: 600px;border: 1px solid #ccc;padding: 10px;}</style>
</head><body><div id="event-stream-data"></div>
</body><script>const eventStreamDataElement = document.getElementById('event-stream-data');function handleEventStreamMessage(event) {console.log(event)const eventText = event.data;displayEvent(eventText);}function displayEvent(eventText) {const eventElement = document.createElement('p');eventElement.textContent = eventText;eventStreamDataElement.appendChild(eventElement);}function connectToEventStream() {fetch('http://127.0.0.1:8080/stream', {method: 'POST',headers: {'Content-Type': 'application/x-www-form-urlencoded'},body: {data: 'example'}}).then(response => {const reader = response.body.getReader();const decoder = new TextDecoder();return reader.read().then(function processResult(result) {// console.log(result)if (result.done) {return;}const chunk = decoder.decode(result.value, {stream: true});handleEventStreamMessage({data: chunk});return reader.read().then(processResult);});}).catch(error => {console.error('Error occurred while fetching event stream:', error);});}connectToEventStream();
</script></html>
EventSource方式
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Event Stream Demo</title><style type="text/css">body {font-family: Arial, sans-serif;text-align: center;}#event-stream-data {margin: 50px auto;max-width: 600px;border: 1px solid #ccc;padding: 10px;}</style>
</head><body><div id="event-stream-data"></div>
</body><script type="text/javascript">const eventStreamDataElement = document.getElementById('event-stream-data');function handleEventStreamMessage(event) {console.log(event)const eventText = event.data;displayEvent(eventText);}function displayEvent(eventText) {const eventElement = document.createElement('p');eventElement.textContent = eventText;eventStreamDataElement.appendChild(eventElement);}// 向后端服务器发起sse请求const es = new EventSource("http://127.0.0.1:8080/stream");// Event 和 Message 分开处理,需要显示的监听事件,否则不会处理事件es.onmessage = function (e) {handleEventStreamMessage(e);}// 监听事件流es.addEventListener("start", (e) => {handleEventStreamMessage(e);});es.addEventListener("end", (e) => {handleEventStreamMessage(e);// 一定要关闭连接,否则会一直轮训es.close()});es.onerror = function (e) {// readyState说明// 0:浏览器与服务端尚未建立连接或连接已被关闭// 1:浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据// 2:浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接console.log("readyState = " + e.currentTarget.readyState);}
</script></html>
二、GIN 中自带的 SSE
package mainimport ("time""github.com/gin-contrib/sse""github.com/gin-gonic/gin"
)func main() {engin := gin.Default()engin.Any("/stream", func(c *gin.Context) {c.Header("Access-Control-Allow-Origin", "*")c.Header("Access-Control-Allow-Headers", "*")// c.SSEvent("start", "start...")sse.Event{Id: "1",Event: "start",Data: "start...",}.Render(c.Writer)c.Writer.Flush()time.Sleep(1 * time.Second)for i := 0; i < 10; i++ {sse.Event{Id: "1",Data: "SSE data",}.Render(c.Writer)c.Writer.Flush() // 需要手动刷新输出缓冲区time.Sleep(1 * time.Second)}// c.SSEvent("end", "end...")sse.Event{Id: "1",Event: "end",Data: "end...",}.Render(c.Writer)})engin.Run(":8080")
}
说明
// sse.Event
type Event struct {Event stringId stringRetry uintData interface{}
}
sse.Event
结构在渲染的时候会自动加上前缀和后面的回车,比如id:xxx\nevent:xxx\nretry:xxx\ndata:xxx\n\n
,因此在设置内容的时候不需要关心format。
并且会自动填充两个响应头
Content-Type: text/event-stream
Cache-Control: no-cache
如果服务器端提供了event参数,那么客户端就需要使用addEventListener 显式监听这个事件,才会正常获取消息,否则事件不会触发。如果服务器端没有提供event 参数,只有id、data等,可以使用onmessage回调监听消息。
id 的意思是 lastEventId,用途不明。
完整的数据结构是:id:xxx\nevent:xxx\nretry:xxx\ndata:xxx\n\n
一般只需要data字段即可,后面接一个json串。
前端使用EventSource对象发起请求
使用 fetch 的方式发起请求,需要先打开调试并打开接口的响应预览tab,否则是看不到响应结果的。
使用EventSource对象发起请求与使用fetch方式的请求两者的区别在于,在处理响应结果的时候,前者是按照SSE协议来处理消息中的\n
和\n\n
以及那几个字段;而后者则不会。下面是打印结果
EventSource示例
fetch示例
上面的实现仅仅是为了满足ChatGPT这种对话形式,或者说仅仅实现了一个长连接下的流式传输,即使不适用SSE也能实现。
如果想要实现真正的消息推送还需要对客户端连接进行管理,在这一块,SSE和websocket要做的事情差不多,这里就不展开了。
三、使用golang请求chatgpt
大部分的时候,在客户端和chatgpt之间还需要有一个代理层,即它代替用户向chatgpt发起请求,接收数据流,然后将数据流转发给用户。前面已经实现了SSE,所以,这里需要处理的是golang发起stream request。
package mainimport ("bufio""bytes""errors""fmt""io""log""net/http""strings""time"
)func main() {client := &http.Client{Timeout: time.Second * 20}req, _ := http.NewRequest("POST", "http://127.0.0.1:8080/stream", strings.NewReader(""))resp, err := client.Do(req)if err != nil {log.Fatal(err)}reader := bufio.NewReader(resp.Body)defer resp.Body.Close()for {rawLine, err := reader.ReadBytes('\n')if errors.Is(err, io.EOF) {return} else if err != nil {fmt.Println(err)return}fmt.Println(string(bytes.TrimRight(rawLine, "\n")))}
}
id:1
event:start
data:start...id:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
data:SSE dataid:1
event:end
data:end...
golang对接openai:https://github.com/sashabaranov/go-openai