Skip to content

Commit

Permalink
fix rocksmq consumer register not concurrent safe
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Feb 13, 2025
1 parent f391ea1 commit 352cae8
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pkg/mq/mqimpl/rocksmq/client/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (c *client) consume(consumer *consumer) {
case _, ok := <-newIncomingMsgCh:
if !ok {
// consumer MsgMutex closed, goroutine exit
log.Info("Consumer MsgMutex closed")
log.Info("Consumer MsgMutex closed", zap.String("topic", consumer.topic), zap.String("groupName", consumer.consumerName))
return
}
case <-timerNotify:
Expand Down
171 changes: 113 additions & 58 deletions pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,96 @@ func checkRetention() bool {

var topicMu = sync.Map{}

type consumerList struct {
list []*Consumer
mu sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
l.mu.Lock()
defer l.mu.Unlock()

for _, c := range l.list {
if c.GroupName == consumer.GroupName {
return
}
}
l.list = append(l.list, consumer)
}

func (l *consumerList) Remove(groupName string) *Consumer {
l.mu.Lock()
defer l.mu.Unlock()
for idx, c := range l.list {
if c.GroupName == groupName {
l.list = append(l.list[:idx], l.list[idx+1:]...)
return c
}
}
return nil
}

func (l *consumerList) Get(groupName string) *Consumer {
l.mu.RLock()
defer l.mu.RUnlock()

for _, consumer := range l.list {
if consumer.GroupName == groupName {
return consumer
}
}
return nil
}

func (l *consumerList) Notify() {
l.mu.RLock()
defer l.mu.RUnlock()

for _, v := range l.list {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}

func (l *consumerList) Len() int {
return len(l.list)
}

func (l *consumerList) Range(fn func(*Consumer) bool) bool {
l.mu.RLock()
defer l.mu.RUnlock()

for _, consumer := range l.list {
if !fn(consumer) {
return false
}
}
return true
}

// fetch consumer list
// unsafe, only use after close mq
func (l *consumerList) Collect() []*Consumer {
return l.list
}

func newConsumerList() *consumerList {
return &consumerList{
list: make([]*Consumer, 0),
mu: sync.RWMutex{},
}
}

type rocksmq struct {
store *gorocksdb.DB
cfh []*gorocksdb.ColumnFamilyHandle
kv kv.BaseKV
storeMu *sync.Mutex
consumers sync.Map
consumers sync.Map // map topic -> consumer list
consumersID sync.Map

retentionInfo *retentionInfo
Expand Down Expand Up @@ -323,7 +407,7 @@ func (rmq *rocksmq) Close() {
rmq.consumers.Range(func(k, v interface{}) bool {
// TODO what happened if the server crashed? who handled the destroy consumer group? should we just handled it when rocksmq created?
// or we should not even make consumer info persistent?
for _, consumer := range v.([]*Consumer) {
for _, consumer := range v.(*consumerList).Collect() {
err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName)
if err != nil {
log.Ctx(rmq.ctx).Warn("Failed to destroy consumer group in rocksmq!", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName), zap.Error(err))
Expand All @@ -344,21 +428,23 @@ func (rmq *rocksmq) Info() bool {
rtn := true
rmq.consumers.Range(func(key, vals interface{}) bool {
topic, _ := key.(string)
consumerList, _ := vals.([]*Consumer)
consumerList, _ := vals.(*consumerList)

minConsumerPosition := UniqueID(-1)
minConsumerGroupName := ""
for _, consumer := range consumerList {
consumerPosition, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName)

consumerList.Range(func(c *Consumer) bool {
consumerPosition, ok := rmq.getCurrentID(c.Topic, c.GroupName)
if !ok {
log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
continue
log.Error("some group not regist", zap.String("topic", c.Topic), zap.String("groupName", c.GroupName))
return true
}
if minConsumerPosition == UniqueID(-1) || consumerPosition < minConsumerPosition {
minConsumerPosition = consumerPosition
minConsumerGroupName = consumer.GroupName
minConsumerGroupName = c.GroupName
}
}
return true
})

pageTsSizeKey := constructKey(PageTsTitle, topic)
pages, _, err := rmq.kv.LoadWithPrefix(context.TODO(), pageTsSizeKey)
Expand All @@ -378,7 +464,7 @@ func (rmq *rocksmq) Info() bool {

log.Info("Rocksmq Info",
zap.String("topic", topic),
zap.Int("consumer num", len(consumerList)),
zap.Int("consumer num", consumerList.Len()),
zap.String("min position group names", minConsumerGroupName),
zap.Int64("min positions", minConsumerPosition),
zap.Int("page sum", len(pages)),
Expand Down Expand Up @@ -422,6 +508,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {

topicMu.LoadOrStore(topicName, new(sync.Mutex))

rmq.consumers.Store(topicName, newConsumerList())
// msgSizeKey -> msgSize
// topicIDKey -> topic creating time
kvs := make(map[string]string)
Expand Down Expand Up @@ -514,12 +601,9 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons
key := constructCurrentID(topicName, groupName)
_, ok := rmq.consumersID.Load(key)
if ok {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
return true, v, nil
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
c := val.(*consumerList).Get(groupName)
return c != nil, c, nil
}
}
return false, nil, nil
Expand Down Expand Up @@ -554,21 +638,13 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
defer mu.Unlock()

start := time.Now()
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == consumer.GroupName {
return nil
}
}
consumers := vals.([]*Consumer)
consumers = append(consumers, consumer)
rmq.consumers.Store(consumer.Topic, consumers)
if val, ok := rmq.consumers.Load(consumer.Topic); ok {
val.(*consumerList).Add(consumer)
} else {
consumers := make([]*Consumer, 1)
consumers[0] = consumer
rmq.consumers.Store(consumer.Topic, consumers)
log.Warn("create consumer for unknown topic", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName))
return fmt.Errorf("create consumer for unknown topic: %s, group: %s", consumer.Topic, consumer.GroupName)
}
log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds()))
log.Ctx(rmq.ctx).Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.String("group", consumer.GroupName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}

Expand Down Expand Up @@ -608,15 +684,10 @@ func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) er
key := constructCurrentID(topicName, groupName)
rmq.consumersID.Delete(key)
rmq.topicName2LatestMsgID.Delete(topicName)
if vals, ok := rmq.consumers.Load(topicName); ok {
consumers := vals.([]*Consumer)
for index, v := range consumers {
if v.GroupName == groupName {
close(v.MsgMutex)
consumers = append(consumers[:index], consumers[index+1:]...)
rmq.consumers.Store(topicName, consumers)
break
}
if val, ok := rmq.consumers.Load(topicName); ok {
c := val.(*consumerList).Remove(groupName)
if c != nil {
close(c.MsgMutex)
}
}
log.Ctx(rmq.ctx).Debug("Rocksmq destroy consumer group successfully ", zap.String("topic", topicName),
Expand Down Expand Up @@ -676,15 +747,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
val.(*consumerList).Notify()
}

// Update message page info
Expand Down Expand Up @@ -1055,17 +1119,8 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {

// Notify sends a mutex in MsgMutex channel to tell consumers to consume
func (rmq *rocksmq) Notify(topicName, groupName string) {
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
if v.GroupName == groupName {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
if val, ok := rmq.consumers.Load(topicName); ok {
val.(*consumerList).Notify()
}
}

Expand Down

0 comments on commit 352cae8

Please sign in to comment.