Skip to content

Commit

Permalink
Automated migration setup (#897)
Browse files Browse the repository at this point in the history
* Initial migration setup for chat-ui

* refresh the lock regularly while the migrations are running

* add index for migrationResults

* clean up code a bit

* Don't try to run migrations when sveltekit is building

* simplified lock code

* Add early check for migrations being done
Reduce timer for lock

* migration use `generateSearchTokens`

* Update src/lib/migrations/migrations.spec.ts

Co-authored-by: Mishig <[email protected]>

---------

Co-authored-by: Mishig Davaadorj <[email protected]>
Co-authored-by: Mishig <[email protected]>
  • Loading branch information
3 people committed Mar 5, 2024
1 parent 10dbbd6 commit 4dbcbb6
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 17 deletions.
6 changes: 6 additions & 0 deletions src/hooks.server.ts
Expand Up @@ -17,6 +17,12 @@ import { findUser, refreshSessionCookie, requiresUser } from "$lib/server/auth";
import { ERROR_MESSAGES } from "$lib/stores/errors";
import { sha256 } from "$lib/utils/sha256";
import { addWeeks } from "date-fns";
import { checkAndRunMigrations } from "$lib/migrations/migrations";
import { building } from "$app/environment";

if (!building) {
await checkAndRunMigrations();
}

export const handle: Handle = async ({ event, resolve }) => {
if (event.url.pathname.startsWith(`${base}/api/`) && EXPOSE_API !== "true") {
Expand Down
42 changes: 42 additions & 0 deletions src/lib/migrations/lock.ts
@@ -0,0 +1,42 @@
import { collections } from "$lib/server/database";

export async function acquireLock(key = "migrations") {
try {
const insert = await collections.semaphores.insertOne({
key,
createdAt: new Date(),
updatedAt: new Date(),
});

return !!insert.acknowledged; // true if the document was inserted
} catch (e) {
// unique index violation, so there must already be a lock
return false;
}
}

export async function releaseLock(key = "migrations") {
await collections.semaphores.deleteOne({
key,
});
}

export async function isDBLocked(key = "migrations"): Promise<boolean> {
const res = await collections.semaphores.countDocuments({
key,
});
return res > 0;
}

export async function refreshLock(key = "migrations") {
await collections.semaphores.updateOne(
{
key,
},
{
$set: {
updatedAt: new Date(),
},
}
);
}
53 changes: 53 additions & 0 deletions src/lib/migrations/migrations.spec.ts
@@ -0,0 +1,53 @@
import { afterEach, describe, expect, it } from "vitest";
import { migrations } from "./routines";
import { acquireLock, isDBLocked, refreshLock, releaseLock } from "./lock";
import { collections } from "$lib/server/database";

describe("migrations", () => {
it("should not have duplicates guid", async () => {
const guids = migrations.map((m) => m._id.toString());
const uniqueGuids = [...new Set(guids)];
expect(uniqueGuids.length).toBe(guids.length);
});

it("should acquire only one lock on DB", async () => {
const results = await Promise.all(new Array(1000).fill(0).map(() => acquireLock()));
const locks = results.filter((r) => r);

const semaphores = await collections.semaphores.find({}).toArray();

expect(locks.length).toBe(1);
expect(semaphores).toBeDefined();
expect(semaphores.length).toBe(1);
expect(semaphores?.[0].key).toBe("migrations");
});

it("should read the lock correctly", async () => {
expect(await acquireLock()).toBe(true);
expect(await isDBLocked()).toBe(true);
expect(await acquireLock()).toBe(false);
await releaseLock();
expect(await isDBLocked()).toBe(false);
});

it("should refresh the lock", async () => {
await acquireLock();

// get the updatedAt time

const updatedAtInitially = (await collections.semaphores.findOne({}))?.updatedAt;

await refreshLock();

const updatedAtAfterRefresh = (await collections.semaphores.findOne({}))?.updatedAt;

expect(updatedAtInitially).toBeDefined();
expect(updatedAtAfterRefresh).toBeDefined();
expect(updatedAtInitially).not.toBe(updatedAtAfterRefresh);
});
});

afterEach(async () => {
await collections.semaphores.deleteMany({});
await collections.migrationResults.deleteMany({});
});
116 changes: 116 additions & 0 deletions src/lib/migrations/migrations.ts
@@ -0,0 +1,116 @@
import { client, collections } from "$lib/server/database";
import { migrations } from "./routines";
import { acquireLock, releaseLock, isDBLocked, refreshLock } from "./lock";
import { isHuggingChat } from "$lib/utils/isHuggingChat";

export async function checkAndRunMigrations() {
// make sure all GUIDs are unique
if (new Set(migrations.map((m) => m._id.toString())).size !== migrations.length) {
throw new Error("Duplicate migration GUIDs found.");
}

// check if all migrations have already been run
const migrationResults = await collections.migrationResults.find().toArray();

// if all the migrations._id are in the migrationResults, we can exit early
if (
migrations.every((m) => migrationResults.some((m2) => m2._id.toString() === m._id.toString()))
) {
console.log("[MIGRATIONS] All migrations already applied.");
return;
}

console.log("[MIGRATIONS] Begin check...");

// connect to the database
const connectedClient = await client.connect();

const hasLock = await acquireLock();

if (!hasLock) {
// another instance already has the lock, so we exit early
console.log(
"[MIGRATIONS] Another instance already has the lock. Waiting for DB to be unlocked."
);

// block until the lock is released
while (await isDBLocked()) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
return;
}

// once here, we have the lock
// make sure to refresh it regularly while it's running
const refreshInterval = setInterval(async () => {
await refreshLock();
}, 1000 * 10);

// iterate over all migrations
for (const migration of migrations) {
// check if the migration has already been applied
const existingMigrationResult = migrationResults.find(
(m) => m._id.toString() === migration._id.toString()
);

// check if the migration has already been applied
if (existingMigrationResult) {
console.log(`[MIGRATIONS] "${migration.name}" already applied. Skipping...`);
} else {
// check the modifiers to see if some cases match
if (
(migration.runForHuggingChat === "only" && !isHuggingChat) ||
(migration.runForHuggingChat === "never" && isHuggingChat)
) {
console.log(
`[MIGRATIONS] "${migration.name}" should not be applied for this run. Skipping...`
);
continue;
}

// otherwise all is good and we cna run the migration
console.log(`[MIGRATIONS] "${migration.name}" not applied yet. Applying...`);

await collections.migrationResults.updateOne(
{ _id: migration._id },
{
$set: {
name: migration.name,
status: "ongoing",
},
},
{ upsert: true }
);

const session = connectedClient.startSession();
let result = false;

try {
await session.withTransaction(async () => {
result = await migration.up(connectedClient);
});
} catch (e) {
console.log(`[MIGRATION[] "${migration.name}" failed!`);
console.error(e);
} finally {
await session.endSession();
}

await collections.migrationResults.updateOne(
{ _id: migration._id },
{
$set: {
name: migration.name,
status: result ? "success" : "failure",
},
},
{ upsert: true }
);
}
}

console.log("[MIGRATIONS] All migrations applied. Releasing lock");

clearInterval(refreshInterval);
await releaseLock();
}
50 changes: 50 additions & 0 deletions src/lib/migrations/routines/01-update-search-assistants.ts
@@ -0,0 +1,50 @@
import type { Migration } from ".";
import { getCollections } from "$lib/server/database";
import { ObjectId, type AnyBulkWriteOperation } from "mongodb";
import type { Assistant } from "$lib/types/Assistant";
import { generateSearchTokens } from "$lib/utils/searchTokens";

const migration: Migration = {
_id: new ObjectId("5f9f3e3e3e3e3e3e3e3e3e3e"),
name: "Update search assistants",
up: async (client) => {
const { assistants } = getCollections(client);
let ops: AnyBulkWriteOperation<Assistant>[] = [];

for await (const assistant of assistants
.find()
.project<Pick<Assistant, "_id" | "name">>({ _id: 1, name: 1 })) {
ops.push({
updateOne: {
filter: {
_id: assistant._id,
},
update: {
$set: {
searchTokens: generateSearchTokens(assistant.name),
},
},
},
});

if (ops.length >= 1000) {
process.stdout.write(".");
await assistants.bulkWrite(ops, { ordered: false });
ops = [];
}
}

if (ops.length) {
await assistants.bulkWrite(ops, { ordered: false });
}

return true;
},
down: async (client) => {
const { assistants } = getCollections(client);
await assistants.updateMany({}, { $unset: { searchTokens: "" } });
return true;
},
};

export default migration;
14 changes: 14 additions & 0 deletions src/lib/migrations/routines/index.ts
@@ -0,0 +1,14 @@
import type { MongoClient, ObjectId } from "mongodb";

import updateSearchAssistant from "./01-update-search-assistants";

export interface Migration {
_id: ObjectId;
name: string;
up: (client: MongoClient) => Promise<boolean>;
down?: (client: MongoClient) => Promise<boolean>;
runForFreshInstall?: "only" | "never"; // leave unspecified to run for both
runForHuggingChat?: "only" | "never"; // leave unspecified to run for both
}

export const migrations: Migration[] = [updateSearchAssistant];
63 changes: 46 additions & 17 deletions src/lib/server/database.ts
Expand Up @@ -10,37 +10,60 @@ import type { Session } from "$lib/types/Session";
import type { Assistant } from "$lib/types/Assistant";
import type { Report } from "$lib/types/Report";
import type { ConversationStats } from "$lib/types/ConversationStats";
import type { MigrationResult } from "$lib/types/MigrationResult";
import type { Semaphore } from "$lib/types/Semaphore";

if (!MONGODB_URL) {
throw new Error(
"Please specify the MONGODB_URL environment variable inside .env.local. Set it to mongodb://localhost:27017 if you are running MongoDB locally, or to a MongoDB Atlas free instance for example."
);
}
export const CONVERSATION_STATS_COLLECTION = "conversations.stats";

const client = new MongoClient(MONGODB_URL, {
directConnection: MONGODB_DIRECT_CONNECTION === "true",
});

export const connectPromise = client.connect().catch(console.error);

const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));
export function getCollections(mongoClient: MongoClient) {
const db = mongoClient.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));

export const CONVERSATION_STATS_COLLECTION = "conversations.stats";
const conversations = db.collection<Conversation>("conversations");
const conversationStats = db.collection<ConversationStats>(CONVERSATION_STATS_COLLECTION);
const assistants = db.collection<Assistant>("assistants");
const reports = db.collection<Report>("reports");
const sharedConversations = db.collection<SharedConversation>("sharedConversations");
const abortedGenerations = db.collection<AbortedGeneration>("abortedGenerations");
const settings = db.collection<Settings>("settings");
const users = db.collection<User>("users");
const sessions = db.collection<Session>("sessions");
const messageEvents = db.collection<MessageEvent>("messageEvents");
const bucket = new GridFSBucket(db, { bucketName: "files" });
const migrationResults = db.collection<MigrationResult>("migrationResults");
const semaphores = db.collection<Semaphore>("semaphores");

return {
conversations,
conversationStats,
assistants,
reports,
sharedConversations,
abortedGenerations,
settings,
users,
sessions,
messageEvents,
bucket,
migrationResults,
semaphores,
};
}
const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));

const conversations = db.collection<Conversation>("conversations");
const conversationStats = db.collection<ConversationStats>(CONVERSATION_STATS_COLLECTION);
const assistants = db.collection<Assistant>("assistants");
const reports = db.collection<Report>("reports");
const sharedConversations = db.collection<SharedConversation>("sharedConversations");
const abortedGenerations = db.collection<AbortedGeneration>("abortedGenerations");
const settings = db.collection<Settings>("settings");
const users = db.collection<User>("users");
const sessions = db.collection<Session>("sessions");
const messageEvents = db.collection<MessageEvent>("messageEvents");
const bucket = new GridFSBucket(db, { bucketName: "files" });
const collections = getCollections(client);

export { client, db };
export const collections = {
const {
conversations,
conversationStats,
assistants,
Expand All @@ -51,8 +74,10 @@ export const collections = {
users,
sessions,
messageEvents,
bucket,
};
semaphores,
} = collections;

export { client, db, collections };

client.on("open", () => {
conversations
Expand Down Expand Up @@ -120,4 +145,8 @@ client.on("open", () => {
assistants.createIndex({ searchTokens: 1 }).catch(console.error);
reports.createIndex({ assistantId: 1 }).catch(console.error);
reports.createIndex({ createdBy: 1, assistantId: 1 }).catch(console.error);

// Unique index for semaphore and migration results
semaphores.createIndex({ key: 1 }, { unique: true }).catch(console.error);
semaphores.createIndex({ createdAt: 1 }, { expireAfterSeconds: 60 }).catch(console.error);
});
7 changes: 7 additions & 0 deletions src/lib/types/MigrationResult.ts
@@ -0,0 +1,7 @@
import type { ObjectId } from "mongodb";

export interface MigrationResult {
_id: ObjectId;
name: string;
status: "success" | "failure" | "ongoing";
}

0 comments on commit 4dbcbb6

Please sign in to comment.