Skip to content

Commit

Permalink
[RND-644] some locking updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bradbanister committed Oct 25, 2023
1 parent 0e81b7a commit 3acb050
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
// See the LICENSE and NOTICES files in the project root for more information.

import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, FindOptions, ReplaceOptions } from 'mongodb';
import { Logger, Config } from '@edfi//meadowlark-utilities';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';
import { AuthorizationDocument } from '../model/AuthorizationDocument';
import { Logger, Config } from '@edfi/meadowlark-utilities';
import type { DocumentUuid, MeadowlarkId } from '@edfi/meadowlark-core';
import type { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import type { ConcurrencyDocument } from '../model/ConcurrencyDocument';
import type { AuthorizationDocument } from '../model/AuthorizationDocument';

export const DOCUMENT_COLLECTION_NAME = 'documents';
export const AUTHORIZATION_COLLECTION_NAME = 'authorizations';
Expand Down Expand Up @@ -127,19 +128,27 @@ export const asUpsert = (session: ClientSession): ReplaceOptions => ({ upsert: t
// MongoDB FindOption to return at most 5 documents
export const limitFive = (session: ClientSession): FindOptions => ({ limit: 5, session });

export async function insertMeadowlarkIdOnConcurrencyCollection(
/**
* Lock in-use meadowlark documents, both those being directly updated and those being referenced.
*/
export async function writeLockDocuments(
concurrencyCollection: Collection<ConcurrencyDocument>,
concurrencyDocuments: ConcurrencyDocument[],
): Promise<void> {
await concurrencyCollection.insertMany(concurrencyDocuments);
}

export async function deleteMeadowlarkIdOnConcurrencyCollection(
/**
* Remove the locks on in-use meadowlark documents, both those being directly updated and those being referenced.
*/
export async function removeDocumentLocks(
concurrencyCollection: Collection<ConcurrencyDocument>,
concurrencyDocuments: ConcurrencyDocument[],
): Promise<void> {
const meadowlarkIds: any[] = concurrencyDocuments.map((document) => document.meadowlarkId);
const documentUuids: any[] = concurrencyDocuments.map((document) => document.documentUuid);
const meadowlarkIds: MeadowlarkId[] = concurrencyDocuments
.map((document: ConcurrencyDocument) => document.meadowlarkId)
.filter((meadowlarkId: MeadowlarkId | null) => meadowlarkId != null) as MeadowlarkId[];
const documentUuids: DocumentUuid[] = concurrencyDocuments.map((document) => document.documentUuid);

await concurrencyCollection.deleteMany({
$and: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import { ClientSession, Collection, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import {
deleteMeadowlarkIdOnConcurrencyCollection,
removeDocumentLocks,
getConcurrencyCollection,
getDocumentCollection,
insertMeadowlarkIdOnConcurrencyCollection,
writeLockDocuments,
limitFive,
onlyReturnId,
} from './Db';
Expand Down Expand Up @@ -109,11 +109,22 @@ export async function deleteDocumentByMeadowlarkIdTransaction(
},
];

await insertMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
try {
await writeLockDocuments(concurrencyCollection, concurrencyDocuments);
} catch (e) {
// Codes 11000 and 11001 are both Duplicate Key Error
if (e.code === 11000 || e.code === 11001) {
return {
response: 'DELETE_FAILURE_WRITE_CONFLICT',
failureMessage: 'Write conflict due to concurrent access to this or related resources',
};
}

throw e;
}
const { acknowledged, deletedCount } = await mongoCollection.deleteOne({ documentUuid }, { session });

await deleteMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await removeDocumentLocks(concurrencyCollection, concurrencyDocuments);

if (!acknowledged) {
const msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import {
getDocumentCollection,
limitFive,
onlyReturnTimestamps,
insertMeadowlarkIdOnConcurrencyCollection,
writeLockDocuments,
getConcurrencyCollection,
deleteMeadowlarkIdOnConcurrencyCollection,
removeDocumentLocks,
} from './Db';
import { deleteDocumentByMeadowlarkIdTransaction } from './Delete';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
Expand Down Expand Up @@ -164,7 +164,7 @@ async function updateAllowingIdentityChange(
}));
concurrencyDocuments.push({ meadowlarkId: updateRequest.meadowlarkId, documentUuid: updateRequest.documentUuid });

await insertMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await writeLockDocuments(concurrencyCollection, concurrencyDocuments);

// Optimize happy path by trying a replacement update, which will succeed if there is no identity change
const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
Expand All @@ -174,7 +174,7 @@ async function updateAllowingIdentityChange(
session,
);

await deleteMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await removeDocumentLocks(concurrencyCollection, concurrencyDocuments);

if (tryUpdateByReplacementResult != null) {
return tryUpdateByReplacementResult;
Expand Down Expand Up @@ -262,7 +262,7 @@ async function updateDisallowingIdentityChange(
}));
concurrencyDocuments.push({ meadowlarkId: updateRequest.meadowlarkId, documentUuid: updateRequest.documentUuid });

await insertMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await writeLockDocuments(concurrencyCollection, concurrencyDocuments);

const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
Expand All @@ -271,7 +271,7 @@ async function updateDisallowingIdentityChange(
session,
);

await deleteMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await removeDocumentLocks(concurrencyCollection, concurrencyDocuments);

if (tryUpdateByReplacementResult != null) return tryUpdateByReplacementResult;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import {
limitFive,
getDocumentCollection,
onlyReturnDocumentUuidAndTimestamps,
insertMeadowlarkIdOnConcurrencyCollection,
writeLockDocuments,
getConcurrencyCollection,
deleteMeadowlarkIdOnConcurrencyCollection,
removeDocumentLocks,
} from './Db';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';
Expand Down Expand Up @@ -145,7 +145,7 @@ export async function upsertDocumentTransaction(
}));
concurrencyDocuments.push({ meadowlarkId, documentUuid });

await insertMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await writeLockDocuments(concurrencyCollection, concurrencyDocuments);
// Perform the document upsert
Logger.debug(`${moduleName}.upsertDocumentTransaction Upserting document uuid ${documentUuid}`, traceId);

Expand All @@ -155,7 +155,7 @@ export async function upsertDocumentTransaction(
asUpsert(session),
);

await deleteMeadowlarkIdOnConcurrencyCollection(concurrencyCollection, concurrencyDocuments);
await removeDocumentLocks(concurrencyCollection, concurrencyDocuments);

if (!acknowledged) {
const msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
getConcurrencyCollection,
getDocumentCollection,
getNewClient,
insertMeadowlarkIdOnConcurrencyCollection,
writeLockDocuments,
onlyReturnId,
} from '../../../src/repository/Db';
import {
Expand Down Expand Up @@ -199,7 +199,7 @@ describe('given a delete concurrent with an insert referencing the to-be-deleted
concurrencyDocumentsAcademicWeek.push({ meadowlarkId: academicWeekMeadowlarkId, documentUuid });
concurrencyDocumentsAcademicWeek.push({ meadowlarkId: schoolMeadowlarkId, documentUuid: schoolDocument.documentUuid });

await insertMeadowlarkIdOnConcurrencyCollection(mongoConcurrencyCollection, concurrencyDocumentsAcademicWeek);
await writeLockDocuments(mongoConcurrencyCollection, concurrencyDocumentsAcademicWeek);

// ----
// End transaction to insert the AcademicWeek document
Expand All @@ -214,7 +214,7 @@ describe('given a delete concurrent with an insert referencing the to-be-deleted

// Try deleting the School document - should fail thanks to AcademicWeek's read-for-write lock
try {
await insertMeadowlarkIdOnConcurrencyCollection(mongoConcurrencyCollection, concurrencyDocumentsSchool);
await writeLockDocuments(mongoConcurrencyCollection, concurrencyDocumentsSchool);

await mongoDocumentCollection.deleteOne({ _id: schoolMeadowlarkId }, { session: deleteSession });
} catch (e) {
Expand Down

0 comments on commit 3acb050

Please sign in to comment.