Skip to content

Commit

Permalink
Investigate alternatives to MongoDB backend read-for-write-locking
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidJGapCR committed Oct 8, 2023
1 parent f339095 commit 8e5c807
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

import { MeadowlarkId } from '@edfi/meadowlark-core';

export interface ConcurrencyDocument {
meadowlarkId: MeadowlarkId;
meadowlarkIds: MeadowlarkId[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, ObjectId,
import { Logger, Config } from '@edfi//meadowlark-utilities';
import { MeadowlarkId } from '@edfi/meadowlark-core';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';
import { AuthorizationDocument } from '../model/AuthorizationDocument';

export const DOCUMENT_COLLECTION_NAME = 'documents';
export const AUTHORIZATION_COLLECTION_NAME = 'authorizations';

// RND-644
export const CONCURRENCY_COLLECTION_NAME = 'concurrences';

let singletonClient: MongoClient | null = null;

/**
Expand Down Expand Up @@ -42,6 +46,12 @@ export async function getNewClient(): Promise<MongoClient> {
.collection(AUTHORIZATION_COLLECTION_NAME);
await authorizationCollection.createIndex({ clientName: 1 });

// Create concurrency collection if not exists
const concurrencyCollection: Collection<ConcurrencyDocument> = newClient
.db(databaseName)
.collection(CONCURRENCY_COLLECTION_NAME);
await concurrencyCollection.createIndex({ meadowlarkId: 1 });

return newClient;
} catch (e) {
const message = e instanceof Error ? e.message : 'unknown';
Expand Down Expand Up @@ -88,6 +98,10 @@ export function getDocumentCollection(client: MongoClient): Collection<Meadowlar
export function getAuthorizationCollection(client: MongoClient): Collection<AuthorizationDocument> {
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(AUTHORIZATION_COLLECTION_NAME);
}
// RND-644
export function getConcurrencyCollection(client: MongoClient): Collection<ConcurrencyDocument> {
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(CONCURRENCY_COLLECTION_NAME);
}

/**
* Write lock referenced documents as part of the upsert/update process. This will prevent the issue of
Expand Down Expand Up @@ -134,3 +148,21 @@ export const asUpsert = (session: ClientSession): ReplaceOptions => ({ upsert: t

// MongoDB FindOption to return at most 5 documents
export const limitFive = (session: ClientSession): FindOptions => ({ limit: 5, session });

/**
* Alternative to writeLockReferencedDocuments function. RND-644
* */
export async function insertMeadowlarkIdOnConcurrencyCollection(
concurrencyCollection: Collection<ConcurrencyDocument>,
meadowlarkId: MeadowlarkId,
document: ConcurrencyDocument,
session: ClientSession,
): Promise<void> {
const { acknowledged, upsertedCount, modifiedCount } = await concurrencyCollection.replaceOne(
{ _id: meadowlarkId },
document,
asUpsert(session),
);

console.log(acknowledged, upsertedCount, modifiedCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ import { Logger, Config } from '@edfi/meadowlark-utilities';
import { Collection, ClientSession, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument, meadowlarkDocumentFrom } from '../model/MeadowlarkDocument';
import { getDocumentCollection, limitFive, onlyReturnTimestamps, writeLockReferencedDocuments } from './Db';
import {
getDocumentCollection,
limitFive,
onlyReturnTimestamps,
writeLockReferencedDocuments,
insertMeadowlarkIdOnConcurrencyCollection,
getConcurrencyCollection,
} from './Db';
import { deleteDocumentByMeadowlarkIdTransaction } from './Delete';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
import { upsertDocumentTransaction } from './Upsert';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Update';

Expand All @@ -26,6 +34,7 @@ const moduleName: string = 'mongodb.repository.Update';
async function insertUpdatedDocument(
{ meadowlarkId, resourceInfo, documentInfo, edfiDoc, traceId, security }: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
document: MeadowlarkDocument,
): Promise<UpdateResult> {
Expand All @@ -40,6 +49,7 @@ async function insertUpdatedDocument(
security,
},
mongoCollection,
concurrencyCollection,
session,
document,
);
Expand Down Expand Up @@ -143,6 +153,7 @@ async function updateAllowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>, // RND-644
session: ClientSession,
): Promise<UpdateResult> {
const { documentUuid, resourceInfo, traceId, security } = updateRequest;
Expand All @@ -158,6 +169,19 @@ async function updateAllowingIdentityChange(
if (tryUpdateByReplacementResult != null) {
// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);

const concurrencyDocument: ConcurrencyDocument = {
meadowlarkId: updateRequest.meadowlarkId,
meadowlarkIds: document.outboundRefs,
};

await insertMeadowlarkIdOnConcurrencyCollection(
concurrencyCollection,
updateRequest.meadowlarkId,
concurrencyDocument,
session,
); // RND-644

return tryUpdateByReplacementResult;
}

Expand Down Expand Up @@ -196,7 +220,7 @@ async function updateAllowingIdentityChange(
switch (deleteResult.response) {
case 'DELETE_SUCCESS':
// document was deleted, so we can insert the new version
return insertUpdatedDocument(updateRequest, mongoCollection, session, document);
return insertUpdatedDocument(updateRequest, mongoCollection, concurrencyCollection, session, document);
case 'DELETE_FAILURE_NOT_EXISTS':
// document was not found on delete, which shouldn't happen
return {
Expand Down Expand Up @@ -227,6 +251,7 @@ async function updateDisallowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>, // RND-644
session: ClientSession,
): Promise<UpdateResult> {
// Perform the document update
Expand All @@ -238,6 +263,18 @@ async function updateDisallowingIdentityChange(
// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);

const concurrencyDocument: ConcurrencyDocument = {
meadowlarkId: updateRequest.meadowlarkId,
meadowlarkIds: document.outboundRefs,
};

await insertMeadowlarkIdOnConcurrencyCollection(
concurrencyCollection,
updateRequest.meadowlarkId,
concurrencyDocument,
session,
); // RND-644

const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
updateRequest,
Expand Down Expand Up @@ -319,6 +356,7 @@ async function checkForInvalidReferences(
async function updateDocumentByDocumentUuidTransaction(
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>, // RND-644
session: ClientSession,
): Promise<UpdateResult> {
const { meadowlarkId, documentUuid, resourceInfo, documentInfo, edfiDoc, validateDocumentReferencesExist, security } =
Expand All @@ -345,9 +383,9 @@ async function updateDocumentByDocumentUuidTransaction(
lastModifiedAt: documentInfo.requestTimestamp,
});
if (resourceInfo.allowIdentityUpdates) {
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session); // RND-644
}
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session);
}

/**
Expand All @@ -362,6 +400,7 @@ export async function updateDocumentByDocumentUuid(
Logger.info(`${moduleName}.updateDocumentByDocumentUuid ${documentUuid}`, traceId);

const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client); // RND-644
const session: ClientSession = client.startSession();
let updateResult: UpdateResult = { response: 'UNKNOWN_FAILURE' };

Expand All @@ -371,7 +410,12 @@ export async function updateDocumentByDocumentUuid(
await retry(
async () => {
await session.withTransaction(async () => {
updateResult = await updateDocumentByDocumentUuidTransaction(updateRequest, mongoCollection, session);
updateResult = await updateDocumentByDocumentUuidTransaction(
updateRequest,
mongoCollection,
concurrencyCollection, // RND-644
session,
);
if (updateResult.response !== 'UPDATE_SUCCESS') {
await session.abortTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import {
limitFive,
getDocumentCollection,
onlyReturnDocumentUuidAndTimestamps,
insertMeadowlarkIdOnConcurrencyCollection,
getConcurrencyCollection,
} from './Db';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Upsert';

export async function upsertDocumentTransaction(
{ resourceInfo, documentInfo, meadowlarkId, edfiDoc, validateDocumentReferencesExist, traceId, security }: UpsertRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>, // RND-644
session: ClientSession,
documentFromUpdate?: MeadowlarkDocument,
): Promise<UpsertResult> {
Expand Down Expand Up @@ -136,6 +140,13 @@ export async function upsertDocumentTransaction(
});

await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);

const concurrencyDocument: ConcurrencyDocument = {
meadowlarkId,
meadowlarkIds: document.outboundRefs,
};

await insertMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, meadowlarkId, concurrencyDocument, session); // RND-644
// Perform the document upsert
Logger.debug(`${moduleName}.upsertDocumentTransaction Upserting document uuid ${documentUuid}`, traceId);

Expand Down Expand Up @@ -172,6 +183,7 @@ export async function upsertDocumentTransaction(
*/
export async function upsertDocument(upsertRequest: UpsertRequest, client: MongoClient): Promise<UpsertResult> {
const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client);
const session: ClientSession = client.startSession();
let upsertResult: UpsertResult = { response: 'UNKNOWN_FAILURE' };
try {
Expand All @@ -180,7 +192,7 @@ export async function upsertDocument(upsertRequest: UpsertRequest, client: Mongo
await retry(
async () => {
await session.withTransaction(async () => {
upsertResult = await upsertDocumentTransaction(upsertRequest, mongoCollection, session);
upsertResult = await upsertDocumentTransaction(upsertRequest, mongoCollection, concurrencyCollection, session);
if (upsertResult.response !== 'UPDATE_SUCCESS' && upsertResult.response !== 'INSERT_SUCCESS') {
await session.abortTransaction();
}
Expand Down

0 comments on commit 8e5c807

Please sign in to comment.