Skip to content

Commit

Permalink
[RND-644] - Investigate alternatives to MongoDB backend read-for-writ…
Browse files Browse the repository at this point in the history
…e-locking (#304)

* Investigate alternatives to MongoDB backend read-for-write-locking

* Delete from concurrences collection

* Some fixes.

* Trying to fix tests.

* Adds locking update test

* Some changes on repo files and its tests.

* Changes on locking tests.

* Removes unnecesary comments.

* Removes unnecesary comments. locking tests

* Some documentation.

* Trying to fix table.

* Removing unnecessary code.

* Documentation organization.

* [RND-644] some locking updates

* Adds session. Centralize catch.

* Deletes unnecessary removeDocumentLocks function

* Adds test for repository/DB

* More tests on repository/Db

* Keeping just DocumentUuid in ConcurrencyDocument

* Documentation - Raw data - Materialized conflict approach.

* Summarize documentation.

* Removes unnecessary ANDs

* Updating documentation based on the latest changes and the new performance test results.

* Fixing filenames with typos and typo on md file.

* Small changes on locking tests.

---------

Co-authored-by: Brad Banister <[email protected]>
  • Loading branch information
DavidJGapCR and bradbanister authored Nov 6, 2023
1 parent c4b69e4 commit 5d07e64
Show file tree
Hide file tree
Showing 35 changed files with 2,516 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

import { DocumentUuid } from '@edfi/meadowlark-core';

// By having a unique DocumentUuid at a given time, we handle concurrency.
export interface ConcurrencyDocument {
_id: DocumentUuid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, ObjectId, FindOptions, ReplaceOptions } from 'mongodb';
import { Logger, Config } from '@edfi//meadowlark-utilities';
import { MeadowlarkId } from '@edfi/meadowlark-core';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { AuthorizationDocument } from '../model/AuthorizationDocument';
import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, FindOptions, ReplaceOptions } from 'mongodb';
import { Logger, Config } from '@edfi/meadowlark-utilities';
import type { DocumentUuid } from '@edfi/meadowlark-core';
import type { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import type { ConcurrencyDocument } from '../model/ConcurrencyDocument';
import type { AuthorizationDocument } from '../model/AuthorizationDocument';

export const DOCUMENT_COLLECTION_NAME = 'documents';
export const AUTHORIZATION_COLLECTION_NAME = 'authorizations';
export const CONCURRENCY_COLLECTION_NAME = 'concurrency';

let singletonClient: MongoClient | null = null;

Expand Down Expand Up @@ -42,6 +44,12 @@ export async function getNewClient(): Promise<MongoClient> {
.collection(AUTHORIZATION_COLLECTION_NAME);
await authorizationCollection.createIndex({ clientName: 1 });

// Create concurrency collection if not exists.
const concurrencyCollection: Collection<ConcurrencyDocument> = newClient
.db(databaseName)
.collection(CONCURRENCY_COLLECTION_NAME);
await concurrencyCollection.createIndex({ _id: 1 });

return newClient;
} catch (e) {
const message = e instanceof Error ? e.message : 'unknown';
Expand Down Expand Up @@ -89,23 +97,8 @@ export function getAuthorizationCollection(client: MongoClient): Collection<Auth
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(AUTHORIZATION_COLLECTION_NAME);
}

/**
* Write lock referenced documents as part of the upsert/update process. This will prevent the issue of
* a concurrent delete operation removing a to-be referenced document in the middle of the transaction.
* See https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions
*
* This function expects Session to have an active transaction. Aborting the transaction on error is left to the caller.
*/
export async function writeLockReferencedDocuments(
mongoCollection: Collection<MeadowlarkDocument>,
referencedMeadowlarkIds: MeadowlarkId[],
session: ClientSession,
): Promise<void> {
await mongoCollection.updateMany(
{ aliasMeadowlarkIds: { $in: referencedMeadowlarkIds } },
{ $set: { lock: new ObjectId() } },
{ session },
);
export function getConcurrencyCollection(client: MongoClient): Collection<ConcurrencyDocument> {
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(CONCURRENCY_COLLECTION_NAME);
}

// MongoDB FindOption to return only the indexed _id field, making this a covered query (MongoDB will optimize)
Expand Down Expand Up @@ -134,3 +127,26 @@ export const asUpsert = (session: ClientSession): ReplaceOptions => ({ upsert: t

// MongoDB FindOption to return at most 5 documents
export const limitFive = (session: ClientSession): FindOptions => ({ limit: 5, session });

/**
* Lock in-use meadowlark documents, both those being directly updated and those being referenced.
*/
export async function lockDocuments(
concurrencyCollection: Collection<ConcurrencyDocument>,
concurrencyDocuments: ConcurrencyDocument[],
session: ClientSession,
): Promise<void> {
await concurrencyCollection.insertMany(concurrencyDocuments, { session });

// eslint-disable-next-line no-underscore-dangle
const documentUuids: DocumentUuid[] = concurrencyDocuments.map((document) => document._id);

await concurrencyCollection.deleteMany(
{
_id: {
$in: documentUuids,
},
},
{ session },
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import { DeleteResult, DeleteRequest, ReferringDocumentInfo, DocumentUuid, Trace
import { ClientSession, Collection, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { getDocumentCollection, limitFive, onlyReturnId } from './Db';
import { getConcurrencyCollection, getDocumentCollection, lockDocuments, limitFive, onlyReturnId } from './Db';
import { onlyReturnAliasIds, onlyDocumentsReferencing } from './ReferenceValidation';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Delete';

Expand Down Expand Up @@ -75,6 +76,7 @@ async function checkForReferencesToDocument(
export async function deleteDocumentByMeadowlarkIdTransaction(
{ documentUuid, validateNoReferencesToDocument, traceId }: DeleteRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<DeleteResult> {
if (validateNoReferencesToDocument) {
Expand All @@ -93,6 +95,14 @@ export async function deleteDocumentByMeadowlarkIdTransaction(
traceId,
);

const concurrencyDocuments: ConcurrencyDocument[] = [
{
_id: documentUuid,
},
];

await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

const { acknowledged, deletedCount } = await mongoCollection.deleteOne({ documentUuid }, { session });

if (!acknowledged) {
Expand All @@ -119,13 +129,19 @@ export async function deleteDocumentByDocumentUuid(
let deleteResult: DeleteResult = { response: 'UNKNOWN_FAILURE', failureMessage: '' };
try {
const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client);

const numberOfRetries: number = Config.get('MONGODB_MAX_NUMBER_OF_RETRIES');

await retry(
async () => {
await session.withTransaction(async () => {
deleteResult = await deleteDocumentByMeadowlarkIdTransaction(deleteRequest, mongoCollection, session);
deleteResult = await deleteDocumentByMeadowlarkIdTransaction(
deleteRequest,
mongoCollection,
concurrencyCollection,
session,
);
if (deleteResult.response !== 'DELETE_SUCCESS') {
await session.abortTransaction();
}
Expand All @@ -143,20 +159,16 @@ export async function deleteDocumentByDocumentUuid(
);
} catch (e) {
Logger.error(`${moduleName}.deleteDocumentByDocumentUuid`, deleteRequest.traceId, e);
await session.abortTransaction();

let response: DeleteResult = { response: 'UNKNOWN_FAILURE', failureMessage: e.message };

// If this is a MongoError, it has a codeName
if (e.codeName === 'WriteConflict') {
response = {
return {
response: 'DELETE_FAILURE_WRITE_CONFLICT',
failureMessage: 'Write conflict due to concurrent access to this or related resources',
};
}

await session.abortTransaction();

return response;
return { response: 'UNKNOWN_FAILURE', failureMessage: e.message };
} finally {
await session.endSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import { Logger, Config } from '@edfi/meadowlark-utilities';
import { Collection, ClientSession, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument, meadowlarkDocumentFrom } from '../model/MeadowlarkDocument';
import { getDocumentCollection, limitFive, onlyReturnTimestamps, writeLockReferencedDocuments } from './Db';
import { getDocumentCollection, limitFive, onlyReturnTimestamps, lockDocuments, getConcurrencyCollection } from './Db';
import { deleteDocumentByMeadowlarkIdTransaction } from './Delete';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
import { upsertDocumentTransaction } from './Upsert';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Update';

Expand All @@ -26,6 +27,7 @@ const moduleName: string = 'mongodb.repository.Update';
async function insertUpdatedDocument(
{ meadowlarkId, resourceInfo, documentInfo, edfiDoc, traceId, security }: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
document: MeadowlarkDocument,
): Promise<UpdateResult> {
Expand All @@ -40,6 +42,7 @@ async function insertUpdatedDocument(
security,
},
mongoCollection,
concurrencyCollection,
session,
document,
);
Expand Down Expand Up @@ -143,10 +146,31 @@ async function updateAllowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
const { documentUuid, resourceInfo, traceId, security } = updateRequest;

const referringDocumentUuids: WithId<MeadowlarkDocument>[] = await mongoCollection
.find(
{
aliasMeadowlarkIds: {
$in: document.outboundRefs,
},
},

{ projection: { documentUuid: 1 } },
)
.toArray();

const concurrencyDocuments: ConcurrencyDocument[] = referringDocumentUuids.map((referringDocumentUuid) => ({
_id: referringDocumentUuid.documentUuid,
}));

concurrencyDocuments.push({ _id: updateRequest.documentUuid });

await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

// Optimize happy path by trying a replacement update, which will succeed if there is no identity change
const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
Expand All @@ -156,8 +180,6 @@ async function updateAllowingIdentityChange(
);

if (tryUpdateByReplacementResult != null) {
// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);
return tryUpdateByReplacementResult;
}

Expand Down Expand Up @@ -189,14 +211,15 @@ async function updateAllowingIdentityChange(
const deleteResult = await deleteDocumentByMeadowlarkIdTransaction(
{ documentUuid, resourceInfo, security, validateNoReferencesToDocument: true, traceId },
mongoCollection,
concurrencyCollection,
session,
);
Logger.debug(`${moduleName}.updateAllowingIdentityChange: Updating document uuid ${documentUuid}`, traceId);

switch (deleteResult.response) {
case 'DELETE_SUCCESS':
// document was deleted, so we can insert the new version
return insertUpdatedDocument(updateRequest, mongoCollection, session, document);
return insertUpdatedDocument(updateRequest, mongoCollection, concurrencyCollection, session, document);
case 'DELETE_FAILURE_NOT_EXISTS':
// document was not found on delete, which shouldn't happen
return {
Expand Down Expand Up @@ -227,6 +250,7 @@ async function updateDisallowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
// Perform the document update
Expand All @@ -235,8 +259,26 @@ async function updateDisallowingIdentityChange(
updateRequest.traceId,
);

// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);
const referringDocumentUuids: WithId<MeadowlarkDocument>[] = await mongoCollection
.find(
{
aliasMeadowlarkIds: {
$in: document.outboundRefs,
},
},
{ projection: { documentUuid: 1 } },
)
.toArray();

const concurrencyDocuments: ConcurrencyDocument[] = referringDocumentUuids.map((referringDocumentUuid) => ({
_id: referringDocumentUuid.documentUuid,
}));

concurrencyDocuments.push({ _id: updateRequest.documentUuid });

// Inserting the same DocumentUuid in Concurrency Collection will result in a WriteConflict error
// By generating this conflict we handle concurrency
await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
Expand Down Expand Up @@ -319,6 +361,7 @@ async function checkForInvalidReferences(
async function updateDocumentByDocumentUuidTransaction(
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
const { meadowlarkId, documentUuid, resourceInfo, documentInfo, edfiDoc, validateDocumentReferencesExist, security } =
Expand All @@ -345,9 +388,9 @@ async function updateDocumentByDocumentUuidTransaction(
lastModifiedAt: documentInfo.requestTimestamp,
});
if (resourceInfo.allowIdentityUpdates) {
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session);
}
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session);
}

/**
Expand All @@ -362,6 +405,7 @@ export async function updateDocumentByDocumentUuid(
Logger.info(`${moduleName}.updateDocumentByDocumentUuid ${documentUuid}`, traceId);

const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client);
const session: ClientSession = client.startSession();
let updateResult: UpdateResult = { response: 'UNKNOWN_FAILURE' };

Expand All @@ -371,7 +415,12 @@ export async function updateDocumentByDocumentUuid(
await retry(
async () => {
await session.withTransaction(async () => {
updateResult = await updateDocumentByDocumentUuidTransaction(updateRequest, mongoCollection, session);
updateResult = await updateDocumentByDocumentUuidTransaction(
updateRequest,
mongoCollection,
concurrencyCollection,
session,
);
if (updateResult.response !== 'UPDATE_SUCCESS') {
await session.abortTransaction();
}
Expand All @@ -391,7 +440,6 @@ export async function updateDocumentByDocumentUuid(
Logger.error(`${moduleName}.updateDocumentByDocumentUuid`, traceId, e);
await session.abortTransaction();

// If this is a MongoError, it has a codeName
if (e.codeName === 'WriteConflict') {
return {
response: 'UPDATE_FAILURE_WRITE_CONFLICT',
Expand Down
Loading

0 comments on commit 5d07e64

Please sign in to comment.