一、背景
在初次使用 ChatGPT 时,我就被打字机的视觉效果吸引。总是感觉似曾相识,因为经常在一些科幻电影中看到,高级文明回传的信息在通讯设备的屏幕上以打字机效果逐步出现,在紧张的氛围下,输出人类可读的内容,拉动着观众的神经,一步步将故事情节拉向高潮。
在很早之前我就了解过 Server-Sent Events 这门服务端推送技术,当时看过很多博客介绍其原理和使用场景,最后也没有留下深刻的印象。这一次 ChatGPT 的使用感受带给我一些触动,也激发了对技术的思考,究竟什么样的技术是一门好的技术 ”需要一个杀手级的应用,现实应用会促进技术发展“,技术不是冰冷无情的,贴近生活挖掘其实用价值,一样可以表现出感性的艺术效果。
二、SSE 工作原理
Server-Sent Events(SSE)是一种允许服务器单向推送信息到客户端的技术,与传统的请求/响应模式相比,这种模式更加适合处理实时数据。以下是一些常见的 Server-Sent Events 应用场景:
- ChatGPT 大型语言模型处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的 HTTP 请求要慢的多。对于这种单项对话场景,ChagtGPT 将先计算出的数据 “推送” 给用户,边计算边返回,提升用户体验。
- 实时通知:SSE 非常适合于实时通知的场景,例如电子邮件或社交媒体通知。一旦有新消息,服务器可以立即将其推送给客户端,而无需客户端定时轮询检查新消息。
- 实时数据流:在金融服务、股票市场、体育比赛等场景中,SSE 可以用于实时推送数据流,如股票价格等。
2.1 SSE 工作原理
SSE 的基本工作原理是客户端首先向服务器发送一个 HTTP 请求,然后服务器保持这个连接打开,并周期性地通过这个连接向客户端发送数据。每个数据块都是一个独立的消息,每个消息都以一个空行结束。
使用 SSE 的主要步骤如下:
- 客户端创建一个新的EventSource对象,参数是服务器的URL。
let source = new EventSource("http://xxx/chat/completions");
- 服务器返回一个 HTTP 响应,Content-Type 为 "text/event-stream",并保持连接打开。
HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive
Cache-Control: no-cache
- 服务器通过打开的连接向客户端发送消息。每个消息都包含一些数据,数据可以是任何格式的文本,比如 JSON。消息以两个连续的换行符结束。
data: This is a message\n\n
- 客户端监听 "message" 事件,当收到新的消息时,这个事件会被触发。
source.onmessage = function(event) {console.log(event.data);
};
注意,由于 SSE 是基于 HTTP 的,因此它受到同源策略的限制。如果你需要进行跨域 SSE,你需要在服务器端设置适当的 CORS 头部信息。另外,SSE 只支持文本数据,不支持二进制数据。如果你需要发送二进制数据,你可能需要考虑使用 WebSockets。
2.2 Fetch API 模拟 SSE
Fetch API 是一种通用的 HTTP 请求和响应模型,它可以用于发送和接收任何类型的 HTTP 请求,支持文本和二进制数据。由于其对流(Stream)的支持,可以模拟 Server-Sent Events (SSE),需要手动处理重连和流式数据。
在某些情况下,你可能会选择使用 Fetch API 模拟 SSE,而不是直接使用 SSE:
- 发送二进制数据:如果你需要发送或接收二进制数据,你必须使用 Fetch API 或其他技术,因为 SSE 只支持文本数据。
- 双向通信:如果你需要进行双向通信,你必须使用 Fetch API 或其他技术,因为 SSE 只支持单向通信。
- 更大的灵活性:Fetch API 提供了更大的灵活性,例如,你可以控制请求头、请求方法、响应处理等。
const url = 'https://your-server.com/events';fetch(url).then(response => {const reader = response.body.getReader();const decoder = new TextDecoder();// done 为数据是否接收完成 boolean 值// value 为接收到的数据, Uint8Array 格式return reader.read().then(function processMessage({ done, value }) {if (done) {return;}console.log(decoder.decode(value));return reader.read().then(processMessage);});});
在这个示例中,我们使用 fetch() 函数发起 HTTP 请求。然后,使用 response.body.getReader() 获取一个可读流的 reader,用来读取数据。还创建了一个 TextDecoder 对象,用来将二进制数据解码为文本,然后打印出来。然后,再次调用 reader.read() 方法,等待下一批数据。
这样,就可以使用 Fetch API 来接收服务器推送的实时更新,就像使用 SSE 一样,ChatGPT 采用的就是这种实现。
三、SSE 服务端
Server-Sent Events (SSE) 是一种服务器推送技术,允许服务器向客户端发送实时更新。在服务器端,我们需要创建一个 endpoint,发送正确的 HTTP 头部并持续推送数据。
func main() {http.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {flusher, ok := w.(http.Flusher)if !ok {http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)return}// 事件流媒体 (MIME 类型)w.Header().Set("Content-Type", "text/event-stream")// 阻止缓存w.Header().Set("Cache-Control", "no-cache")// 保持长连接w.Header().Set("Connection", "keep-alive")// 跨域支持w.Header().Set("Access-Control-Allow-Origin", "*")phrase := []string{"dolor ", "sit amet", ", consectetur", " adipiscing elit. ", "Ut consequat", " diam at ", "justo efficitur", " mattis."}for _, delta := range phrase {// 数据内容用 data 表示, 如果数据很长, 可以分成多行用 \n 结尾,fmt.Fprintf(w, "data: %s\n", delta)flusher.Flush()time.Sleep(200 * time.Millisecond)}// 最后一行使用 \n\n 结尾fmt.Fprintf(w, "data: %s\n\n", "[DONE]")})if err := http.ListenAndServe("127.0.0.1:8080", nil); err != nil {panic(err)}
}
在 Go 语言中,http.Flusher 是一个接口,它允许 HTTP 响应数据在写入后立即发送到客户端,而不是等待所有响应数据都写入后再一次性发送。这对于长连接和服务器推送的场景非常有用。
// Flush 将用户层的数据写入到 TCP 缓冲区,内核会尽快将 TCP 缓存区数据发送出去
type Flusher interface {Flush()
}
扩展:每个 TCP socket 连接在内核中都有一个发送缓存区和接收缓冲区
发送缓冲区用于暂存应用程序写入的数据,直到数据被发送出去并得到对方的确认。接收缓冲区用于暂存收到的数据,直到应用程序读取这些数据。
当应用程序调用发送数据的系统调用(如 write 或 send)时,数据会被复制到发送缓冲区。然后,内核会尽快将这些数据发送出去。但具体发送的时机取决于许多因素,包括但不限于以下几点:
- Nagle 算法:为了减少小包在网络上的传输,Nagle 算法规定,除非上一个发送的数据包已经得到确认,否则不能发送新的数据包。所以,如果发送缓冲区中的数据量较小,并且上一个数据包还未得到确认,数据可能会在缓冲区中等待。
- TCP 拥塞控制:TCP 协议通过拥塞控制算法,动态地调整发送速率,以避免网络拥塞。如果网络拥塞,数据可能会在发送缓冲区中等待,直到网络状况改善。
- 接收方的接收窗口:接收方通过 TCP 的滑动窗口机制,告诉发送方它的接收缓冲区还有多少空间。如果接收方的接收窗口满了,数据必须在发送缓冲区等待,直到接收方的接收窗口有空间。
当数据成功发送并得到确认后,内核会从发送缓冲区中删除这些数据,释放缓冲区空间。
四、实现一个打字机效果
上面我们讨论下 SSE 的工作原理,也知道由于 Web API EventSource 的局限性,ChatGPT 采用了 Fetch API 来手动处理和解析 SSE 服务端端点接收的数据流。那么接下来通过一个简单的打字机案例,加深对所学内容的理解。
这里借鉴了 《ChatGPT 打字机消息回复实现原理》 文章中的前端代码,在其基础上增加了消息处理逻辑,用于适配上面的 SSE 服务端。
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Chat Completion</title>
</head>
<body><button onclick="connectFetch()">建立 fetchSSE 连接</button><button onclick="closeSSE()">断开 fetchSSE 连接</button><br/><br/><div id="text"></div><script>const divTyping = document.getElementById('text')let ctrlconst connectFetch = () => {ctrl = new AbortController()fetchEventSource('http://127.0.0.1:8080/v1/chat/completions', {method: 'POST',body: JSON.stringify({prompt: 'Lorem ipsum',max_tokens: 20,stream: true,}),signal: ctrl.signal,onopen: () => {console.log('Connection successful.')},onclose: () => {console.log('Connection closed.')},onmessage: (delta) => {let prefix = 'data: 'if (!delta.startsWith(prefix)) {return}delta = delta.slice(prefix.length)delta = delta.replace(/\n$/, '')if (delta === '[DONE]\n') {return}divTyping.innerText += delta}})}const closeSSE = () => {if (ctrl) {ctrl.abort()ctrl = null}}const fetchEventSource = (url, options) => {fetch(url, options).then(resp => {if (resp.status === 200) {options.onopen && options.onopen()return resp.body}}).then(rb => {const reader = rb.getReader()const push = () => {// done 为数据是否接收完成 boolean 值// value 为接收到的数据, Uint8Array 格式return reader.read().then(({done, value}) => {if (done) {options.onclose && options.onclose()return}options.onmessage && options.onmessage(new TextDecoder().decode(value))return push()});}// 开始读取流信息return push()}).catch((e) => {options.error && options.error(e)})}</script>
</body>
</html>
五、参考资料
- MDN - EventSource EventSource - Web APIs | MDN
- MDN - Server-sent events Server-sent events - Web APIs | MDN
- Server-Sent Events 教程 Server-Sent Events 教程 - 阮一峰的网络日志
- Go 实现 SSE 服务端 Go实现SSE的方式和需要注意的事项 | Laravel China 社区
- ChatGPT 打字机消息回复实现原理 ChatGPT 打字机消息回复实现原理 - 掘金
- Create chat completion https://platform.openai.com/docs/api-reference/chat/create
- ChatGPT Web 开源项目 https://github.com/Chanzhaoyu/chatgpt-web
- Go clients for OpenAI API https://github.com/sashabaranov/go-openai