From c1fbd4da21c34f53f31891ac730084dba3f6b825 Mon Sep 17 00:00:00 2001 From: duanfuxiang Date: Thu, 10 Jul 2025 12:54:57 +0800 Subject: [PATCH] update batch files in update index --- src/database/modules/vector/vector-manager.ts | 785 ++++++++++-------- 1 file changed, 419 insertions(+), 366 deletions(-) diff --git a/src/database/modules/vector/vector-manager.ts b/src/database/modules/vector/vector-manager.ts index 6b2998b..1100a88 100644 --- a/src/database/modules/vector/vector-manager.ts +++ b/src/database/modules/vector/vector-manager.ts @@ -30,7 +30,7 @@ export class VectorManager { constructor(app: App, dbManager: DBManager) { this.app = app this.dbManager = dbManager - this.repository = new VectorRepository(app, dbManager.getPgClient()) + this.repository = new VectorRepository(app, dbManager.getPgClient() as any) } async performSimilaritySearch( @@ -103,13 +103,25 @@ export class VectorManager { // 强制垃圾回收的辅助方法 private forceGarbageCollection() { try { - if (typeof global !== 'undefined' && global.gc) { - global.gc() - } else if (typeof window !== 'undefined' && (window as any).gc) { - ((window as any).gc as () => void)(); + // 强制垃圾回收多次,确保释放资源 + for (let i = 0; i < 3; i++) { + if (typeof global !== 'undefined' && (global as any).gc) { + (global as any).gc() + } else if (typeof window !== 'undefined' && (window as any).gc) { + (window as any).gc() + } + } + + // 强制清理一些可能的引用 + if (typeof global !== 'undefined' && (global as any).gc) { + // Node.js 环境 + setTimeout(() => { + (global as any).gc?.() + }, 0) } } catch (e) { // 忽略垃圾回收错误 + console.debug('GC error (ignored):', e) } } @@ -188,198 +200,207 @@ export class VectorManager { console.log("textSplitter chunkSize: ", options.chunkSize, "overlap: ", overlap) const skippedFiles: string[] = [] - const contentChunks: InsertVector[] = ( - await Promise.all( - filesToIndex.map(async (file) => { - try { - let fileContent = await this.app.vault.cachedRead(file) - // 清理null字节,防止PostgreSQL UTF8编码错误 - fileContent = fileContent.replace(/\0/g, '') - const fileDocuments = await textSplitter.createDocuments([ - fileContent, - ]) - return fileDocuments - .map((chunk): InsertVector | null => { - // 保存原始内容,不在此处调用 removeMarkdown - const rawContent = chunk.pageContent.replace(/\0/g, '') - if (!rawContent || rawContent.trim().length === 0) { - return null - } - return { - path: file.path, - mtime: file.stat.mtime, - content: rawContent, // 保存原始内容 - embedding: [], - metadata: { - startLine: Number(chunk.metadata.loc.lines.from), - endLine: Number(chunk.metadata.loc.lines.to), - }, - } - }) - .filter((chunk): chunk is InsertVector => chunk !== null) - } catch (error) { - console.warn(`跳过文件 ${file.path}:`, error.message) - skippedFiles.push(file.path) - return [] - } - }), - ) - ).flat() - - console.log("contentChunks: ", contentChunks.length) - - if (skippedFiles.length > 0) { - console.warn(`跳过了 ${skippedFiles.length} 个有问题的文件:`, skippedFiles) - new Notice(`跳过了 ${skippedFiles.length} 个有问题的文件`) + const embeddingProgress = { completed: 0, totalChunks: 0 } + + // 分批处理文件,每批最多50个文件(减少以避免文件句柄耗尽) + const FILE_BATCH_SIZE = 50 + // 减少批量大小以降低内存压力 + const embeddingBatchSize = Math.min(options.batchSize, 10) + + // 首先统计总的分块数量用于进度显示 + let totalChunks = 0 + for (let i = 0; i < filesToIndex.length; i += FILE_BATCH_SIZE) { + const fileBatch = filesToIndex.slice(i, Math.min(i + FILE_BATCH_SIZE, filesToIndex.length)) + for (const file of fileBatch) { + try { + let fileContent = await this.app.vault.cachedRead(file) + fileContent = fileContent.replace(/\0/g, '') + const fileDocuments = await textSplitter.createDocuments([fileContent]) + totalChunks += fileDocuments.length + } catch (error) { + // 统计阶段跳过错误文件 + } + } } - + + embeddingProgress.totalChunks = totalChunks updateProgress?.({ completedChunks: 0, - totalChunks: contentChunks.length, + totalChunks: totalChunks, totalFiles: filesToIndex.length, }) - const embeddingProgress = { completed: 0 } - // 减少批量大小以降低内存压力 - const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小 - let batchCount = 0 - try { - if (embeddingModel.supportsBatch) { - // 支持批量处理的提供商:使用流式处理逻辑 - for (let i = 0; i < contentChunks.length; i += batchSize) { - batchCount++ - const batchChunks = contentChunks.slice(i, Math.min(i + batchSize, contentChunks.length)) - - const embeddedBatch: InsertVector[] = [] - - await backOff( - async () => { - // 在嵌入之前处理 markdown - const cleanedBatchData = batchChunks.map(chunk => { - const cleanContent = removeMarkdown(chunk.content) - return { chunk, cleanContent } - }).filter(({ cleanContent }) => cleanContent && cleanContent.trim().length > 0) - - if (cleanedBatchData.length === 0) { - return - } - - const batchTexts = cleanedBatchData.map(({ cleanContent }) => cleanContent) - const batchEmbeddings = await embeddingModel.getBatchEmbeddings(batchTexts) - - // 合并embedding结果到chunk数据 - for (let j = 0; j < cleanedBatchData.length; j++) { - const { chunk, cleanContent } = cleanedBatchData[j] - const embeddedChunk: InsertVector = { - path: chunk.path, - mtime: chunk.mtime, - content: cleanContent, // 使用已经清理过的内容 - embedding: batchEmbeddings[j], - metadata: chunk.metadata, - } - embeddedBatch.push(embeddedChunk) - } - }, - { - numOfAttempts: 3, // 减少重试次数 - startingDelay: 500, // 减少延迟 - timeMultiple: 1.5, - jitter: 'full', - }, - ) - - // 使用事务批量插入,减少数据库连接压力 - if (embeddedBatch.length > 0) { - await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) - console.log("insert vectors with transaction success, batch size: ", embeddedBatch.length) - // 清理批次数据 - embeddedBatch.length = 0 - } - - embeddingProgress.completed += batchChunks.length - updateProgress?.({ - completedChunks: embeddingProgress.completed, - totalChunks: contentChunks.length, - totalFiles: filesToIndex.length, - }) - - // 定期内存清理和延迟 - await this.memoryCleanupWithDelay(batchCount) - } - } else { - // 不支持批量处理的提供商:大幅降低并发度 - const limit = pLimit(8) // 从32降低到8,大幅减少并发压力 - const abortController = new AbortController() - - // 流式处理:分批处理并立即插入 - for (let i = 0; i < contentChunks.length; i += batchSize) { - if (abortController.signal.aborted) { - throw new Error('Operation was aborted') - } - - batchCount++ - const batchChunks = contentChunks.slice(i, Math.min(i + batchSize, contentChunks.length)) - const embeddedBatch: InsertVector[] = [] - - const tasks = batchChunks.map((chunk) => - limit(async () => { - if (abortController.signal.aborted) { - throw new Error('Operation was aborted') - } + for (let i = 0; i < filesToIndex.length; i += FILE_BATCH_SIZE) { + const fileBatch = filesToIndex.slice(i, Math.min(i + FILE_BATCH_SIZE, filesToIndex.length)) + console.log(`Processing file batch ${Math.floor(i / FILE_BATCH_SIZE) + 1}/${Math.ceil(filesToIndex.length / FILE_BATCH_SIZE)} (${fileBatch.length} files)`) + + // 第一步:分块处理 + const batchChunks = ( + await Promise.all( + fileBatch.map(async (file) => { try { - await backOff( - async () => { - // 在嵌入之前处理 markdown - const cleanContent = removeMarkdown(chunk.content).replace(/\0/g, '') - // 跳过清理后为空的内容 - if (!cleanContent || cleanContent.trim().length === 0) { - return + let fileContent = await this.app.vault.cachedRead(file) + // 清理null字节,防止PostgreSQL UTF8编码错误 + fileContent = fileContent.replace(/\0/g, '') + const fileDocuments = await textSplitter.createDocuments([ + fileContent, + ]) + return fileDocuments + .map((chunk): InsertVector | null => { + // 保存原始内容,不在此处调用 removeMarkdown + const rawContent = chunk.pageContent.replace(/\0/g, '') + if (!rawContent || rawContent.trim().length === 0) { + return null } - - const embedding = await embeddingModel.getEmbedding(cleanContent) - const embeddedChunk = { - path: chunk.path, - mtime: chunk.mtime, - content: cleanContent, // 使用清理后的内容 - embedding, - metadata: chunk.metadata, + return { + path: file.path, + mtime: file.stat.mtime, + content: rawContent, // 保存原始内容 + embedding: [], + metadata: { + startLine: Number(chunk.metadata.loc.lines.from), + endLine: Number(chunk.metadata.loc.lines.to), + }, } - embeddedBatch.push(embeddedChunk) - }, - { - numOfAttempts: 3, - startingDelay: 1000, // 增加延迟 - timeMultiple: 2.0, - jitter: 'full', - }, - ) + }) + .filter((chunk): chunk is InsertVector => chunk !== null) } catch (error) { - console.error('Error in embedding task:', error) - // 不要立即中止,继续处理其他任务 + console.warn(`跳过文件 ${file.path}:`, error.message) + skippedFiles.push(file.path) + return [] } }), ) - - await Promise.all(tasks) - - // 使用事务批量插入 - if (embeddedBatch.length > 0) { - await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) - // 清理批次数据 - embeddedBatch.length = 0 - } - - embeddingProgress.completed += batchChunks.length - updateProgress?.({ - completedChunks: embeddingProgress.completed, - totalChunks: contentChunks.length, - totalFiles: filesToIndex.length, - }) - - // 定期内存清理和延迟 - await this.memoryCleanupWithDelay(batchCount) + ).flat() + + if (batchChunks.length === 0) { + continue } + + // 第二步:嵌入处理 + console.log(`Embedding ${batchChunks.length} chunks for current file batch`) + + if (embeddingModel.supportsBatch) { + // 支持批量处理的提供商 + for (let j = 0; j < batchChunks.length; j += embeddingBatchSize) { + const embeddingBatch = batchChunks.slice(j, Math.min(j + embeddingBatchSize, batchChunks.length)) + const embeddedBatch: InsertVector[] = [] + + await backOff( + async () => { + // 在嵌入之前处理 markdown + const cleanedBatchData = embeddingBatch.map(chunk => { + const cleanContent = removeMarkdown(chunk.content) + return { chunk, cleanContent } + }).filter(({ cleanContent }) => cleanContent && cleanContent.trim().length > 0) + + if (cleanedBatchData.length === 0) { + return + } + + const batchTexts = cleanedBatchData.map(({ cleanContent }) => cleanContent) + const batchEmbeddings = await embeddingModel.getBatchEmbeddings(batchTexts) + + // 合并embedding结果到chunk数据 + for (let k = 0; k < cleanedBatchData.length; k++) { + const { chunk, cleanContent } = cleanedBatchData[k] + const embeddedChunk: InsertVector = { + path: chunk.path, + mtime: chunk.mtime, + content: cleanContent, // 使用已经清理过的内容 + embedding: batchEmbeddings[k], + metadata: chunk.metadata, + } + embeddedBatch.push(embeddedChunk) + } + }, + { + numOfAttempts: 3, + startingDelay: 500, + timeMultiple: 1.5, + jitter: 'full', + }, + ) + + // 第三步:立即存储 + if (embeddedBatch.length > 0) { + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) + console.log(`Stored ${embeddedBatch.length} embedded chunks`) + } + + embeddingProgress.completed += embeddingBatch.length + updateProgress?.({ + completedChunks: embeddingProgress.completed, + totalChunks: embeddingProgress.totalChunks, + totalFiles: filesToIndex.length, + }) + } + } else { + // 不支持批量处理的提供商(减少并发度以避免文件句柄耗尽) + const limit = pLimit(3) + + for (let j = 0; j < batchChunks.length; j += embeddingBatchSize) { + const embeddingBatch = batchChunks.slice(j, Math.min(j + embeddingBatchSize, batchChunks.length)) + const embeddedBatch: InsertVector[] = [] + + const tasks = embeddingBatch.map((chunk) => + limit(async () => { + try { + await backOff( + async () => { + // 在嵌入之前处理 markdown + const cleanContent = removeMarkdown(chunk.content).replace(/\0/g, '') + // 跳过清理后为空的内容 + if (!cleanContent || cleanContent.trim().length === 0) { + return + } + + const embedding = await embeddingModel.getEmbedding(cleanContent) + const embeddedChunk = { + path: chunk.path, + mtime: chunk.mtime, + content: cleanContent, // 使用清理后的内容 + embedding, + metadata: chunk.metadata, + } + embeddedBatch.push(embeddedChunk) + }, + { + numOfAttempts: 3, + startingDelay: 1000, + timeMultiple: 2.0, + jitter: 'full', + }, + ) + } catch (error) { + console.error('Error in embedding task:', error) + } + }), + ) + + await Promise.all(tasks) + + // 第三步:立即存储 + if (embeddedBatch.length > 0) { + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) + console.log(`Stored ${embeddedBatch.length} embedded chunks`) + } + + embeddingProgress.completed += embeddingBatch.length + updateProgress?.({ + completedChunks: embeddingProgress.completed, + totalChunks: embeddingProgress.totalChunks, + totalFiles: filesToIndex.length, + }) + } + } + + // 每批文件处理完后进行强制资源清理 + await this.forceResourceCleanup() + + // 额外延迟以允许系统释放文件句柄 + await new Promise(resolve => setTimeout(resolve, 500)) } } catch (error) { if ( @@ -395,8 +416,13 @@ export class VectorManager { throw error } } finally { - // 最终清理 - this.forceGarbageCollection() + // 最终强制清理 + await this.forceResourceCleanup() + } + + if (skippedFiles.length > 0) { + console.warn(`跳过了 ${skippedFiles.length} 个有问题的文件:`, skippedFiles) + new Notice(`跳过了 ${skippedFiles.length} 个有问题的文件`) } } @@ -472,197 +498,207 @@ export class VectorManager { console.log("textSplitter chunkSize: ", options.chunkSize, "overlap: ", overlap) const skippedFiles: string[] = [] - const contentChunks: InsertVector[] = ( - await Promise.all( - filesToIndex.map(async (file) => { - try { - let fileContent = await this.app.vault.cachedRead(file) - // 清理null字节,防止PostgreSQL UTF8编码错误 - fileContent = fileContent.replace(/\0/g, '') - const fileDocuments = await textSplitter.createDocuments([ - fileContent, - ]) - return fileDocuments - .map((chunk): InsertVector | null => { - // 保存原始内容,不在此处调用 removeMarkdown - const rawContent = chunk.pageContent.replace(/\0/g, '') - if (!rawContent || rawContent.trim().length === 0) { - return null - } - return { - path: file.path, - mtime: file.stat.mtime, - content: rawContent, // 保存原始内容 - embedding: [], - metadata: { - startLine: Number(chunk.metadata.loc.lines.from), - endLine: Number(chunk.metadata.loc.lines.to), - }, - } - }) - .filter((chunk): chunk is InsertVector => chunk !== null) - } catch (error) { - console.warn(`跳过文件 ${file.path}:`, error.message) - skippedFiles.push(file.path) - return [] - } - }), - ) - ).flat() - - console.log("contentChunks: ", contentChunks.length) - - if (skippedFiles.length > 0) { - console.warn(`跳过了 ${skippedFiles.length} 个有问题的文件:`, skippedFiles) - new Notice(`跳过了 ${skippedFiles.length} 个有问题的文件`) + const embeddingProgress = { completed: 0, totalChunks: 0 } + + // 分批处理文件,每批最多50个文件(减少以避免文件句柄耗尽) + const FILE_BATCH_SIZE = 50 + // 减少批量大小以降低内存压力 + const embeddingBatchSize = Math.min(options.batchSize, 10) + + // 首先统计总的分块数量用于进度显示 + let totalChunks = 0 + for (let i = 0; i < filesToIndex.length; i += FILE_BATCH_SIZE) { + const fileBatch = filesToIndex.slice(i, Math.min(i + FILE_BATCH_SIZE, filesToIndex.length)) + for (const file of fileBatch) { + try { + let fileContent = await this.app.vault.cachedRead(file) + fileContent = fileContent.replace(/\0/g, '') + const fileDocuments = await textSplitter.createDocuments([fileContent]) + totalChunks += fileDocuments.length + } catch (error) { + // 统计阶段跳过错误文件 + } + } } - + + embeddingProgress.totalChunks = totalChunks updateProgress?.({ completedChunks: 0, - totalChunks: contentChunks.length, + totalChunks: totalChunks, totalFiles: filesToIndex.length, }) - const embeddingProgress = { completed: 0 } - // 减少批量大小以降低内存压力 - const batchSize = Math.min(options.batchSize, 20) // 限制最大批量大小 - let batchCount = 0 - try { - if (embeddingModel.supportsBatch) { - // 支持批量处理的提供商:使用流式处理逻辑 - for (let i = 0; i < contentChunks.length; i += batchSize) { - batchCount++ - const batchChunks = contentChunks.slice(i, Math.min(i + batchSize, contentChunks.length)) - - const embeddedBatch: InsertVector[] = [] - - await backOff( - async () => { - // 在嵌入之前处理 markdown,只处理一次 - const cleanedBatchData = batchChunks.map(chunk => { - const cleanContent = removeMarkdown(chunk.content) - return { chunk, cleanContent } - }).filter(({ cleanContent }) => cleanContent && cleanContent.trim().length > 0) - - if (cleanedBatchData.length === 0) { - return - } - - const batchTexts = cleanedBatchData.map(({ cleanContent }) => cleanContent) - const batchEmbeddings = await embeddingModel.getBatchEmbeddings(batchTexts) - - // 合并embedding结果到chunk数据 - for (let j = 0; j < cleanedBatchData.length; j++) { - const { chunk, cleanContent } = cleanedBatchData[j] - const embeddedChunk: InsertVector = { - path: chunk.path, - mtime: chunk.mtime, - content: cleanContent, // 使用已经清理过的内容 - embedding: batchEmbeddings[j], - metadata: chunk.metadata, - } - embeddedBatch.push(embeddedChunk) - } - }, - { - numOfAttempts: 3, - startingDelay: 1000, // 增加延迟 - timeMultiple: 2.0, - jitter: 'full', - }, - ) - - // 使用事务批量插入,减少数据库连接压力 - if (embeddedBatch.length > 0) { - await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) - // 清理批次数据 - embeddedBatch.length = 0 - } - - embeddingProgress.completed += batchChunks.length - updateProgress?.({ - completedChunks: embeddingProgress.completed, - totalChunks: contentChunks.length, - totalFiles: filesToIndex.length, - }) - - // 定期内存清理和延迟 - await this.memoryCleanupWithDelay(batchCount) - } - } else { - // 不支持批量处理的提供商:大幅降低并发度 - const limit = pLimit(8) // 从32降低到8,大幅减少并发压力 - const abortController = new AbortController() - - // 流式处理:分批处理并立即插入 - for (let i = 0; i < contentChunks.length; i += batchSize) { - if (abortController.signal.aborted) { - throw new Error('Operation was aborted') - } - - batchCount++ - const batchChunks = contentChunks.slice(i, Math.min(i + batchSize, contentChunks.length)) - const embeddedBatch: InsertVector[] = [] - - const tasks = batchChunks.map((chunk) => - limit(async () => { - if (abortController.signal.aborted) { - throw new Error('Operation was aborted') - } + for (let i = 0; i < filesToIndex.length; i += FILE_BATCH_SIZE) { + const fileBatch = filesToIndex.slice(i, Math.min(i + FILE_BATCH_SIZE, filesToIndex.length)) + console.log(`Processing workspace file batch ${Math.floor(i / FILE_BATCH_SIZE) + 1}/${Math.ceil(filesToIndex.length / FILE_BATCH_SIZE)} (${fileBatch.length} files)`) + + // 第一步:分块处理 + const batchChunks = ( + await Promise.all( + fileBatch.map(async (file) => { try { - await backOff( - async () => { - // 在嵌入之前处理 markdown - const cleanContent = removeMarkdown(chunk.content).replace(/\0/g, '') - // 跳过清理后为空的内容 - if (!cleanContent || cleanContent.trim().length === 0) { - return + let fileContent = await this.app.vault.cachedRead(file) + // 清理null字节,防止PostgreSQL UTF8编码错误 + fileContent = fileContent.replace(/\0/g, '') + const fileDocuments = await textSplitter.createDocuments([ + fileContent, + ]) + return fileDocuments + .map((chunk): InsertVector | null => { + // 保存原始内容,不在此处调用 removeMarkdown + const rawContent = chunk.pageContent.replace(/\0/g, '') + if (!rawContent || rawContent.trim().length === 0) { + return null } - - const embedding = await embeddingModel.getEmbedding(cleanContent) - const embeddedChunk = { - path: chunk.path, - mtime: chunk.mtime, - content: cleanContent, // 使用清理后的内容 - embedding, - metadata: chunk.metadata, + return { + path: file.path, + mtime: file.stat.mtime, + content: rawContent, // 保存原始内容 + embedding: [], + metadata: { + startLine: Number(chunk.metadata.loc.lines.from), + endLine: Number(chunk.metadata.loc.lines.to), + }, } - embeddedBatch.push(embeddedChunk) - }, - { - numOfAttempts: 3, - startingDelay: 1000, // 增加延迟 - timeMultiple: 2.0, - jitter: 'full', - }, - ) + }) + .filter((chunk): chunk is InsertVector => chunk !== null) } catch (error) { - console.error('Error in embedding task:', error) - // 不要立即中止,继续处理其他任务 + console.warn(`跳过文件 ${file.path}:`, error.message) + skippedFiles.push(file.path) + return [] } }), ) - - await Promise.all(tasks) - - // 使用事务批量插入 - if (embeddedBatch.length > 0) { - await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) - // 清理批次数据 - embeddedBatch.length = 0 - } - - embeddingProgress.completed += batchChunks.length - updateProgress?.({ - completedChunks: embeddingProgress.completed, - totalChunks: contentChunks.length, - totalFiles: filesToIndex.length, - }) - - // 定期内存清理和延迟 - await this.memoryCleanupWithDelay(batchCount) + ).flat() + + if (batchChunks.length === 0) { + continue } + + // 第二步:嵌入处理 + console.log(`Embedding ${batchChunks.length} chunks for current workspace file batch`) + + if (embeddingModel.supportsBatch) { + // 支持批量处理的提供商 + for (let j = 0; j < batchChunks.length; j += embeddingBatchSize) { + const embeddingBatch = batchChunks.slice(j, Math.min(j + embeddingBatchSize, batchChunks.length)) + const embeddedBatch: InsertVector[] = [] + + await backOff( + async () => { + // 在嵌入之前处理 markdown + const cleanedBatchData = embeddingBatch.map(chunk => { + const cleanContent = removeMarkdown(chunk.content) + return { chunk, cleanContent } + }).filter(({ cleanContent }) => cleanContent && cleanContent.trim().length > 0) + + if (cleanedBatchData.length === 0) { + return + } + + const batchTexts = cleanedBatchData.map(({ cleanContent }) => cleanContent) + const batchEmbeddings = await embeddingModel.getBatchEmbeddings(batchTexts) + + // 合并embedding结果到chunk数据 + for (let k = 0; k < cleanedBatchData.length; k++) { + const { chunk, cleanContent } = cleanedBatchData[k] + const embeddedChunk: InsertVector = { + path: chunk.path, + mtime: chunk.mtime, + content: cleanContent, // 使用已经清理过的内容 + embedding: batchEmbeddings[k], + metadata: chunk.metadata, + } + embeddedBatch.push(embeddedChunk) + } + }, + { + numOfAttempts: 3, + startingDelay: 1000, + timeMultiple: 2.0, + jitter: 'full', + }, + ) + + // 第三步:立即存储 + if (embeddedBatch.length > 0) { + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) + console.log(`Stored ${embeddedBatch.length} embedded chunks for workspace`) + } + + embeddingProgress.completed += embeddingBatch.length + updateProgress?.({ + completedChunks: embeddingProgress.completed, + totalChunks: embeddingProgress.totalChunks, + totalFiles: filesToIndex.length, + }) + } + } else { + // 不支持批量处理的提供商(减少并发度以避免文件句柄耗尽) + const limit = pLimit(3) + + for (let j = 0; j < batchChunks.length; j += embeddingBatchSize) { + const embeddingBatch = batchChunks.slice(j, Math.min(j + embeddingBatchSize, batchChunks.length)) + const embeddedBatch: InsertVector[] = [] + + const tasks = embeddingBatch.map((chunk) => + limit(async () => { + try { + await backOff( + async () => { + // 在嵌入之前处理 markdown + const cleanContent = removeMarkdown(chunk.content).replace(/\0/g, '') + // 跳过清理后为空的内容 + if (!cleanContent || cleanContent.trim().length === 0) { + return + } + + const embedding = await embeddingModel.getEmbedding(cleanContent) + const embeddedChunk = { + path: chunk.path, + mtime: chunk.mtime, + content: cleanContent, // 使用清理后的内容 + embedding, + metadata: chunk.metadata, + } + embeddedBatch.push(embeddedChunk) + }, + { + numOfAttempts: 3, + startingDelay: 1000, + timeMultiple: 2.0, + jitter: 'full', + }, + ) + } catch (error) { + console.error('Error in embedding task:', error) + } + }), + ) + + await Promise.all(tasks) + + // 第三步:立即存储 + if (embeddedBatch.length > 0) { + await this.insertVectorsWithTransaction(embeddedBatch, embeddingModel) + console.log(`Stored ${embeddedBatch.length} embedded chunks for workspace`) + } + + embeddingProgress.completed += embeddingBatch.length + updateProgress?.({ + completedChunks: embeddingProgress.completed, + totalChunks: embeddingProgress.totalChunks, + totalFiles: filesToIndex.length, + }) + } + } + + // 每批文件处理完后进行强制资源清理 + await this.forceResourceCleanup() + + // 额外延迟以允许系统释放文件句柄 + await new Promise(resolve => setTimeout(resolve, 500)) } } catch (error) { if ( @@ -678,8 +714,13 @@ export class VectorManager { throw error } } finally { - // 最终清理 - this.forceGarbageCollection() + // 最终强制清理 + await this.forceResourceCleanup() + } + + if (skippedFiles.length > 0) { + console.warn(`跳过了 ${skippedFiles.length} 个有问题的文件:`, skippedFiles) + new Notice(`跳过了 ${skippedFiles.length} 个有问题的文件`) } } @@ -1034,14 +1075,26 @@ export class VectorManager { // 增强的内存清理方法,增加延迟 private async memoryCleanupWithDelay(batchCount: number) { - // 每5批次强制垃圾回收和延迟 - if (batchCount % 5 === 0) { + // 每3批次强制垃圾回收和延迟 + if (batchCount % 3 === 0) { this.forceGarbageCollection() - // 增加延迟让系统有时间处理 - await new Promise(resolve => setTimeout(resolve, 500)) + // 增加延迟让系统有时间处理和释放文件句柄 + await new Promise(resolve => setTimeout(resolve, 1000)) } } + // 强制内存和资源清理 + private async forceResourceCleanup() { + // 多次垃圾回收 + for (let i = 0; i < 5; i++) { + this.forceGarbageCollection() + await new Promise(resolve => setTimeout(resolve, 100)) + } + + // 额外延迟让系统释放资源 + await new Promise(resolve => setTimeout(resolve, 500)) + } + // 使用事务插入向量的方法 private async insertVectorsWithTransaction( data: InsertVector[],