Skip to content

Commit 6f325d8

Browse files
authored
fix(./class/eventManagement/dispatcher/transaction-handler): remove ping transactions (#307)
* fix(./class/eventManagement/dispatcher/transaction-handler): remove ping transactions * refactor(./class/eventManagement/dispatcher/Transaction-handler): fix shared options * test(): updated UT
1 parent 76032b5 commit 6f325d8

File tree

4 files changed

+36
-17
lines changed

4 files changed

+36
-17
lines changed

src/class/eventManagement/dispatcher.class.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,12 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
778778
}
779779
} as Transaction<"incomer">);
780780

781-
await this.incomerStore.updateIncomerState(origin);
781+
try {
782+
await this.incomerStore.updateIncomerState(origin);
783+
}
784+
catch {
785+
// Do Nothing
786+
}
782787

783788
this.#logger.info(this.#standardLogFn(
784789
Object.assign({}, logData, {

src/class/eventManagement/dispatcher/transaction-handler.class.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ interface DistributeMainTransactionOptions {
4343
interface ResolveTransactions {
4444
incomers: Set<RegisteredIncomer>;
4545
backupIncomerTransactions: Transactions<"incomer">;
46-
dispatcherTransactions: Transactions<"dispatcher">;
4746
}
4847

4948
interface FindISOIncomerOptions {
@@ -151,10 +150,9 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
151150
this.backupDispatcherTransactionStore.getTransactions()
152151
]);
153152

154-
let options = {
153+
let sharedOptions = {
155154
incomers,
156-
backupIncomerTransactions,
157-
dispatcherTransactions
155+
backupIncomerTransactions
158156
};
159157

160158
const mappedBackupDispatcherTransactions = new Map([...backupDispatcherTransactions.entries()]
@@ -167,10 +165,12 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
167165
return [id, formatted];
168166
}));
169167

170-
options = await this.handleBackupIncomerTransactions(options);
168+
sharedOptions = await this.handleBackupIncomerTransactions(sharedOptions);
169+
170+
await this.resolvePingTransactions(dispatcherTransactions);
171171

172172
await this.resolveMainTransactions({
173-
...options,
173+
...sharedOptions,
174174
dispatcherTransactions: new Map([...dispatcherTransactions, ...mappedBackupDispatcherTransactions])
175175
});
176176
}
@@ -414,7 +414,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
414414
}
415415

416416
private async handleBackupIncomerTransactions(options: ResolveTransactions): Promise<ResolveTransactions> {
417-
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;
417+
const { incomers, backupIncomerTransactions } = options;
418418

419419
const toResolve = [];
420420

@@ -437,7 +437,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
437437

438438
await Promise.all(toResolve);
439439

440-
return { incomers, backupIncomerTransactions, dispatcherTransactions };
440+
return { incomers, backupIncomerTransactions };
441441
}
442442

443443
private findISOIncomer(options: FindISOIncomerOptions) : RegisteredIncomer | undefined {
@@ -474,7 +474,22 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
474474
]);
475475
}
476476

477-
private async resolveMainTransactions(options: ResolveTransactions) {
477+
private async resolvePingTransactions(dispatcherTransactions: Transactions<"dispatcher">) {
478+
for (const dispatcherTransaction of dispatcherTransactions.values()) {
479+
if (dispatcherTransaction.name === "PING" && dispatcherTransaction.redisMetadata.resolved) {
480+
try {
481+
await this.incomerStore.updateIncomerState(dispatcherTransaction.redisMetadata.to);
482+
}
483+
catch {
484+
// Do Nothing
485+
}
486+
487+
await this.dispatcherTransactionStore.deleteTransaction(dispatcherTransaction.redisMetadata.transactionId);
488+
}
489+
}
490+
}
491+
492+
private async resolveMainTransactions(options: ResolveTransactions & { dispatcherTransactions: Transactions<"dispatcher">}) {
478493
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;
479494

480495
const toResolve = [];

test/UT/class/eventManagement/dispatcher.spec.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,7 @@ describe("Dispatcher", () => {
365365

366366
const pingTransaction = await dispatcherTransactionStore.getTransactionById(pingTransactionId);
367367

368-
expect(pingTransaction).toEqual({
369-
...pingTransaction,
370-
redisMetadata: {
371-
...pingTransaction?.redisMetadata,
372-
resolved: true
373-
}
374-
});
368+
expect(pingTransaction).toEqual(null);
375369
expect(mockedCheckLastActivity).toHaveBeenCalled();
376370
});
377371
});

test/UT/class/eventManagement/handle-inactive-with-backup.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ describe("Publishing/exploiting a custom event & inactive incomer", () => {
168168
// Do nothing
169169
});
170170

171+
jest.spyOn(concernedIncomer as any, "handlePing")
172+
.mockImplementation(async(opts: any) => {
173+
// Do nothing
174+
});
175+
171176
secondConcernedIncomer = new Incomer({
172177
redis,
173178
subscriber,

0 commit comments

Comments
 (0)