一个 Go 语言的 NSQ 消费者包装器,包含生产者和消费者的封装实现。
go get github.com/nevernet/go-nsqworker/v3
package main
import (
"log"
"os"
"os/signal"
"runtime"
"syscall"
"github.com/nsqio/go-nsq"
"github.com/nevernet/go-nsqworker/v3"
"your-project/handlers"
)
func main() {
var err error
defer func() {
if err != nil {
log.Fatalf("Error: %v", err)
}
}()
// 设置使用所有 CPU 核心
runtime.GOMAXPROCS(runtime.NumCPU())
// 初始化 handlers
handlers.Start()
// 配置信息
nsqConfig := &struct {
Addr string
LookupdAddr string
Topic string
ConsumerCount int
}{
Addr: "127.0.0.1:4150", // NSQ 地址
LookupdAddr: "127.0.0.1:4161", // NSQ Lookupd 地址
Topic: "test_topic", // 主题
ConsumerCount: 10, // 消费者数量
}
// 初始化生产者
var producer *nsqworker.Producer
producer, err = nsqworker.NewProducer(nsqConfig.Addr, nsq.NewConfig())
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
return
}
log.Printf("NSQ producer initialized")
// 初始化消费者配置
consumerConfig := nsqworker.NewConfig(
nsqConfig.Addr,
nsqConfig.LookupdAddr,
nsqConfig.Topic,
nsqConfig.ConsumerCount,
)
// 初始化消费者
var consumer *nsqworker.Consumer
consumer, err = nsqworker.NewConsumer(consumerConfig, nsq.NewConfig())
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
return
}
log.Printf("NSQ consumer initialized")
// 优雅退出
exitChan := make(chan struct{})
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
close(exitChan)
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-exitChan
// 停止服务
producer.Stop()
log.Printf("Producer stopped")
consumer.Stop()
log.Printf("Consumer stopped")
}
mkdir handlers
package handlers
// Start 初始化所有 handlers
// 这个方法会在 main 函数中被调用
func Start() {
// 这里可以添加一些初始化代码
}
package handlers
import (
"log"
"github.com/nsqio/go-nsq"
"github.com/nevernet/go-nsqworker/v3"
)
// MailHandler 邮件处理器
type MailHandler struct {
}
func init() {
// 注册处理器,最后一个参数是并发数
nsqworker.RegisterConcurrentHandler("mail", &MailHandler{}, 10)
}
// HandleMessage 实现消息处理接口
func (h *MailHandler) HandleMessage(message *nsq.Message) error {
log.Printf("Handler: [mail], Message: [%s]", string(message.Body))
return nil
}
Addr
: NSQ 服务器地址LookupdAddr
: NSQ Lookupd 服务器地址Topic
: 消息主题ConsumerCount
: 消费者数量
- 确保 NSQ 服务已经启动
- 根据实际情况调整配置参数
- 建议在生产环境中添加适当的错误处理和日志记录
- Handler 的并发数要根据实际业务需求来设置
MIT