diff --git a/src/hooks.server.ts b/src/hooks.server.ts index e119140c45..783479cc3a 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -17,6 +17,12 @@ import { findUser, refreshSessionCookie, requiresUser } from "$lib/server/auth"; import { ERROR_MESSAGES } from "$lib/stores/errors"; import { sha256 } from "$lib/utils/sha256"; import { addWeeks } from "date-fns"; +import { checkAndRunMigrations } from "$lib/migrations/migrations"; +import { building } from "$app/environment"; + +if (!building) { + await checkAndRunMigrations(); +} export const handle: Handle = async ({ event, resolve }) => { if (event.url.pathname.startsWith(`${base}/api/`) && EXPOSE_API !== "true") { diff --git a/src/lib/migrations/lock.ts b/src/lib/migrations/lock.ts new file mode 100644 index 0000000000..0ea03bf765 --- /dev/null +++ b/src/lib/migrations/lock.ts @@ -0,0 +1,42 @@ +import { collections } from "$lib/server/database"; + +export async function acquireLock(key = "migrations") { + try { + const insert = await collections.semaphores.insertOne({ + key, + createdAt: new Date(), + updatedAt: new Date(), + }); + + return !!insert.acknowledged; // true if the document was inserted + } catch (e) { + // unique index violation, so there must already be a lock + return false; + } +} + +export async function releaseLock(key = "migrations") { + await collections.semaphores.deleteOne({ + key, + }); +} + +export async function isDBLocked(key = "migrations"): Promise { + const res = await collections.semaphores.countDocuments({ + key, + }); + return res > 0; +} + +export async function refreshLock(key = "migrations") { + await collections.semaphores.updateOne( + { + key, + }, + { + $set: { + updatedAt: new Date(), + }, + } + ); +} diff --git a/src/lib/migrations/migrations.spec.ts b/src/lib/migrations/migrations.spec.ts new file mode 100644 index 0000000000..165c263c3a --- /dev/null +++ b/src/lib/migrations/migrations.spec.ts @@ -0,0 +1,53 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { migrations } from "./routines"; +import { acquireLock, isDBLocked, refreshLock, releaseLock } from "./lock"; +import { collections } from "$lib/server/database"; + +describe("migrations", () => { + it("should not have duplicates guid", async () => { + const guids = migrations.map((m) => m._id.toString()); + const uniqueGuids = [...new Set(guids)]; + expect(uniqueGuids.length).toBe(guids.length); + }); + + it("should acquire only one lock on DB", async () => { + const results = await Promise.all(new Array(1000).fill(0).map(() => acquireLock())); + const locks = results.filter((r) => r); + + const semaphores = await collections.semaphores.find({}).toArray(); + + expect(locks.length).toBe(1); + expect(semaphores).toBeDefined(); + expect(semaphores.length).toBe(1); + expect(semaphores?.[0].key).toBe("migrations"); + }); + + it("should read the lock correctly", async () => { + expect(await acquireLock()).toBe(true); + expect(await isDBLocked()).toBe(true); + expect(await acquireLock()).toBe(false); + await releaseLock(); + expect(await isDBLocked()).toBe(false); + }); + + it("should refresh the lock", async () => { + await acquireLock(); + + // get the updatedAt time + + const updatedAtInitially = (await collections.semaphores.findOne({}))?.updatedAt; + + await refreshLock(); + + const updatedAtAfterRefresh = (await collections.semaphores.findOne({}))?.updatedAt; + + expect(updatedAtInitially).toBeDefined(); + expect(updatedAtAfterRefresh).toBeDefined(); + expect(updatedAtInitially).not.toBe(updatedAtAfterRefresh); + }); +}); + +afterEach(async () => { + await collections.semaphores.deleteMany({}); + await collections.migrationResults.deleteMany({}); +}); diff --git a/src/lib/migrations/migrations.ts b/src/lib/migrations/migrations.ts new file mode 100644 index 0000000000..1142a25850 --- /dev/null +++ b/src/lib/migrations/migrations.ts @@ -0,0 +1,116 @@ +import { client, collections } from "$lib/server/database"; +import { migrations } from "./routines"; +import { acquireLock, releaseLock, isDBLocked, refreshLock } from "./lock"; +import { isHuggingChat } from "$lib/utils/isHuggingChat"; + +export async function checkAndRunMigrations() { + // make sure all GUIDs are unique + if (new Set(migrations.map((m) => m._id.toString())).size !== migrations.length) { + throw new Error("Duplicate migration GUIDs found."); + } + + // check if all migrations have already been run + const migrationResults = await collections.migrationResults.find().toArray(); + + // if all the migrations._id are in the migrationResults, we can exit early + if ( + migrations.every((m) => migrationResults.some((m2) => m2._id.toString() === m._id.toString())) + ) { + console.log("[MIGRATIONS] All migrations already applied."); + return; + } + + console.log("[MIGRATIONS] Begin check..."); + + // connect to the database + const connectedClient = await client.connect(); + + const hasLock = await acquireLock(); + + if (!hasLock) { + // another instance already has the lock, so we exit early + console.log( + "[MIGRATIONS] Another instance already has the lock. Waiting for DB to be unlocked." + ); + + // block until the lock is released + while (await isDBLocked()) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + return; + } + + // once here, we have the lock + // make sure to refresh it regularly while it's running + const refreshInterval = setInterval(async () => { + await refreshLock(); + }, 1000 * 10); + + // iterate over all migrations + for (const migration of migrations) { + // check if the migration has already been applied + const existingMigrationResult = migrationResults.find( + (m) => m._id.toString() === migration._id.toString() + ); + + // check if the migration has already been applied + if (existingMigrationResult) { + console.log(`[MIGRATIONS] "${migration.name}" already applied. Skipping...`); + } else { + // check the modifiers to see if some cases match + if ( + (migration.runForHuggingChat === "only" && !isHuggingChat) || + (migration.runForHuggingChat === "never" && isHuggingChat) + ) { + console.log( + `[MIGRATIONS] "${migration.name}" should not be applied for this run. Skipping...` + ); + continue; + } + + // otherwise all is good and we cna run the migration + console.log(`[MIGRATIONS] "${migration.name}" not applied yet. Applying...`); + + await collections.migrationResults.updateOne( + { _id: migration._id }, + { + $set: { + name: migration.name, + status: "ongoing", + }, + }, + { upsert: true } + ); + + const session = connectedClient.startSession(); + let result = false; + + try { + await session.withTransaction(async () => { + result = await migration.up(connectedClient); + }); + } catch (e) { + console.log(`[MIGRATION[] "${migration.name}" failed!`); + console.error(e); + } finally { + await session.endSession(); + } + + await collections.migrationResults.updateOne( + { _id: migration._id }, + { + $set: { + name: migration.name, + status: result ? "success" : "failure", + }, + }, + { upsert: true } + ); + } + } + + console.log("[MIGRATIONS] All migrations applied. Releasing lock"); + + clearInterval(refreshInterval); + await releaseLock(); +} diff --git a/src/lib/migrations/routines/01-update-search-assistants.ts b/src/lib/migrations/routines/01-update-search-assistants.ts new file mode 100644 index 0000000000..9f12b27d3f --- /dev/null +++ b/src/lib/migrations/routines/01-update-search-assistants.ts @@ -0,0 +1,50 @@ +import type { Migration } from "."; +import { getCollections } from "$lib/server/database"; +import { ObjectId, type AnyBulkWriteOperation } from "mongodb"; +import type { Assistant } from "$lib/types/Assistant"; +import { generateSearchTokens } from "$lib/utils/searchTokens"; + +const migration: Migration = { + _id: new ObjectId("5f9f3e3e3e3e3e3e3e3e3e3e"), + name: "Update search assistants", + up: async (client) => { + const { assistants } = getCollections(client); + let ops: AnyBulkWriteOperation[] = []; + + for await (const assistant of assistants + .find() + .project>({ _id: 1, name: 1 })) { + ops.push({ + updateOne: { + filter: { + _id: assistant._id, + }, + update: { + $set: { + searchTokens: generateSearchTokens(assistant.name), + }, + }, + }, + }); + + if (ops.length >= 1000) { + process.stdout.write("."); + await assistants.bulkWrite(ops, { ordered: false }); + ops = []; + } + } + + if (ops.length) { + await assistants.bulkWrite(ops, { ordered: false }); + } + + return true; + }, + down: async (client) => { + const { assistants } = getCollections(client); + await assistants.updateMany({}, { $unset: { searchTokens: "" } }); + return true; + }, +}; + +export default migration; diff --git a/src/lib/migrations/routines/index.ts b/src/lib/migrations/routines/index.ts new file mode 100644 index 0000000000..e000f95aff --- /dev/null +++ b/src/lib/migrations/routines/index.ts @@ -0,0 +1,14 @@ +import type { MongoClient, ObjectId } from "mongodb"; + +import updateSearchAssistant from "./01-update-search-assistants"; + +export interface Migration { + _id: ObjectId; + name: string; + up: (client: MongoClient) => Promise; + down?: (client: MongoClient) => Promise; + runForFreshInstall?: "only" | "never"; // leave unspecified to run for both + runForHuggingChat?: "only" | "never"; // leave unspecified to run for both +} + +export const migrations: Migration[] = [updateSearchAssistant]; diff --git a/src/lib/server/database.ts b/src/lib/server/database.ts index 7259f1e73a..8246691dd6 100644 --- a/src/lib/server/database.ts +++ b/src/lib/server/database.ts @@ -10,12 +10,15 @@ import type { Session } from "$lib/types/Session"; import type { Assistant } from "$lib/types/Assistant"; import type { Report } from "$lib/types/Report"; import type { ConversationStats } from "$lib/types/ConversationStats"; +import type { MigrationResult } from "$lib/types/MigrationResult"; +import type { Semaphore } from "$lib/types/Semaphore"; if (!MONGODB_URL) { throw new Error( "Please specify the MONGODB_URL environment variable inside .env.local. Set it to mongodb://localhost:27017 if you are running MongoDB locally, or to a MongoDB Atlas free instance for example." ); } +export const CONVERSATION_STATS_COLLECTION = "conversations.stats"; const client = new MongoClient(MONGODB_URL, { directConnection: MONGODB_DIRECT_CONNECTION === "true", @@ -23,24 +26,44 @@ const client = new MongoClient(MONGODB_URL, { export const connectPromise = client.connect().catch(console.error); -const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : "")); +export function getCollections(mongoClient: MongoClient) { + const db = mongoClient.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : "")); -export const CONVERSATION_STATS_COLLECTION = "conversations.stats"; + const conversations = db.collection("conversations"); + const conversationStats = db.collection(CONVERSATION_STATS_COLLECTION); + const assistants = db.collection("assistants"); + const reports = db.collection("reports"); + const sharedConversations = db.collection("sharedConversations"); + const abortedGenerations = db.collection("abortedGenerations"); + const settings = db.collection("settings"); + const users = db.collection("users"); + const sessions = db.collection("sessions"); + const messageEvents = db.collection("messageEvents"); + const bucket = new GridFSBucket(db, { bucketName: "files" }); + const migrationResults = db.collection("migrationResults"); + const semaphores = db.collection("semaphores"); + + return { + conversations, + conversationStats, + assistants, + reports, + sharedConversations, + abortedGenerations, + settings, + users, + sessions, + messageEvents, + bucket, + migrationResults, + semaphores, + }; +} +const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : "")); -const conversations = db.collection("conversations"); -const conversationStats = db.collection(CONVERSATION_STATS_COLLECTION); -const assistants = db.collection("assistants"); -const reports = db.collection("reports"); -const sharedConversations = db.collection("sharedConversations"); -const abortedGenerations = db.collection("abortedGenerations"); -const settings = db.collection("settings"); -const users = db.collection("users"); -const sessions = db.collection("sessions"); -const messageEvents = db.collection("messageEvents"); -const bucket = new GridFSBucket(db, { bucketName: "files" }); +const collections = getCollections(client); -export { client, db }; -export const collections = { +const { conversations, conversationStats, assistants, @@ -51,8 +74,10 @@ export const collections = { users, sessions, messageEvents, - bucket, -}; + semaphores, +} = collections; + +export { client, db, collections }; client.on("open", () => { conversations @@ -120,4 +145,8 @@ client.on("open", () => { assistants.createIndex({ searchTokens: 1 }).catch(console.error); reports.createIndex({ assistantId: 1 }).catch(console.error); reports.createIndex({ createdBy: 1, assistantId: 1 }).catch(console.error); + + // Unique index for semaphore and migration results + semaphores.createIndex({ key: 1 }, { unique: true }).catch(console.error); + semaphores.createIndex({ createdAt: 1 }, { expireAfterSeconds: 60 }).catch(console.error); }); diff --git a/src/lib/types/MigrationResult.ts b/src/lib/types/MigrationResult.ts new file mode 100644 index 0000000000..aff17be616 --- /dev/null +++ b/src/lib/types/MigrationResult.ts @@ -0,0 +1,7 @@ +import type { ObjectId } from "mongodb"; + +export interface MigrationResult { + _id: ObjectId; + name: string; + status: "success" | "failure" | "ongoing"; +} diff --git a/src/lib/types/Semaphore.ts b/src/lib/types/Semaphore.ts new file mode 100644 index 0000000000..8ea0d8ccb1 --- /dev/null +++ b/src/lib/types/Semaphore.ts @@ -0,0 +1,5 @@ +import type { Timestamps } from "./Timestamps"; + +export interface Semaphore extends Timestamps { + key: string; +}