Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Feb 13, 2025
1 parent 352cae8 commit c39b543
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {

topicMu.LoadOrStore(topicName, new(sync.Mutex))

rmq.consumers.Store(topicName, newConsumerList())
rmq.consumers.LoadOrStore(topicName, newConsumerList())
// msgSizeKey -> msgSize
// topicIDKey -> topic creating time
kvs := make(map[string]string)
Expand Down Expand Up @@ -638,12 +638,13 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
defer mu.Unlock()

start := time.Now()
if val, ok := rmq.consumers.Load(consumer.Topic); ok {
val.(*consumerList).Add(consumer)
} else {
log.Warn("create consumer for unknown topic", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName))
return fmt.Errorf("create consumer for unknown topic: %s, group: %s", consumer.Topic, consumer.GroupName)

val, ok := rmq.consumers.LoadOrStore(consumer.Topic, newConsumerList())
if !ok {
log.Warn("create consumer for topic not exist", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName))
}
val.(*consumerList).Add(consumer)

log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
Expand Down

0 comments on commit c39b543

Please sign in to comment.