Skip to content

Commit cda0351

Browse files
author
刘硕
committed
over
1 parent 7d33a12 commit cda0351

File tree

10 files changed

+26
-11
lines changed

10 files changed

+26
-11
lines changed

cmd/proxy/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,11 @@ Options:
218218

219219
switch {
220220
case dashboard != "":
221-
go AutoOnlineWithDashboard(s, dashboard)
221+
go AutoOnlineWithDashboard(s, dashboard) // 自动上线到dashboard
222222
case coordinator.name != "":
223-
go AutoOnlineWithCoordinator(s, coordinator.name, coordinator.addr, coordinator.auth)
223+
go AutoOnlineWithCoordinator(s, coordinator.name, coordinator.addr, coordinator.auth) // 自动注册到etcd 等...
224224
case slots != nil:
225-
go AutoOnlineWithFillSlots(s, slots)
225+
go AutoOnlineWithFillSlots(s, slots) //
226226
}
227227

228228
for !s.IsClosed() && !s.IsOnline() {

example/fe.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#!/usr/bin/env python3
22

3-
from utils import *
4-
53
import atexit
6-
import json
74
import datetime
5+
import json
6+
7+
from utils import *
88

99

1010
class CodisFE(Process):
@@ -16,7 +16,7 @@ def __init__(self, port, assets):
1616
self.command = "codis-fe --filesystem rootfs --listen 0.0.0.0:{} --assets-dir={}".format(self.port, assets)
1717
Process.__init__(self, self.command, self.logfile)
1818

19-
dict = {"pid": self.proc.pid, "assets": assets}
19+
dict = {"pid": self.proc.pid, "assets": assets, "port": self.port}
2020
print(" >> codis.fe = " + json.dumps(dict, sort_keys=True))
2121

2222

extern/redis/src/slots_async.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,13 +1262,15 @@ slotsmgrtExecWrapperCommand(client *c) {
12621262
addReplyError(c, "wrong number of arguments for SLOTSMGRT-EXEC-WRAPPER");
12631263
return;
12641264
}
1265+
// //查找命令是否存在
12651266
struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
12661267
if (cmd == NULL) {
12671268
addReplyLongLong(c, -1);
12681269
addReplyErrorFormat(c,"invalid command specified (%s)",
12691270
(char *)c->argv[2]->ptr);
12701271
return;
12711272
}
1273+
12721274
if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) {
12731275
addReplyLongLong(c, -1);
12741276
addReplyErrorFormat(c, "wrong number of arguments for command (%s)",
@@ -1280,6 +1282,7 @@ slotsmgrtExecWrapperCommand(client *c) {
12801282
addReplyError(c, "the specified key doesn't exist");
12811283
return;
12821284
}
1285+
//如果正在迁移并且当前命令是写命令则返回错误
12831286
if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) {
12841287
addReplyLongLong(c, 1);
12851288
addReplyError(c, "the specified key is being migrated");

pkg/proxy/forward.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, e
4949
s.id, hkey)
5050
return nil, ErrSlotIsNotReady
5151
}
52+
//如果正在迁移,查询这个key是否迁移完成
5253
if s.migrate.bc != nil && len(hkey) != 0 {
5354
if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
5455
log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",

pkg/proxy/proxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func (s *Proxy) rewatchSentinels(servers []string) {
340340
for !p.IsCanceled() {
341341
timeout := time.Minute * 15
342342
retryAt := time.Now().Add(time.Second * 10)
343-
if !p.Subscribe(servers, timeout, callback) { // ✅
343+
if !p.Subscribe(servers, timeout, callback) { // ✅ 订阅切主信息
344344
delayUntil(retryAt)
345345
} else {
346346
callback()
@@ -352,11 +352,13 @@ func (s *Proxy) rewatchSentinels(servers []string) {
352352
var success int
353353
for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ {
354354
timeout := time.Second * 5
355+
// 得到最新的Master
355356
masters, err := p.Masters(servers, timeout)
356357
if err != nil {
357358
log.WarnErrorf(err, "[%p] fetch group masters failed", s)
358359
} else {
359360
if !p.IsCanceled() {
361+
//切主
360362
s.SwitchMasters(masters)
361363
}
362364
success += 1

pkg/topom/topom.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (s *Topom) Start(routines bool) error {
232232
go func() {
233233
for !s.IsClosed() {
234234
if s.IsOnline() {
235-
if err := s.ProcessSyncAction(); err != nil { // ✅
235+
if err := s.ProcessSyncAction(); err != nil { // ✅ 同步,选主
236236
log.WarnErrorf(err, "process sync action failed")
237237
time.Sleep(time.Second * 5)
238238
}

pkg/topom/topom_action.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/CodisLabs/codis/pkg/utils/sync2"
1515
)
1616

17-
func (s *Topom) ProcessSlotAction() error {
17+
func (s *Topom) ProcessSlotAction() error { // 更新solt相应的数据
1818
for s.IsOnline() {
1919
var (
2020
marks = make(map[int]bool)
@@ -39,13 +39,15 @@ func (s *Topom) ProcessSlotAction() error {
3939
}
4040
var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
4141
for parallel > len(plans) {
42+
//状态转移在这里完成
4243
_, ok, err := s.SlotActionPrepareFilter(accept, update)
4344
if err != nil {
4445
return err
4546
} else if !ok {
4647
break
4748
}
4849
}
50+
// ActionPending =》ActionPreparing =》ActionPrepared => ActionMigrating => ActionFinished
4951
if len(plans) == 0 {
5052
return nil
5153
}
@@ -54,6 +56,7 @@ func (s *Topom) ProcessSlotAction() error {
5456
fut.Add()
5557
go func(sid int) {
5658
log.Warnf("slot-[%d] process action", sid)
59+
//重点,真正的数据迁移
5760
var err = s.processSlotAction(sid)
5861
if err != nil {
5962
status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
@@ -87,7 +90,7 @@ func (s *Topom) processSlotAction(sid int) error {
8790
return err
8891
}
8992
log.Debugf("slot-[%d] action executor %d", sid, n)
90-
93+
//迁移完成判断
9194
if n == 0 && nextdb == -1 {
9295
return s.SlotActionComplete(sid)
9396
}

pkg/topom/topom_sentinel.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,19 +194,23 @@ func (s *Topom) ResyncSentinels() error {
194194
}
195195

196196
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
197+
//移除所有Master
197198
if err := sentinel.RemoveGroupsAll(p.Servers, s.config.SentinelClientTimeout.Duration()); err != nil {
198199
log.WarnErrorf(err, "remove sentinels failed")
199200
}
201+
//监听Group
200202
if err := sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters()); err != nil {
201203
log.WarnErrorf(err, "resync sentinels failed")
202204
return err
203205
}
206+
//设置Group Master
204207
s.reWatchSentinels(p.Servers)
205208

206209
var fut sync2.Future
207210
for _, p := range ctx.proxy {
208211
fut.Add()
209212
go func(p *models.Proxy) {
213+
//通知Proxy更新
210214
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
211215
if err != nil {
212216
log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)

pkg/utils/usage_linux.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2016 CodisLabs. All Rights Reserved.
22
// Licensed under the MIT (MIT-LICENSE.txt) license.
33

4+
//go:build linux
45
// +build linux
56

67
package utils

pkg/utils/usage_rusage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2016 CodisLabs. All Rights Reserved.
22
// Licensed under the MIT (MIT-LICENSE.txt) license.
33

4+
//go:build !linux
45
// +build !linux
56

67
package utils

0 commit comments

Comments
 (0)