Skip to content

Commit 6fafa98

Browse files
authored
Feat(grouper): manual event grouping (#382)
* Imp(javascript): babel source map parsing (#373) * deps(): add babel dependencies * imp(javascript): improve source-map parsing * chore(javascript): add logs * deps(hawk/types): update to new version * feat(grouper): event grouping by pattern * test(grouper): cover pattern grouping with tests * fix(grouper): remove cache key on event saving * fix(): tests mock fixed * chore(grouper): remove redundant logs * chore(grouper): move patterns to project object * imp(grouper): cache getOriginalEvent db query * chore(grouper): lint fix * fix(tests): fix getPatterns method * fix(grouper): fix findMatchingPattern method * imp(grouper): new testcases added * imp(grouper): move pattern grouping to findSimilarEvent method * imp(grouper): cache getProjectPatterns response * chore(): lint fixes * deps(): update @hawk.so/types package version * fix(tests): fix limiter tests * fix(archiver): build fix * feat(migrations): create index on event.payload.title * remove duplicated code
1 parent 7b3958b commit 6fafa98

File tree

9 files changed

+343
-63
lines changed

9 files changed

+343
-63
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* This migration creates indexes for all collections on payload.title field
3+
*/
4+
5+
/**
6+
* Index name for payload.title field
7+
*/
8+
const payloadTitleIndexName = 'payloadTitle';
9+
10+
module.exports = {
11+
async up(db) {
12+
const collections = await db.listCollections({}, {
13+
authorizedCollections: true,
14+
nameOnly: true,
15+
}).toArray();
16+
17+
const targetCollections = [];
18+
19+
collections.forEach((collection) => {
20+
if (/events/.test(collection.name)) {
21+
targetCollections.push(collection.name);
22+
}
23+
});
24+
25+
for (const collectionName of targetCollections) {
26+
const hasIndexAlready = await db.collection(collectionName).indexExists(payloadTitleIndexName);
27+
28+
if (!hasIndexAlready) {
29+
await db.collection(collectionName).createIndex({
30+
'payload.title': 1,
31+
}, {
32+
name: payloadTitleIndexName,
33+
});
34+
}
35+
}
36+
},
37+
async down(db) {
38+
const collections = await db.listCollections({}, {
39+
authorizedCollections: true,
40+
nameOnly: true,
41+
}).toArray();
42+
43+
const targetCollections = [];
44+
45+
collections.forEach((collection) => {
46+
if (/events/.test(collection.name)) {
47+
targetCollections.push(collection.name);
48+
}
49+
});
50+
51+
for (const collectionName of targetCollections) {
52+
await db.collection(collectionName).dropIndex(payloadTitleIndexName);
53+
}
54+
},
55+
};

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
},
4848
"dependencies": {
4949
"@hawk.so/nodejs": "^3.1.1",
50-
"@hawk.so/types": "^0.1.26",
50+
"@hawk.so/types": "^0.1.28",
5151
"@types/amqplib": "^0.8.2",
5252
"@types/jest": "^29.2.3",
5353
"@types/mongodb": "^3.5.15",

workers/archiver/tests/index.test.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ process.env.MAX_DAYS_NUMBER = '30';
1818

1919
const mockedProject: ProjectDBScheme = {
2020
notifications: [],
21+
eventGroupingPatterns: [],
2122
token: '5342',
2223
integrationId: 'eyJpbnRlZ3JhdGlvbklkIjoiMzg3NGNkOWMtZjJiYS00ZDVkLTk5ZmQtM2UzZjYzMDcxYmJhIiwic2VjcmV0IjoiMGZhM2JkM2EtYmMyZC00YWRiLThlMWMtNjg2OGY0MzM1YjRiIn0=',
2324
uidAdded: new ObjectId('5e4ff518628a6c714515f4db'),
@@ -53,7 +54,7 @@ describe('Archiver worker', () => {
5354

5455
beforeEach(async () => {
5556
await db.collection('releases').deleteMany({});
56-
})
57+
});
5758

5859
test('Should correctly remove old events', async () => {
5960
/**
@@ -129,7 +130,7 @@ describe('Archiver worker', () => {
129130
/**
130131
* Insert one release with object id based on current time, it should not be removed
131132
*/
132-
await db.collection('releases').insert(releasesToStay)
133+
await db.collection('releases').insert(releasesToStay);
133134

134135
const worker = new ArchiverWorker();
135136

@@ -173,9 +174,9 @@ describe('Archiver worker', () => {
173174
expect(newReleasesCollection).toEqual([
174175
mockedReleases[mockedReleasesLength - 2],
175176
mockedReleases[mockedReleasesLength - 1],
176-
])
177+
]);
177178
await worker.finish();
178-
})
179+
});
179180

180181
afterAll(async () => {
181182
await db.dropCollection('releases');

workers/grouper/src/index.ts

Lines changed: 105 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ export default class GrouperWorker extends Worker {
3232
public readonly type: string = pkg.workerType;
3333

3434
/**
35-
* Database Controller
35+
* Events database Controller
3636
*/
37-
private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI);
37+
private eventsDb: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI);
38+
39+
/**
40+
* Accounts database Controller
41+
*/
42+
private accountsDb: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI);
3843

3944
/**
4045
* This class will filter sensitive information
@@ -52,7 +57,8 @@ export default class GrouperWorker extends Worker {
5257
public async start(): Promise<void> {
5358
console.log('starting grouper worker');
5459

55-
await this.db.connect();
60+
await this.eventsDb.connect();
61+
await this.accountsDb.connect();
5662
this.prepareCache();
5763
console.log('redis initializing');
5864

@@ -67,7 +73,8 @@ export default class GrouperWorker extends Worker {
6773
public async finish(): Promise<void> {
6874
await super.finish();
6975
this.prepareCache();
70-
await this.db.close();
76+
await this.eventsDb.close();
77+
await this.accountsDb.close();
7178
await this.redis.close();
7279
}
7380

@@ -85,12 +92,13 @@ export default class GrouperWorker extends Worker {
8592
let existedEvent = await this.getEvent(task.projectId, uniqueEventHash);
8693

8794
/**
88-
* If we couldn't group by group hash (title), try grouping by Levenshtein distance with last N events
95+
* If we couldn't group by group hash (title), try grouping by Levenshtein distance or patterns
8996
*/
9097
if (!existedEvent) {
9198
const similarEvent = await this.findSimilarEvent(task.projectId, task.event);
9299

93100
if (similarEvent) {
101+
this.logger.info(`similar event: ${JSON.stringify(similarEvent)}`);
94102
/**
95103
* Override group hash with found event's group hash
96104
*/
@@ -226,7 +234,7 @@ export default class GrouperWorker extends Worker {
226234
}
227235

228236
/**
229-
* Tries to find events with a small Levenshtein distance of a title
237+
* Tries to find events with a small Levenshtein distance of a title or by matching grouping patterns
230238
*
231239
* @param projectId - where to find
232240
* @param event - event to compare
@@ -237,23 +245,101 @@ export default class GrouperWorker extends Worker {
237245

238246
const lastUniqueEvents = await this.findLastEvents(projectId, eventsCountToCompare);
239247

240-
return lastUniqueEvents.filter(prevEvent => {
248+
/**
249+
* First try to find by Levenshtein distance
250+
*/
251+
const similarByLevenshtein = lastUniqueEvents.filter(prevEvent => {
241252
const distance = levenshtein(event.title, prevEvent.payload.title);
242253
const threshold = event.title.length * diffTreshold;
243254

244255
return distance < threshold;
245256
}).pop();
257+
258+
if (similarByLevenshtein) {
259+
return similarByLevenshtein;
260+
}
261+
262+
/**
263+
* If no match by Levenshtein, try matching by patterns
264+
*/
265+
const patterns = await this.getProjectPatterns(projectId);
266+
267+
if (patterns && patterns.length > 0) {
268+
const matchingPattern = await this.findMatchingPattern(patterns, event);
269+
270+
if (matchingPattern !== null) {
271+
const originalEvent = await this.cache.get(`${projectId}:${matchingPattern}:originalEvent`, async () => {
272+
return await this.eventsDb.getConnection()
273+
.collection(`events:${projectId}`)
274+
.findOne(
275+
{ 'payload.title': { $regex: matchingPattern } },
276+
{ sort: { _id: 1 } }
277+
);
278+
});
279+
280+
this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`);
281+
282+
if (originalEvent) {
283+
return originalEvent;
284+
}
285+
}
286+
}
287+
288+
return undefined;
289+
}
290+
291+
/**
292+
* Method that returns matched pattern for event, if event do not match any of patterns return null
293+
*
294+
* @param patterns - list of the patterns of the related project
295+
* @param event - event which title would be cheched
296+
* @returns {string | null} matched pattern or null if no match
297+
*/
298+
private async findMatchingPattern(patterns: string[], event: EventDataAccepted<EventAddons>): Promise<string | null> {
299+
if (!patterns || patterns.length === 0) {
300+
return null;
301+
}
302+
303+
return patterns.filter(pattern => {
304+
const patternRegExp = new RegExp(pattern);
305+
306+
return event.title.match(patternRegExp);
307+
}).pop() || null;
308+
}
309+
310+
/**
311+
* Method that gets event patterns for a project
312+
*
313+
* @param projectId - id of the project to find related event patterns
314+
* @returns {string[]} EventPatterns object with projectId and list of patterns
315+
*/
316+
private async getProjectPatterns(projectId: string): Promise<string[]> {
317+
return this.cache.get(`project:${projectId}:patterns`, async () => {
318+
const project = await this.accountsDb.getConnection()
319+
.collection('projects')
320+
.findOne({
321+
_id: new mongodb.ObjectId(projectId),
322+
});
323+
324+
return project?.eventGroupingPatterns || [];
325+
},
326+
/**
327+
* Cache project patterns for 5 minutes since they don't change frequently
328+
*/
329+
/* eslint-disable-next-line @typescript-eslint/no-magic-numbers */
330+
5 * TimeMs.MINUTE / MS_IN_SEC);
246331
}
247332

248333
/**
249334
* Returns last N unique events by a project id
250335
*
251336
* @param projectId - where to find
252337
* @param count - how many events to return
338+
* @returns {GroupedEventDBScheme[]} list of the last N unique events
253339
*/
254340
private findLastEvents(projectId: string, count): Promise<GroupedEventDBScheme[]> {
255341
return this.cache.get(`last:${count}:eventsOf:${projectId}`, async () => {
256-
return this.db.getConnection()
342+
return this.eventsDb.getConnection()
257343
.collection(`events:${projectId}`)
258344
.find()
259345
.sort({
@@ -308,7 +394,7 @@ export default class GrouperWorker extends Worker {
308394
*/
309395
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
310396
const repetition = await this.cache.get(repetitionCacheKey, async () => {
311-
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
397+
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
312398
.findOne({
313399
groupHash: existedEvent.groupHash,
314400
'payload.user.id': eventUser.id,
@@ -342,7 +428,7 @@ export default class GrouperWorker extends Worker {
342428
*/
343429
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
344430
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
345-
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
431+
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
346432
.findOne({
347433
groupHash: existedEvent.groupHash,
348434
'payload.user.id': eventUser.id,
@@ -377,7 +463,7 @@ export default class GrouperWorker extends Worker {
377463
* Returns finds event by query from project with passed ID
378464
*
379465
* @param projectId - project's identifier
380-
* @param groupHash - group hash of the event
466+
* @param groupHash - group hash of the event
381467
*/
382468
private async getEvent(projectId: string, groupHash: string): Promise<GroupedEventDBScheme> {
383469
if (!mongodb.ObjectID.isValid(projectId)) {
@@ -387,7 +473,7 @@ export default class GrouperWorker extends Worker {
387473
const eventCacheKey = await this.getEventCacheKey(projectId, groupHash);
388474

389475
return this.cache.get(eventCacheKey, async () => {
390-
return this.db.getConnection()
476+
return this.eventsDb.getConnection()
391477
.collection(`events:${projectId}`)
392478
.findOne({
393479
groupHash,
@@ -400,12 +486,13 @@ export default class GrouperWorker extends Worker {
400486

401487
/**
402488
* Method that returns event cache key based on projectId and groupHash
489+
*
403490
* @param projectId - used for cache key creation
404491
* @param groupHash - used for cache key creation
405-
* @returns cache key
492+
* @returns {string} cache key for event
406493
*/
407494
private async getEventCacheKey(projectId: string, groupHash: string): Promise<string> {
408-
return `${projectId}:${JSON.stringify({groupHash: groupHash})}`
495+
return `${projectId}:${JSON.stringify({ groupHash: groupHash })}`;
409496
}
410497

411498
/**
@@ -421,7 +508,7 @@ export default class GrouperWorker extends Worker {
421508
throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed');
422509
}
423510

424-
const collection = this.db.getConnection().collection(`events:${projectId}`);
511+
const collection = this.eventsDb.getConnection().collection(`events:${projectId}`);
425512

426513
encodeUnsafeFields(groupedEventData);
427514

@@ -441,7 +528,7 @@ export default class GrouperWorker extends Worker {
441528
}
442529

443530
try {
444-
const collection = this.db.getConnection().collection(`repetitions:${projectId}`);
531+
const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`);
445532

446533
encodeUnsafeFields(repetition);
447534

@@ -480,7 +567,7 @@ export default class GrouperWorker extends Worker {
480567
},
481568
};
482569

483-
return (await this.db.getConnection()
570+
return (await this.eventsDb.getConnection()
484571
.collection(`events:${projectId}`)
485572
.updateOne(query, updateQuery)).modifiedCount;
486573
} catch (err) {
@@ -512,7 +599,7 @@ export default class GrouperWorker extends Worker {
512599
try {
513600
const midnight = this.getMidnightByEventTimestamp(eventTimestamp);
514601

515-
await this.db.getConnection()
602+
await this.eventsDb.getConnection()
516603
.collection(`dailyEvents:${projectId}`)
517604
.updateOne(
518605
{

0 commit comments

Comments
 (0)