diff --git a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go index c259ffc4452ba..caed2e9e4ce14 100644 --- a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -508,7 +508,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { topicMu.LoadOrStore(topicName, new(sync.Mutex)) - rmq.consumers.Store(topicName, newConsumerList()) + rmq.consumers.LoadOrStore(topicName, newConsumerList()) // msgSizeKey -> msgSize // topicIDKey -> topic creating time kvs := make(map[string]string) @@ -638,12 +638,13 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error { defer mu.Unlock() start := time.Now() - if val, ok := rmq.consumers.Load(consumer.Topic); ok { - val.(*consumerList).Add(consumer) - } else { - log.Warn("create consumer for unknown topic", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName)) - return fmt.Errorf("create consumer for unknown topic: %s, group: %s", consumer.Topic, consumer.GroupName) + + val, ok := rmq.consumers.LoadOrStore(consumer.Topic, newConsumerList()) + if !ok { + log.Warn("create consumer for topic not exist", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName)) } + val.(*consumerList).Add(consumer) + log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil }