perf: random queue

This commit is contained in:
archer 2023-05-28 16:16:59 +08:00
parent a287ace126
commit 7e99f905bc
No known key found for this signature in database
GPG Key ID: 166CA6BF2383B2BB
3 changed files with 87 additions and 20 deletions

View File

@ -7,6 +7,7 @@ import { authKb } from '@/service/utils/auth';
import { withNextCors } from '@/service/utils/tools'; import { withNextCors } from '@/service/utils/tools';
import { TrainingModeEnum } from '@/constants/plugin'; import { TrainingModeEnum } from '@/constants/plugin';
import { startQueue } from '@/service/utils/tools'; import { startQueue } from '@/service/utils/tools';
import { PgClient } from '@/service/pg';
export type Props = { export type Props = {
kbId: string; kbId: string;
@ -60,10 +61,23 @@ export async function pushDataToKb({
return {}; return {};
} }
// 去重
// 过滤重复的 qa 内容 // 过滤重复的 qa 内容
const set = new Set();
const filterData: {
a: string;
q: string;
}[] = [];
data.forEach((item) => {
const text = item.q + item.a;
if (!set.has(text)) {
filterData.push(item);
set.add(text);
}
});
// 数据库去重
// const searchRes = await Promise.allSettled( // const searchRes = await Promise.allSettled(
// dataItems.map(async ({ q, a = '' }) => { // data.map(async ({ q, a = '' }) => {
// if (!q) { // if (!q) {
// return Promise.reject('q为空'); // return Promise.reject('q为空');
// } // }

View File

@ -10,6 +10,10 @@ import { pushDataToKb } from '@/pages/api/openapi/kb/pushData';
import { TrainingModeEnum } from '@/constants/plugin'; import { TrainingModeEnum } from '@/constants/plugin';
import { ERROR_ENUM } from '../errorCode'; import { ERROR_ENUM } from '../errorCode';
const reduceQueue = () => {
global.qaQueueLen = global.qaQueueLen > 0 ? global.qaQueueLen - 1 : 0;
};
export async function generateQA(): Promise<any> { export async function generateQA(): Promise<any> {
const maxProcess = Number(process.env.QA_MAX_PROCESS || 10); const maxProcess = Number(process.env.QA_MAX_PROCESS || 10);
@ -20,11 +24,34 @@ export async function generateQA(): Promise<any> {
let userId = ''; let userId = '';
try { try {
// 找出一个需要生成的 dataItem (4分钟锁) const match = {
mode: TrainingModeEnum.qa,
lockTime: { $lte: new Date(Date.now() - 4 * 60 * 1000) }
};
// random get task
const agree = await TrainingData.aggregate([
{
$match: match
},
{ $sample: { size: 1 } },
{
$project: {
_id: 1
}
}
]);
// no task
if (agree.length === 0) {
reduceQueue();
global.qaQueueLen <= 0 && console.log(`没有需要【QA】的数据, ${global.qaQueueLen}`);
return;
}
const data = await TrainingData.findOneAndUpdate( const data = await TrainingData.findOneAndUpdate(
{ {
mode: TrainingModeEnum.qa, _id: agree[0]._id,
lockTime: { $lte: new Date(Date.now() - 2 * 60 * 1000) } ...match
}, },
{ {
lockTime: new Date() lockTime: new Date()
@ -37,11 +64,10 @@ export async function generateQA(): Promise<any> {
q: 1 q: 1
}); });
/* 无待生成的任务 */ // task preemption
if (!data) { if (!data) {
global.qaQueueLen--; reduceQueue();
!global.qaQueueLen && console.log(`没有需要【QA】的数据`); return generateQA();
return;
} }
trainingId = data._id; trainingId = data._id;
@ -123,10 +149,10 @@ A2:
console.log('生成QA成功time:', `${(Date.now() - startTime) / 1000}s`); console.log('生成QA成功time:', `${(Date.now() - startTime) / 1000}s`);
global.qaQueueLen--; reduceQueue();
generateQA(); generateQA();
} catch (err: any) { } catch (err: any) {
global.qaQueueLen--; reduceQueue();
// log // log
if (err?.response) { if (err?.response) {
console.log('openai error: 生成QA错误'); console.log('openai error: 生成QA错误');

View File

@ -1,10 +1,14 @@
import { openaiError2 } from '../errorCode'; import { openaiError2 } from '../errorCode';
import { insertKbItem, PgClient } from '@/service/pg'; import { insertKbItem } from '@/service/pg';
import { openaiEmbedding } from '@/pages/api/openapi/plugin/openaiEmbedding'; import { openaiEmbedding } from '@/pages/api/openapi/plugin/openaiEmbedding';
import { TrainingData } from '../models/trainingData'; import { TrainingData } from '../models/trainingData';
import { ERROR_ENUM } from '../errorCode'; import { ERROR_ENUM } from '../errorCode';
import { TrainingModeEnum } from '@/constants/plugin'; import { TrainingModeEnum } from '@/constants/plugin';
const reduceQueue = () => {
global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0;
};
/* 索引生成队列。每导入一次,就是一个单独的线程 */ /* 索引生成队列。每导入一次,就是一个单独的线程 */
export async function generateVector(): Promise<any> { export async function generateVector(): Promise<any> {
const maxProcess = Number(process.env.VECTOR_MAX_PROCESS || 10); const maxProcess = Number(process.env.VECTOR_MAX_PROCESS || 10);
@ -16,10 +20,34 @@ export async function generateVector(): Promise<any> {
let userId = ''; let userId = '';
try { try {
const match = {
mode: TrainingModeEnum.index,
lockTime: { $lte: new Date(Date.now() - 2 * 60 * 1000) }
};
// random get task
const agree = await TrainingData.aggregate([
{
$match: match
},
{ $sample: { size: 1 } },
{
$project: {
_id: 1
}
}
]);
// no task
if (agree.length === 0) {
reduceQueue();
global.vectorQueueLen <= 0 && console.log(`没有需要【索引】的数据, ${global.vectorQueueLen}`);
return;
}
const data = await TrainingData.findOneAndUpdate( const data = await TrainingData.findOneAndUpdate(
{ {
mode: TrainingModeEnum.index, _id: agree[0]._id,
lockTime: { $lte: new Date(Date.now() - 2 * 60 * 1000) } ...match
}, },
{ {
lockTime: new Date() lockTime: new Date()
@ -32,11 +60,10 @@ export async function generateVector(): Promise<any> {
a: 1 a: 1
}); });
/* 无待生成的任务 */ // task preemption
if (!data) { if (!data) {
global.vectorQueueLen--; reduceQueue();
!global.vectorQueueLen && console.log(`没有需要【索引】的数据`); return generateVector();
return;
} }
trainingId = data._id; trainingId = data._id;
@ -72,10 +99,10 @@ export async function generateVector(): Promise<any> {
await TrainingData.findByIdAndDelete(data._id); await TrainingData.findByIdAndDelete(data._id);
console.log(`生成向量成功: ${data._id}`); console.log(`生成向量成功: ${data._id}`);
global.vectorQueueLen--; reduceQueue();
generateVector(); generateVector();
} catch (err: any) { } catch (err: any) {
global.vectorQueueLen--; reduceQueue();
// log // log
if (err?.response) { if (err?.response) {
console.log('openai error: 生成向量错误'); console.log('openai error: 生成向量错误');