diff --git a/src/components/chat-view/SearchView.tsx b/src/components/chat-view/SearchView.tsx index 9212039..aa49ee6 100644 --- a/src/components/chat-view/SearchView.tsx +++ b/src/components/chat-view/SearchView.tsx @@ -354,8 +354,6 @@ const SearchView = () => { setShowRAGInitConfirm(true) }, []) - - // 确认初始化 RAG 向量 const confirmInitWorkspaceRAG = useCallback(async () => { setShowRAGInitConfirm(false) diff --git a/src/core/rag/rag-engine.ts b/src/core/rag/rag-engine.ts index d25b2ae..f4ace14 100644 --- a/src/core/rag/rag-engine.ts +++ b/src/core/rag/rag-engine.ts @@ -16,7 +16,7 @@ import { getEmbeddingModel } from './embedding' type EmbeddingManager = { modelLoaded: boolean currentModel: string | null - loadModel(modelId: string, useGpu: boolean): Promise + loadModel(modelId: string, useGpu: boolean): Promise embed(text: string): Promise<{ vec: number[] }> embedBatch(texts: string[]): Promise<{ vec: number[] }[]> } @@ -248,7 +248,6 @@ export class RAGEngine { if (workspace) { // 获取工作区中的所有文件路径 - const folders: string[] = [] const files: string[] = [] for (const item of workspace.content) { diff --git a/src/database/modules/vector/vector-manager.ts b/src/database/modules/vector/vector-manager.ts index fdb7d83..6b2998b 100644 --- a/src/database/modules/vector/vector-manager.ts +++ b/src/database/modules/vector/vector-manager.ts @@ -18,6 +18,7 @@ import { getFilesWithTag } from '../../../utils/glob-utils'; import { openSettingsModalWithError } from '../../../utils/open-settings-modal'; import { DBManager } from '../../database-manager'; import { Workspace } from '../../json/workspace/types'; +import { vectorTables } from '../../schema'; import { VectorRepository } from './vector-repository'; @@ -240,7 +241,7 @@ export class VectorManager { const embeddingProgress = { completed: 0 } // 减少批量大小以降低内存压力 - const batchSize = options.batchSize + const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小 let batchCount = 0 try { @@ -288,9 +289,10 @@ export class VectorManager { }, ) - // 立即插入当前批次,避免内存累积 + // 使用事务批量插入,减少数据库连接压力 if (embeddedBatch.length > 0) { - await this.repository.insertVectors(embeddedBatch, embeddingModel) + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) + console.log("insert vectors with transaction success, batch size: ", embeddedBatch.length) // 清理批次数据 embeddedBatch.length = 0 } @@ -302,12 +304,12 @@ export class VectorManager { totalFiles: filesToIndex.length, }) - // 定期内存清理 - await this.memoryCleanup(batchCount) + // 定期内存清理和延迟 + await this.memoryCleanupWithDelay(batchCount) } } else { - // 不支持批量处理的提供商:使用流式处理逻辑 - const limit = pLimit(32) // 从50降低到10,减少并发压力 + // 不支持批量处理的提供商:大幅降低并发度 + const limit = pLimit(8) // 从32降低到8,大幅减少并发压力 const abortController = new AbortController() // 流式处理:分批处理并立即插入 @@ -346,24 +348,24 @@ export class VectorManager { embeddedBatch.push(embeddedChunk) }, { - numOfAttempts: 3, // 减少重试次数 - startingDelay: 500, // 减少延迟 - timeMultiple: 1.5, + numOfAttempts: 3, + startingDelay: 1000, // 增加延迟 + timeMultiple: 2.0, jitter: 'full', }, ) } catch (error) { - abortController.abort() - throw error + console.error('Error in embedding task:', error) + // 不要立即中止,继续处理其他任务 } }), ) await Promise.all(tasks) - // 立即插入当前批次 + // 使用事务批量插入 if (embeddedBatch.length > 0) { - await this.repository.insertVectors(embeddedBatch, embeddingModel) + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) // 清理批次数据 embeddedBatch.length = 0 } @@ -375,8 +377,8 @@ export class VectorManager { totalFiles: filesToIndex.length, }) - // 定期内存清理 - await this.memoryCleanup(batchCount) + // 定期内存清理和延迟 + await this.memoryCleanupWithDelay(batchCount) } } } catch (error) { @@ -523,7 +525,7 @@ export class VectorManager { const embeddingProgress = { completed: 0 } // 减少批量大小以降低内存压力 - const batchSize = options.batchSize + const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小 let batchCount = 0 try { @@ -564,16 +566,16 @@ export class VectorManager { } }, { - numOfAttempts: 3, // 减少重试次数 - startingDelay: 500, // 减少延迟 - timeMultiple: 1.5, + numOfAttempts: 3, + startingDelay: 1000, // 增加延迟 + timeMultiple: 2.0, jitter: 'full', }, ) - // 立即插入当前批次,避免内存累积 + // 使用事务批量插入,减少数据库连接压力 if (embeddedBatch.length > 0) { - await this.repository.insertVectors(embeddedBatch, embeddingModel) + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) // 清理批次数据 embeddedBatch.length = 0 } @@ -585,12 +587,12 @@ export class VectorManager { totalFiles: filesToIndex.length, }) - // 定期内存清理 - await this.memoryCleanup(batchCount) + // 定期内存清理和延迟 + await this.memoryCleanupWithDelay(batchCount) } } else { - // 不支持批量处理的提供商:使用流式处理逻辑 - const limit = pLimit(32) // 从50降低到10,减少并发压力 + // 不支持批量处理的提供商:大幅降低并发度 + const limit = pLimit(8) // 从32降低到8,大幅减少并发压力 const abortController = new AbortController() // 流式处理:分批处理并立即插入 @@ -629,24 +631,24 @@ export class VectorManager { embeddedBatch.push(embeddedChunk) }, { - numOfAttempts: 3, // 减少重试次数 - startingDelay: 500, // 减少延迟 - timeMultiple: 1.5, + numOfAttempts: 3, + startingDelay: 1000, // 增加延迟 + timeMultiple: 2.0, jitter: 'full', }, ) } catch (error) { - abortController.abort() - throw error + console.error('Error in embedding task:', error) + // 不要立即中止,继续处理其他任务 } }), ) await Promise.all(tasks) - // 立即插入当前批次 + // 使用事务批量插入 if (embeddedBatch.length > 0) { - await this.repository.insertVectors(embeddedBatch, embeddingModel) + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) // 清理批次数据 embeddedBatch.length = 0 } @@ -658,8 +660,8 @@ export class VectorManager { totalFiles: filesToIndex.length, }) - // 定期内存清理 - await this.memoryCleanup(batchCount) + // 定期内存清理和延迟 + await this.memoryCleanupWithDelay(batchCount) } } } catch (error) { @@ -1029,4 +1031,56 @@ export class VectorManager { return [] } } + + // 增强的内存清理方法,增加延迟 + private async memoryCleanupWithDelay(batchCount: number) { + // 每5批次强制垃圾回收和延迟 + if (batchCount % 5 === 0) { + this.forceGarbageCollection() + // 增加延迟让系统有时间处理 + await new Promise(resolve => setTimeout(resolve, 500)) + } + } + + // 使用事务插入向量的方法 + private async insertVectorsWithTransaction( + data: InsertVector[], + embeddingModel: EmbeddingModel, + ): Promise { + const db = this.dbManager.getPgClient() + if (!db) { + throw new Error('Database not initialized') + } + + const tableName = this.getTableName(embeddingModel) + + // 使用 .exec 方法进行批量插入,性能更好 + const insertStatements = data.map((vector) => { + // 转义字符串值以防止SQL注入 + const escapedPath = vector.path.replace(/'/g, "''") + const escapedContent = vector.content.replace(/\0/g, '').replace(/'/g, "''") + const embeddingVector = `[${vector.embedding.join(',')}]` + const escapedMetadata = JSON.stringify(vector.metadata).replace(/'/g, "''") + + return `INSERT INTO "${tableName}" (path, mtime, content, embedding, metadata) VALUES ('${escapedPath}', ${vector.mtime}, '${escapedContent}', '${embeddingVector}', '${escapedMetadata}');` + }).join('\n') + + // 使用事务包装批量插入 + const sql = ` + BEGIN; + ${insertStatements} + COMMIT; + ` + + await db.exec(sql) + } + + // 获取表名的辅助方法 + private getTableName(embeddingModel: EmbeddingModel): string { + const tableDefinition = vectorTables[embeddingModel.dimension] + if (!tableDefinition) { + throw new Error(`No table definition found for model: ${embeddingModel.id}`) + } + return tableDefinition.name + } }