切换语言为:繁体

使用 fetch 请求大模型流式响应结果转换格式的通用解决方案

  • 爱糖宝
  • 2024-11-21
  • 2014
  • 0
  • 0

首先,在处理流式响应请求这里卡了我大半天,遇到不少坑,大多数流式响应都是以单条完整数据返回的,可是有时候会遇到某些大模型会将数据块内容随机返回,每次返回的数据块不一定是完整的 json 数据,可能会掐头去尾! 如果大家有对接多个 LLM 的经验,想必可以感同身受,真的很痛苦!

于是我费劲巴拉的寻找解决方案,在网上找了一大圈,并没有一个人能讲清楚,最后决定自己解决。 下面就将我的解决方案分享给大家!!!

概述

在处理大模型的流式响应数据时,常遇到数据块不完整的问题,尤其是当数据以多个小的 JSON 对象形式分块返回时。这种现象在使用 Server-Sent Events (SSE) 或类似的长连接通信协议时尤为常见。

为什么有的大模型可以一次返回多个 data: xxx

  1. Server-Sent Events (SSE):允许服务器连续发送多个 data: 行,每个代表一个独立的数据块。

  2. 流式响应:大模型服务通常以流式响应方式返回数据,提高响应速度。

  3. 批量处理:服务器可能将多个数据块打包发送,减少网络开销。

踩坑过程

在处理流式响应时,遇到了以下挑战:

  • 数据块不完整,可能包含多个小的 JSON 对象。

  • 数据块可能被分割,导致 JSON 对象不完整。

  • JSON 对象中可能存在\n\n

解决方案

关键在于累积和识别完整的 JSON 对象。以下是详细步骤和代码实现:

关键步骤

  1. 发送请求并获取响应

    const response: Response = await fetch(requestInit);

  2. 初始化解码器和编码器

    const decoder = new TextDecoder("utf-8");
    const encode = new TextEncoder();
    let accumulatedData = "";

  3. 处理响应数据流

    • 使用 TransformStream 来处理数据块。

    • 累积数据并查找完整的 JSON 对象。

    • 提取并处理完整的 JSON 对象,移除已处理的部分,继续查找下一个。

关键代码实现

export function isValidJsonStr(str: string) {
  if (typeof str !== 'string') {
    return false;
  }

  try {
    const parsed = JSON.parse(str);
    return typeof parsed === 'object' && parsed !== null;
  } catch (e) {
    return false;
  }
}

const response: Response = await fetch(requestInit);

const decoder = new TextDecoder('utf-8');
const encode = new TextEncoder();

if (response.body) {

  let accumulatedData = ''; // 初始化累积的数据为空字符串
  let buffer = ""; // 初始化缓冲区为空字符串

  response.body
    .pipeThrough(
      new TransformStream({
        transform(chunk, controller) {
          const textChunk = decoder.decode(chunk, { stream: true }); // 解码数据块
          accumulatedData += textChunk; // 将解码后的数据块添加到累积的数据中

          // 查找完整的 JSON 对象
          while (true) {
            // 如果返回的是 json 字符串 一般都是大模型直接报错了
            if (isValidJsonStr(accumulatedData)) {
              const transformedChunk =
                adapter.transform2OpenAiResponse4Stream(accumulatedData); // 转换数据块 这里自己实现转换方法吧!
              const handledChunk = encode.encode(transformedChunk); // 编码转换后的数据块
              self.chunks.push(transformedChunk); // 将转换后的数据块添加到数据片段数组中
              controller.enqueue(handledChunk); // 将编码后的数据块推入控制器
              accumulatedData = ""; // 重置累积的数据为空字符串
              break; // 退出循环
            } else {
              const dataStartIndex = accumulatedData.indexOf("data: "); // 查找 'data: ' 开头的数据块
              if (dataStartIndex === -1) {
                // 如果没有找到 'data: ' 开头的数据块,保留当前累积的数据
                buffer = accumulatedData;
                accumulatedData = "";
                break;
              }

              const jsonStartIndex = dataStartIndex + 6; // 跳过 'data: '
              const jsonEndIndex = accumulatedData.indexOf(
                "}\n\n",
                jsonStartIndex
              ); // 查找完整的 JSON 对象结束位置  这里可以进一步优化,因为担心json中也会出现 "}\n\n"

              if (jsonEndIndex === -1) {
                // 如果没有找到完整的 JSON 对象,保留当前累积的数据
                buffer = accumulatedData;
                accumulatedData = "";
                break;
              }

              // 提取完整的 JSON 对象
              const jsonString = accumulatedData.slice(
                jsonStartIndex,
                jsonEndIndex + 1
              );
              try {
                // 处理完整的 JSON 对象
                const transformedChunk =
                  adapter.transform2OpenAiResponse4Stream(
                    "data: " + jsonString
                  ) + "\n\n"; // 转换数据块
                const handledChunk = encode.encode(transformedChunk); // 编码转换后的数据块
                self.chunks.push(transformedChunk); // 将转换后的数据块添加到数据片段数组中
                controller.enqueue(handledChunk); // 将编码后的数据块推入控制器
              } catch (error) {
                console.error("Error parsing JSON:", error); // 如果解析 JSON 失败,打印错误信息
                // 如果解析 JSON 失败,保留当前累积的数据
                buffer = accumulatedData;
                accumulatedData = "";
                break;
              }

              // 移除已处理的 JSON 对象
              accumulatedData = accumulatedData.slice(jsonEndIndex + 3);
            }
          }
        },
      })
    )
    .pipeTo(writable) // 将处理后的数据流管道连接到可写流
    .catch((error) => {
      console.error("Error:", error); // 捕获并打印错误
      throw new Error("Handle stream response error"); // 抛出错误
    });

  const processedResponse = new Response(readable, {
    status: response.status,
    statusText: response.statusText,
    headers: response.headers,
  });

  return processedResponse; // 构建并返回处理后的响应对象
}

重点解析

  1. 累积数据

    • 使用 accumulatedData 变量累积数据块。

  2. 查找完整的 JSON 对象

    • transform 方法中,通过循环查找完整的 JSON 对象。

    • 首先检查 accumulatedData 是否包含完整的 JSON 字符串(通过 isValidJsonStr 函数)。

    • 如果找到完整的 JSON 字符串,直接处理并清空 accumulatedData

    • 否则,查找 data: 开头的数据块,并尝试找到对应的 }\n\n 结束标记。

  3. 提取和处理完整的 JSON 对象

    • 提取从 data: 开头到 \n\n 结束标记之间的 JSON 字符串。

    • 使用 try-catch 块来捕获和处理可能的 JSON 解析错误。

    • 处理完一个完整的 JSON 对象后,移除已处理的部分,继续查找下一个 JSON 对象。

注意事项

  • JSON 字符串中的转义字符:正确处理 \n\n 等转义字符序列。

  • 错误处理:在解析 JSON 对象时,应使用 try-catch 块来捕获和处理可能的解析错误。

  • 数据完整性:确保每个处理的 JSON 对象都是完整的,避免处理不完整或损坏的数据。

通过以上步骤和注意事项,可以有效解析和处理大模型的流式数据,确保数据的准确性和完整性。

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.