@@ -32,9 +32,14 @@ export default class GrouperWorker extends Worker {
32
32
public readonly type : string = pkg . workerType ;
33
33
34
34
/**
35
- * Database Controller
35
+ * Events database Controller
36
36
*/
37
- private db : DatabaseController = new DatabaseController ( process . env . MONGO_EVENTS_DATABASE_URI ) ;
37
+ private eventsDb : DatabaseController = new DatabaseController ( process . env . MONGO_EVENTS_DATABASE_URI ) ;
38
+
39
+ /**
40
+ * Accounts database Controller
41
+ */
42
+ private accountsDb : DatabaseController = new DatabaseController ( process . env . MONGO_ACCOUNTS_DATABASE_URI ) ;
38
43
39
44
/**
40
45
* This class will filter sensitive information
@@ -52,7 +57,8 @@ export default class GrouperWorker extends Worker {
52
57
public async start ( ) : Promise < void > {
53
58
console . log ( 'starting grouper worker' ) ;
54
59
55
- await this . db . connect ( ) ;
60
+ await this . eventsDb . connect ( ) ;
61
+ await this . accountsDb . connect ( ) ;
56
62
this . prepareCache ( ) ;
57
63
console . log ( 'redis initializing' ) ;
58
64
@@ -67,7 +73,8 @@ export default class GrouperWorker extends Worker {
67
73
public async finish ( ) : Promise < void > {
68
74
await super . finish ( ) ;
69
75
this . prepareCache ( ) ;
70
- await this . db . close ( ) ;
76
+ await this . eventsDb . close ( ) ;
77
+ await this . accountsDb . close ( ) ;
71
78
await this . redis . close ( ) ;
72
79
}
73
80
@@ -85,12 +92,13 @@ export default class GrouperWorker extends Worker {
85
92
let existedEvent = await this . getEvent ( task . projectId , uniqueEventHash ) ;
86
93
87
94
/**
88
- * If we couldn't group by group hash (title), try grouping by Levenshtein distance with last N events
95
+ * If we couldn't group by group hash (title), try grouping by Levenshtein distance or patterns
89
96
*/
90
97
if ( ! existedEvent ) {
91
98
const similarEvent = await this . findSimilarEvent ( task . projectId , task . event ) ;
92
99
93
100
if ( similarEvent ) {
101
+ this . logger . info ( `similar event: ${ JSON . stringify ( similarEvent ) } ` ) ;
94
102
/**
95
103
* Override group hash with found event's group hash
96
104
*/
@@ -226,7 +234,7 @@ export default class GrouperWorker extends Worker {
226
234
}
227
235
228
236
/**
229
- * Tries to find events with a small Levenshtein distance of a title
237
+ * Tries to find events with a small Levenshtein distance of a title or by matching grouping patterns
230
238
*
231
239
* @param projectId - where to find
232
240
* @param event - event to compare
@@ -237,23 +245,101 @@ export default class GrouperWorker extends Worker {
237
245
238
246
const lastUniqueEvents = await this . findLastEvents ( projectId , eventsCountToCompare ) ;
239
247
240
- return lastUniqueEvents . filter ( prevEvent => {
248
+ /**
249
+ * First try to find by Levenshtein distance
250
+ */
251
+ const similarByLevenshtein = lastUniqueEvents . filter ( prevEvent => {
241
252
const distance = levenshtein ( event . title , prevEvent . payload . title ) ;
242
253
const threshold = event . title . length * diffTreshold ;
243
254
244
255
return distance < threshold ;
245
256
} ) . pop ( ) ;
257
+
258
+ if ( similarByLevenshtein ) {
259
+ return similarByLevenshtein ;
260
+ }
261
+
262
+ /**
263
+ * If no match by Levenshtein, try matching by patterns
264
+ */
265
+ const patterns = await this . getProjectPatterns ( projectId ) ;
266
+
267
+ if ( patterns && patterns . length > 0 ) {
268
+ const matchingPattern = await this . findMatchingPattern ( patterns , event ) ;
269
+
270
+ if ( matchingPattern !== null ) {
271
+ const originalEvent = await this . cache . get ( `${ projectId } :${ matchingPattern } :originalEvent` , async ( ) => {
272
+ return await this . eventsDb . getConnection ( )
273
+ . collection ( `events:${ projectId } ` )
274
+ . findOne (
275
+ { 'payload.title' : { $regex : matchingPattern } } ,
276
+ { sort : { _id : 1 } }
277
+ ) ;
278
+ } ) ;
279
+
280
+ this . logger . info ( `original event for pattern: ${ JSON . stringify ( originalEvent ) } ` ) ;
281
+
282
+ if ( originalEvent ) {
283
+ return originalEvent ;
284
+ }
285
+ }
286
+ }
287
+
288
+ return undefined ;
289
+ }
290
+
291
+ /**
292
+ * Method that returns matched pattern for event, if event do not match any of patterns return null
293
+ *
294
+ * @param patterns - list of the patterns of the related project
295
+ * @param event - event which title would be cheched
296
+ * @returns {string | null } matched pattern or null if no match
297
+ */
298
+ private async findMatchingPattern ( patterns : string [ ] , event : EventDataAccepted < EventAddons > ) : Promise < string | null > {
299
+ if ( ! patterns || patterns . length === 0 ) {
300
+ return null ;
301
+ }
302
+
303
+ return patterns . filter ( pattern => {
304
+ const patternRegExp = new RegExp ( pattern ) ;
305
+
306
+ return event . title . match ( patternRegExp ) ;
307
+ } ) . pop ( ) || null ;
308
+ }
309
+
310
+ /**
311
+ * Method that gets event patterns for a project
312
+ *
313
+ * @param projectId - id of the project to find related event patterns
314
+ * @returns {string[] } EventPatterns object with projectId and list of patterns
315
+ */
316
+ private async getProjectPatterns ( projectId : string ) : Promise < string [ ] > {
317
+ return this . cache . get ( `project:${ projectId } :patterns` , async ( ) => {
318
+ const project = await this . accountsDb . getConnection ( )
319
+ . collection ( 'projects' )
320
+ . findOne ( {
321
+ _id : new mongodb . ObjectId ( projectId ) ,
322
+ } ) ;
323
+
324
+ return project ?. eventGroupingPatterns || [ ] ;
325
+ } ,
326
+ /**
327
+ * Cache project patterns for 5 minutes since they don't change frequently
328
+ */
329
+ /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */
330
+ 5 * TimeMs . MINUTE / MS_IN_SEC ) ;
246
331
}
247
332
248
333
/**
249
334
* Returns last N unique events by a project id
250
335
*
251
336
* @param projectId - where to find
252
337
* @param count - how many events to return
338
+ * @returns {GroupedEventDBScheme[] } list of the last N unique events
253
339
*/
254
340
private findLastEvents ( projectId : string , count ) : Promise < GroupedEventDBScheme [ ] > {
255
341
return this . cache . get ( `last:${ count } :eventsOf:${ projectId } ` , async ( ) => {
256
- return this . db . getConnection ( )
342
+ return this . eventsDb . getConnection ( )
257
343
. collection ( `events:${ projectId } ` )
258
344
. find ( )
259
345
. sort ( {
@@ -308,7 +394,7 @@ export default class GrouperWorker extends Worker {
308
394
*/
309
395
const repetitionCacheKey = `repetitions:${ task . projectId } :${ existedEvent . groupHash } :${ eventUser . id } ` ;
310
396
const repetition = await this . cache . get ( repetitionCacheKey , async ( ) => {
311
- return this . db . getConnection ( ) . collection ( `repetitions:${ task . projectId } ` )
397
+ return this . eventsDb . getConnection ( ) . collection ( `repetitions:${ task . projectId } ` )
312
398
. findOne ( {
313
399
groupHash : existedEvent . groupHash ,
314
400
'payload.user.id' : eventUser . id ,
@@ -342,7 +428,7 @@ export default class GrouperWorker extends Worker {
342
428
*/
343
429
const repetitionDailyCacheKey = `repetitions:${ task . projectId } :${ existedEvent . groupHash } :${ eventUser . id } :${ eventMidnight } ` ;
344
430
const repetitionDaily = await this . cache . get ( repetitionDailyCacheKey , async ( ) => {
345
- return this . db . getConnection ( ) . collection ( `repetitions:${ task . projectId } ` )
431
+ return this . eventsDb . getConnection ( ) . collection ( `repetitions:${ task . projectId } ` )
346
432
. findOne ( {
347
433
groupHash : existedEvent . groupHash ,
348
434
'payload.user.id' : eventUser . id ,
@@ -377,7 +463,7 @@ export default class GrouperWorker extends Worker {
377
463
* Returns finds event by query from project with passed ID
378
464
*
379
465
* @param projectId - project's identifier
380
- * @param groupHash - group hash of the event
466
+ * @param groupHash - group hash of the event
381
467
*/
382
468
private async getEvent ( projectId : string , groupHash : string ) : Promise < GroupedEventDBScheme > {
383
469
if ( ! mongodb . ObjectID . isValid ( projectId ) ) {
@@ -387,7 +473,7 @@ export default class GrouperWorker extends Worker {
387
473
const eventCacheKey = await this . getEventCacheKey ( projectId , groupHash ) ;
388
474
389
475
return this . cache . get ( eventCacheKey , async ( ) => {
390
- return this . db . getConnection ( )
476
+ return this . eventsDb . getConnection ( )
391
477
. collection ( `events:${ projectId } ` )
392
478
. findOne ( {
393
479
groupHash,
@@ -400,12 +486,13 @@ export default class GrouperWorker extends Worker {
400
486
401
487
/**
402
488
* Method that returns event cache key based on projectId and groupHash
489
+ *
403
490
* @param projectId - used for cache key creation
404
491
* @param groupHash - used for cache key creation
405
- * @returns cache key
492
+ * @returns { string } cache key for event
406
493
*/
407
494
private async getEventCacheKey ( projectId : string , groupHash : string ) : Promise < string > {
408
- return `${ projectId } :${ JSON . stringify ( { groupHash : groupHash } ) } `
495
+ return `${ projectId } :${ JSON . stringify ( { groupHash : groupHash } ) } ` ;
409
496
}
410
497
411
498
/**
@@ -421,7 +508,7 @@ export default class GrouperWorker extends Worker {
421
508
throw new ValidationError ( 'Controller.saveEvent: Project ID is invalid or missed' ) ;
422
509
}
423
510
424
- const collection = this . db . getConnection ( ) . collection ( `events:${ projectId } ` ) ;
511
+ const collection = this . eventsDb . getConnection ( ) . collection ( `events:${ projectId } ` ) ;
425
512
426
513
encodeUnsafeFields ( groupedEventData ) ;
427
514
@@ -441,7 +528,7 @@ export default class GrouperWorker extends Worker {
441
528
}
442
529
443
530
try {
444
- const collection = this . db . getConnection ( ) . collection ( `repetitions:${ projectId } ` ) ;
531
+ const collection = this . eventsDb . getConnection ( ) . collection ( `repetitions:${ projectId } ` ) ;
445
532
446
533
encodeUnsafeFields ( repetition ) ;
447
534
@@ -480,7 +567,7 @@ export default class GrouperWorker extends Worker {
480
567
} ,
481
568
} ;
482
569
483
- return ( await this . db . getConnection ( )
570
+ return ( await this . eventsDb . getConnection ( )
484
571
. collection ( `events:${ projectId } ` )
485
572
. updateOne ( query , updateQuery ) ) . modifiedCount ;
486
573
} catch ( err ) {
@@ -512,7 +599,7 @@ export default class GrouperWorker extends Worker {
512
599
try {
513
600
const midnight = this . getMidnightByEventTimestamp ( eventTimestamp ) ;
514
601
515
- await this . db . getConnection ( )
602
+ await this . eventsDb . getConnection ( )
516
603
. collection ( `dailyEvents:${ projectId } ` )
517
604
. updateOne (
518
605
{
0 commit comments