diff --git a/app/api/permissions/permissionsContext.ts b/app/api/permissions/permissionsContext.ts index 4bc8571198..491dfa271c 100644 --- a/app/api/permissions/permissionsContext.ts +++ b/app/api/permissions/permissionsContext.ts @@ -23,4 +23,8 @@ export const permissionsContext = { setCommandContext: () => { appContext.set('user', permissionsContext.commandUser); }, + + setCommandContextAsDefault: () => { + appContext.setValueAsDefault('user', permissionsContext.commandUser); + }, }; diff --git a/app/api/utils/AppContext.ts b/app/api/utils/AppContext.ts index bf14475915..238da00b29 100644 --- a/app/api/utils/AppContext.ts +++ b/app/api/utils/AppContext.ts @@ -7,6 +7,8 @@ interface ContextData { class AppContext { private storage = new AsyncLocalStorage(); + private defaultData: { [k: string]: any } = {}; + private getContextObject() { const data = this.storage.getStore(); if (!data) { @@ -17,7 +19,7 @@ class AppContext { async run(cb: () => Promise, data: ContextData = {}): Promise { return new Promise((resolve, reject) => { - this.storage.run(data, () => { + this.storage.run({ ...this.defaultData, ...data }, () => { cb().then(resolve).catch(reject); }); }); @@ -30,6 +32,10 @@ class AppContext { set(key: string, value: unknown) { this.getContextObject()[key] = value; } + + setValueAsDefault(key: string, value: unknown) { + this.defaultData[key] = value; + } } const appContext = new AppContext(); diff --git a/app/api/utils/specs/appContext.spec.ts b/app/api/utils/specs/appContext.spec.ts index 9c88db75ce..c770acda16 100644 --- a/app/api/utils/specs/appContext.spec.ts +++ b/app/api/utils/specs/appContext.spec.ts @@ -48,6 +48,25 @@ describe('appContext', () => { }); }); + describe('setValueAsDefault', () => { + it('should set a value that will be used as default when run is executed', async () => { + appContext.setValueAsDefault('defaultKey', 'defaultValue'); + await appContext.run( + async () => { + expect(appContext.get('defaultKey')).toBe('defaultValue'); + expect(appContext.get('tenant')).toBe('test_tenant'); + }, + { tenant: 'test_tenant' } + ); + await appContext.run( + async () => { + expect(appContext.get('defaultKey')).toBe('another value'); + }, + { defaultKey: 'another value' } + ); + }); + }); + describe('when outside a context', () => { const error = new Error('Accessing nonexistent async context'); diff --git a/app/worker.ts b/app/worker.ts index 5bdbbe2880..26890af529 100644 --- a/app/worker.ts +++ b/app/worker.ts @@ -41,89 +41,82 @@ process.on('uncaughtException', uncaughtError); DB.connect(config.DBHOST, dbAuth) .then(async () => { await tenants.setupTenants(); + permissionsContext.setCommandContextAsDefault(); setupWorkerSockets(); - await tenants.run(async () => { - permissionsContext.setCommandContext(); - - systemLogger.info('[Worker] - ==> 📡 starting external services...'); - - const services: Record = { - ocr_manager: ocrManager, - at_service: new ATServiceListener(), - information_extractor: new InformationExtraction(), - convert_pdf: new ConvertToPdfWorker(), - preserve_integration: new DistributedLoop( - 'preserve_integration', - async () => preserveSync.syncAllTenants(), - { - port: config.redis.port, - host: config.redis.host, - delayTimeBetweenTasks: 30000, - } - ), - toc_service: new DistributedLoop( - 'toc_service', - async () => tocService.processAllTenants(), - { - port: config.redis.port, - host: config.redis.host, - delayTimeBetweenTasks: 30000, - } - ), - sync_job: new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), { + systemLogger.info('[Worker] - ==> 📡 starting external services...'); + + const services: Record = { + ocr_manager: ocrManager, + at_service: new ATServiceListener(), + information_extractor: new InformationExtraction(), + convert_pdf: new ConvertToPdfWorker(), + preserve_integration: new DistributedLoop( + 'preserve_integration', + async () => preserveSync.syncAllTenants(), + { port: config.redis.port, host: config.redis.host, - delayTimeBetweenTasks: 1000, - }), - - pdf_segmentation: new PDFSegmentation(), - twitter_integration: new TwitterIntegration(), - }; - - services.segmentation_distributed_loop = new DistributedLoop( - 'segmentation_repeat', - services.pdf_segmentation.segmentPdfs, - { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 } + delayTimeBetweenTasks: 30000, + } + ), + toc_service: new DistributedLoop('toc_service', async () => tocService.processAllTenants(), { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 30000, + }), + sync_job: new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 1000, + }), + + pdf_segmentation: new PDFSegmentation(), + twitter_integration: new TwitterIntegration(), + }; + + services.segmentation_distributed_loop = new DistributedLoop( + 'segmentation_repeat', + services.pdf_segmentation.segmentPdfs, + { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 } + ); + + services.twitter_distributed_loop = new DistributedLoop( + 'twitter_repeat', + services.twitter_integration.addTweetsRequestsToQueue, + { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 } + ); + + Object.values(services).forEach(service => service.start()); + + process.on('SIGINT', async () => { + systemLogger.info( + '[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...' ); - services.twitter_distributed_loop = new DistributedLoop( - 'twitter_repeat', - services.twitter_integration.addTweetsRequestsToQueue, - { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 } + const stoppedServices: string[] = []; + + const stopPromises = Promise.all( + Object.entries(services).map(async ([name, service]) => { + await service.stop(); + stoppedServices.push(name); + }) ); + const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]); - Object.values(services).forEach(service => service.start()); + if (Array.isArray(firstToFinish)) { + systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!'); + } else { + const notStoppedServices = Object.keys(services) + .filter(service => !stoppedServices.includes(service)) + .join(', '); - process.on('SIGINT', async () => { systemLogger.info( - '[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...' + `[Worker Graceful shutdown] - These services [${notStoppedServices}] did not stop in time, initiating forceful shutdown...` ); + } - const stoppedServices: string[] = []; - - const stopPromises = Promise.all( - Object.entries(services).map(async ([name, service]) => { - await service.stop(); - stoppedServices.push(name); - }) - ); - const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]); - - if (Array.isArray(firstToFinish)) { - systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!'); - } else { - const notStoppedServices = Object.keys(services) - .filter(service => !stoppedServices.includes(service)) - .join(', '); - - systemLogger.info( - `[Worker Graceful shutdown] - These services [${notStoppedServices}] did not stop in time, initiating forceful shutdown...` - ); - } - - process.exit(0); - }); + process.exit(0); }); }) .catch(error => { diff --git a/package.json b/package.json index c7e1bb966d..9e3e8e00ea 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "uwazi", - "version": "1.196.0-rc3", + "version": "1.197.0-rc2", "description": "Uwazi is a free, open-source solution for organising, analysing and publishing your documents.", "keywords": [ "react"