Skip to content

Commit

Permalink
Merge branch 'development' into dependabot/npm_and_yarn/eslint-465552…
Browse files Browse the repository at this point in the history
…aa7c
  • Loading branch information
mfacar authored Jan 21, 2025
2 parents e90be2d + 44b8da5 commit 8a8e79e
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 73 deletions.
4 changes: 4 additions & 0 deletions app/api/permissions/permissionsContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ export const permissionsContext = {
setCommandContext: () => {
appContext.set('user', permissionsContext.commandUser);
},

setCommandContextAsDefault: () => {
appContext.setValueAsDefault('user', permissionsContext.commandUser);
},
};
8 changes: 7 additions & 1 deletion app/api/utils/AppContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ interface ContextData {
class AppContext {
private storage = new AsyncLocalStorage<ContextData>();

private defaultData: { [k: string]: any } = {};

private getContextObject() {
const data = this.storage.getStore();
if (!data) {
Expand All @@ -17,7 +19,7 @@ class AppContext {

async run(cb: () => Promise<void>, data: ContextData = {}): Promise<void> {
return new Promise((resolve, reject) => {
this.storage.run(data, () => {
this.storage.run({ ...this.defaultData, ...data }, () => {
cb().then(resolve).catch(reject);
});
});
Expand All @@ -30,6 +32,10 @@ class AppContext {
set(key: string, value: unknown) {
this.getContextObject()[key] = value;
}

setValueAsDefault(key: string, value: unknown) {
this.defaultData[key] = value;
}
}

const appContext = new AppContext();
Expand Down
19 changes: 19 additions & 0 deletions app/api/utils/specs/appContext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@ describe('appContext', () => {
});
});

describe('setValueAsDefault', () => {
it('should set a value that will be used as default when run is executed', async () => {
appContext.setValueAsDefault('defaultKey', 'defaultValue');
await appContext.run(
async () => {
expect(appContext.get('defaultKey')).toBe('defaultValue');
expect(appContext.get('tenant')).toBe('test_tenant');
},
{ tenant: 'test_tenant' }
);
await appContext.run(
async () => {
expect(appContext.get('defaultKey')).toBe('another value');
},
{ defaultKey: 'another value' }
);
});
});

describe('when outside a context', () => {
const error = new Error('Accessing nonexistent async context');

Expand Down
135 changes: 64 additions & 71 deletions app/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,89 +41,82 @@ process.on('uncaughtException', uncaughtError);
DB.connect(config.DBHOST, dbAuth)
.then(async () => {
await tenants.setupTenants();
permissionsContext.setCommandContextAsDefault();
setupWorkerSockets();

await tenants.run(async () => {
permissionsContext.setCommandContext();

systemLogger.info('[Worker] - ==> 📡 starting external services...');

const services: Record<string, any> = {
ocr_manager: ocrManager,
at_service: new ATServiceListener(),
information_extractor: new InformationExtraction(),
convert_pdf: new ConvertToPdfWorker(),
preserve_integration: new DistributedLoop(
'preserve_integration',
async () => preserveSync.syncAllTenants(),
{
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}
),
toc_service: new DistributedLoop(
'toc_service',
async () => tocService.processAllTenants(),
{
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}
),
sync_job: new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
systemLogger.info('[Worker] - ==> 📡 starting external services...');

const services: Record<string, any> = {
ocr_manager: ocrManager,
at_service: new ATServiceListener(),
information_extractor: new InformationExtraction(),
convert_pdf: new ConvertToPdfWorker(),
preserve_integration: new DistributedLoop(
'preserve_integration',
async () => preserveSync.syncAllTenants(),
{
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}),

pdf_segmentation: new PDFSegmentation(),
twitter_integration: new TwitterIntegration(),
};

services.segmentation_distributed_loop = new DistributedLoop(
'segmentation_repeat',
services.pdf_segmentation.segmentPdfs,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 }
delayTimeBetweenTasks: 30000,
}
),
toc_service: new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}),
sync_job: new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}),

pdf_segmentation: new PDFSegmentation(),
twitter_integration: new TwitterIntegration(),
};

services.segmentation_distributed_loop = new DistributedLoop(
'segmentation_repeat',
services.pdf_segmentation.segmentPdfs,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 }
);

services.twitter_distributed_loop = new DistributedLoop(
'twitter_repeat',
services.twitter_integration.addTweetsRequestsToQueue,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 }
);

Object.values(services).forEach(service => service.start());

process.on('SIGINT', async () => {
systemLogger.info(
'[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...'
);

services.twitter_distributed_loop = new DistributedLoop(
'twitter_repeat',
services.twitter_integration.addTweetsRequestsToQueue,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 }
const stoppedServices: string[] = [];

const stopPromises = Promise.all(
Object.entries(services).map(async ([name, service]) => {
await service.stop();
stoppedServices.push(name);
})
);
const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]);

Object.values(services).forEach(service => service.start());
if (Array.isArray(firstToFinish)) {
systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!');
} else {
const notStoppedServices = Object.keys(services)
.filter(service => !stoppedServices.includes(service))
.join(', ');

process.on('SIGINT', async () => {
systemLogger.info(
'[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...'
`[Worker Graceful shutdown] - These services [${notStoppedServices}] did not stop in time, initiating forceful shutdown...`
);
}

const stoppedServices: string[] = [];

const stopPromises = Promise.all(
Object.entries(services).map(async ([name, service]) => {
await service.stop();
stoppedServices.push(name);
})
);
const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]);

if (Array.isArray(firstToFinish)) {
systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!');
} else {
const notStoppedServices = Object.keys(services)
.filter(service => !stoppedServices.includes(service))
.join(', ');

systemLogger.info(
`[Worker Graceful shutdown] - These services [${notStoppedServices}] did not stop in time, initiating forceful shutdown...`
);
}

process.exit(0);
});
process.exit(0);
});
})
.catch(error => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "uwazi",
"version": "1.196.0-rc3",
"version": "1.197.0-rc2",
"description": "Uwazi is a free, open-source solution for organising, analysing and publishing your documents.",
"keywords": [
"react"
Expand Down

0 comments on commit 8a8e79e

Please sign in to comment.