Skip to content

Commit

Permalink
include elasticsearch entities index into the transaction process
Browse files Browse the repository at this point in the history
  • Loading branch information
daneryl committed Feb 3, 2025
1 parent 442feb4 commit a6f4bd8
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 11 deletions.
6 changes: 3 additions & 3 deletions app/api/entities/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { search } from 'api/search';
import { withTransaction } from 'api/utils/withTransaction';
import needsAuthorization from '../auth/authMiddleware';
import templates from '../templates/templates';
import thesauri from '../thesauri/thesauri';
import { thesauri } from '../thesauri/thesauri';
import { parseQuery, validation } from '../utils';
import date from '../utils/date';
import entities from './entities';
Expand Down Expand Up @@ -81,7 +81,7 @@ export default app => {
activitylogMiddleware,
async (req, res, next) => {
try {
await withTransaction(async () => {
await withTransaction(async ({ abort }) => {
const entityToSave = req.body.entity ? JSON.parse(req.body.entity) : req.body;
const result = await saveEntity(entityToSave, {
user: req.user,
Expand All @@ -92,7 +92,7 @@ export default app => {
const { entity, errors } = result;
await updateThesauriWithEntity(entity, req);
if (errors.length) {
console.log(errors);
await abort();
}
res.json(req.body.entity ? result : entity);
});
Expand Down
18 changes: 18 additions & 0 deletions app/api/odm/sessionsContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,33 @@ export const dbSessionContext = {
return appContext.get('mongoSession') as ClientSession | undefined;
},

getReindexOperations() {
return (
(appContext.get('reindexOperations') as [query?: any, select?: string, limit?: number][]) ||
[]
);
},

clearSession() {
appContext.set('mongoSession', undefined);
},

clearContext() {
appContext.set('mongoSession', undefined);
appContext.set('reindexOperations', undefined);
},

async startSession() {
const currentTenant = tenants.current();
const connection = DB.connectionForDB(currentTenant.dbName);
const session = await connection.startSession();
appContext.set('mongoSession', session);
return session;
},

registerESIndexOperation(args: [query?: any, select?: string, limit?: number]) {
const reindexOperations = dbSessionContext.getReindexOperations();
reindexOperations.push(args);
appContext.set('reindexOperations', reindexOperations);
},
};
115 changes: 107 additions & 8 deletions app/api/utils/specs/withTransaction.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
import { instanceModel } from 'api/odm/model';
import { dbSessionContext } from 'api/odm/sessionsContext';
import { testingEnvironment } from 'api/utils/testingEnvironment';
import { withTransaction } from 'api/utils/withTransaction';
import { ClientSession } from 'mongodb';
import { Schema } from 'mongoose';

import entities from 'api/entities';
import { instanceModel } from 'api/odm/model';
import { dbSessionContext } from 'api/odm/sessionsContext';

import { appContext } from '../AppContext';
import { elasticTesting } from '../elastic_testing';
import { getFixturesFactory } from '../fixturesFactory';
import { testingEnvironment } from '../testingEnvironment';
import { withTransaction } from '../withTransaction';
import { EntitySchema } from 'shared/types/entityType';

const factory = getFixturesFactory();

interface TestDoc {
title: string;
value?: number;
}

afterAll(async () => {
await testingEnvironment.tearDown();
});

const saveEntity = async (entity: EntitySchema) =>
entities.save(entity, { user: {}, language: 'es' }, { updateRelationships: false });

const createEntity = async (entity: EntitySchema) =>
entities.save(
{ ...entity, _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);

describe('withTransaction utility', () => {
let model: any;

Expand All @@ -27,10 +49,6 @@ describe('withTransaction utility', () => {
testingEnvironment.unsetFakeContext();
});

afterAll(async () => {
await testingEnvironment.tearDown();
});

it('should commit transaction when operation succeeds', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
Expand Down Expand Up @@ -212,3 +230,84 @@ describe('withTransaction utility', () => {
});
});
});

describe('Entities elasticsearch index', () => { beforeEach(async () => {

Check failure on line 234 in app/api/utils/specs/withTransaction.spec.ts

View workflow job for this annotation

GitHub Actions / eslint

Insert `⏎·`
await testingEnvironment.setUp(
{
transactiontests: [],
templates: [factory.template('template1')],
entities: [
factory.entity('existing1', 'template1'),
factory.entity('existing2', 'template1'),
],
settings: [{ languages: [{ label: 'English', key: 'en', default: true }] }],
},
'with_transaction_index'
);
testingEnvironment.unsetFakeContext();
});

it('should handle delayed reindexing after a successful transaction', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await entities.save(
{ ...factory.entity('test1', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{ ...factory.entity('test2', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
});

await elasticTesting.refresh();
const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toHaveLength(4);
expect(indexedEntities).toEqual(
expect.arrayContaining([
expect.objectContaining({ title: 'test1' }),
expect.objectContaining({ title: 'test2' }),
expect.objectContaining({ title: 'existing1' }),
expect.objectContaining({ title: 'existing2' }),
])
);
});
});

it('should not index changes to elasticsearch if transaction is aborted manually', async () => {
await appContext.run(async () => {
await withTransaction(async ({ abort }) => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
await abort();
});

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
});
});

it('should not index changes to elasticsearch if transaction is aborted by an error', async () => {
await appContext.run(async () => {
let error;
try {
await withTransaction(async () => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
throw new Error('Testing error');
});
} catch (e) {
error = e;
}

expect(error.message).toBe('Testing error');

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
});
});
});
1 change: 1 addition & 0 deletions app/api/utils/testing_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const testingDB: {

async tearDown() {
await this.disconnect();
connected = false;
},

async disconnect() {
Expand Down
19 changes: 19 additions & 0 deletions app/api/utils/withTransaction.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import { dbSessionContext } from 'api/odm/sessionsContext';
import { search } from 'api/search';

interface TransactionOperation {
abort: () => Promise<void>;
}

const originalIndexEntities = search.indexEntities.bind(search);
search.indexEntities = async (query, select, limit) => {
if (dbSessionContext.getSession()) {
return dbSessionContext.registerESIndexOperation([query, select, limit]);
}
return originalIndexEntities(query, select, limit);
};

const performDelayedReindexes = async () => {
await Promise.all(
dbSessionContext
.getReindexOperations()
.map(async reindexArgs => originalIndexEntities(...reindexArgs))
);
};

const withTransaction = async <T>(
operation: (context: TransactionOperation) => Promise<T>
): Promise<T> => {
Expand All @@ -24,6 +41,8 @@ const withTransaction = async <T>(
const result = await operation(context);
if (!wasManuallyAborted) {
await session.commitTransaction();
dbSessionContext.clearSession();
await performDelayedReindexes();
}
return result;
} catch (e) {
Expand Down

0 comments on commit a6f4bd8

Please sign in to comment.