Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RND-644] - Investigate alternatives to MongoDB backend read-for-write-locking #304

Merged
merged 25 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4ef51ba
Investigate alternatives to MongoDB backend read-for-write-locking
DavidJGapCR Oct 8, 2023
1c51c51
Delete from concurrences collection
DavidJGapCR Oct 9, 2023
dae8d17
Some fixes.
DavidJGapCR Oct 12, 2023
31ef689
Trying to fix tests.
DavidJGapCR Oct 12, 2023
f4931cd
Adds locking update test
DavidJGapCR Oct 12, 2023
e447c29
Some changes on repo files and its tests.
DavidJGapCR Oct 13, 2023
c673483
Changes on locking tests.
DavidJGapCR Oct 13, 2023
bbf9d33
Removes unnecesary comments.
DavidJGapCR Oct 13, 2023
2a99a6c
Removes unnecesary comments. locking tests
DavidJGapCR Oct 13, 2023
3b82336
Some documentation.
DavidJGapCR Oct 19, 2023
cfc31c3
Trying to fix table.
DavidJGapCR Oct 19, 2023
c0735eb
Removing unnecessary code.
DavidJGapCR Oct 25, 2023
0a8e343
Documentation organization.
DavidJGapCR Oct 25, 2023
ed0f0a0
[RND-644] some locking updates
bradbanister Oct 25, 2023
ebe4e15
Adds session. Centralize catch.
DavidJGapCR Oct 25, 2023
3870967
Deletes unnecessary removeDocumentLocks function
DavidJGapCR Oct 25, 2023
361fb55
Adds test for repository/DB
DavidJGapCR Oct 25, 2023
ce64698
More tests on repository/Db
DavidJGapCR Oct 25, 2023
95bc57b
Keeping just DocumentUuid in ConcurrencyDocument
DavidJGapCR Oct 29, 2023
d4e8bf3
Documentation - Raw data - Materialized conflict approach.
DavidJGapCR Oct 29, 2023
0154c23
Summarize documentation.
DavidJGapCR Oct 29, 2023
2f4184d
Removes unnecessary ANDs
DavidJGapCR Oct 31, 2023
c3e2146
Updating documentation based on the latest changes and the new perfor…
DavidJGapCR Oct 31, 2023
e2c7cfa
Fixing filenames with typos and typo on md file.
DavidJGapCR Nov 3, 2023
7eb8c31
Small changes on locking tests.
DavidJGapCR Nov 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

import { DocumentUuid } from '@edfi/meadowlark-core';

// By having a unique DocumentUuid at a given time, we handle concurrency.
export interface ConcurrencyDocument {
_id: DocumentUuid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, ObjectId, FindOptions, ReplaceOptions } from 'mongodb';
import { Logger, Config } from '@edfi//meadowlark-utilities';
import { MeadowlarkId } from '@edfi/meadowlark-core';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { AuthorizationDocument } from '../model/AuthorizationDocument';
import { Collection, MongoClient, ReadConcernLevel, W, ClientSession, FindOptions, ReplaceOptions } from 'mongodb';
import { Logger, Config } from '@edfi/meadowlark-utilities';
import type { DocumentUuid } from '@edfi/meadowlark-core';
import type { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import type { ConcurrencyDocument } from '../model/ConcurrencyDocument';
import type { AuthorizationDocument } from '../model/AuthorizationDocument';

export const DOCUMENT_COLLECTION_NAME = 'documents';
export const AUTHORIZATION_COLLECTION_NAME = 'authorizations';
export const CONCURRENCY_COLLECTION_NAME = 'concurrency';

let singletonClient: MongoClient | null = null;

Expand Down Expand Up @@ -42,6 +44,12 @@ export async function getNewClient(): Promise<MongoClient> {
.collection(AUTHORIZATION_COLLECTION_NAME);
await authorizationCollection.createIndex({ clientName: 1 });

// Create concurrency collection if not exists.
const concurrencyCollection: Collection<ConcurrencyDocument> = newClient
.db(databaseName)
.collection(CONCURRENCY_COLLECTION_NAME);
await concurrencyCollection.createIndex({ _id: 1 });

return newClient;
} catch (e) {
const message = e instanceof Error ? e.message : 'unknown';
Expand Down Expand Up @@ -89,23 +97,8 @@ export function getAuthorizationCollection(client: MongoClient): Collection<Auth
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(AUTHORIZATION_COLLECTION_NAME);
}

/**
* Write lock referenced documents as part of the upsert/update process. This will prevent the issue of
* a concurrent delete operation removing a to-be referenced document in the middle of the transaction.
* See https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions
*
* This function expects Session to have an active transaction. Aborting the transaction on error is left to the caller.
*/
export async function writeLockReferencedDocuments(
mongoCollection: Collection<MeadowlarkDocument>,
referencedMeadowlarkIds: MeadowlarkId[],
session: ClientSession,
): Promise<void> {
await mongoCollection.updateMany(
{ aliasMeadowlarkIds: { $in: referencedMeadowlarkIds } },
{ $set: { lock: new ObjectId() } },
{ session },
);
export function getConcurrencyCollection(client: MongoClient): Collection<ConcurrencyDocument> {
return client.db(Config.get<string>('MEADOWLARK_DATABASE_NAME')).collection(CONCURRENCY_COLLECTION_NAME);
}

// MongoDB FindOption to return only the indexed _id field, making this a covered query (MongoDB will optimize)
Expand Down Expand Up @@ -134,3 +127,26 @@ export const asUpsert = (session: ClientSession): ReplaceOptions => ({ upsert: t

// MongoDB FindOption to return at most 5 documents
export const limitFive = (session: ClientSession): FindOptions => ({ limit: 5, session });

/**
* Lock in-use meadowlark documents, both those being directly updated and those being referenced.
*/
export async function lockDocuments(
concurrencyCollection: Collection<ConcurrencyDocument>,
concurrencyDocuments: ConcurrencyDocument[],
session: ClientSession,
): Promise<void> {
await concurrencyCollection.insertMany(concurrencyDocuments, { session });

// eslint-disable-next-line no-underscore-dangle
const documentUuids: DocumentUuid[] = concurrencyDocuments.map((document) => document._id);

await concurrencyCollection.deleteMany(
{
_id: {
$in: documentUuids,
},
},
{ session },
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import { DeleteResult, DeleteRequest, ReferringDocumentInfo, DocumentUuid, Trace
import { ClientSession, Collection, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument } from '../model/MeadowlarkDocument';
import { getDocumentCollection, limitFive, onlyReturnId } from './Db';
import { getConcurrencyCollection, getDocumentCollection, lockDocuments, limitFive, onlyReturnId } from './Db';
import { onlyReturnAliasIds, onlyDocumentsReferencing } from './ReferenceValidation';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Delete';

Expand Down Expand Up @@ -75,6 +76,7 @@ async function checkForReferencesToDocument(
export async function deleteDocumentByMeadowlarkIdTransaction(
{ documentUuid, validateNoReferencesToDocument, traceId }: DeleteRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<DeleteResult> {
if (validateNoReferencesToDocument) {
Expand All @@ -93,6 +95,14 @@ export async function deleteDocumentByMeadowlarkIdTransaction(
traceId,
);

const concurrencyDocuments: ConcurrencyDocument[] = [
{
_id: documentUuid,
},
];

await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

const { acknowledged, deletedCount } = await mongoCollection.deleteOne({ documentUuid }, { session });

if (!acknowledged) {
Expand All @@ -119,13 +129,19 @@ export async function deleteDocumentByDocumentUuid(
let deleteResult: DeleteResult = { response: 'UNKNOWN_FAILURE', failureMessage: '' };
try {
const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client);

const numberOfRetries: number = Config.get('MONGODB_MAX_NUMBER_OF_RETRIES');

await retry(
async () => {
await session.withTransaction(async () => {
deleteResult = await deleteDocumentByMeadowlarkIdTransaction(deleteRequest, mongoCollection, session);
deleteResult = await deleteDocumentByMeadowlarkIdTransaction(
deleteRequest,
mongoCollection,
concurrencyCollection,
session,
);
if (deleteResult.response !== 'DELETE_SUCCESS') {
await session.abortTransaction();
}
Expand All @@ -143,20 +159,16 @@ export async function deleteDocumentByDocumentUuid(
);
} catch (e) {
Logger.error(`${moduleName}.deleteDocumentByDocumentUuid`, deleteRequest.traceId, e);
await session.abortTransaction();

let response: DeleteResult = { response: 'UNKNOWN_FAILURE', failureMessage: e.message };

// If this is a MongoError, it has a codeName
if (e.codeName === 'WriteConflict') {
response = {
return {
response: 'DELETE_FAILURE_WRITE_CONFLICT',
failureMessage: 'Write conflict due to concurrent access to this or related resources',
};
}

await session.abortTransaction();

return response;
return { response: 'UNKNOWN_FAILURE', failureMessage: e.message };
} finally {
await session.endSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import { Logger, Config } from '@edfi/meadowlark-utilities';
import { Collection, ClientSession, MongoClient, WithId } from 'mongodb';
import retry from 'async-retry';
import { MeadowlarkDocument, meadowlarkDocumentFrom } from '../model/MeadowlarkDocument';
import { getDocumentCollection, limitFive, onlyReturnTimestamps, writeLockReferencedDocuments } from './Db';
import { getDocumentCollection, limitFive, onlyReturnTimestamps, lockDocuments, getConcurrencyCollection } from './Db';
import { deleteDocumentByMeadowlarkIdTransaction } from './Delete';
import { onlyDocumentsReferencing, validateReferences } from './ReferenceValidation';
import { upsertDocumentTransaction } from './Upsert';
import { ConcurrencyDocument } from '../model/ConcurrencyDocument';

const moduleName: string = 'mongodb.repository.Update';

Expand All @@ -26,6 +27,7 @@ const moduleName: string = 'mongodb.repository.Update';
async function insertUpdatedDocument(
{ meadowlarkId, resourceInfo, documentInfo, edfiDoc, traceId, security }: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
document: MeadowlarkDocument,
): Promise<UpdateResult> {
Expand All @@ -40,6 +42,7 @@ async function insertUpdatedDocument(
security,
},
mongoCollection,
concurrencyCollection,
session,
document,
);
Expand Down Expand Up @@ -143,10 +146,31 @@ async function updateAllowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
const { documentUuid, resourceInfo, traceId, security } = updateRequest;

const referringDocumentUuids: WithId<MeadowlarkDocument>[] = await mongoCollection
.find(
{
aliasMeadowlarkIds: {
$in: document.outboundRefs,
},
},

{ projection: { documentUuid: 1 } },
)
.toArray();

const concurrencyDocuments: ConcurrencyDocument[] = referringDocumentUuids.map((referringDocumentUuid) => ({
_id: referringDocumentUuid.documentUuid,
}));

concurrencyDocuments.push({ _id: updateRequest.documentUuid });

await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

// Optimize happy path by trying a replacement update, which will succeed if there is no identity change
const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
Expand All @@ -156,8 +180,6 @@ async function updateAllowingIdentityChange(
);

if (tryUpdateByReplacementResult != null) {
// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);
return tryUpdateByReplacementResult;
}

Expand Down Expand Up @@ -189,14 +211,15 @@ async function updateAllowingIdentityChange(
const deleteResult = await deleteDocumentByMeadowlarkIdTransaction(
{ documentUuid, resourceInfo, security, validateNoReferencesToDocument: true, traceId },
mongoCollection,
concurrencyCollection,
session,
);
Logger.debug(`${moduleName}.updateAllowingIdentityChange: Updating document uuid ${documentUuid}`, traceId);

switch (deleteResult.response) {
case 'DELETE_SUCCESS':
// document was deleted, so we can insert the new version
return insertUpdatedDocument(updateRequest, mongoCollection, session, document);
return insertUpdatedDocument(updateRequest, mongoCollection, concurrencyCollection, session, document);
case 'DELETE_FAILURE_NOT_EXISTS':
// document was not found on delete, which shouldn't happen
return {
Expand Down Expand Up @@ -227,6 +250,7 @@ async function updateDisallowingIdentityChange(
document: MeadowlarkDocument,
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
// Perform the document update
Expand All @@ -235,8 +259,26 @@ async function updateDisallowingIdentityChange(
updateRequest.traceId,
);

// Ensure referenced documents are not modified in other transactions
await writeLockReferencedDocuments(mongoCollection, document.outboundRefs, session);
const referringDocumentUuids: WithId<MeadowlarkDocument>[] = await mongoCollection
.find(
{
aliasMeadowlarkIds: {
$in: document.outboundRefs,
},
},
{ projection: { documentUuid: 1 } },
)
.toArray();

const concurrencyDocuments: ConcurrencyDocument[] = referringDocumentUuids.map((referringDocumentUuid) => ({
_id: referringDocumentUuid.documentUuid,
}));

concurrencyDocuments.push({ _id: updateRequest.documentUuid });

// Inserting the same DocumentUuid in Concurrency Collection will result in a WriteConflict error
// By generating this conflict we handle concurrency
await lockDocuments(concurrencyCollection, concurrencyDocuments, session);

const tryUpdateByReplacementResult: UpdateResult | null = await tryUpdateByReplacement(
document,
Expand Down Expand Up @@ -319,6 +361,7 @@ async function checkForInvalidReferences(
async function updateDocumentByDocumentUuidTransaction(
updateRequest: UpdateRequest,
mongoCollection: Collection<MeadowlarkDocument>,
concurrencyCollection: Collection<ConcurrencyDocument>,
session: ClientSession,
): Promise<UpdateResult> {
const { meadowlarkId, documentUuid, resourceInfo, documentInfo, edfiDoc, validateDocumentReferencesExist, security } =
Expand All @@ -345,9 +388,9 @@ async function updateDocumentByDocumentUuidTransaction(
lastModifiedAt: documentInfo.requestTimestamp,
});
if (resourceInfo.allowIdentityUpdates) {
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateAllowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session);
}
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, session);
return updateDisallowingIdentityChange(document, updateRequest, mongoCollection, concurrencyCollection, session);
}

/**
Expand All @@ -362,6 +405,7 @@ export async function updateDocumentByDocumentUuid(
Logger.info(`${moduleName}.updateDocumentByDocumentUuid ${documentUuid}`, traceId);

const mongoCollection: Collection<MeadowlarkDocument> = getDocumentCollection(client);
const concurrencyCollection: Collection<ConcurrencyDocument> = getConcurrencyCollection(client);
const session: ClientSession = client.startSession();
let updateResult: UpdateResult = { response: 'UNKNOWN_FAILURE' };

Expand All @@ -371,7 +415,12 @@ export async function updateDocumentByDocumentUuid(
await retry(
async () => {
await session.withTransaction(async () => {
updateResult = await updateDocumentByDocumentUuidTransaction(updateRequest, mongoCollection, session);
updateResult = await updateDocumentByDocumentUuidTransaction(
updateRequest,
mongoCollection,
concurrencyCollection,
session,
);
if (updateResult.response !== 'UPDATE_SUCCESS') {
await session.abortTransaction();
}
Expand All @@ -391,7 +440,6 @@ export async function updateDocumentByDocumentUuid(
Logger.error(`${moduleName}.updateDocumentByDocumentUuid`, traceId, e);
await session.abortTransaction();

// If this is a MongoError, it has a codeName
if (e.codeName === 'WriteConflict') {
return {
response: 'UPDATE_FAILURE_WRITE_CONFLICT',
Expand Down
Loading
Loading