Skip to content

Commit

Permalink
feat: expose retry in connection option configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Hadi-E authored and raschan committed Jun 21, 2024
1 parent f8596c8 commit 931be8d
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 20 deletions.
13 changes: 13 additions & 0 deletions src/interface/queue-module-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,17 @@ export interface AMQPConnectionOptions {
* Connection options directly used by `rhea`
*/
connectionOptions?: ConnectionOptions;

/**
* Retry configuration for senders and receivers
*/
retryConnection?: {
receiver?: RetryConfig;
sender?: RetryConfig;
};
}

export interface RetryConfig {
retryDelay?: number;
maxRetryAttempts?: number;
}
95 changes: 95 additions & 0 deletions src/service/queue/queue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ describe('QueueService', () => {
createReceiver: jest.fn().mockResolvedValue(jest.fn().mockResolvedValue(new EventContextMock().receiver)),
createSender: jest.fn().mockResolvedValue(new EventContextMock().sender),
disconnect: jest.fn().mockResolvedValue(jest.fn()),
getConnectionOptions: jest.fn(() => ({
connectionUri: 'amqp://test',
retryConnection: {
receiver: {
retryDelay: 1000,
maxRetryAttempts: 3,
},
sender: {
retryDelay: 1000,
maxRetryAttempts: 3,
},
},
})),
getModuleOptions(): QueueModuleOptions {
return moduleOptions;
},
Expand Down Expand Up @@ -277,6 +290,47 @@ describe('QueueService', () => {
expect(result).toBeDefined();
expect(amqpService.createReceiver).toHaveBeenCalledTimes(2);
});

it('should not retry creating a receiver if maxRetryAttempts is 1', async () => {
(amqpService as any).getConnectionOptions.mockReturnValueOnce({
retryConnection: {
receiver: {
retryDelay: 1000,
maxRetryAttempts: 1,
},
},
});

const source = 'test-queue';
const messageHandler = jest.fn();

(amqpService as any).createReceiver.mockRejectedValue(new Error('Test error'));

await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error');

expect(amqpService.createReceiver).toHaveBeenCalledTimes(1);
});

it('should not retry if both retryDelay and maxRetryAttempts are set to zero', async () => {
(amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({
connectionUri: 'amqp://test',
retryConnection: {
receiver: {
retryDelay: 0,
maxRetryAttempts: 0,
},
},
});

const source = 'test-queue';
const messageHandler = jest.fn();

(amqpService.createReceiver as jest.Mock).mockRejectedValue(new Error('Test error'));

await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error');

expect(amqpService.createReceiver).toHaveBeenCalledTimes(1);
});
});
});

Expand Down Expand Up @@ -417,6 +471,47 @@ describe('QueueService', () => {
expect(result).toBeDefined();
expect(amqpService.createSender).toHaveBeenCalledTimes(2);
});

it('should retry creating a sender with custom retry configuration', async () => {
(amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({
connectionUri: 'amqp://test',
retryConnection: {
sender: {
retryDelay: 500,
maxRetryAttempts: 2,
},
},
});

const target = 'test-queue';

(amqpService.createSender as jest.Mock).mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender);

const result = await queueService['getSender'](target, 'default');

expect(result).toBeDefined();
expect(amqpService.createSender).toHaveBeenCalledTimes(2);
});

it('should not retry creating a sender if maxRetryAttempts is 1', async () => {
(amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({
connectionUri: 'amqp://test',
retryConnection: {
sender: {
retryDelay: 1000,
maxRetryAttempts: 1,
},
},
});

const target = 'test-queue';

(amqpService.createSender as jest.Mock).mockRejectedValue(new Error('Test error'));

await expect(queueService['getSender'](target, 'default')).rejects.toThrow('Test error');

expect(amqpService.createSender).toHaveBeenCalledTimes(1);
});
});

describe('removeListener()', () => {
Expand Down
68 changes: 48 additions & 20 deletions src/service/queue/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const toString = Object.prototype.toString;
export class QueueService {
private readonly receivers: Map<string, Receiver>;
private readonly senders: Map<string, AwaitableSender>;
private readonly reconnectDelay: number = 5000; // 5 seconds

constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) {
// this means only one sender and receiver / app / queue
Expand Down Expand Up @@ -332,22 +331,36 @@ export class QueueService {
connection: string,
): Promise<Receiver> {
const sourceToken = typeof source === 'string' ? source : JSON.stringify(source);

const receiverToken = this.getLinkToken(sourceToken, connection);

if (this.receivers.has(receiverToken)) {
return this.receivers.get(receiverToken);
}

try {
const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection);
this.receivers.set(receiverToken, receiver);
return receiver;
} catch (error) {
logger.error(`Error creating receiver: ${error.message}`, error.stack);
await sleep(this.reconnectDelay);
return this.getReceiver(source, credit, messageHandler, connection);
}
const connectionOptions = this.amqpService.getConnectionOptions(connection);
const retryDelay = connectionOptions.retryConnection?.receiver?.retryDelay ?? 0;
const maxRetryAttempts = connectionOptions.retryConnection?.receiver?.maxRetryAttempts ?? 1;

let attempt = 0;

do {
try {
const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection);
this.receivers.set(receiverToken, receiver);
return receiver;
} catch (error) {
logger.error(`Error creating receiver (attempt ${attempt + 1}): ${error.message}`, error.stack);

attempt = attempt + 1;
if (attempt >= maxRetryAttempts) {
throw new Error(`Max retry attempts reached for creating receiver: ${error.message}`);
}

if (retryDelay > 0) {
await sleep(retryDelay);
}
}
} while (attempt < maxRetryAttempts);
}

private async getSender(target: string, connection: string): Promise<AwaitableSender> {
Expand All @@ -357,15 +370,30 @@ export class QueueService {
return this.senders.get(senderToken);
}

try {
const sender = await this.amqpService.createSender(target, connection);
this.senders.set(senderToken, sender);
return sender;
} catch (error) {
logger.error(`Error creating sender: ${error.message}`, error.stack);
await sleep(this.reconnectDelay);
return this.getSender(target, connection);
}
const connectionOptions = this.amqpService.getConnectionOptions(connection);
const retryDelay = connectionOptions.retryConnection?.sender?.retryDelay ?? 0;
const maxRetryAttempts = connectionOptions.retryConnection?.sender?.maxRetryAttempts ?? 1;

let attempt = 0;

do {
try {
const sender = await this.amqpService.createSender(target, connection);
this.senders.set(senderToken, sender);
return sender;
} catch (error) {
logger.error(`Error creating sender (attempt ${attempt + 1}): ${error.message}`, error.stack);

attempt++;
if (attempt >= maxRetryAttempts) {
throw new Error(`Max retry attempts reached for creating sender: ${error.message}`);
}

if (retryDelay > 0) {
await sleep(retryDelay);
}
}
} while (attempt < maxRetryAttempts);
}

private encodeMessage(message: any): string {
Expand Down

0 comments on commit 931be8d

Please sign in to comment.