A structured Kafka integration for NestJS inspired by BullMQ, offering decorators, modular configuration, and a scalable consumer/producer architecture.
npm install kafka-nestjs
- 🎯 Decorator-based Kafka consumer configuration
- 🔄 Dynamic listener service for automatic consumer registration
- 📤 Producer service for sending messages
- ⚙️ Flexible configuration options (sync and async)
- 🔌 Global module support
- 🏗️ Built on top of KafkaJS
import { KafkaModule } from 'kafka-nestjs';
@Module({
imports: [
KafkaModule.forRoot({
clientId: 'my-app',
brokers: ['localhost:9092'],
}),
],
})
export class AppModule {}
import { KafkaConsumer } from 'kafka-nestjs';
@Controller()
export class MyController {
@KafkaConsumer({
subscribe: {
topics: ['example.event.stage'],
fromBeginning: true
},
consumerConfig: {
groupId: 'example.group.id'
}
})
async handleMessage(message: any) {
console.log('Received message:', message);
}
@KafkaConsumer({
subscribe: {
topics: ['example.event.stage'],
fromBeginning: true
},
consumerConfig: {
groupId: 'example.group.id'
}
})
async handleMessageWithPayload(message: any, payload: EachMessagePayload) {
console.log('Received message:', message);
}
}
import { KafkaProducerService } from 'kafka-nestjs';
@Controller()
export class MyController {
constructor(private readonly kafkaProducer: KafkaProducerService) {}
@Post('send')
async sendMessage() {
await this.kafkaProducer.send({
topic: 'my-topic',
messages: [{ value: 'Hello Kafka!' }],
});
}
}
KafkaModule.forRoot({
clientId: 'my-app',
brokers: ['localhost:9092'],
// ... other KafkaJS options
})
KafkaModule.forRootAsync({
useFactory: async (configService: ConfigService) => ({
clientId: configService.get('KAFKA_CLIENT_ID'),
brokers: configService.get('KAFKA_BROKERS'),
}),
inject: [ConfigService],
})
If you only need to produce messages:
import { KafkaModule } from 'kafka-nestjs';
import { Module } from '@nestjs/common';
import { ExampleService } from '.example.service';
@Module({
imports: [
KafkaModule.forProducer()
],
providers: [ExampleService],
exports: [ExampleService],
})
export class ExampleModule {}
import { KafkaProducerService } from 'kafka-nestjs';
export class ExampleService {
constructor(
private readonly kafkaProducerService: KafkaProducerService,
}{}
}
- @nestjs/common: ^11.1.2
- @nestjs/core: ^11.1.2
- kafkajs: ^2.2.4
MIT
Behrad Kazemi
Contributions are welcome! Please feel free to submit a Pull Request.