Skip to content

Commit

Permalink
WIP, included storage.storeFile as part of the transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
daneryl committed Feb 4, 2025
1 parent ea845fa commit 6d311c2
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 69 deletions.
38 changes: 38 additions & 0 deletions app/api/files/specs/storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,44 @@ describe('storage', () => {
});
});

describe('storeMultipleFiles', () => {
beforeEach(async () => {
testingTenants.changeCurrentTenant({ featureFlags: { s3Storage: false } });
});

afterEach(async () => {
await storage.removeFile('file1.txt', 'document');
await storage.removeFile('file2.txt', 'document');
await storage.removeFile('file3.txt', 'document');
jest.restoreAllMocks();
});

it('should rollback already uploaded files if an error occurs', async () => {
const files = [
{ filename: 'file1.txt', file: Readable.from(['content1']), type: 'document' },
{ filename: 'file2.txt', file: Readable.from(['content2']), type: 'document' },
{ filename: 'file3.txt', file: Readable.from(['content3']), type: 'document' },
];

const originalStoreFile = storage.storeFile.bind(storage);
jest.spyOn(storage, 'storeFile').mockImplementation(async (filename, file, type) => {
if (filename === 'file2.txt') {
throw new Error('Upload error');
}
return originalStoreFile(filename, file, type);
});

await expect(storage.storeMultipleFiles(files)).rejects.toThrow('Upload error');

const file1Exists = await storage.fileExists('file1.txt', 'document');
const file2Exists = await storage.fileExists('file2.txt', 'document');
const file3Exists = await storage.fileExists('file3.txt', 'document');

expect(file1Exists).toBe(false);
expect(file2Exists).toBe(false);
expect(file3Exists).toBe(false);
});
});
describe('createDirectory', () => {
afterEach(async () => {
try {
Expand Down
33 changes: 31 additions & 2 deletions app/api/files/storage.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { NoSuchKey, S3Client } from '@aws-sdk/client-s3';
import { config } from 'api/config';
import { tenants } from 'api/tenants';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import { inspect } from 'util';
// eslint-disable-next-line node/no-restricted-import
import { createReadStream, createWriteStream } from 'fs';
// eslint-disable-next-line node/no-restricted-import
import { access, readdir } from 'fs/promises';
import path from 'path';

import { config } from 'api/config';
import { legacyLogger } from 'api/log';
import { tenants } from 'api/tenants';
import { FileType } from 'shared/types/fileType';
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';

import { FileNotFound } from './FileNotFound';
import {
activityLogPath,
Expand Down Expand Up @@ -173,4 +177,29 @@ export const storage = {

return paths[type](filename);
},

async storeMultipleFiles(files: { filename: string; file: Readable; type: FileTypes }[]) {
const uploadedFiles: { filename: string; type: FileTypes }[] = [];

try {
await files.reduce(async (promise, { filename, file, type }) => {
await promise;
await this.storeFile(filename, file, type);
uploadedFiles.push({ filename, type });
}, Promise.resolve());
} catch (error) {
await Promise.all(
uploadedFiles.map(async ({ filename, type }) => {
try {
await this.removeFile(filename, type);
} catch (rollbackError) {
legacyLogger.error(
inspect(new Error('Failed to rollback file', { cause: rollbackError }))
);
}
})
);
throw error;
}
},
};
19 changes: 18 additions & 1 deletion app/api/odm/sessionsContext.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { ClientSession } from 'mongoose';
import { Readable } from 'stream';

import { tenants } from 'api/tenants';
import { appContext } from 'api/utils/AppContext';
import { ClientSession } from 'mongoose';

import { DB } from './DB';

export const dbSessionContext = {
Expand All @@ -15,13 +18,21 @@ export const dbSessionContext = {
);
},

getFileOperations() {
return (
(appContext.get('fileOperations') as { filename: string; file: Readable; type: string }[]) ||
[]
);
},

clearSession() {
appContext.set('mongoSession', undefined);
},

clearContext() {
appContext.set('mongoSession', undefined);
appContext.set('reindexOperations', undefined);
appContext.set('fileOperations', undefined);
},

async startSession() {
Expand All @@ -37,4 +48,10 @@ export const dbSessionContext = {
reindexOperations.push(args);
appContext.set('reindexOperations', reindexOperations);
},

registerFileOperation(args: { filename: string; file: Readable; type: string }) {
const fileOperations = dbSessionContext.getFileOperations();
fileOperations.push(args);
appContext.set('fileOperations', fileOperations);
},
};
1 change: 1 addition & 0 deletions app/api/odm/specs/mongooseModelWrapper_sessions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe('MultiTenantMongooseModel Session operations', () => {

afterAll(async () => {
await testingEnvironment.tearDown();
await session.endSession();
});

describe('create()', () => {
Expand Down
189 changes: 123 additions & 66 deletions app/api/utils/specs/withTransaction.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ClientSession } from 'mongodb';
import { Schema } from 'mongoose';
import { model, Schema } from 'mongoose';

Check failure on line 2 in app/api/utils/specs/withTransaction.spec.ts

View workflow job for this annotation

GitHub Actions / eslint

'model' is defined but never used

import entities from 'api/entities';
import { instanceModel } from 'api/odm/model';
Expand All @@ -11,6 +11,8 @@ import { elasticTesting } from '../elastic_testing';
import { getFixturesFactory } from '../fixturesFactory';
import { testingEnvironment } from '../testingEnvironment';
import { withTransaction } from '../withTransaction';
import { storage } from 'api/files';
import { Readable } from 'stream';

const factory = getFixturesFactory();

Expand Down Expand Up @@ -229,86 +231,141 @@ describe('withTransaction utility', () => {
});
});
});
});
describe('Entities elasticsearch index', () => {
beforeEach(async () => {
await testingEnvironment.setUp(
{
transactiontests: [],
templates: [factory.template('template1')],
entities: [
factory.entity('existing1', 'template1'),
factory.entity('existing2', 'template1'),
],
settings: [{ languages: [{ label: 'English', key: 'en', default: true }] }],
},
'with_transaction_index'
);
testingEnvironment.unsetFakeContext();
});

describe('Entities elasticsearch index', () => {
beforeEach(async () => {
await testingEnvironment.setUp(
{
transactiontests: [],
templates: [factory.template('template1')],
entities: [
factory.entity('existing1', 'template1'),
factory.entity('existing2', 'template1'),
],
settings: [{ languages: [{ label: 'English', key: 'en', default: true }] }],
},
'with_transaction_index'
);
testingEnvironment.unsetFakeContext();
});
it('should handle delayed reindexing after a successful transaction', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await entities.save(
{ ...factory.entity('test1', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{ ...factory.entity('test2', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
});

it('should handle delayed reindexing after a successful transaction', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await entities.save(
{ ...factory.entity('test1', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
);
await entities.save(
{ ...factory.entity('test2', 'template1'), _id: undefined, sharedId: undefined },
{ user: {}, language: 'es' },
{ updateRelationships: false }
await elasticTesting.refresh();
const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toHaveLength(4);
expect(indexedEntities).toEqual(
expect.arrayContaining([
expect.objectContaining({ title: 'test1' }),
expect.objectContaining({ title: 'test2' }),
expect.objectContaining({ title: 'existing1' }),
expect.objectContaining({ title: 'existing2' }),
])
);
});

await elasticTesting.refresh();
const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toHaveLength(4);
expect(indexedEntities).toEqual(
expect.arrayContaining([
expect.objectContaining({ title: 'test1' }),
expect.objectContaining({ title: 'test2' }),
expect.objectContaining({ title: 'existing1' }),
expect.objectContaining({ title: 'existing2' }),
])
);
});
});

it('should not index changes to elasticsearch if transaction is aborted manually', async () => {
await appContext.run(async () => {
await withTransaction(async ({ abort }) => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
await abort();
it('should not index changes to elasticsearch if transaction is aborted manually', async () => {
await appContext.run(async () => {
await withTransaction(async ({ abort }) => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
await abort();
});

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
});
});

it('should not index changes to elasticsearch if transaction is aborted by an error', async () => {
await appContext.run(async () => {
let error;
try {
await withTransaction(async () => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
throw new Error('Testing error');
});
} catch (e) {
error = e;
}

expect(error.message).toBe('Testing error');

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
});
});
});

it('should not index changes to elasticsearch if transaction is aborted by an error', async () => {
await appContext.run(async () => {
let error;
try {
describe('storeFile', () => {
it('should store file after transaction is committed', async () => {
await appContext.run(async () => {
await withTransaction(async () => {
await saveEntity({ ...factory.entity('existing1', 'template1'), title: 'update1' });
await saveEntity({ ...factory.entity('existing2', 'template1'), title: 'update2' });
await createEntity(factory.entity('new', 'template1'));
throw new Error('Testing error');
await model.save({ title: 'test-file', value: 1 });
await storage.storeFile('file_to_commit.txt', Readable.from(['content']), 'document');
});
} catch (e) {
error = e;
}

expect(error.message).toBe('Testing error');
const docs = await model.get({ title: 'test-file' });
expect(docs[0]).toBeTruthy();
expect(docs[0].value).toBe(1);

expect(await storage.fileExists('file_to_commit.txt', 'document')).toBe(true);
});
});

it('should rollback transaction when storeFile operation fails', async () => {
await appContext.run(async () => {
let errorThrown;
jest.spyOn(storage, 'storeMultipleFiles').mockImplementation(async () => {
throw new Error('Intentional storeFile error');
});

try {
await withTransaction(async () => {
await model.save({ title: 'test-file-fail', value: 1 });
await storage.storeFile('file_to_fail.txt', Readable.from(['content']), 'document');
});
} catch (error) {
errorThrown = error;
}

expect(errorThrown.message).toBe('Intentional storeFile error');

const docs = await model.get({ title: 'test-file-fail' });
expect(docs).toHaveLength(0);
});
});

const indexedEntities = await elasticTesting.getIndexedEntities();
expect(indexedEntities).toMatchObject([{ title: 'existing1' }, { title: 'existing2' }]);
it('should rollback transaction when manually aborted after storeFile operation', async () => {
await appContext.run(async () => {
jest.spyOn(storage, 'storeMultipleFiles').mockImplementation(async () => {
throw new Error('Intentional storeFile error');
});
await withTransaction(async ({ abort }) => {
await model.save({ title: 'test-file-abort', value: 1 });
await storage.storeFile('file_to_abort.txt', Readable.from(['content']), 'document');
await abort();
});

const docs = await model.get({ title: 'test-file-abort' });
expect(docs).toHaveLength(0);
});
});
});
});
Loading

0 comments on commit 6d311c2

Please sign in to comment.