Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated migration setup #897

Merged
merged 12 commits into from Mar 5, 2024
6 changes: 6 additions & 0 deletions src/hooks.server.ts
Expand Up @@ -17,6 +17,12 @@ import { findUser, refreshSessionCookie, requiresUser } from "$lib/server/auth";
import { ERROR_MESSAGES } from "$lib/stores/errors";
import { sha256 } from "$lib/utils/sha256";
import { addWeeks } from "date-fns";
import { checkAndRunMigrations } from "$lib/migrations/migrations";
import { building } from "$app/environment";

if (!building) {
await checkAndRunMigrations();
}

export const handle: Handle = async ({ event, resolve }) => {
if (event.url.pathname.startsWith(`${base}/api/`) && EXPOSE_API !== "true") {
Expand Down
42 changes: 42 additions & 0 deletions src/lib/migrations/lock.ts
nsarrazin marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,42 @@
import { collections } from "$lib/server/database";

export async function acquireLock(key = "migrations") {
mishig25 marked this conversation as resolved.
Show resolved Hide resolved
try {
const insert = await collections.semaphores.insertOne({
key,
createdAt: new Date(),
updatedAt: new Date(),
});

return !!insert.acknowledged; // true if the document was inserted
} catch (e) {
// unique index violation, so there must already be a lock
return false;
}
}

export async function releaseLock(key = "migrations") {
await collections.semaphores.deleteOne({
key,
});
}

export async function isDBLocked(key = "migrations"): Promise<boolean> {
const res = await collections.semaphores.countDocuments({
key,
});
return res > 0;
}

export async function refreshLock(key = "migrations") {
await collections.semaphores.updateOne(
{
key,
},
{
$set: {
updatedAt: new Date(),
},
}
);
}
52 changes: 52 additions & 0 deletions src/lib/migrations/migrations.spec.ts
@@ -0,0 +1,52 @@
import { afterEach, describe, expect, it } from "vitest";
import { migrations } from "./routines";
import { acquireLock, isDBLocked, refreshLock, releaseLock } from "./lock";
import { collections } from "$lib/server/database";

describe("migrations", () => {
it("should not have duplicates guid", async () => {
const guids = migrations.map((m) => m._id.toString());
const uniqueGuids = [...new Set(guids)];
expect(uniqueGuids.length).toBe(guids.length);
});

it("should acquire only one lock on DB", async () => {
const results = await Promise.all(new Array(1000).fill(0).map(() => acquireLock()));
const locks = results.filter((r) => r);

const semaphores = await collections.semaphores.find({}).toArray();

expect(locks.length).toBe(1);
expect(semaphores).toBeDefined();
expect(semaphores.length).toBe(1);
expect(semaphores?.[0].key).toBe("migrations");
});

it("should read the lock correctly", async () => {
expect(await acquireLock()).toBe(true);
expect(await isDBLocked()).toBe(true);
nsarrazin marked this conversation as resolved.
Show resolved Hide resolved
await releaseLock();
expect(await isDBLocked()).toBe(false);
});

it("should refresh the lock", async () => {
await acquireLock();

// get the updatedAt time

const updatedAtInitially = (await collections.semaphores.findOne({}))?.updatedAt;

await refreshLock();

const updatedAtAfterRefresh = (await collections.semaphores.findOne({}))?.updatedAt;

expect(updatedAtInitially).toBeDefined();
expect(updatedAtAfterRefresh).toBeDefined();
expect(updatedAtInitially).not.toBe(updatedAtAfterRefresh);
});
});

afterEach(async () => {
await collections.semaphores.deleteMany({});
await collections.migrationResults.deleteMany({});
});
116 changes: 116 additions & 0 deletions src/lib/migrations/migrations.ts
@@ -0,0 +1,116 @@
import { client, collections } from "$lib/server/database";
import { migrations } from "./routines";
import { acquireLock, releaseLock, isDBLocked, refreshLock } from "./lock";
import { isHuggingChat } from "$lib/utils/isHuggingChat";

export async function checkAndRunMigrations() {
// make sure all GUIDs are unique
if (new Set(migrations.map((m) => m._id.toString())).size !== migrations.length) {
throw new Error("Duplicate migration GUIDs found.");
}

// check if all migrations have already been run
const migrationResults = await collections.migrationResults.find().toArray();

// if all the migrations._id are in the migrationResults, we can exit early
if (
migrations.every((m) => migrationResults.some((m2) => m2._id.toString() === m._id.toString()))
) {
console.log("[MIGRATIONS] All migrations already applied.");
mishig25 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

console.log("[MIGRATIONS] Begin check...");

// connect to the database
const connectedClient = await client.connect();

const hasLock = await acquireLock();
nsarrazin marked this conversation as resolved.
Show resolved Hide resolved

if (!hasLock) {
// another instance already has the lock, so we exit early
console.log(
"[MIGRATIONS] Another instance already has the lock. Waiting for DB to be unlocked."
);

// block until the lock is released
while (await isDBLocked()) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
return;
}

// once here, we have the lock
// make sure to refresh it regularly while it's running
const refreshInterval = setInterval(async () => {
await refreshLock();
}, 1000 * 10);

// iterate over all migrations
for (const migration of migrations) {
// check if the migration has already been applied
const existingMigrationResult = migrationResults.find(
(m) => m._id.toString() === migration._id.toString()
);

// check if the migration has already been applied
if (existingMigrationResult) {
console.log(`[MIGRATIONS] "${migration.name}" already applied. Skipping...`);
nsarrazin marked this conversation as resolved.
Show resolved Hide resolved
} else {
// check the modifiers to see if some cases match
if (
(migration.runForHuggingChat === "only" && !isHuggingChat) ||
(migration.runForHuggingChat === "never" && isHuggingChat)
) {
console.log(
`[MIGRATIONS] "${migration.name}" should not be applied for this run. Skipping...`
);
continue;
}

// otherwise all is good and we cna run the migration
console.log(`[MIGRATIONS] "${migration.name}" not applied yet. Applying...`);

await collections.migrationResults.updateOne(
{ _id: migration._id },
{
$set: {
name: migration.name,
status: "ongoing",
},
},
{ upsert: true }
);

const session = connectedClient.startSession();
let result = false;

try {
await session.withTransaction(async () => {
result = await migration.up(connectedClient);
});
} catch (e) {
console.log(`[MIGRATION[] "${migration.name}" failed!`);
console.error(e);
} finally {
await session.endSession();
}

await collections.migrationResults.updateOne(
{ _id: migration._id },
{
$set: {
name: migration.name,
status: result ? "success" : "failure",
},
},
{ upsert: true }
);
}
}

console.log("[MIGRATIONS] All migrations applied. Releasing lock");

clearInterval(refreshInterval);
await releaseLock();
}
50 changes: 50 additions & 0 deletions src/lib/migrations/routines/01-update-search-assistants.ts
@@ -0,0 +1,50 @@
import type { Migration } from ".";
import { getCollections } from "$lib/server/database";
import { ObjectId, type AnyBulkWriteOperation } from "mongodb";
import type { Assistant } from "$lib/types/Assistant";
import { generateSearchTokens } from "$lib/utils/searchTokens";

const migration: Migration = {
_id: new ObjectId("5f9f3e3e3e3e3e3e3e3e3e3e"),
name: "Update search assistants",
up: async (client) => {
const { assistants } = getCollections(client);
let ops: AnyBulkWriteOperation<Assistant>[] = [];

for await (const assistant of assistants
.find()
.project<Pick<Assistant, "_id" | "name">>({ _id: 1, name: 1 })) {
ops.push({
updateOne: {
filter: {
_id: assistant._id,
},
update: {
$set: {
searchTokens: generateSearchTokens(assistant.name),
},
},
},
});

if (ops.length >= 1000) {
process.stdout.write(".");
await assistants.bulkWrite(ops, { ordered: false });
ops = [];
}
}

if (ops.length) {
await assistants.bulkWrite(ops, { ordered: false });
}

return true;
},
down: async (client) => {
const { assistants } = getCollections(client);
await assistants.updateMany({}, { $unset: { searchTokens: "" } });
return true;
},
};

export default migration;
14 changes: 14 additions & 0 deletions src/lib/migrations/routines/index.ts
@@ -0,0 +1,14 @@
import type { MongoClient, ObjectId } from "mongodb";

import updateSearchAssistant from "./01-update-search-assistants";

export interface Migration {
_id: ObjectId;
name: string;
up: (client: MongoClient) => Promise<boolean>;
down?: (client: MongoClient) => Promise<boolean>;
runForFreshInstall?: "only" | "never"; // leave unspecified to run for both
runForHuggingChat?: "only" | "never"; // leave unspecified to run for both
}

export const migrations: Migration[] = [updateSearchAssistant];
63 changes: 46 additions & 17 deletions src/lib/server/database.ts
Expand Up @@ -10,37 +10,60 @@ import type { Session } from "$lib/types/Session";
import type { Assistant } from "$lib/types/Assistant";
import type { Report } from "$lib/types/Report";
import type { ConversationStats } from "$lib/types/ConversationStats";
import type { MigrationResult } from "$lib/types/MigrationResult";
import type { Semaphore } from "$lib/types/Semaphore";

if (!MONGODB_URL) {
throw new Error(
"Please specify the MONGODB_URL environment variable inside .env.local. Set it to mongodb://localhost:27017 if you are running MongoDB locally, or to a MongoDB Atlas free instance for example."
);
}
export const CONVERSATION_STATS_COLLECTION = "conversations.stats";

const client = new MongoClient(MONGODB_URL, {
directConnection: MONGODB_DIRECT_CONNECTION === "true",
});

export const connectPromise = client.connect().catch(console.error);

const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));
export function getCollections(mongoClient: MongoClient) {
const db = mongoClient.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));

export const CONVERSATION_STATS_COLLECTION = "conversations.stats";
const conversations = db.collection<Conversation>("conversations");
const conversationStats = db.collection<ConversationStats>(CONVERSATION_STATS_COLLECTION);
const assistants = db.collection<Assistant>("assistants");
const reports = db.collection<Report>("reports");
const sharedConversations = db.collection<SharedConversation>("sharedConversations");
const abortedGenerations = db.collection<AbortedGeneration>("abortedGenerations");
const settings = db.collection<Settings>("settings");
const users = db.collection<User>("users");
const sessions = db.collection<Session>("sessions");
const messageEvents = db.collection<MessageEvent>("messageEvents");
const bucket = new GridFSBucket(db, { bucketName: "files" });
const migrationResults = db.collection<MigrationResult>("migrationResults");
const semaphores = db.collection<Semaphore>("semaphores");

return {
conversations,
conversationStats,
assistants,
reports,
sharedConversations,
abortedGenerations,
settings,
users,
sessions,
messageEvents,
bucket,
migrationResults,
semaphores,
};
}
const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));

const conversations = db.collection<Conversation>("conversations");
const conversationStats = db.collection<ConversationStats>(CONVERSATION_STATS_COLLECTION);
const assistants = db.collection<Assistant>("assistants");
const reports = db.collection<Report>("reports");
const sharedConversations = db.collection<SharedConversation>("sharedConversations");
const abortedGenerations = db.collection<AbortedGeneration>("abortedGenerations");
const settings = db.collection<Settings>("settings");
const users = db.collection<User>("users");
const sessions = db.collection<Session>("sessions");
const messageEvents = db.collection<MessageEvent>("messageEvents");
const bucket = new GridFSBucket(db, { bucketName: "files" });
const collections = getCollections(client);

export { client, db };
export const collections = {
const {
conversations,
conversationStats,
assistants,
Expand All @@ -51,8 +74,10 @@ export const collections = {
users,
sessions,
messageEvents,
bucket,
};
semaphores,
} = collections;

export { client, db, collections };

client.on("open", () => {
conversations
Expand Down Expand Up @@ -120,4 +145,8 @@ client.on("open", () => {
assistants.createIndex({ searchTokens: 1 }).catch(console.error);
reports.createIndex({ assistantId: 1 }).catch(console.error);
reports.createIndex({ createdBy: 1, assistantId: 1 }).catch(console.error);

// Unique index for semaphore and migration results
semaphores.createIndex({ key: 1 }, { unique: true }).catch(console.error);
nsarrazin marked this conversation as resolved.
Show resolved Hide resolved
semaphores.createIndex({ createdAt: 1 }, { expireAfterSeconds: 60 }).catch(console.error);
});
7 changes: 7 additions & 0 deletions src/lib/types/MigrationResult.ts
@@ -0,0 +1,7 @@
import type { ObjectId } from "mongodb";

export interface MigrationResult {
_id: ObjectId;
name: string;
status: "success" | "failure" | "ongoing";
}