Redis cache (#4436)
* perf: add Redis cache for vector counting (#4432) * feat: cache * perf: get cache key --------- Co-authored-by: a.e. <49438478+I-Info@users.noreply.github.com>
This commit is contained in:
parent
dc73f47486
commit
c2e088cf39
@ -44,7 +44,8 @@ curl --location --request POST 'https://{{host}}/api/admin/initv494' \
|
||||
1. 集合数据训练状态展示
|
||||
2. SMTP 发送邮件插件
|
||||
3. BullMQ 消息队列。
|
||||
4. 站点同步支持配置训练参数。
|
||||
4. 利用 redis 进行部分数据缓存。
|
||||
5. 站点同步支持配置训练参数。
|
||||
|
||||
## ⚙️ 优化
|
||||
|
||||
|
||||
38
packages/service/common/redis/cache.ts
Normal file
38
packages/service/common/redis/cache.ts
Normal file
@ -0,0 +1,38 @@
|
||||
import { getGlobalRedisCacheConnection } from './index';
|
||||
import { addLog } from '../system/log';
|
||||
import { retryFn } from '@fastgpt/global/common/system/utils';
|
||||
|
||||
export enum CacheKeyEnum {
|
||||
team_vector_count = 'team_vector_count'
|
||||
}
|
||||
|
||||
export const setRedisCache = async (
|
||||
key: string,
|
||||
data: string | Buffer | number,
|
||||
expireSeconds?: number
|
||||
) => {
|
||||
return await retryFn(async () => {
|
||||
try {
|
||||
const redis = getGlobalRedisCacheConnection();
|
||||
|
||||
if (expireSeconds) {
|
||||
await redis.set(key, data, 'EX', expireSeconds);
|
||||
} else {
|
||||
await redis.set(key, data);
|
||||
}
|
||||
} catch (error) {
|
||||
addLog.error('Set cache error:', error);
|
||||
return Promise.reject(error);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
export const getRedisCache = async (key: string) => {
|
||||
const redis = getGlobalRedisCacheConnection();
|
||||
return await retryFn(() => redis.get(key));
|
||||
};
|
||||
|
||||
export const delRedisCache = async (key: string) => {
|
||||
const redis = getGlobalRedisCacheConnection();
|
||||
await retryFn(() => redis.del(key));
|
||||
};
|
||||
@ -1,8 +1,9 @@
|
||||
import { addLog } from '../system/log';
|
||||
import Redis from 'ioredis';
|
||||
|
||||
const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379';
|
||||
|
||||
export function newQueueRedisConnection() {
|
||||
export const newQueueRedisConnection = () => {
|
||||
const redis = new Redis(REDIS_URL);
|
||||
redis.on('connect', () => {
|
||||
console.log('Redis connected');
|
||||
@ -11,9 +12,9 @@ export function newQueueRedisConnection() {
|
||||
console.error('Redis connection error', error);
|
||||
});
|
||||
return redis;
|
||||
}
|
||||
};
|
||||
|
||||
export function newWorkerRedisConnection() {
|
||||
export const newWorkerRedisConnection = () => {
|
||||
const redis = new Redis(REDIS_URL, {
|
||||
maxRetriesPerRequest: null
|
||||
});
|
||||
@ -24,4 +25,19 @@ export function newWorkerRedisConnection() {
|
||||
console.error('Redis connection error', error);
|
||||
});
|
||||
return redis;
|
||||
}
|
||||
};
|
||||
|
||||
export const getGlobalRedisCacheConnection = () => {
|
||||
if (global.redisCache) return global.redisCache;
|
||||
|
||||
global.redisCache = new Redis(REDIS_URL, { keyPrefix: 'fastgpt:cache:' });
|
||||
|
||||
global.redisCache.on('connect', () => {
|
||||
addLog.info('Redis connected');
|
||||
});
|
||||
global.redisCache.on('error', (error) => {
|
||||
addLog.error('Redis connection error', error);
|
||||
});
|
||||
|
||||
return global.redisCache;
|
||||
};
|
||||
|
||||
5
packages/service/common/redis/type.d.ts
vendored
Normal file
5
packages/service/common/redis/type.d.ts
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
import Redis from 'ioredis';
|
||||
|
||||
declare global {
|
||||
var redisCache: Redis | null;
|
||||
}
|
||||
@ -2,10 +2,11 @@
|
||||
import { PgVectorCtrl } from './pg/class';
|
||||
import { ObVectorCtrl } from './oceanbase/class';
|
||||
import { getVectorsByText } from '../../core/ai/embedding';
|
||||
import { InsertVectorProps } from './controller.d';
|
||||
import { DelDatasetVectorCtrlProps, InsertVectorProps } from './controller.d';
|
||||
import { EmbeddingModelItemType } from '@fastgpt/global/core/ai/model.d';
|
||||
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants';
|
||||
import { MilvusCtrl } from './milvus/class';
|
||||
import { setRedisCache, getRedisCache, delRedisCache, CacheKeyEnum } from '../redis/cache';
|
||||
|
||||
const getVectorObj = () => {
|
||||
if (PG_ADDRESS) return new PgVectorCtrl();
|
||||
@ -14,14 +15,29 @@ const getVectorObj = () => {
|
||||
|
||||
return new PgVectorCtrl();
|
||||
};
|
||||
const getChcheKey = (teamId: string) => `${CacheKeyEnum.team_vector_count}:${teamId}`;
|
||||
|
||||
const Vector = getVectorObj();
|
||||
|
||||
export const initVectorStore = Vector.init;
|
||||
export const deleteDatasetDataVector = Vector.delete;
|
||||
export const recallFromVectorStore = Vector.embRecall;
|
||||
export const getVectorDataByTime = Vector.getVectorDataByTime;
|
||||
export const getVectorCountByTeamId = Vector.getVectorCountByTeamId;
|
||||
|
||||
export const getVectorCountByTeamId = async (teamId: string) => {
|
||||
const key = getChcheKey(teamId);
|
||||
|
||||
const countStr = await getRedisCache(key);
|
||||
if (countStr) {
|
||||
return Number(countStr);
|
||||
}
|
||||
|
||||
const count = await Vector.getVectorCountByTeamId(teamId);
|
||||
|
||||
await setRedisCache(key, count, 30 * 60);
|
||||
|
||||
return count;
|
||||
};
|
||||
|
||||
export const getVectorCountByDatasetId = Vector.getVectorCountByDatasetId;
|
||||
export const getVectorCountByCollectionId = Vector.getVectorCountByCollectionId;
|
||||
|
||||
@ -43,8 +59,16 @@ export const insertDatasetDataVector = async ({
|
||||
vector: vectors[0]
|
||||
});
|
||||
|
||||
delRedisCache(getChcheKey(props.teamId));
|
||||
|
||||
return {
|
||||
tokens,
|
||||
insertId
|
||||
};
|
||||
};
|
||||
|
||||
export const deleteDatasetDataVector = async (props: DelDatasetVectorCtrlProps) => {
|
||||
const result = await Vector.delete(props);
|
||||
delRedisCache(getChcheKey(props.teamId));
|
||||
return result;
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user