[微信小程序]对接sse接口
在uni开发中,在微信小程序中实现sse接口请求
- 相关连接
微信小程序对接SSE接口记录
uni中实现sse代码
-
注意的坑点
- 接收的并不是字符串,而是
ArrayBuffer
- 模拟流推送并不是流推送,会有
data:
字符扰乱 - 推送并不是完全按照一条条推送,如果过快,可能一次收到多条,如果信息过大,可能收到半条,同理,收到的可能是第一条的后半部分,两条正常数据,第四条的前半部分
- 接收的并不是字符串,而是
-
格式解决方式
ArrayBuffer
转String
-
推送错乱
- 解析前,判断是否有未解析的部分,拼接后再解析
- 解析后,最后无法解析的部分,保留下来到下次推送再解析
import { BASE_URL } from '../config'// arrayBuffer 转 String
function arrayBufferToString(arr) {if (typeof arr === 'string') {return arr}var dataview = new DataView(arr)var ints = new Uint8Array(arr.byteLength)for (var i = 0; i < ints.length; i++) {ints[i] = dataview.getUint8(i)}var str = '',_arr = intsfor (var i = 0; i < _arr.length; i++) {if (_arr[i]) {var one = _arr[i].toString(2),v = one.match(/^1+?(?=0)/)if (v && one.length == 8) {var bytesLength = v[0].lengthvar store = _arr[i].toString(2).slice(7 - bytesLength)for (var st = 1; st < bytesLength; st++) {if (_arr[st + i]) {store += _arr[st + i].toString(2).slice(2)}}str += String.fromCharCode(parseInt(store, 2))i += bytesLength - 1} else {str += String.fromCharCode(_arr[i])}}}return str
}function parseSSEData(sseData) {// 使用正则表达式匹配每个data:开头的块,包括可能的多行内容const regex = /data:([\s\S]*?)(?=\n\s*data:|$)/gconst matches = [...sseData.matchAll(regex)]// 从匹配结果中提取JSON字符串const jsonStrings = matches.map(match => {// 获取匹配的内容并清理const jsonContent = match[0].trim().replace(/\n/g, '')return jsonContent})return jsonStrings
}// 解析`data:`开头的json字符串
const safeJsonParse = (str = '') => {const str1 = str.trim()if (str1.startsWith('data:')) {try {const data = JSON.parse(str1.slice(5))return data} catch (err) {throw new Error('[json解析失败]')}} else {throw new Error('[未匹配到消息头]')}
}/*** 遗留消息* 如果一条消息解析失败,则认为该消息为半条消息,和后续消息拼接后再进行解析*/
const LegacyMessage = new Map()// 微信小程序实现sse,通过wx自己的方式实现
export const SSE_WX = ({ url, data, success, error, finish }) => {let requestTask = nulltry {const uid = Math.random().toString(36).substring(2, 9)// 处理接收到的数据const listener = res => {// 1. 转换成字符串的格式const str1 = arrayBufferToString(res.data)console.log('------------------------------------------')console.log('接收消息:\n', str1)// 2. 判断是否存在未解析部分,如果存在,则解析合并后的字符串let prefix = ''if (LegacyMessage.has(uid)) {prefix = LegacyMessage.get(uid)}const str2 = `${prefix}${str1}`// 3. 进行解析const jsonStrings = parseSSEData(str2)if (!jsonStrings.length) {// 3.1. 如果解析为空 则代表该部分为片段部分LegacyMessage.set(uid, str2)} else {// 3.2. 解析内容不为空LegacyMessage.delete(uid)// 4.1. 判断解析后数组,是否是完整的数据,最后一项进行特殊处理for (let i = 0; i < jsonStrings.length - 1; i++) {const data = safeJsonParse(jsonStrings[i])success && success(data)}// 4.2. 最后一项特殊处理,判断正常解析,还是记录未处理的内容const last = jsonStrings[jsonStrings.length - 1]try {const data = safeJsonParse(last)success && success(data)} catch (err) {LegacyMessage.set(uid, last)}}}// 发起请求requestTask = wx.request({url: `${BASE_URL}${url}`,method: 'POST',enableChunked: true, // enableChunked必须为truedata: data,header: {'content-type': 'application/json'},// 执行完成complete(res) {// 移除监听 需传入与监听时同一个的函数对象requestTask.offChunkReceived(listener)// 触发完成回调if (finish && typeof finish === 'function') {finish(res)}}})// 监听服务端返回的数据requestTask.onChunkReceived(listener)return {abort: () => {// 移除监听 需传入与监听时同一个的函数对象requestTask.offChunkReceived(listener)requestTask.abort()}}} catch (err) {console.error('[sse请求异常]', err)error(err)requestTask?.abort()}
}