diff --git a/front/lib/api/redis.ts b/front/lib/api/redis.ts index de4955ed352e..981068f40143 100644 --- a/front/lib/api/redis.ts +++ b/front/lib/api/redis.ts @@ -18,7 +18,8 @@ export type RedisUsageTagsType = | "update_authors" | "user_message_events" | "reasoning_generation" - | "notion_url_sync"; + | "notion_url_sync" + | "lock"; export async function getRedisClient({ origin, diff --git a/front/lib/lock.ts b/front/lib/lock.ts index d142fcad5ea3..5ec318108609 100644 --- a/front/lib/lock.ts +++ b/front/lib/lock.ts @@ -1,45 +1,39 @@ -export class Lock { - private static locks = new Map>(); +import { getRedisClient } from "./api/redis"; + +const WAIT_BETWEEN_RETRIES = 100; +export class Lock { static async executeWithLock( lockName: string, callback: () => Promise, timeoutMs: number = 30000 ): Promise { + const client = await getRedisClient({ origin: "lock" }); + const lockKey = `lock:${lockName}`; + const lockValue = Date.now().toString(); const start = Date.now(); - if (Lock.locks.has(lockName)) { - const currentLock = Lock.locks.get(lockName); - if (currentLock) { - const remainingTime = timeoutMs - (Date.now() - start); - if (remainingTime <= 0) { - throw new Error(`Lock acquisition timed out for ${lockName}`); - } - - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { - reject(new Error(`Lock acquisition timed out for ${lockName}`)); - }, remainingTime); - }); - - await Promise.race([currentLock, timeoutPromise]); + // Try to acquire the lock + while ( + !(await client.set(lockKey, lockValue, { NX: true, PX: timeoutMs })) + ) { + // Check for timeout + if (Date.now() - start >= timeoutMs) { + throw new Error(`Lock acquisition timed out for ${lockName}`); } + // Wait a bit before retrying + await new Promise((resolve) => setTimeout(resolve, WAIT_BETWEEN_RETRIES)); } - // Initialize resolveLock with a no-op function to satisfy TypeScript - let resolveLock = () => {}; - const lockPromise = new Promise((resolve) => { - resolveLock = resolve; - }); - - Lock.locks.set(lockName, lockPromise); - try { const result = await callback(); return result; } finally { - Lock.locks.delete(lockName); - resolveLock(); + // Release the lock if we still own it + const currentValue = await client.get(lockKey); + if (currentValue === lockValue) { + await client.del(lockKey); + } } } } diff --git a/front/lib/resources/data_source_resource.ts b/front/lib/resources/data_source_resource.ts index 9e27473131d9..5477d436abb9 100644 --- a/front/lib/resources/data_source_resource.ts +++ b/front/lib/resources/data_source_resource.ts @@ -386,11 +386,11 @@ export class DataSourceResource extends ResourceWithSpace { } static async fetchByModelIdWithAuth(auth: Authenticator, id: ModelId) { - const [dataSource] = await this.baseFetch(auth, undefined, { + const r = await this.baseFetch(auth, undefined, { where: { id }, }); - return dataSource ?? null; + return r.length > 0 ? r[0] : null; } protected async softDelete( diff --git a/front/lib/resources/storage/models/data_source.ts b/front/lib/resources/storage/models/data_source.ts index b927f9ee9c60..8bf75a6060cb 100644 --- a/front/lib/resources/storage/models/data_source.ts +++ b/front/lib/resources/storage/models/data_source.ts @@ -85,7 +85,7 @@ DataSourceModel.init( { fields: ["workspaceId", "name", "deletedAt"], unique: true }, { fields: ["workspaceId", "connectorProvider"] }, { fields: ["workspaceId", "vaultId"] }, - { fields: ["workspaceId", "conversationId"] }, + { fields: ["workspaceId", "conversationId"], unique: true }, { fields: ["dustAPIProjectId"] }, ], } diff --git a/front/migrations/20250413_cleanup_github_platform_actions.ts b/front/migrations/20250313_cleanup_github_platform_actions.ts similarity index 100% rename from front/migrations/20250413_cleanup_github_platform_actions.ts rename to front/migrations/20250313_cleanup_github_platform_actions.ts diff --git a/front/migrations/20250320_fix_conversation_datasources.ts b/front/migrations/20250320_fix_conversation_datasources.ts new file mode 100644 index 000000000000..89faf07b314a --- /dev/null +++ b/front/migrations/20250320_fix_conversation_datasources.ts @@ -0,0 +1,207 @@ +import { QueryTypes } from "sequelize"; + +import { hardDeleteDataSource } from "@app/lib/api/data_sources"; +import { processAndUpsertToDataSource } from "@app/lib/api/files/upsert"; +import { Authenticator } from "@app/lib/auth"; +import { Conversation } from "@app/lib/models/assistant/conversation"; +import { Workspace } from "@app/lib/models/workspace"; +import { DataSourceResource } from "@app/lib/resources/data_source_resource"; +import { FileResource } from "@app/lib/resources/file_resource"; +import { frontSequelize } from "@app/lib/resources/storage"; +import { FileModel } from "@app/lib/resources/storage/models/files"; +import logger from "@app/logger/logger"; +import { makeScript } from "@app/scripts/helpers"; + +interface DuplicateConversation { + conversationId: number; + workspaceId: number; + datasource_ids: string[]; +} + +const worker = async ({ execute }: { execute: boolean }) => { + // Find all conversations with multiple datasources + const duplicateConversations = + await frontSequelize.query( + ` +SELECT + "conversationId", + "workspaceId", + array_agg("id") as datasource_ids +FROM + data_sources +WHERE + "deletedAt" IS NULL AND "conversationId" is not null +GROUP BY + "workspaceId", + "conversationId" +HAVING + COUNT(*) > 1 + `, + { type: QueryTypes.SELECT } + ); + + console.log(duplicateConversations); + + if (duplicateConversations.length === 0) { + logger.info("No conversations with duplicate datasources found"); + return; + } + + logger.info( + { duplicateCount: duplicateConversations.length }, + "Found conversations with duplicate datasources" + ); + + for (const conv of duplicateConversations) { + const conversationId = conv.conversationId; + const workspaceId = conv.workspaceId; + const datasourceIds = conv.datasource_ids.sort(); + const firstDatasourceId = parseInt(datasourceIds[0], 10); + + logger.info( + { + conversationId, + datasourceCount: datasourceIds.length, + firstDatasourceId, + dryRun: !execute, + }, + execute ? "Processing conversation" : "Would process conversation" + ); + + const workspace = await Workspace.findByPk(workspaceId); + if (!workspace) { + logger.error({ workspaceId }, "Could not find workspace"); + continue; + } + + // Get conversation + const conversation = await Conversation.findByPk(conversationId); + + if (!conversation) { + logger.error({ conversationId }, "Could not find conversation"); + continue; + } + + if (!execute) { + // Dry run - show what would happen + const files = await FileModel.findAll({ + where: { + workspaceId: workspace.id, + useCase: "conversation", + useCaseMetadata: { + conversationId: conversation.sId, + }, + }, + }); + + logger.info( + { + conversationId, + workspaceId, + fileCount: files.length, + datasourcesToDelete: datasourceIds.slice(1), + dryRun: true, + }, + `Process ${files.length} files to datasource ${firstDatasourceId}. Delete ${datasourceIds.length - 1} duplicate datasources: ${datasourceIds.slice(1).join(", ")}` + ); + continue; + } + + // First create a temporary auth to fetch the first datasource + const tempAuth = await Authenticator.internalAdminForWorkspace( + workspace.sId + ); + + // Get workspace from first datasource to create auth + const firstDataSource = await DataSourceResource.fetchByModelIdWithAuth( + tempAuth, + firstDatasourceId + ); + if (!firstDataSource) { + logger.error( + { conversationId, firstDatasourceId }, + "Could not find first datasource" + ); + continue; + } + + // Find all files for this conversation + const files = await FileModel.findAll({ + where: { + workspaceId: workspace.id, + useCaseMetadata: { + conversationId: conversation.sId, + }, + }, + }); + + logger.info( + { conversationId, fileCount: files.length }, + "Found files for conversation" + ); + + // Process each file to the first datasource + for (const file of files) { + try { + const fileResource = new FileResource(FileResource.model, file.get()); + const res = await processAndUpsertToDataSource( + tempAuth, + firstDataSource, + { + file: fileResource, + } + ); + + if (res.isErr()) { + logger.error( + { + conversationId, + fileId: file.id, + error: res.error, + }, + "Failed to process file" + ); + continue; + } + + logger.info( + { conversationId, fileId: file.id }, + "Successfully processed file" + ); + } catch (error) { + logger.error( + { conversationId, fileId: file.id, error }, + "Error processing file" + ); + } + } + + // Delete all other datasources + for (const dsId of datasourceIds.slice(1)) { + try { + const ds = await DataSourceResource.fetchByModelIdWithAuth( + tempAuth, + parseInt(dsId, 10) + ); + if (!ds) { + logger.warn({ dsId }, "Could not find datasource to delete"); + continue; + } + + await hardDeleteDataSource(tempAuth, ds); + logger.info({ dsId }, "Successfully deleted duplicate datasource"); + } catch (error) { + logger.error({ dsId, error }, "Error deleting duplicate datasource"); + } + } + } +}; + +makeScript({}, worker); + +export const up = worker; + +export const down = async () => { + // This migration cannot be reversed as it deletes data + logger.info("This migration cannot be reversed"); +}; diff --git a/front/migrations/db/migration_187.sql b/front/migrations/db/migration_187.sql new file mode 100644 index 000000000000..e22afb42f3c1 --- /dev/null +++ b/front/migrations/db/migration_187.sql @@ -0,0 +1,6 @@ +DROP INDEX CONCURRENTLY "data_sources_workspace_id_conversation_id"; + +CREATE UNIQUE INDEX CONCURRENTLY "data_sources_workspace_conversation_unique_idx" ON "data_sources" ("workspaceId", "conversationId") +WHERE + "deletedAt" IS NULL + AND "conversationId" IS NOT NULL; \ No newline at end of file