perf: sse response

This commit is contained in:
archer 2023-06-23 23:11:22 +08:00
parent 6787f19d78
commit 986206b691
No known key found for this signature in database
GPG Key ID: 569A5660D2379E28
13 changed files with 150 additions and 132 deletions

View File

@ -26,7 +26,6 @@
"crypto": "^1.0.1", "crypto": "^1.0.1",
"date-fns": "^2.30.0", "date-fns": "^2.30.0",
"dayjs": "^1.11.7", "dayjs": "^1.11.7",
"eventsource-parser": "^0.1.0",
"formidable": "^2.1.1", "formidable": "^2.1.1",
"framer-motion": "^9.0.6", "framer-motion": "^9.0.6",
"hyperdown": "^2.4.29", "hyperdown": "^2.4.29",

10
client/pnpm-lock.yaml generated
View File

@ -56,9 +56,6 @@ dependencies:
dayjs: dayjs:
specifier: ^1.11.7 specifier: ^1.11.7
version: registry.npmmirror.com/dayjs@1.11.7 version: registry.npmmirror.com/dayjs@1.11.7
eventsource-parser:
specifier: ^0.1.0
version: registry.npmmirror.com/eventsource-parser@0.1.0
formidable: formidable:
specifier: ^2.1.1 specifier: ^2.1.1
version: registry.npmmirror.com/formidable@2.1.1 version: registry.npmmirror.com/formidable@2.1.1
@ -7510,13 +7507,6 @@ packages:
version: 2.0.3 version: 2.0.3
engines: {node: '>=0.10.0'} engines: {node: '>=0.10.0'}
registry.npmmirror.com/eventsource-parser@0.1.0:
resolution: {integrity: sha512-M9QjFtEIkwytUarnx113HGmgtk52LSn3jNAtnWKi3V+b9rqSfQeVdLsaD5AG/O4IrGQwmAAHBIsqbmURPTd2rA==, registry: https://registry.npm.taobao.org/, tarball: https://registry.npmmirror.com/eventsource-parser/-/eventsource-parser-0.1.0.tgz}
name: eventsource-parser
version: 0.1.0
engines: {node: '>=14.18'}
dev: false
registry.npmmirror.com/execa@5.1.1: registry.npmmirror.com/execa@5.1.1:
resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==, registry: https://registry.npm.taobao.org/, tarball: https://registry.npmmirror.com/execa/-/execa-5.1.1.tgz} resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==, registry: https://registry.npm.taobao.org/, tarball: https://registry.npmmirror.com/execa/-/execa-5.1.1.tgz}
name: execa name: execa

View File

@ -1,6 +1,7 @@
import { Props, ChatResponseType } from '@/pages/api/openapi/v1/chat/completions'; import { Props, ChatResponseType } from '@/pages/api/openapi/v1/chat/completions';
import { sseResponseEventEnum } from '@/constants/chat'; import { sseResponseEventEnum } from '@/constants/chat';
import { getErrText } from '@/utils/tools'; import { getErrText } from '@/utils/tools';
import { parseStreamChunk } from '@/utils/adapt';
interface StreamFetchProps { interface StreamFetchProps {
data: Props; data: Props;
@ -32,7 +33,6 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps)
} }
const reader = response.body?.getReader(); const reader = response.body?.getReader();
const decoder = new TextDecoder('utf-8');
// response data // response data
let responseText = ''; let responseText = '';
@ -53,21 +53,7 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps)
return reject('响应过程出现异常~'); return reject('响应过程出现异常~');
} }
} }
const chunk = decoder.decode(value); const chunkResponse = parseStreamChunk(value);
const chunkLines = chunk.split('\n\n').filter((item) => item);
const chunkResponse = chunkLines.map((item) => {
const splitEvent = item.split('\n');
if (splitEvent.length === 2) {
return {
event: splitEvent[0].replace('event: ', ''),
data: splitEvent[1].replace('data: ', '')
};
}
return {
event: '',
data: splitEvent[0].replace('data: ', '')
};
});
chunkResponse.forEach((item) => { chunkResponse.forEach((item) => {
// parse json data // parse json data
@ -87,6 +73,8 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps)
const chatResponse = data as ChatResponseType; const chatResponse = data as ChatResponseType;
newChatId = chatResponse.newChatId; newChatId = chatResponse.newChatId;
quoteLen = chatResponse.quoteLen || 0; quoteLen = chatResponse.quoteLen || 0;
} else if (item.event === sseResponseEventEnum.error) {
return reject(getErrText(data, '流响应错误'));
} }
}); });
read(); read();

View File

@ -1,4 +1,5 @@
export enum sseResponseEventEnum { export enum sseResponseEventEnum {
error = 'error',
answer = 'answer', answer = 'answer',
chatResponse = 'chatResponse' chatResponse = 'chatResponse'
} }

View File

@ -1,6 +1,6 @@
import type { NextApiRequest, NextApiResponse } from 'next'; import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@/service/response'; import { jsonRes } from '@/service/response';
import { authUser, getApiKey } from '@/service/utils/auth'; import { authUser, getApiKey, getSystemOpenAiKey } from '@/service/utils/auth';
import { withNextCors } from '@/service/utils/tools'; import { withNextCors } from '@/service/utils/tools';
import { getOpenAIApi } from '@/service/utils/chat/openai'; import { getOpenAIApi } from '@/service/utils/chat/openai';
import { embeddingModel } from '@/constants/model'; import { embeddingModel } from '@/constants/model';
@ -39,14 +39,10 @@ export async function openaiEmbedding({
input, input,
mustPay = false mustPay = false
}: { userId: string; mustPay?: boolean } & Props) { }: { userId: string; mustPay?: boolean } & Props) {
const { userOpenAiKey, systemAuthKey } = await getApiKey({ const apiKey = getSystemOpenAiKey();
model: OpenAiChatEnum.GPT35,
userId,
mustPay
});
// 获取 chatAPI // 获取 chatAPI
const chatAPI = getOpenAIApi(); const chatAPI = getOpenAIApi(apiKey);
// 把输入的内容转成向量 // 把输入的内容转成向量
const result = await chatAPI const result = await chatAPI
@ -57,16 +53,22 @@ export async function openaiEmbedding({
}, },
{ {
timeout: 60000, timeout: 60000,
...axiosConfig(userOpenAiKey || systemAuthKey) ...axiosConfig(apiKey)
} }
) )
.then((res) => ({ .then((res) => {
tokenLen: res.data?.usage?.total_tokens || 0, if (!res.data?.usage?.total_tokens) {
// @ts-ignore
return Promise.reject(res.data?.error?.message || 'Embedding Error');
}
return {
tokenLen: res.data.usage.total_tokens || 0,
vectors: res.data.data.map((item) => item.embedding) vectors: res.data.data.map((item) => item.embedding)
})); };
});
pushGenerateVectorBill({ pushGenerateVectorBill({
isPay: !userOpenAiKey, isPay: mustPay,
userId, userId,
text: input.join(''), text: input.join(''),
tokenLen: result.tokenLen tokenLen: result.tokenLen

View File

@ -14,9 +14,9 @@ import { gptMessage2ChatType, textAdaptGptResponse } from '@/utils/adapt';
import { getChatHistory } from './getHistory'; import { getChatHistory } from './getHistory';
import { saveChat } from '@/pages/api/chat/saveChat'; import { saveChat } from '@/pages/api/chat/saveChat';
import { sseResponse } from '@/service/utils/tools'; import { sseResponse } from '@/service/utils/tools';
import { getErrText } from '@/utils/tools';
import { type ChatCompletionRequestMessage } from 'openai'; import { type ChatCompletionRequestMessage } from 'openai';
import { Types } from 'mongoose'; import { Types } from 'mongoose';
import { sensitiveCheck } from '../../text/sensitiveCheck';
export type MessageItemType = ChatCompletionRequestMessage & { _id?: string }; export type MessageItemType = ChatCompletionRequestMessage & { _id?: string };
type FastGptWebChatProps = { type FastGptWebChatProps = {
@ -175,6 +175,10 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
2 2
); );
await sensitiveCheck({
input: `${prompt.value}`
});
// start model api. responseText and totalTokens: valid only if stream = false // start model api. responseText and totalTokens: valid only if stream = false
const { streamResponse, responseMessages, responseText, totalTokens } = const { streamResponse, responseMessages, responseText, totalTokens } =
await modelServiceToolMap[model.chat.chatModel].chatCompletion({ await modelServiceToolMap[model.chat.chatModel].chatCompletion({
@ -231,8 +235,7 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
tokens: totalTokens tokens: totalTokens
}; };
} catch (error) { } catch (error) {
console.log('stream response error', error); return Promise.reject(error);
return {};
} }
} else { } else {
return { return {
@ -301,7 +304,12 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
} catch (err: any) { } catch (err: any) {
res.status(500); res.status(500);
if (step === 1) { if (step === 1) {
res.end(getErrText(err, 'Stream response error')); sseResponse({
res,
event: sseResponseEventEnum.error,
data: JSON.stringify(err)
});
res.end();
} else { } else {
jsonRes(res, { jsonRes(res, {
code: 500, code: 500,

View File

@ -42,6 +42,12 @@ const Test = () => {
pushKbTestItem(testItem); pushKbTestItem(testItem);
setInputText(''); setInputText('');
setKbTestItem(testItem); setKbTestItem(testItem);
},
onError(err) {
toast({
title: getErrText(err),
status: 'error'
});
} }
}); });

View File

@ -163,7 +163,7 @@ export const authUser = async ({
/* random get openai api key */ /* random get openai api key */
export const getSystemOpenAiKey = () => { export const getSystemOpenAiKey = () => {
return process.env.OPENAIKEY || ''; return process.env.ONEAPI_KEY || process.env.OPENAIKEY || '';
}; };
/* 获取 api 请求的 key */ /* 获取 api 请求的 key */

View File

@ -6,8 +6,8 @@ import { sseResponse } from '../tools';
import { OpenAiChatEnum } from '@/constants/model'; import { OpenAiChatEnum } from '@/constants/model';
import { chatResponse, openAiStreamResponse } from './openai'; import { chatResponse, openAiStreamResponse } from './openai';
import type { NextApiResponse } from 'next'; import type { NextApiResponse } from 'next';
import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser';
import { textAdaptGptResponse } from '@/utils/adapt'; import { textAdaptGptResponse } from '@/utils/adapt';
import { parseStreamChunk } from '@/utils/adapt';
export type ChatCompletionType = { export type ChatCompletionType = {
apiKey: string; apiKey: string;
@ -185,24 +185,22 @@ export const V2_StreamResponse = async ({
model: ChatModelType; model: ChatModelType;
}) => { }) => {
let responseContent = ''; let responseContent = '';
let error: any = null;
try { const clientRes = async (data: string) => {
const onParse = async (e: ParsedEvent | ReconnectInterval) => {
if (e.type !== 'event') return;
const data = e.data;
const { content = '' } = (() => { const { content = '' } = (() => {
try { try {
const json = JSON.parse(data); const json = JSON.parse(data);
const content: string = json?.choices?.[0].delta.content || ''; const content: string = json?.choices?.[0].delta.content || '';
error = json.error;
responseContent += content; responseContent += content;
return { content }; return { content };
} catch (error) {} } catch (error) {
return {}; return {};
}
})(); })();
if (res.closed) return; if (res.closed || error) return;
if (data === '[DONE]') { if (data === '[DONE]') {
sseResponse({ sseResponse({
@ -230,20 +228,19 @@ export const V2_StreamResponse = async ({
}; };
try { try {
const parser = createParser(onParse);
const decoder = new TextDecoder();
for await (const chunk of chatResponse.data as any) { for await (const chunk of chatResponse.data as any) {
if (res.closed) { if (res.closed) break;
break; const parse = parseStreamChunk(chunk);
} parse.forEach((item) => clientRes(item.data));
parser.feed(decoder.decode(chunk, { stream: true }));
} }
} catch (error) { } catch (error) {
console.log('pipe error', error); console.log('pipe error', error);
} }
} catch (error) {
console.log('stream error', error); if (error) {
return Promise.reject(error);
} }
// count tokens // count tokens
const finishMessages = prompts.concat({ const finishMessages = prompts.concat({
obj: ChatRoleEnum.AI, obj: ChatRoleEnum.AI,

View File

@ -1,18 +1,20 @@
import { Configuration, OpenAIApi } from 'openai'; import { Configuration, OpenAIApi } from 'openai';
import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser';
import { axiosConfig } from '../tools'; import { axiosConfig } from '../tools';
import { ChatModelMap, OpenAiChatEnum } from '@/constants/model'; import { ChatModelMap, OpenAiChatEnum } from '@/constants/model';
import { adaptChatItem_openAI } from '@/utils/plugin/openai'; import { adaptChatItem_openAI } from '@/utils/plugin/openai';
import { modelToolMap } from '@/utils/plugin'; import { modelToolMap } from '@/utils/plugin';
import { ChatCompletionType, ChatContextFilter, StreamResponseType } from './index'; import { ChatCompletionType, ChatContextFilter, StreamResponseType } from './index';
import { ChatRoleEnum } from '@/constants/chat'; import { ChatRoleEnum } from '@/constants/chat';
import { parseStreamChunk } from '@/utils/adapt';
export const getOpenAIApi = () => export const getOpenAIApi = (apiKey: string) => {
new OpenAIApi( const openaiBaseUrl = process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1';
return new OpenAIApi(
new Configuration({ new Configuration({
basePath: process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1' basePath: apiKey === process.env.ONEAPI_KEY ? process.env.ONEAPI_URL : openaiBaseUrl
}) })
); );
};
/* 模型对话 */ /* 模型对话 */
export const chatResponse = async ({ export const chatResponse = async ({
@ -31,7 +33,7 @@ export const chatResponse = async ({
}); });
const adaptMessages = adaptChatItem_openAI({ messages: filterMessages, reserveId: false }); const adaptMessages = adaptChatItem_openAI({ messages: filterMessages, reserveId: false });
const chatAPI = getOpenAIApi(); const chatAPI = getOpenAIApi(apiKey);
const promptsToken = modelToolMap[model].countTokens({ const promptsToken = modelToolMap[model].countTokens({
messages: filterMessages messages: filterMessages
@ -80,29 +82,29 @@ export const openAiStreamResponse = async ({
try { try {
let responseContent = ''; let responseContent = '';
const onParse = async (event: ParsedEvent | ReconnectInterval) => { const clientRes = async (data: string) => {
if (event.type !== 'event') return; const { content = '' } = (() => {
const data = event.data;
if (data === '[DONE]') return;
try { try {
const json = JSON.parse(data); const json = JSON.parse(data);
const content: string = json?.choices?.[0].delta.content || ''; const content: string = json?.choices?.[0].delta.content || '';
responseContent += content; responseContent += content;
return { content };
} catch (error) {
return {};
}
})();
if (data === '[DONE]') return;
!res.closed && content && res.write(content); !res.closed && content && res.write(content);
} catch (error) {
error;
}
}; };
try { try {
const decoder = new TextDecoder();
const parser = createParser(onParse);
for await (const chunk of chatResponse.data as any) { for await (const chunk of chatResponse.data as any) {
if (res.closed) { if (res.closed) break;
break;
} const parse = parseStreamChunk(chunk);
parser.feed(decoder.decode(chunk, { stream: true })); parse.forEach((item) => clientRes(item.data));
} }
} catch (error) { } catch (error) {
console.log('pipe error', error); console.log('pipe error', error);

View File

@ -34,14 +34,18 @@ export const clearCookie = (res: NextApiResponse) => {
}; };
/* openai axios config */ /* openai axios config */
export const axiosConfig = (apikey: string) => ({ export const axiosConfig = (apikey: string) => {
baseURL: process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1', const openaiBaseUrl = process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1';
return {
baseURL: apikey === process.env.ONEAPI_KEY ? process.env.ONEAPI_URL : openaiBaseUrl, // 此处仅对非 npm 模块有效
httpsAgent: global.httpsAgent, httpsAgent: global.httpsAgent,
headers: { headers: {
Authorization: `Bearer ${apikey}`, Authorization: `Bearer ${apikey}`,
auth: process.env.OPENAI_BASE_URL_AUTH || '' auth: process.env.OPENAI_BASE_URL_AUTH || ''
} }
}); };
};
export function withNextCors(handler: NextApiHandler): NextApiHandler { export function withNextCors(handler: NextApiHandler): NextApiHandler {
return async function nextApiHandlerWrappedWithNextCors( return async function nextApiHandlerWrappedWithNextCors(

View File

@ -54,3 +54,24 @@ export const textAdaptGptResponse = ({
choices: [{ delta: text === null ? {} : { content: text }, index: 0, finish_reason }] choices: [{ delta: text === null ? {} : { content: text }, index: 0, finish_reason }]
}); });
}; };
const decoder = new TextDecoder();
export const parseStreamChunk = (value: BufferSource) => {
const chunk = decoder.decode(value);
const chunkLines = chunk.split('\n\n').filter((item) => item);
const chunkResponse = chunkLines.map((item) => {
const splitEvent = item.split('\n');
if (splitEvent.length === 2) {
return {
event: splitEvent[0].replace('event: ', ''),
data: splitEvent[1].replace('data: ', '')
};
}
return {
event: '',
data: splitEvent[0].replace('data: ', '')
};
});
return chunkResponse;
};

View File

@ -70,7 +70,7 @@ services:
# openai, 推荐使用 one-api 管理key # openai, 推荐使用 one-api 管理key
- OPENAIKEY=sk-xxxxx - OPENAIKEY=sk-xxxxx
- OPENAI_BASE_URL=https://api.openai.com/v1 - OPENAI_BASE_URL=https://api.openai.com/v1
- OPENAI_BASE_URL_AUTH=可选的安全凭证 - OPENAI_BASE_URL_AUTH=可选的安全凭证,会放到 header.auth 里
fastgpt-admin: fastgpt-admin:
image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-admin:latest image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-admin:latest
container_name: fastgpt-admin container_name: fastgpt-admin