Skip to content

Commit

Permalink
WIP, include elasticsearch entities index into the transaction process
Browse files Browse the repository at this point in the history
  • Loading branch information
daneryl committed Jan 31, 2025
1 parent 442feb4 commit f91c11d
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 11 deletions.
6 changes: 3 additions & 3 deletions app/api/entities/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { search } from 'api/search';
import { withTransaction } from 'api/utils/withTransaction';
import needsAuthorization from '../auth/authMiddleware';
import templates from '../templates/templates';
import thesauri from '../thesauri/thesauri';
import { thesauri } from '../thesauri/thesauri';
import { parseQuery, validation } from '../utils';
import date from '../utils/date';
import entities from './entities';
Expand Down Expand Up @@ -81,7 +81,7 @@ export default app => {
activitylogMiddleware,
async (req, res, next) => {
try {
await withTransaction(async () => {
await withTransaction(async ({ abort }) => {
const entityToSave = req.body.entity ? JSON.parse(req.body.entity) : req.body;
const result = await saveEntity(entityToSave, {
user: req.user,
Expand All @@ -92,7 +92,7 @@ export default app => {
const { entity, errors } = result;
await updateThesauriWithEntity(entity, req);
if (errors.length) {
console.log(errors);
await abort();
}
res.json(req.body.entity ? result : entity);
});
Expand Down
15 changes: 15 additions & 0 deletions app/api/odm/sessionsContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,30 @@ export const dbSessionContext = {
return appContext.get('mongoSession') as ClientSession | undefined;
},

getReindexOperations() {
return (appContext.get('reindexOperations') as []) || [];
},

clearSession() {
appContext.set('mongoSession', undefined);
},

clearContext() {
appContext.set('mongoSession', undefined);
appContext.set('reindexOperations', undefined);
},

async startSession() {
const currentTenant = tenants.current();
const connection = DB.connectionForDB(currentTenant.dbName);
const session = await connection.startSession();
appContext.set('mongoSession', session);
return session;
},

registerESIndexOperation(args) {
const reindexOperations = dbSessionContext.getReindexOperations();
reindexOperations.push(args);
appContext.set('reindexOperations', reindexOperations);
},
};
148 changes: 140 additions & 8 deletions app/api/utils/specs/withTransaction.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import { instanceModel } from 'api/odm/model';
import { dbSessionContext } from 'api/odm/sessionsContext';
import { testingEnvironment } from 'api/utils/testingEnvironment';
import { withTransaction } from 'api/utils/withTransaction';
import { ClientSession } from 'mongodb';
import { Schema } from 'mongoose';

import entities from 'api/entities';
import { instanceModel } from 'api/odm/model';
import { dbSessionContext } from 'api/odm/sessionsContext';

import { appContext } from '../AppContext';
import { elasticTesting } from '../elastic_testing';
import { getFixturesFactory } from '../fixturesFactory';
import { testingEnvironment } from '../testingEnvironment';
import { withTransaction } from '../withTransaction';

const factory = getFixturesFactory();

interface TestDoc {
title: string;
value?: number;
}

afterAll(async () => {
await testingEnvironment.tearDown();
});

describe('withTransaction utility', () => {
let model: any;

Expand All @@ -27,10 +38,6 @@ describe('withTransaction utility', () => {
testingEnvironment.unsetFakeContext();
});

afterAll(async () => {
await testingEnvironment.tearDown();
});

it('should commit transaction when operation succeeds', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
Expand Down Expand Up @@ -212,3 +219,128 @@ describe('withTransaction utility', () => {
});
});
});

describe('withTransaction utility with ElasticSearch indexing', () => {
beforeEach(async () => {
await testingEnvironment.setUp(
{
transactiontests: [],
templates: [factory.template('template1')],
entities: [
factory.entity('existing-entity1', 'template1'),
factory.entity('existing-entity2', 'template1'),
],
settings: [{ languages: [{ label: 'English', key: 'en', default: true }] }],
},
'with_transaction_index'
);
testingEnvironment.unsetFakeContext();
});

it('should handle delayed reindexing after a successful transaction', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await entities.save(
{ ...factory.entity('test1', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{ ...factory.entity('test2', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
});

await elasticTesting.refresh();
const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toHaveLength(4);
expect(indexedEntities).toEqual(
expect.arrayContaining([
expect.objectContaining({ title: 'test1' }),
expect.objectContaining({ title: 'test2' }),
expect.objectContaining({ title: 'existing-entity1' }),
expect.objectContaining({ title: 'existing-entity2' }),
])
);
});
});

describe('entities update', () => {
// this test should test what happens if the errors occurs after reindex happened
it('should store indexing operations during a transaction and not execute them on abort', async () => {
await appContext.run(async () => {
let error;
try {
await withTransaction(async ({ abort }) => {
await entities.save(
{
...factory.entity('existing-entity1', 'template1'),
title: 'update-existing-1',
},
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{
...factory.entity('existing-entity2', 'template1'),
title: 'update-existing-2',
},
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await abort();
});
} catch (e) {
error = e;
}

expect(error).toBeUndefined();

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([
{ title: 'existing-entity1' },
{ title: 'existing-entity2' },
]);
});
});

// this test should test what happens if the errors occurs after reindex happened
it('should make sure the version of the indexed entities is the correct one if the operation fails', async () => {
await appContext.run(async () => {
let error;
try {
await withTransaction(async () => {
await entities.save(
{
...factory.entity('existing-entity1', 'template1'),
title: 'update-existing-1',
},
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{
...factory.entity('existing-entity2', 'template1'),
title: 'update-existing-2',
},
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
throw new Error('Testing error');
});
} catch (e) {
error = e;
}

expect(error.message).toBe('Testing error');

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([
{ title: 'existing-entity1' },
{ title: 'existing-entity2' },
]);
});
});
});
});
1 change: 1 addition & 0 deletions app/api/utils/testing_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const testingDB: {

async tearDown() {
await this.disconnect();
connected = false;
},

async disconnect() {
Expand Down
22 changes: 22 additions & 0 deletions app/api/utils/withTransaction.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { dbSessionContext } from 'api/odm/sessionsContext';
import { search } from 'api/search';
import { inspect } from 'util';

Check failure on line 3 in app/api/utils/withTransaction.ts

View workflow job for this annotation

GitHub Actions / eslint

'inspect' is defined but never used

interface TransactionOperation {
abort: () => Promise<void>;
}

const originalIndexEntities = search.indexEntities.bind(search);
search.indexEntities = async (...args) => {
if (dbSessionContext.getSession()) {
return dbSessionContext.registerESIndexOperation(args);
}
return originalIndexEntities(...args);
};

const performDelayedReindexes = async () => {
await Promise.all(
dbSessionContext.getReindexOperations().map(reindexArgs => {

Check failure on line 19 in app/api/utils/withTransaction.ts

View workflow job for this annotation

GitHub Actions / eslint

Functions that return promises must be async
return originalIndexEntities(...reindexArgs);
})
);
};

const withTransaction = async <T>(
operation: (context: TransactionOperation) => Promise<T>
): Promise<T> => {
Expand All @@ -15,6 +33,7 @@ const withTransaction = async <T>(
abort: async () => {
if (session.inTransaction()) {
await session.abortTransaction();
// await performDelayedReindexes();
}
wasManuallyAborted = true;
},
Expand All @@ -24,12 +43,15 @@ const withTransaction = async <T>(
const result = await operation(context);
if (!wasManuallyAborted) {
await session.commitTransaction();
dbSessionContext.clearSession();
await performDelayedReindexes();
}
return result;
} catch (e) {
if (!wasManuallyAborted) {
await session.abortTransaction();
}
// await performDelayedReindexes();
throw e;
} finally {
dbSessionContext.clearSession();
Expand Down

0 comments on commit f91c11d

Please sign in to comment.