Optimize the vector manager by adding batch transaction insertion and delayed memory cleanup features, limit the maximum batch size to reduce memory pressure, and enhance system performance and stability.

This commit is contained in:
duanfuxiang 2025-07-07 23:12:49 +08:00
parent d3271f85e9
commit 627a19206e
3 changed files with 90 additions and 39 deletions

View File

@ -354,8 +354,6 @@ const SearchView = () => {
setShowRAGInitConfirm(true) setShowRAGInitConfirm(true)
}, []) }, [])
// 确认初始化 RAG 向量 // 确认初始化 RAG 向量
const confirmInitWorkspaceRAG = useCallback(async () => { const confirmInitWorkspaceRAG = useCallback(async () => {
setShowRAGInitConfirm(false) setShowRAGInitConfirm(false)

View File

@ -16,7 +16,7 @@ import { getEmbeddingModel } from './embedding'
type EmbeddingManager = { type EmbeddingManager = {
modelLoaded: boolean modelLoaded: boolean
currentModel: string | null currentModel: string | null
loadModel(modelId: string, useGpu: boolean): Promise<any> loadModel(modelId: string, useGpu: boolean): Promise<unknown>
embed(text: string): Promise<{ vec: number[] }> embed(text: string): Promise<{ vec: number[] }>
embedBatch(texts: string[]): Promise<{ vec: number[] }[]> embedBatch(texts: string[]): Promise<{ vec: number[] }[]>
} }
@ -248,7 +248,6 @@ export class RAGEngine {
if (workspace) { if (workspace) {
// 获取工作区中的所有文件路径 // 获取工作区中的所有文件路径
const folders: string[] = []
const files: string[] = [] const files: string[] = []
for (const item of workspace.content) { for (const item of workspace.content) {

View File

@ -18,6 +18,7 @@ import { getFilesWithTag } from '../../../utils/glob-utils';
import { openSettingsModalWithError } from '../../../utils/open-settings-modal'; import { openSettingsModalWithError } from '../../../utils/open-settings-modal';
import { DBManager } from '../../database-manager'; import { DBManager } from '../../database-manager';
import { Workspace } from '../../json/workspace/types'; import { Workspace } from '../../json/workspace/types';
import { vectorTables } from '../../schema';
import { VectorRepository } from './vector-repository'; import { VectorRepository } from './vector-repository';
@ -240,7 +241,7 @@ export class VectorManager {
const embeddingProgress = { completed: 0 } const embeddingProgress = { completed: 0 }
// 减少批量大小以降低内存压力 // 减少批量大小以降低内存压力
const batchSize = options.batchSize const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小
let batchCount = 0 let batchCount = 0
try { try {
@ -288,9 +289,10 @@ export class VectorManager {
}, },
) )
// 立即插入当前批次,避免内存累积 // 使用事务批量插入,减少数据库连接压力
if (embeddedBatch.length > 0) { if (embeddedBatch.length > 0) {
await this.repository.insertVectors(embeddedBatch, embeddingModel) await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel)
console.log("insert vectors with transaction success, batch size: ", embeddedBatch.length)
// 清理批次数据 // 清理批次数据
embeddedBatch.length = 0 embeddedBatch.length = 0
} }
@ -302,12 +304,12 @@ export class VectorManager {
totalFiles: filesToIndex.length, totalFiles: filesToIndex.length,
}) })
// 定期内存清理 // 定期内存清理和延迟
await this.memoryCleanup(batchCount) await this.memoryCleanupWithDelay(batchCount)
} }
} else { } else {
// 不支持批量处理的提供商:使用流式处理逻辑 // 不支持批量处理的提供商:大幅降低并发度
const limit = pLimit(32) // 从50降低到10减少并发压力 const limit = pLimit(8) // 从32降低到8大幅减少并发压力
const abortController = new AbortController() const abortController = new AbortController()
// 流式处理:分批处理并立即插入 // 流式处理:分批处理并立即插入
@ -346,24 +348,24 @@ export class VectorManager {
embeddedBatch.push(embeddedChunk) embeddedBatch.push(embeddedChunk)
}, },
{ {
numOfAttempts: 3, // 减少重试次数 numOfAttempts: 3,
startingDelay: 500, // 减少延迟 startingDelay: 1000, // 增加延迟
timeMultiple: 1.5, timeMultiple: 2.0,
jitter: 'full', jitter: 'full',
}, },
) )
} catch (error) { } catch (error) {
abortController.abort() console.error('Error in embedding task:', error)
throw error // 不要立即中止,继续处理其他任务
} }
}), }),
) )
await Promise.all(tasks) await Promise.all(tasks)
// 立即插入当前批次 // 使用事务批量插入
if (embeddedBatch.length > 0) { if (embeddedBatch.length > 0) {
await this.repository.insertVectors(embeddedBatch, embeddingModel) await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel)
// 清理批次数据 // 清理批次数据
embeddedBatch.length = 0 embeddedBatch.length = 0
} }
@ -375,8 +377,8 @@ export class VectorManager {
totalFiles: filesToIndex.length, totalFiles: filesToIndex.length,
}) })
// 定期内存清理 // 定期内存清理和延迟
await this.memoryCleanup(batchCount) await this.memoryCleanupWithDelay(batchCount)
} }
} }
} catch (error) { } catch (error) {
@ -523,7 +525,7 @@ export class VectorManager {
const embeddingProgress = { completed: 0 } const embeddingProgress = { completed: 0 }
// 减少批量大小以降低内存压力 // 减少批量大小以降低内存压力
const batchSize = options.batchSize const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小
let batchCount = 0 let batchCount = 0
try { try {
@ -564,16 +566,16 @@ export class VectorManager {
} }
}, },
{ {
numOfAttempts: 3, // 减少重试次数 numOfAttempts: 3,
startingDelay: 500, // 减少延迟 startingDelay: 1000, // 增加延迟
timeMultiple: 1.5, timeMultiple: 2.0,
jitter: 'full', jitter: 'full',
}, },
) )
// 立即插入当前批次,避免内存累积 // 使用事务批量插入,减少数据库连接压力
if (embeddedBatch.length > 0) { if (embeddedBatch.length > 0) {
await this.repository.insertVectors(embeddedBatch, embeddingModel) await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel)
// 清理批次数据 // 清理批次数据
embeddedBatch.length = 0 embeddedBatch.length = 0
} }
@ -585,12 +587,12 @@ export class VectorManager {
totalFiles: filesToIndex.length, totalFiles: filesToIndex.length,
}) })
// 定期内存清理 // 定期内存清理和延迟
await this.memoryCleanup(batchCount) await this.memoryCleanupWithDelay(batchCount)
} }
} else { } else {
// 不支持批量处理的提供商:使用流式处理逻辑 // 不支持批量处理的提供商:大幅降低并发度
const limit = pLimit(32) // 从50降低到10减少并发压力 const limit = pLimit(8) // 从32降低到8大幅减少并发压力
const abortController = new AbortController() const abortController = new AbortController()
// 流式处理:分批处理并立即插入 // 流式处理:分批处理并立即插入
@ -629,24 +631,24 @@ export class VectorManager {
embeddedBatch.push(embeddedChunk) embeddedBatch.push(embeddedChunk)
}, },
{ {
numOfAttempts: 3, // 减少重试次数 numOfAttempts: 3,
startingDelay: 500, // 减少延迟 startingDelay: 1000, // 增加延迟
timeMultiple: 1.5, timeMultiple: 2.0,
jitter: 'full', jitter: 'full',
}, },
) )
} catch (error) { } catch (error) {
abortController.abort() console.error('Error in embedding task:', error)
throw error // 不要立即中止,继续处理其他任务
} }
}), }),
) )
await Promise.all(tasks) await Promise.all(tasks)
// 立即插入当前批次 // 使用事务批量插入
if (embeddedBatch.length > 0) { if (embeddedBatch.length > 0) {
await this.repository.insertVectors(embeddedBatch, embeddingModel) await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel)
// 清理批次数据 // 清理批次数据
embeddedBatch.length = 0 embeddedBatch.length = 0
} }
@ -658,8 +660,8 @@ export class VectorManager {
totalFiles: filesToIndex.length, totalFiles: filesToIndex.length,
}) })
// 定期内存清理 // 定期内存清理和延迟
await this.memoryCleanup(batchCount) await this.memoryCleanupWithDelay(batchCount)
} }
} }
} catch (error) { } catch (error) {
@ -1029,4 +1031,56 @@ export class VectorManager {
return [] return []
} }
} }
// 增强的内存清理方法,增加延迟
private async memoryCleanupWithDelay(batchCount: number) {
// 每5批次强制垃圾回收和延迟
if (batchCount % 5 === 0) {
this.forceGarbageCollection()
// 增加延迟让系统有时间处理
await new Promise(resolve => setTimeout(resolve, 500))
}
}
// 使用事务插入向量的方法
private async insertVectorsWithTransaction(
data: InsertVector[],
embeddingModel: EmbeddingModel,
): Promise<void> {
const db = this.dbManager.getPgClient()
if (!db) {
throw new Error('Database not initialized')
}
const tableName = this.getTableName(embeddingModel)
// 使用 .exec 方法进行批量插入,性能更好
const insertStatements = data.map((vector) => {
// 转义字符串值以防止SQL注入
const escapedPath = vector.path.replace(/'/g, "''")
const escapedContent = vector.content.replace(/\0/g, '').replace(/'/g, "''")
const embeddingVector = `[${vector.embedding.join(',')}]`
const escapedMetadata = JSON.stringify(vector.metadata).replace(/'/g, "''")
return `INSERT INTO "${tableName}" (path, mtime, content, embedding, metadata) VALUES ('${escapedPath}', ${vector.mtime}, '${escapedContent}', '${embeddingVector}', '${escapedMetadata}');`
}).join('\n')
// 使用事务包装批量插入
const sql = `
BEGIN;
${insertStatements}
COMMIT;
`
await db.exec(sql)
}
// 获取表名的辅助方法
private getTableName(embeddingModel: EmbeddingModel): string {
const tableDefinition = vectorTables[embeddingModel.dimension]
if (!tableDefinition) {
throw new Error(`No table definition found for model: ${embeddingModel.id}`)
}
return tableDefinition.name
}
} }