Skip to content

Commit f47f2a8

Browse files
committed
optimize log
1 parent ab4c347 commit f47f2a8

File tree

7 files changed

+54
-54
lines changed

7 files changed

+54
-54
lines changed

cmd/shardsvr/shardsvr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func main() {
6767
svr_peer_map[i] = addr
6868
}
6969

70-
shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, node_id, gid, os.Args[3])
70+
shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, int64(node_id), gid, os.Args[3])
7171
lis, err := net.Listen("tcp", svr_peer_map[node_id])
7272
if err != nil {
7373
fmt.Printf("failed to listen: %v", err)

metaserver/metaserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func MakeMetaServer(peerMaps map[int]string, nodeId int) *MetaServer {
6565
newdb_eng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId))
6666
logdb_eng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId))
6767

68-
newRf := raftcore.MakeRaft(client_ends, nodeId, logdb_eng, newApplyCh, 50, 150)
68+
newRf := raftcore.MakeRaft(client_ends, int64(nodeId), logdb_eng, newApplyCh, 50, 150)
6969
meta_server := &MetaServer{
7070
Rf: newRf,
7171
applyCh: newApplyCh,

raftcore/raft.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func NodeToString(role NodeRole) string {
6161
type Raft struct {
6262
mu sync.RWMutex
6363
peers []*RaftClientEnd // rpc client end
64-
me_ int
64+
id int64
6565
dead int32
6666
applyCh chan *pb.ApplyMsg
6767
applyCond *sync.Cond
@@ -85,10 +85,10 @@ type Raft struct {
8585
baseElecTimeout uint64
8686
}
8787

88-
func MakeRaft(peers []*RaftClientEnd, me int, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
88+
func MakeRaft(peers []*RaftClientEnd, me int64, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
8989
rf := &Raft{
9090
peers: peers,
91-
me_: me,
91+
id: me,
9292
dead: 0,
9393
applyCh: applyCh,
9494
replicatorCond: make([]*sync.Cond, len(peers)),
@@ -107,13 +107,12 @@ func MakeRaft(peers []*RaftClientEnd, me int, newdbEng storage_eng.KvStore, appl
107107
heartBeatTimeout: heartbeatTimeOutMs,
108108
}
109109
rf.curTerm, rf.votedFor = rf.persister.ReadRaftState()
110-
rf.ReInitLog()
111110
rf.applyCond = sync.NewCond(&rf.mu)
112111
last_log := rf.logs.GetLast()
113112
for _, peer := range peers {
114113
logger.ELogger().Sugar().Debugf("peer addr %s id %d", peer.addr, peer.id)
115114
rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(last_log.Index+1)
116-
if int(peer.id) != me {
115+
if int64(peer.id) != me {
117116
rf.replicatorCond[peer.id] = sync.NewCond(&sync.Mutex{})
118117
go rf.Replicator(peer)
119118
}
@@ -156,7 +155,7 @@ func (rf *Raft) SwitchRaftNodeRole(role NodeRole) {
156155
case NodeRoleLeader:
157156
// become leader,set replica (matchIdx and nextIdx) processs table
158157
lastLog := rf.logs.GetLast()
159-
rf.leaderId = int64(rf.me_)
158+
rf.leaderId = int64(rf.id)
160159
for i := 0; i < len(rf.peers); i++ {
161160
rf.matchIdx[i], rf.nextIdx[i] = 0, int(lastLog.Index+1)
162161
}
@@ -201,7 +200,7 @@ func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVo
201200

202201
last_log := rf.logs.GetLast()
203202

204-
if !(req.LastLogTerm > int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex >= last_log.Index)) {
203+
if req.LastLogTerm < int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex < last_log.Index) {
205204
resp.Term, resp.VoteGranted = rf.curTerm, false
206205
return
207206
}
@@ -244,7 +243,7 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
244243
if req.PrevLogIndex < int64(rf.logs.GetFirst().Index) {
245244
resp.Term = 0
246245
resp.Success = false
247-
logger.ELogger().Sugar().Debugf("peer %d reject append entires request from %d", rf.me_, req.LeaderId)
246+
logger.ELogger().Sugar().Debugf("peer %d reject append entires request from %d", rf.id, req.LeaderId)
248247
return
249248
}
250249

@@ -295,7 +294,7 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int,
295294
if lastIncludedIndex > int(rf.logs.GetLast().Index) {
296295
rf.logs.ReInitLogs()
297296
} else {
298-
rf.logs.EraseBeforeWithDel(int64(lastIncludedIndex) - rf.logs.GetFirst().Index)
297+
rf.logs.EraseBefore(int64(lastIncludedIndex)-rf.logs.GetFirst().Index, true)
299298
rf.logs.SetEntFirstData([]byte{})
300299
}
301300
// update dummy entry with lastIncludedTerm and lastIncludedIndex
@@ -318,7 +317,7 @@ func (rf *Raft) Snapshot(index int, snapshot []byte) {
318317
logger.ELogger().Sugar().Warnf("reject snapshot, current snapshotIndex is larger in cur term")
319318
return
320319
}
321-
rf.logs.EraseBeforeWithDel(int64(index) - int64(snapshot_index))
320+
rf.logs.EraseBefore(int64(index)-int64(snapshot_index), true)
322321
rf.logs.SetEntFirstData([]byte{})
323322
logger.ELogger().Sugar().Debugf("del log entry before idx %d", index)
324323
rf.isSnapshoting = false
@@ -377,10 +376,13 @@ func (rf *Raft) GetLogCount() int {
377376
func (rf *Raft) advanceCommitIndexForLeader() {
378377
sort.Ints(rf.matchIdx)
379378
n := len(rf.matchIdx)
379+
// [18 18 '19 19 20] majority replicate log index 19
380+
// [18 '18 19] majority replicate log index 18
381+
// [18 '18 19 20] majority replicate log index 18
380382
new_commit_index := rf.matchIdx[n-(n/2+1)]
381383
if new_commit_index > int(rf.commitIdx) {
382384
if rf.MatchLog(rf.curTerm, int64(new_commit_index)) {
383-
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.me_, rf.commitIdx, rf.curTerm)
385+
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
384386
rf.commitIdx = int64(new_commit_index)
385387
rf.applyCond.Signal()
386388
}
@@ -390,7 +392,7 @@ func (rf *Raft) advanceCommitIndexForLeader() {
390392
func (rf *Raft) advanceCommitIndexForFollower(leaderCommit int) {
391393
new_commit_index := Min(leaderCommit, int(rf.logs.GetLast().Index))
392394
if new_commit_index > int(rf.commitIdx) {
393-
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.me_, rf.commitIdx, rf.curTerm)
395+
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
394396
rf.commitIdx = int64(new_commit_index)
395397
rf.applyCond.Signal()
396398
}
@@ -403,19 +405,19 @@ func (rf *Raft) MatchLog(term, index int64) bool {
403405

404406
// Election make a new election
405407
func (rf *Raft) Election() {
406-
logger.ELogger().Sugar().Debugf("%d start election ", rf.me_)
408+
logger.ELogger().Sugar().Debugf("%d start election ", rf.id)
407409

408410
rf.IncrGrantedVotes()
409-
rf.votedFor = int64(rf.me_)
411+
rf.votedFor = int64(rf.id)
410412
vote_req := &pb.RequestVoteRequest{
411413
Term: rf.curTerm,
412-
CandidateId: int64(rf.me_),
414+
CandidateId: int64(rf.id),
413415
LastLogIndex: int64(rf.logs.GetLast().Index),
414416
LastLogTerm: int64(rf.logs.GetLast().Term),
415417
}
416418
rf.PersistRaftState()
417419
for _, peer := range rf.peers {
418-
if int(peer.id) == rf.me_ {
420+
if int64(peer.id) == rf.id {
419421
continue
420422
}
421423
go func(peer *RaftClientEnd) {
@@ -434,7 +436,7 @@ func (rf *Raft) Election() {
434436
// success granted the votes
435437
rf.IncrGrantedVotes()
436438
if rf.grantedVotes > len(rf.peers)/2 {
437-
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.me_, rf.curTerm)
439+
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm)
438440
rf.SwitchRaftNodeRole(NodeRoleLeader)
439441
rf.BroadcastHeartbeat()
440442
rf.grantedVotes = 0
@@ -457,7 +459,7 @@ func (rf *Raft) Election() {
457459

458460
func (rf *Raft) BroadcastAppend() {
459461
for _, peer := range rf.peers {
460-
if peer.id == uint64(rf.me_) {
462+
if peer.id == uint64(rf.id) {
461463
continue
462464
}
463465
rf.replicatorCond[peer.id].Signal()
@@ -467,7 +469,7 @@ func (rf *Raft) BroadcastAppend() {
467469
// BroadcastHeartbeat broadcast heartbeat to peers
468470
func (rf *Raft) BroadcastHeartbeat() {
469471
for _, peer := range rf.peers {
470-
if int(peer.id) == rf.me_ {
472+
if int64(peer.id) == rf.id {
471473
continue
472474
}
473475
logger.ELogger().Sugar().Debugf("send heart beat to %s", peer.addr)
@@ -529,8 +531,8 @@ func (rf *Raft) Append(command []byte) *pb.Entry {
529531
Data: command,
530532
}
531533
rf.logs.Append(newLog)
532-
rf.matchIdx[rf.me_] = int(newLog.Index)
533-
rf.nextIdx[rf.me_] = int(newLog.Index) + 1
534+
rf.matchIdx[rf.id] = int(newLog.Index)
535+
rf.nextIdx[rf.id] = int(newLog.Index) + 1
534536
rf.PersistRaftState()
535537
return newLog
536538
}
@@ -568,7 +570,7 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
568570
first_log := rf.logs.GetFirst()
569571
snap_shot_req := &pb.InstallSnapshotRequest{
570572
Term: rf.curTerm,
571-
LeaderId: int64(rf.me_),
573+
LeaderId: int64(rf.id),
572574
LastIncludedIndex: first_log.Index,
573575
LastIncludedTerm: int64(first_log.Term),
574576
Data: rf.ReadSnapshot(),
@@ -604,11 +606,13 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
604606
} else {
605607
first_index := rf.logs.GetFirst().Index
606608
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
607-
entries := make([]*pb.Entry, len(rf.logs.EraseBefore(int64(prev_log_index)+1-first_index)))
608-
copy(entries, rf.logs.EraseBefore(int64(prev_log_index)+1-first_index))
609+
_, new_ents := rf.logs.EraseBefore(int64(prev_log_index)+1-first_index, false)
610+
entries := make([]*pb.Entry, len(new_ents))
611+
copy(entries, new_ents)
612+
609613
append_ent_req := &pb.AppendEntriesRequest{
610614
Term: rf.curTerm,
611-
LeaderId: int64(rf.me_),
615+
LeaderId: int64(rf.id),
612616
PrevLogIndex: int64(prev_log_index),
613617
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index) - first_index).Term),
614618
Entries: entries,
@@ -666,7 +670,7 @@ func (rf *Raft) Applier() {
666670
first_index, commit_index, last_applied := rf.logs.GetFirst().Index, rf.commitIdx, rf.lastApplied
667671
entries := make([]*pb.Entry, commit_index-last_applied)
668672
copy(entries, rf.logs.GetRange(last_applied+1-int64(first_index), commit_index+1-int64(first_index)))
669-
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.me_, rf.lastApplied, commit_index, rf.curTerm)
673+
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commit_index, rf.curTerm)
670674

671675
rf.mu.Unlock()
672676
for _, entry := range entries {

raftcore/raft_persistent_log.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -229,29 +229,24 @@ func (rfLog *RaftLog) Append(newEnt *pb.Entry) {
229229
// EraseBefore
230230
// erase log before from idx, and copy [idx:] log return
231231
// this operation don't modity log in storage engine
232-
func (rfLog *RaftLog) EraseBefore(idx int64) []*pb.Entry {
232+
func (rfLog *RaftLog) EraseBefore(idx int64, withDel bool) (error, []*pb.Entry) {
233233
rfLog.mu.Lock()
234234
defer rfLog.mu.Unlock()
235235
ents := []*pb.Entry{}
236236
lastlog_id := rfLog.GetLastLogId()
237237
firstlog_id := rfLog.GetFirstLogId()
238+
if withDel {
239+
for i := firstlog_id; i < firstlog_id+uint64(idx); i++ {
240+
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil {
241+
return err, ents
242+
}
243+
logger.ELogger().Sugar().Debugf("del log with id %d success", i)
244+
}
245+
}
238246
for i := int64(firstlog_id) + idx; i <= int64(lastlog_id); i++ {
239247
ents = append(ents, rfLog.GetEnt(i-int64(firstlog_id)))
240248
}
241-
return ents
242-
}
243-
244-
func (rfLog *RaftLog) EraseBeforeWithDel(idx int64) error {
245-
rfLog.mu.Lock()
246-
defer rfLog.mu.Unlock()
247-
firstlog_id := rfLog.GetFirstLogId()
248-
for i := firstlog_id; i < firstlog_id+uint64(idx); i++ {
249-
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil {
250-
return err
251-
}
252-
logger.ELogger().Sugar().Debugf("del log with id %d success", i)
253-
}
254-
return nil
249+
return nil, ents
255250
}
256251

257252
// EraseAfter

raftcore/raft_persistent_log_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestEraseBefore1(t *testing.T) {
6060
t.Logf("first log %s", fristEnt.String())
6161
lastEnt := raftLog.GetLast()
6262
t.Logf("last log %s", lastEnt.String())
63-
ents := raftLog.EraseBefore(1)
63+
_, ents := raftLog.EraseBefore(1, false)
6464
t.Logf("%v", ents)
6565
RemoveDir("./log_data_test")
6666
}
@@ -103,7 +103,7 @@ func TestPersisEraseBefore0And1(t *testing.T) {
103103
t.Logf("first log %s", fristEnt.String())
104104
lastEnt := raftLog.GetLast()
105105
t.Logf("last log %s", lastEnt.String())
106-
ents := raftLog.EraseBefore(0)
106+
_, ents := raftLog.EraseBefore(0, false)
107107
t.Logf("%v", ents)
108108
raftLog.Append(&pb.Entry{
109109
Index: 1,
@@ -113,7 +113,7 @@ func TestPersisEraseBefore0And1(t *testing.T) {
113113
Index: 2,
114114
Term: 1,
115115
})
116-
ents = raftLog.EraseBefore(1)
116+
_, ents = raftLog.EraseBefore(1, false)
117117
t.Logf("%v", ents)
118118
t.Logf("%d", raftLog.LogItemCount())
119119
RemoveDir("./log_data_test")
@@ -173,7 +173,7 @@ func TestTestPersisLogErase(t *testing.T) {
173173
Term: 1,
174174
Data: []byte{0x01, 0x02},
175175
})
176-
raftLog.EraseBefore(0)
176+
raftLog.EraseBefore(0, false)
177177
fristEnt := raftLog.GetFirst()
178178
t.Logf("first log %s", fristEnt.String())
179179
lastEnt := raftLog.GetLast()
@@ -262,7 +262,7 @@ func TestPersisLogGetRangeAfterGc(t *testing.T) {
262262
Term: 1,
263263
Data: []byte{0x01, 0x02},
264264
})
265-
raftLog.EraseBeforeWithDel(2)
265+
raftLog.EraseBefore(2, true)
266266
ents := raftLog.GetRange(1, 2)
267267
for _, ent := range ents {
268268
t.Logf("got ent %s", ent.String())

shardkvserver/shard_kvserver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,17 @@ type ShardKV struct {
7171
// nodeId: the peer's nodeId in the raft group
7272
// gid: the node's raft group id
7373
// configServerAddr: config server addr (leader addr, need to optimized into config server peer map)
74-
func MakeShardKVServer(peerMaps map[int]string, nodeId int, gid int, configServerAddrs string) *ShardKV {
74+
func MakeShardKVServer(peerMaps map[int]string, nodeId int64, gid int, configServerAddrs string) *ShardKV {
7575
client_ends := []*raftcore.RaftClientEnd{}
7676
for id, addr := range peerMaps {
7777
new_end := raftcore.MakeRaftClientEnd(addr, uint64(id))
7878
client_ends = append(client_ends, new_end)
7979
}
8080
new_apply_ch := make(chan *pb.ApplyMsg)
8181

82-
log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(nodeId))
82+
log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
8383
new_rf := raftcore.MakeRaft(client_ends, nodeId, log_db_eng, new_apply_ch, 50, 150)
84-
newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(nodeId))
84+
newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
8585

8686
shard_kv := &ShardKV{
8787
dead: 0,

tests/integration_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"os"
77
"os/signal"
8+
"strconv"
89
"strings"
910
"syscall"
1011
"testing"
@@ -59,7 +60,7 @@ func RunShardKvServer(svrPeerMaps map[int]string, nodeId int, groupId int, metaa
5960
sigs := make(chan os.Signal, 1)
6061
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
6162

62-
shardSvr := shardkvserver.MakeShardKVServer(svrPeerMaps, nodeId, groupId, metaaddrs)
63+
shardSvr := shardkvserver.MakeShardKVServer(svrPeerMaps, int64(nodeId), groupId, metaaddrs)
6364
lis, err := net.Listen("tcp", svrPeerMaps[nodeId])
6465
if err != nil {
6566
fmt.Printf("failed to listen: %v", err)
@@ -165,12 +166,12 @@ func TestClusterRwBench(t *testing.T) {
165166
// R-W test
166167
shardkvcli := shardkvserver.MakeKvClient("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")
167168

168-
N := 1000
169+
N := 300
169170
KEY_SIZE := 64
170171
VAL_SIZE := 64
171172
bench_kvs := map[string]string{}
172173
for i := 0; i <= N; i++ {
173-
k := common.RandStringRunes(KEY_SIZE)
174+
k := strconv.Itoa(i) + "-" + common.RandStringRunes(KEY_SIZE)
174175
v := common.RandStringRunes(VAL_SIZE)
175176
bench_kvs[k] = v
176177
}

0 commit comments

Comments
 (0)