Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple data sources for a single conversation #11509

Merged
merged 4 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion front/lib/api/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export type RedisUsageTagsType =
| "update_authors"
| "user_message_events"
| "reasoning_generation"
| "notion_url_sync";
| "notion_url_sync"
| "lock";

export async function getRedisClient({
origin,
Expand Down
48 changes: 21 additions & 27 deletions front/lib/lock.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,39 @@
export class Lock {
private static locks = new Map<string, Promise<void>>();
import { getRedisClient } from "./api/redis";

const WAIT_BETWEEN_RETRIES = 100;

export class Lock {
static async executeWithLock<T>(
lockName: string,
callback: () => Promise<T>,
timeoutMs: number = 30000
): Promise<T> {
const client = await getRedisClient({ origin: "lock" });
const lockKey = `lock:${lockName}`;
const lockValue = Date.now().toString();
const start = Date.now();

if (Lock.locks.has(lockName)) {
const currentLock = Lock.locks.get(lockName);
if (currentLock) {
const remainingTime = timeoutMs - (Date.now() - start);
if (remainingTime <= 0) {
throw new Error(`Lock acquisition timed out for ${lockName}`);
}

const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Lock acquisition timed out for ${lockName}`));
}, remainingTime);
});

await Promise.race([currentLock, timeoutPromise]);
// Try to acquire the lock
while (
!(await client.set(lockKey, lockValue, { NX: true, PX: timeoutMs }))
) {
// Check for timeout
if (Date.now() - start >= timeoutMs) {
throw new Error(`Lock acquisition timed out for ${lockName}`);
}
// Wait a bit before retrying
await new Promise((resolve) => setTimeout(resolve, WAIT_BETWEEN_RETRIES));
}

// Initialize resolveLock with a no-op function to satisfy TypeScript
let resolveLock = () => {};
const lockPromise = new Promise<void>((resolve) => {
resolveLock = resolve;
});

Lock.locks.set(lockName, lockPromise);

try {
const result = await callback();
return result;
} finally {
Lock.locks.delete(lockName);
resolveLock();
// Release the lock if we still own it
const currentValue = await client.get(lockKey);
if (currentValue === lockValue) {
await client.del(lockKey);
}
}
}
}
4 changes: 2 additions & 2 deletions front/lib/resources/data_source_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,11 @@ export class DataSourceResource extends ResourceWithSpace<DataSourceModel> {
}

static async fetchByModelIdWithAuth(auth: Authenticator, id: ModelId) {
const [dataSource] = await this.baseFetch(auth, undefined, {
const r = await this.baseFetch(auth, undefined, {
where: { id },
});

return dataSource ?? null;
return r.length > 0 ? r[0] : null;
}

protected async softDelete(
Expand Down
2 changes: 1 addition & 1 deletion front/lib/resources/storage/models/data_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ DataSourceModel.init(
{ fields: ["workspaceId", "name", "deletedAt"], unique: true },
{ fields: ["workspaceId", "connectorProvider"] },
{ fields: ["workspaceId", "vaultId"] },
{ fields: ["workspaceId", "conversationId"] },
{ fields: ["workspaceId", "conversationId"], unique: true },
{ fields: ["dustAPIProjectId"] },
],
}
Expand Down
207 changes: 207 additions & 0 deletions front/migrations/20250320_fix_conversation_datasources.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import { QueryTypes } from "sequelize";

import { hardDeleteDataSource } from "@app/lib/api/data_sources";
import { processAndUpsertToDataSource } from "@app/lib/api/files/upsert";
import { Authenticator } from "@app/lib/auth";
import { Conversation } from "@app/lib/models/assistant/conversation";
import { Workspace } from "@app/lib/models/workspace";
import { DataSourceResource } from "@app/lib/resources/data_source_resource";
import { FileResource } from "@app/lib/resources/file_resource";
import { frontSequelize } from "@app/lib/resources/storage";
import { FileModel } from "@app/lib/resources/storage/models/files";
import logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";

interface DuplicateConversation {
conversationId: number;
workspaceId: number;
datasource_ids: string[];
}

const worker = async ({ execute }: { execute: boolean }) => {
// Find all conversations with multiple datasources
const duplicateConversations =
await frontSequelize.query<DuplicateConversation>(
`
SELECT
"conversationId",
"workspaceId",
array_agg("id") as datasource_ids
FROM
data_sources
WHERE
"deletedAt" IS NULL AND "conversationId" is not null
GROUP BY
"workspaceId",
"conversationId"
HAVING
COUNT(*) > 1
`,
{ type: QueryTypes.SELECT }
);

console.log(duplicateConversations);

if (duplicateConversations.length === 0) {
logger.info("No conversations with duplicate datasources found");
return;
}

logger.info(
{ duplicateCount: duplicateConversations.length },
"Found conversations with duplicate datasources"
);

for (const conv of duplicateConversations) {
const conversationId = conv.conversationId;
const workspaceId = conv.workspaceId;
const datasourceIds = conv.datasource_ids.sort();
const firstDatasourceId = parseInt(datasourceIds[0], 10);

logger.info(
{
conversationId,
datasourceCount: datasourceIds.length,
firstDatasourceId,
dryRun: !execute,
},
execute ? "Processing conversation" : "Would process conversation"
);

const workspace = await Workspace.findByPk(workspaceId);
if (!workspace) {
logger.error({ workspaceId }, "Could not find workspace");
continue;
}

// Get conversation
const conversation = await Conversation.findByPk(conversationId);

if (!conversation) {
logger.error({ conversationId }, "Could not find conversation");
continue;
}

if (!execute) {
// Dry run - show what would happen
const files = await FileModel.findAll({
where: {
workspaceId: workspace.id,
useCase: "conversation",
useCaseMetadata: {
conversationId: conversation.sId,
},
},
});

logger.info(
{
conversationId,
workspaceId,
fileCount: files.length,
datasourcesToDelete: datasourceIds.slice(1),
dryRun: true,
},
`Process ${files.length} files to datasource ${firstDatasourceId}. Delete ${datasourceIds.length - 1} duplicate datasources: ${datasourceIds.slice(1).join(", ")}`
);
continue;
}

// First create a temporary auth to fetch the first datasource
const tempAuth = await Authenticator.internalAdminForWorkspace(
workspace.sId
);

// Get workspace from first datasource to create auth
const firstDataSource = await DataSourceResource.fetchByModelIdWithAuth(
tempAuth,
firstDatasourceId
);
if (!firstDataSource) {
logger.error(
{ conversationId, firstDatasourceId },
"Could not find first datasource"
);
continue;
}

// Find all files for this conversation
const files = await FileModel.findAll({
where: {
workspaceId: workspace.id,
useCaseMetadata: {
conversationId: conversation.sId,
},
},
});

logger.info(
{ conversationId, fileCount: files.length },
"Found files for conversation"
);

// Process each file to the first datasource
for (const file of files) {
try {
const fileResource = new FileResource(FileResource.model, file.get());
const res = await processAndUpsertToDataSource(
tempAuth,
firstDataSource,
{
file: fileResource,
}
);

if (res.isErr()) {
logger.error(
{
conversationId,
fileId: file.id,
error: res.error,
},
"Failed to process file"
);
continue;
}

logger.info(
{ conversationId, fileId: file.id },
"Successfully processed file"
);
} catch (error) {
logger.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to throw/exit if we hit those errors? I guess if we have a lot of volume, those errors might end up being "unnoticed".

{ conversationId, fileId: file.id, error },
"Error processing file"
);
}
}

// Delete all other datasources
for (const dsId of datasourceIds.slice(1)) {
try {
const ds = await DataSourceResource.fetchByModelIdWithAuth(
tempAuth,
parseInt(dsId, 10)
);
if (!ds) {
logger.warn({ dsId }, "Could not find datasource to delete");
continue;
}

await hardDeleteDataSource(tempAuth, ds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to soft delete them first!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really ? Why ? (it was working just fine in my test)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we had a check to ensure data sources were soft deleted first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the soft delete, it does delete directly the data source view, to ensure it's not visible anymore in the product:

protected async softDelete(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's probably best, to simply use the softDeleteDataSourceAndLaunchScrubWorkflow.

logger.info({ dsId }, "Successfully deleted duplicate datasource");
} catch (error) {
logger.error({ dsId, error }, "Error deleting duplicate datasource");
}
}
}
};

makeScript({}, worker);

export const up = worker;

export const down = async () => {
// This migration cannot be reversed as it deletes data
logger.info("This migration cannot be reversed");
};
6 changes: 6 additions & 0 deletions front/migrations/db/migration_187.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP INDEX CONCURRENTLY "data_sources_workspace_id_conversation_id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we drop the index before creating the new one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, seems that we can't update.


CREATE UNIQUE INDEX CONCURRENTLY "data_sources_workspace_conversation_unique_idx" ON "data_sources" ("workspaceId", "conversationId")
WHERE
"deletedAt" IS NULL
AND "conversationId" IS NOT NULL;
Loading