最近公司业务需要接入gpt问答,踩了不少坑,特此记录一下
流式
在网上找了很多别人gpt接入的案例,但是一直没有得到想要的效果,一直以为是我接错了,后来想通了一件事,虽然都是流式接入,但是还是有本质区别的,网上找到的很多案例是一次性类似于图片传输的流,拿到的流是最终结果了,而我们业务想实现的是分块的流,即从GPT拿到一个字就,返回给前端一个字,而不是拿到gpt的最终结果再将结果返回,缩短用户等待时间。
实现
- 接收到的流有可能并不是完整数据,即可能是一条、N条、N.5条,需要做处理
function ask()
{$messages = [['role' => 'user','content' => '你好']];$json = json_encode(['model' => 'gpt-3.5-turbo','messages' => $messages,'temperature' => 0.6,'stream' => true,]);$headers = array("Content-Type: application/json","Authorization: Bearer " . $this->api_key,);// 原先用GuzzleHttp,但是没有达到想要的效果,不知道问题出在哪里,一怒之下,咱用原生吧$ch = curl_init();curl_setopt($ch, CURLOPT_URL, $this->api_url);curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);curl_setopt($ch, CURLOPT_HEADER, false);curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);curl_setopt($ch, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);curl_setopt($ch, CURLOPT_POSTFIELDS, $json);curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);curl_setopt($ch, CURLOPT_WRITEFUNCTION, [self::class, 'callback']);$response = curl_exec($ch);if (curl_errno($ch)) {file_put_contents('./log/curl.error.log', curl_error($ch) . PHP_EOL . PHP_EOL, FILE_APPEND);}curl_close($ch);
}/*** 流式回调* @param $ch* @param $data* @return int*/public function callback($ch, $data){$this->counter += 1;$result = json_decode($data, TRUE);if (is_array($result)) {$this->end('openai 请求错误:' . json_encode($result));return strlen($data);}/*每次 callback 函数收到的数据并不一定只有一条 data: {"key":"value"} 格式的数据,有可能只有半条,也有可能有多条,还有可能有 N 条半*/// 把上次缓冲区内数据拼接上本次的data$buffer = $this->data_buffer . $data;// 拼接完之后,要把缓冲字符串清空$this->data_buffer = '';// 把所有的 'data: {' 替换为 '{' ,'data: [' 换成 '['$buffer = str_replace('data: {', '{', $buffer);$buffer = str_replace('data: [', '[', $buffer);// 把所有的 '}\n\n{' 替换维 '}[br]{' , '}\n\n[' 替换为 '}[br]['$buffer = str_replace("}\n\n{", '}[br]{', $buffer);$buffer = str_replace("}\n\n[", '}[br][', $buffer);// 用 '[br]' 分割成多行数组$chunks = explode('[br]', $buffer);$chunkCount = count($chunks);foreach ($chunks as $key => $chunk) {$line = trim($chunk);// 数据传输完成if ($line == '[DONE]') {$this->data_buffer = "";$this->counter = 0;$this->end();break;}$chunkData = json_decode($line, true);if (!is_array($chunkData) || !isset($chunkData['choices']) || !isset($chunkData['choices'][0])) {// 已经到本次截取字段末尾了,将末尾数据储存起来,供下一次使用if ($key == ($chunkCount - 1)) {$this->data_buffer = $chunk;break;}//如果是中间行无法json解析,则写入错误日志中continue;}// 输出数据if (isset($chunkData['choices'][0]['delta']) && isset($chunkData['choices'][0]['delta']['content'])) {$this->write($chunkData['choices'][0]['delta']['content']);}}return strlen($data);}private function write($content = NULL, $flush = TRUE){if ($content != NULL) {echo 'data: ' . json_encode(['time' => date('Y-m-d H:i:s'), 'content' => $content], JSON_UNESCAPED_UNICODE) . PHP_EOL . PHP_EOL;}if ($flush) {flush();}}private function end($content = NULL){if (!empty($content)) {$this->write($content, FALSE);}echo 'data: Connection closed' . PHP_EOL . PHP_EOL;flush();
}// 返回给前端
public function ai()
{// 前端返回// 这行代码用于关闭输出缓冲。关闭后,脚本的输出将立即发送到浏览器,而不是等待缓冲区填满或脚本执行完毕。ini_set('output_buffering', 'off');// 这行代码禁用了 zlib 压缩。通常情况下,启用 zlib 压缩可以减小发送到浏览器的数据量,但对于服务器发送事件来说,实时性更重要,因此需要禁用压缩。ini_set('zlib.output_compression', false);// 这行代码使用循环来清空所有当前激活的输出缓冲区。ob_end_flush() 函数会刷新并关闭最内层的输出缓冲区,@ 符号用于抑制可能出现的错误或警告。while (@ob_end_flush()) {}// 跨域问题header('Access-Control-Allow-Credentials: true');header('Access-Control-Allow-Origin: *');header('Access-Control-Allow-Methods: GET, POST, OPTIONS');header('Access-Control-Allow-Headers: Content-Type');// 这行代码设置 HTTP 响应的 Content-Type 为 text/event-stream,这是服务器发送事件(SSE)的 MIME 类型。header('Content-Type: text/event-stream;charset=UTF-8');// 这行代码设置 HTTP 响应的 Cache-Control 为 no-cache,告诉浏览器不要缓存此响应。header('Cache-Control: no-cache');// 这行代码设置 HTTP 响应的 Connection 为 keep-alive,保持长连接,以便服务器可以持续发送事件到客户端。header('Connection: keep-alive');// 这行代码设置 HTTP 响应的自定义头部 X-Accel-Buffering 为 no,用于禁用某些代理或 Web 服务器(如 Nginx)的缓冲。// 这有助于确保服务器发送事件在传输过程中不会受到缓冲影响。header('X-Accel-Buffering: no');$this->ask();
}
另一个大佬GuzzleHttp写法
public function createChatCompletionStream($messages = [])
{if (empty($messages)) {exit();}try {$response = $this->guzzle->request("POST", '/v1/chat/completions', ['json' => ['model' => 'gpt-3.5-turbo','messages' => $messages,'stream' => true,],'stream' => true,]);$body = $response->getBody();$buffer = '';while (!$body->eof()) {$buffer .= $body->read(128);// 这里使用 while 是因为读取 n 个字节有可能同时读出 n 条 EventSource 消息while (($pos = strpos($buffer, "\n\n")) !== false) {$msg = substr($buffer, 0, $pos); // 一条 event 消息$buffer = substr($buffer, $pos + 2); // 去除已被解析的部分if (substr($msg, 0, 6) === 'data: ') { // 只解析了 data ,实际的 EventSource 还有 event 、id 、retry$obj = json_decode(substr($msg, 6));if (isset($obj->choices[0]->delta->content)) {echo $obj->choices[0]->delta->content;ob_flush();flush();}}}}exit();} catch (GuzzleException $e) {Log::error($e->getMessage());return response('请求失败,请稍后重试', 500);}
}
- 实现原理为SSE,要实现业务效果,后端需要
多次
返回流,前端需要用Eventsource
接收流,根据接收的流做出处理 - Eventsource 只支持utf8编码文本
- 浏览器对保持连接的限制(有人说chrome只能访问6个),超过了会一直阻塞在客户端
- 部分浏览器不太支持 查看
前端
eventsource
createEventSource() {this.resultText = ''const url = ``const eventSource = new EventSource(url)eventSource.addEventListener('open', (event) => {console.log('连接已建立', JSON.stringify(event))})eventSource.addEventListener('message', (event) => {console.log('接收数据:', event.data)if (event.data.indexOf('closed') !== -1) {eventSource.close()} else {var result = JSON.parse(event.data)if (result.time && result.content) {this.resultText += result.content}}})eventSource.addEventListener('error', (event) => {console.error('发生错误:', JSON.stringify(event))eventSource.close()})},
eventsource 仅支持get请求,post请求不能用原生的eventsource
async fetchAiResponse(message) {try {const response = await fetch('http://127.0.0.1/api/test', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({messages: [{ role: 'user', content: message }]})})console.log(response, 'response')if (!response.ok) {throw new Error(response.statusText)}const reader = response.body.getReader()const decoder = new TextDecoder('utf-8')const readChunk = async () => {return reader.read().then(({ value, done }) => {if (!done) {let partialResponse = decoder.decode(value, { stream: true })if (partialResponse.indexOf('closed') !== -1) return readChunk()partialResponse = partialResponse.replaceAll('data: {', '{')let chunks = partialResponse.split(/\n{2}/g)chunks = chunks.filter((item) => {return item.trim()})for (let i = 0; i < chunks.length; i++) {const chunk = chunks[i]// 第一个数据可能会为nullif (chunk) {let payloads// 可能会存在一条数据中多个对象if (chunk.indexOf('}{') !== -1) {const _arr = chunk.split('}{')payloads = _arr.map((item, i) => {let _strif (i === 0) {_str = item + '}'} else if (i === _arr.length - 1) {_str = '{' + item} else {_str = `{${item}}`}return JSON.parse(_str)})} else {payloads = [JSON.parse(chunk)]}if (payloads) {for (let k = 0; k < payloads.length; k++) {const _item = payloads[k]if (_item.content) {this.resultText += _item.contentbreak}}}}}return readChunk()} else {console.log('结束了')}})}await readChunk()} catch (error) {console.error('Error fetching AI response:', error)console.log('assistant', 'Error: Failed to fetch AI response.')}},
eventsource 无法设置header头,可以改用event-source-polyfill
参考链接
阮一峰
EventSource
纯 PHP 实现流式调用 OpenAI gpt