首先,在处理流式响应请求这里卡了我大半天,遇到不少坑,大多数流式响应都是以单条完整数据返回的,可是有时候会遇到某些大模型会将数据块内容随机返回,每次返回的数据块不一定是完整的 json 数据,可能会掐头去尾! 如果大家有对接多个 LLM 的经验,想必可以感同身受,真的很痛苦!
于是我费劲巴拉的寻找解决方案,在网上找了一大圈,并没有一个人能讲清楚,最后决定自己解决。 下面就将我的解决方案分享给大家!!!
概述
在处理大模型的流式响应数据时,常遇到数据块不完整的问题,尤其是当数据以多个小的 JSON 对象形式分块返回时。这种现象在使用 Server-Sent Events (SSE) 或类似的长连接通信协议时尤为常见。
为什么有的大模型可以一次返回多个 data: xxx
?
Server-Sent Events (SSE):允许服务器连续发送多个 data: 行,每个代表一个独立的数据块。
流式响应:大模型服务通常以流式响应方式返回数据,提高响应速度。
批量处理:服务器可能将多个数据块打包发送,减少网络开销。
踩坑过程
在处理流式响应时,遇到了以下挑战:
数据块不完整,可能包含多个小的 JSON 对象。
数据块可能被分割,导致 JSON 对象不完整。
JSON 对象中可能存在
\n\n
。
解决方案
关键在于累积和识别完整的 JSON 对象。以下是详细步骤和代码实现:
关键步骤
发送请求并获取响应
const response: Response = await fetch(requestInit);
初始化解码器和编码器
const decoder = new TextDecoder("utf-8"); const encode = new TextEncoder(); let accumulatedData = "";
处理响应数据流
使用
TransformStream
来处理数据块。累积数据并查找完整的 JSON 对象。
提取并处理完整的 JSON 对象,移除已处理的部分,继续查找下一个。
关键代码实现
export function isValidJsonStr(str: string) { if (typeof str !== 'string') { return false; } try { const parsed = JSON.parse(str); return typeof parsed === 'object' && parsed !== null; } catch (e) { return false; } } const response: Response = await fetch(requestInit); const decoder = new TextDecoder('utf-8'); const encode = new TextEncoder(); if (response.body) { let accumulatedData = ''; // 初始化累积的数据为空字符串 let buffer = ""; // 初始化缓冲区为空字符串 response.body .pipeThrough( new TransformStream({ transform(chunk, controller) { const textChunk = decoder.decode(chunk, { stream: true }); // 解码数据块 accumulatedData += textChunk; // 将解码后的数据块添加到累积的数据中 // 查找完整的 JSON 对象 while (true) { // 如果返回的是 json 字符串 一般都是大模型直接报错了 if (isValidJsonStr(accumulatedData)) { const transformedChunk = adapter.transform2OpenAiResponse4Stream(accumulatedData); // 转换数据块 这里自己实现转换方法吧! const handledChunk = encode.encode(transformedChunk); // 编码转换后的数据块 self.chunks.push(transformedChunk); // 将转换后的数据块添加到数据片段数组中 controller.enqueue(handledChunk); // 将编码后的数据块推入控制器 accumulatedData = ""; // 重置累积的数据为空字符串 break; // 退出循环 } else { const dataStartIndex = accumulatedData.indexOf("data: "); // 查找 'data: ' 开头的数据块 if (dataStartIndex === -1) { // 如果没有找到 'data: ' 开头的数据块,保留当前累积的数据 buffer = accumulatedData; accumulatedData = ""; break; } const jsonStartIndex = dataStartIndex + 6; // 跳过 'data: ' const jsonEndIndex = accumulatedData.indexOf( "}\n\n", jsonStartIndex ); // 查找完整的 JSON 对象结束位置 这里可以进一步优化,因为担心json中也会出现 "}\n\n" if (jsonEndIndex === -1) { // 如果没有找到完整的 JSON 对象,保留当前累积的数据 buffer = accumulatedData; accumulatedData = ""; break; } // 提取完整的 JSON 对象 const jsonString = accumulatedData.slice( jsonStartIndex, jsonEndIndex + 1 ); try { // 处理完整的 JSON 对象 const transformedChunk = adapter.transform2OpenAiResponse4Stream( "data: " + jsonString ) + "\n\n"; // 转换数据块 const handledChunk = encode.encode(transformedChunk); // 编码转换后的数据块 self.chunks.push(transformedChunk); // 将转换后的数据块添加到数据片段数组中 controller.enqueue(handledChunk); // 将编码后的数据块推入控制器 } catch (error) { console.error("Error parsing JSON:", error); // 如果解析 JSON 失败,打印错误信息 // 如果解析 JSON 失败,保留当前累积的数据 buffer = accumulatedData; accumulatedData = ""; break; } // 移除已处理的 JSON 对象 accumulatedData = accumulatedData.slice(jsonEndIndex + 3); } } }, }) ) .pipeTo(writable) // 将处理后的数据流管道连接到可写流 .catch((error) => { console.error("Error:", error); // 捕获并打印错误 throw new Error("Handle stream response error"); // 抛出错误 }); const processedResponse = new Response(readable, { status: response.status, statusText: response.statusText, headers: response.headers, }); return processedResponse; // 构建并返回处理后的响应对象 }
重点解析
累积数据
使用
accumulatedData
变量累积数据块。查找完整的 JSON 对象
在
transform
方法中,通过循环查找完整的 JSON 对象。首先检查
accumulatedData
是否包含完整的 JSON 字符串(通过isValidJsonStr
函数)。如果找到完整的 JSON 字符串,直接处理并清空
accumulatedData
。否则,查找
data:
开头的数据块,并尝试找到对应的}\n\n
结束标记。提取和处理完整的 JSON 对象
提取从
data:
开头到\n\n
结束标记之间的 JSON 字符串。使用
try-catch
块来捕获和处理可能的 JSON 解析错误。处理完一个完整的 JSON 对象后,移除已处理的部分,继续查找下一个 JSON 对象。
注意事项
JSON 字符串中的转义字符:正确处理
\n\n
等转义字符序列。错误处理:在解析 JSON 对象时,应使用
try-catch
块来捕获和处理可能的解析错误。数据完整性:确保每个处理的 JSON 对象都是完整的,避免处理不完整或损坏的数据。
通过以上步骤和注意事项,可以有效解析和处理大模型的流式数据,确保数据的准确性和完整性。