diff --git a/app/api/entities/routes.js b/app/api/entities/routes.js index 0a8f35b14b..fb005cb472 100644 --- a/app/api/entities/routes.js +++ b/app/api/entities/routes.js @@ -5,7 +5,7 @@ import { search } from 'api/search'; import { withTransaction } from 'api/utils/withTransaction'; import needsAuthorization from '../auth/authMiddleware'; import templates from '../templates/templates'; -import thesauri from '../thesauri/thesauri'; +import { thesauri } from '../thesauri/thesauri'; import { parseQuery, validation } from '../utils'; import date from '../utils/date'; import entities from './entities'; @@ -81,7 +81,7 @@ export default app => { activitylogMiddleware, async (req, res, next) => { try { - await withTransaction(async () => { + await withTransaction(async ({ abort }) => { const entityToSave = req.body.entity ? JSON.parse(req.body.entity) : req.body; const result = await saveEntity(entityToSave, { user: req.user, @@ -92,7 +92,7 @@ export default app => { const { entity, errors } = result; await updateThesauriWithEntity(entity, req); if (errors.length) { - console.log(errors); + await abort(); } res.json(req.body.entity ? result : entity); }); diff --git a/app/api/odm/sessionsContext.ts b/app/api/odm/sessionsContext.ts index fa74e4b845..f527558e39 100644 --- a/app/api/odm/sessionsContext.ts +++ b/app/api/odm/sessionsContext.ts @@ -8,10 +8,19 @@ export const dbSessionContext = { return appContext.get('mongoSession') as ClientSession | undefined; }, + getReindexOperations() { + return (appContext.get('reindexOperations') as []) || []; + }, + clearSession() { appContext.set('mongoSession', undefined); }, + clearContext() { + appContext.set('mongoSession', undefined); + appContext.set('reindexOperations', undefined); + }, + async startSession() { const currentTenant = tenants.current(); const connection = DB.connectionForDB(currentTenant.dbName); @@ -19,4 +28,10 @@ export const dbSessionContext = { appContext.set('mongoSession', session); return session; }, + + registerESIndexOperation(args) { + const reindexOperations = dbSessionContext.getReindexOperations(); + reindexOperations.push(args); + appContext.set('reindexOperations', reindexOperations); + }, }; diff --git a/app/api/utils/specs/withTransaction.spec.ts b/app/api/utils/specs/withTransaction.spec.ts index 2b5faaa57e..139ec7b774 100644 --- a/app/api/utils/specs/withTransaction.spec.ts +++ b/app/api/utils/specs/withTransaction.spec.ts @@ -1,16 +1,27 @@ -import { instanceModel } from 'api/odm/model'; -import { dbSessionContext } from 'api/odm/sessionsContext'; -import { testingEnvironment } from 'api/utils/testingEnvironment'; -import { withTransaction } from 'api/utils/withTransaction'; import { ClientSession } from 'mongodb'; import { Schema } from 'mongoose'; + +import entities from 'api/entities'; +import { instanceModel } from 'api/odm/model'; +import { dbSessionContext } from 'api/odm/sessionsContext'; + import { appContext } from '../AppContext'; +import { elasticTesting } from '../elastic_testing'; +import { getFixturesFactory } from '../fixturesFactory'; +import { testingEnvironment } from '../testingEnvironment'; +import { withTransaction } from '../withTransaction'; + +const factory = getFixturesFactory(); interface TestDoc { title: string; value?: number; } +afterAll(async () => { + await testingEnvironment.tearDown(); +}); + describe('withTransaction utility', () => { let model: any; @@ -27,10 +38,6 @@ describe('withTransaction utility', () => { testingEnvironment.unsetFakeContext(); }); - afterAll(async () => { - await testingEnvironment.tearDown(); - }); - it('should commit transaction when operation succeeds', async () => { await appContext.run(async () => { await withTransaction(async () => { @@ -212,3 +219,199 @@ describe('withTransaction utility', () => { }); }); }); + +describe('withTransaction utility with ElasticSearch indexing', () => { + beforeEach(async () => { + await testingEnvironment.setUp( + { + transactiontests: [], + templates: [factory.template('template1')], + entities: [ + factory.entity('existing-entity1', 'template1'), + factory.entity('existing-entity2', 'template1'), + ], + settings: [{ languages: [{ label: 'English', key: 'en', default: true }] }], + }, + 'with_transaction_index' + ); + testingEnvironment.unsetFakeContext(); + }); + + it('should handle delayed reindexing after a successful transaction', async () => { + await appContext.run(async () => { + await withTransaction(async () => { + await entities.save( + { ...factory.entity('test1', 'template1'), _id: undefined, sharedId: undefined }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await entities.save( + { ...factory.entity('test2', 'template1'), _id: undefined, sharedId: undefined }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + }); + + await elasticTesting.refresh(); + const indexedEntities = await elasticTesting.getIndexedEntities(); + expect(indexedEntities).toHaveLength(4); + expect(indexedEntities).toEqual( + expect.arrayContaining([ + expect.objectContaining({ title: 'test1' }), + expect.objectContaining({ title: 'test2' }), + expect.objectContaining({ title: 'existing-entity1' }), + expect.objectContaining({ title: 'existing-entity2' }), + ]) + ); + }); + }); + + describe('entities update', () => { + // this test should test what happens if the errors occurs after reindex happened + it('should store indexing operations during a transaction and not execute them on abort', async () => { + await appContext.run(async () => { + let error; + try { + await withTransaction(async ({ abort }) => { + await entities.save( + { + ...factory.entity('existing-entity1', 'template1'), + title: 'update-existing-1', + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await entities.save( + { + ...factory.entity('existing-entity2', 'template1'), + title: 'update-existing-2', + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await abort(); + }); + } catch (e) { + error = e; + } + + expect(error).toBeUndefined(); + + const indexedEntities = await elasticTesting.getIndexedEntities(); + expect(indexedEntities).toMatchObject([ + { title: 'existing-entity1' }, + { title: 'existing-entity2' }, + ]); + }); + }); + + // this test should test what happens if the errors occurs after reindex happened + it('should make sure the version of the indexed entities is the correct one if the operation fails', async () => { + await appContext.run(async () => { + let error; + try { + await withTransaction(async () => { + await entities.save( + { + ...factory.entity('existing-entity1', 'template1'), + title: 'update-existing-1', + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await entities.save( + { + ...factory.entity('existing-entity2', 'template1'), + title: 'update-existing-2', + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + throw new Error('Testing error'); + }); + } catch (e) { + error = e; + } + + expect(error.message).toBe('Testing error'); + + const indexedEntities = await elasticTesting.getIndexedEntities(); + expect(indexedEntities).toMatchObject([ + { title: 'existing-entity1' }, + { title: 'existing-entity2' }, + ]); + }); + }); + }); + + describe('entities create', () => { + xit('should store indexing operations during a transaction and not execute them on abort', async () => { + await appContext.run(async () => { + let error; + try { + await withTransaction(async ({ abort }) => { + await entities.save( + { + ...factory.entity('abort-test1', 'template1'), + _id: undefined, + sharedId: undefined, + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await entities.save( + { + ...factory.entity('abort-test2', 'template1'), + _id: undefined, + sharedId: undefined, + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + await abort(); + }); + } catch (e) { + error = e; + } + + expect(error).toBeUndefined(); + + const indexedEntities = await elasticTesting.getIndexedEntities(); + expect(indexedEntities).toHaveLength(0); + }); + }); + + xit('should reindex if necessary when the transaction fails', async () => { + await appContext.run(async () => { + let error; + try { + await withTransaction(async () => { + await entities.save( + { + ...factory.entity('reindex-test1', 'template1'), + _id: undefined, + sharedId: undefined, + }, + { user: {}, language: 'es' }, + { updateRelationships: false } + ); + throw new Error('Intentional error'); + }); + } catch (e) { + error = e; + } + + expect(error?.message).toBe('Intentional error'); + + const indexedEntities = await elasticTesting.getIndexedEntities(); + expect(indexedEntities).toHaveLength(0); + + // Reindex after the transaction failure + await elasticTesting.reindex(); + + const reindexedEntities = await elasticTesting.getIndexedEntities(); + expect(reindexedEntities).toHaveLength(0); // No documents should be indexed yet + }); + }); + }); +}); diff --git a/app/api/utils/testing_db.ts b/app/api/utils/testing_db.ts index 6281865912..f6a27484b0 100644 --- a/app/api/utils/testing_db.ts +++ b/app/api/utils/testing_db.ts @@ -126,6 +126,7 @@ const testingDB: { async tearDown() { await this.disconnect(); + connected = false; }, async disconnect() { diff --git a/app/api/utils/withTransaction.ts b/app/api/utils/withTransaction.ts index d4235876f6..351bb67b87 100644 --- a/app/api/utils/withTransaction.ts +++ b/app/api/utils/withTransaction.ts @@ -1,9 +1,27 @@ import { dbSessionContext } from 'api/odm/sessionsContext'; +import { search } from 'api/search'; +import { inspect } from 'util'; interface TransactionOperation { abort: () => Promise; } +const originalIndexEntities = search.indexEntities.bind(search); +search.indexEntities = async (...args) => { + if (dbSessionContext.getSession()) { + return dbSessionContext.registerESIndexOperation(args); + } + return originalIndexEntities(...args); +}; + +const performDelayedReindexes = async () => { + await Promise.all( + dbSessionContext.getReindexOperations().map(reindexArgs => { + return originalIndexEntities(...reindexArgs); + }) + ); +}; + const withTransaction = async ( operation: (context: TransactionOperation) => Promise ): Promise => { @@ -15,6 +33,7 @@ const withTransaction = async ( abort: async () => { if (session.inTransaction()) { await session.abortTransaction(); + // await performDelayedReindexes(); } wasManuallyAborted = true; }, @@ -24,12 +43,15 @@ const withTransaction = async ( const result = await operation(context); if (!wasManuallyAborted) { await session.commitTransaction(); + dbSessionContext.clearSession(); + await performDelayedReindexes(); } return result; } catch (e) { if (!wasManuallyAborted) { await session.abortTransaction(); } + // await performDelayedReindexes(); throw e; } finally { dbSessionContext.clearSession();