diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76d79c51..81c34f27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,39 +7,12 @@ jobs: runs-on: ubuntu-20.04 steps: - uses: actions/checkout@v2 - - name: Install gcc-10 and g++-10 + - name: Install golang run: | - sudo apt install -y gcc-10 g++-10 - sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-10 20 - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-10 20 - - name: Install build dependencies - run: sudo apt-get update && sudo apt-get install -y clang-format build-essential autoconf automake libtool lcov libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev libzstd-dev - - name: Install RocksDB - run: sudo apt-get update && sudo apt-get install librocksdb-dev - - name: Install latest version cmake + sudo apt install golang + - name: Run test run: | - sudo wget https://github.com/Kitware/CMake/releases/download/v3.26.3/cmake-3.26.3-linux-x86_64.sh -q -O /tmp/cmake-install.sh && sudo chmod u+x /tmp/cmake-install.sh - sudo mkdir /opt/cmake-3.26.3 && sudo /tmp/cmake-install.sh --skip-license --prefix=/opt/cmake-3.26.3 - sudo rm /tmp/cmake-install.sh && sudo ln -sf /opt/cmake-3.26.3/bin/* /usr/local/bin - - name: Install gtest manually - run: sudo apt-get install libgtest-dev && cd /usr/src/gtest && sudo cmake CMakeLists.txt && sudo make && sudo cp lib/*.a /usr/lib && sudo ln -s /usr/lib/libgtest.a /usr/local/lib/libgtest.a && sudo ln -s /usr/lib/libgtest_main.a /usr/local/lib/libgtest_main.a - - name: Install - run: sudo git clone https://github.com/gabime/spdlog.git && cd spdlog && sudo git checkout v1.9.2 && sudo mkdir build && cd build && sudo cmake .. && sudo make -j4 && sudo make install - - name: Install grpc - run: sudo git clone https://github.com/grpc/grpc.git && cd grpc && sudo git checkout v1.28.0 && sudo git submodule update --init && sudo mkdir .build && cd .build && sudo cmake .. -DgRPC_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release && sudo make install -j4 && cd .. && sudo rm -rf .build/CMakeCache.txt && cd .build && sudo cmake .. -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release && sudo make install -j4 - - name: Install Google benchmark - run: sudo git clone https://github.com/google/benchmark.git && sudo git clone https://github.com/google/googletest.git benchmark/googletest && cd benchmark && sudo cmake -E make_directory "build" && sudo cmake -E chdir "build" cmake -DCMAKE_BUILD_TYPE=Release ../ && sudo cmake --build "build" --config Release --target install - - name: Build - run: | - mkdir build && cd build - cmake .. - make -j4 - - name: Run Main - run: | - ./build/rocksdb_storage_impl_tests - ./build/log_entry_cache_tests - ./build/log_entry_cache_tests - ./build/log_entry_cache_benchmark + go test -run TestClusterRwBench tests/integration_test.go -v test-sanitizer: runs-on: ubuntu-latest @@ -47,4 +20,3 @@ jobs: - uses: actions/checkout@v2 - name: Install build dependencies run: sudo apt-get update && sudo apt-get install -y build-essential autoconf automake libtool cmake lcov - diff --git a/raftcore/raft.go b/raftcore/raft.go index 11f538cb..fb7369bd 100644 --- a/raftcore/raft.go +++ b/raftcore/raft.go @@ -26,6 +26,7 @@ package raftcore import ( "context" + "errors" "sort" "sync" "sync/atomic" @@ -59,24 +60,23 @@ func NodeToString(role NodeRole) string { // raft stack definition type Raft struct { - mu sync.RWMutex - peers []*RaftPeerNode // rpc client end - id int64 - dead int32 - applyCh chan *pb.ApplyMsg - applyCond *sync.Cond - replicatorCond []*sync.Cond - role NodeRole - curTerm int64 - votedFor int64 - grantedVotes int - logs *RaftLog - commitIdx int64 - lastApplied int64 - nextIdx []int - matchIdx []int - isSnapshoting bool - + mu sync.RWMutex + peers []*RaftPeerNode // rpc client end + id int64 + dead int32 + applyCh chan *pb.ApplyMsg + applyCond *sync.Cond + replicatorCond []*sync.Cond + role NodeRole + curTerm int64 + votedFor int64 + grantedVotes int + logs *RaftLog + commitIdx int64 + lastApplied int64 + nextIdx []int + matchIdx []int + isSnapshoting bool leaderId int64 electionTimer *time.Timer heartbeatTimer *time.Timer @@ -172,10 +172,6 @@ func (rf *Raft) IncrGrantedVotes() { rf.grantedVotes += 1 } -func (rf *Raft) ReInitLog() { - rf.logs.ReInitLogs() -} - // HandleRequestVote handle request vote from other node func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVoteResponse) { rf.mu.Lock() @@ -277,7 +273,7 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen resp.Success = true } -func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int, snapshot []byte) bool { +func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int) bool { rf.mu.Lock() defer rf.mu.Unlock() @@ -289,10 +285,8 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int, rf.logs.ReInitLogs() } else { rf.logs.EraseBefore(int64(lastIncludedIndex), true) - rf.logs.SetEntFirstData([]byte{}) } - // update dummy entry with lastIncludedTerm and lastIncludedIndex - rf.logs.ResetFirstEntryTermAndIndex(int64(lastIncluedTerm), int64(lastIncludedIndex)) + rf.logs.ResetFirstLogEntry(int64(lastIncluedTerm), int64(lastIncludedIndex)) rf.lastApplied = int64(lastIncludedIndex) rf.commitIdx = int64(lastIncludedIndex) @@ -300,32 +294,6 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int, return true } -// take a snapshot -func (rf *Raft) Snapshot(index int, snapshot []byte) { - rf.mu.Lock() - defer rf.mu.Unlock() - rf.isSnapshoting = true - snapshot_index := rf.logs.GetFirstLogId() - if index <= int(snapshot_index) { - rf.isSnapshoting = false - logger.ELogger().Sugar().Warnf("reject snapshot, current snapshotIndex is larger in cur term") - return - } - rf.logs.EraseBefore(int64(index), true) - rf.logs.SetEntFirstData([]byte{}) - logger.ELogger().Sugar().Debugf("del log entry before idx %d", index) - rf.isSnapshoting = false - rf.logs.PersisSnapshot(snapshot) -} - -func (rf *Raft) ReadSnapshot() []byte { - b, err := rf.logs.ReadSnapshot() - if err != nil { - logger.ELogger().Sugar().Error(err.Error()) - } - return b -} - // install snapshot from leader func (rf *Raft) HandleInstallSnapshot(request *pb.InstallSnapshotRequest, response *pb.InstallSnapshotResponse) { rf.mu.Lock() @@ -361,12 +329,6 @@ func (rf *Raft) HandleInstallSnapshot(request *pb.InstallSnapshotRequest, respon } -func (rf *Raft) GetLogCount() int { - rf.mu.Lock() - defer rf.mu.Unlock() - return rf.logs.LogItemCount() -} - func (rf *Raft) advanceCommitIndexForLeader() { sort.Ints(rf.matchIdx) n := len(rf.matchIdx) @@ -416,31 +378,28 @@ func (rf *Raft) Election() { } go func(peer *RaftPeerNode) { logger.ELogger().Sugar().Debugf("send request vote to %s %s", peer.addr, vote_req.String()) - request_vote_resp, err := (*peer.raftServiceCli).RequestVote(context.Background(), vote_req) if err != nil { logger.ELogger().Sugar().Errorf("send request vote to %s failed %v", peer.addr, err.Error()) } - if request_vote_resp != nil { - rf.mu.Lock() - defer rf.mu.Unlock() + rf.mu.Lock() + defer rf.mu.Unlock() + if request_vote_resp != nil && rf.curTerm == vote_req.Term && rf.role == NodeRoleCandidate { logger.ELogger().Sugar().Errorf("send request vote to %s recive -> %s, curterm %d, req term %d", peer.addr, request_vote_resp.String(), rf.curTerm, vote_req.Term) - if rf.curTerm == vote_req.Term && rf.role == NodeRoleCandidate { - if request_vote_resp.VoteGranted { - // success granted the votes - rf.IncrGrantedVotes() - if rf.grantedVotes > len(rf.peers)/2 { - logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm) - rf.SwitchRaftNodeRole(NodeRoleLeader) - rf.BroadcastHeartbeat() - rf.grantedVotes = 0 - } - } else if request_vote_resp.Term > rf.curTerm { - // request vote reject - rf.SwitchRaftNodeRole(NodeRoleFollower) - rf.curTerm, rf.votedFor = request_vote_resp.Term, -1 - rf.PersistRaftState() + if request_vote_resp.VoteGranted { + // success granted the votes + rf.IncrGrantedVotes() + if rf.grantedVotes > len(rf.peers)/2 { + logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm) + rf.SwitchRaftNodeRole(NodeRoleLeader) + rf.BroadcastHeartbeat() + rf.grantedVotes = 0 } + } else if request_vote_resp.Term > rf.curTerm { + // request vote reject + rf.SwitchRaftNodeRole(NodeRoleFollower) + rf.curTerm, rf.votedFor = request_vote_resp.Term, -1 + rf.PersistRaftState() } } }(peer) @@ -562,12 +521,14 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) { logger.ELogger().Sugar().Debugf("leader prev log index %d", prev_log_index) if prev_log_index < uint64(rf.logs.GetFirst().Index) { first_log := rf.logs.GetFirst() + + // TODO: send kv leveldb snapshot + snap_shot_req := &pb.InstallSnapshotRequest{ Term: rf.curTerm, LeaderId: int64(rf.id), LastIncludedIndex: first_log.Index, LastIncludedTerm: int64(first_log.Term), - Data: rf.ReadSnapshot(), } rf.mu.RUnlock() @@ -681,3 +642,17 @@ func (rf *Raft) Applier() { rf.mu.Unlock() } } + +func (rf *Raft) StartSnapshot(snap_idx uint64) error { + rf.isSnapshoting = true + if snap_idx <= rf.logs.GetFirstLogId() { + rf.isSnapshoting = false + return errors.New("ety index is larger than the first log index") + } + rf.logs.EraseBefore(int64(snap_idx), true) + rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snap_idx)) + + // create checkpoint for db + rf.isSnapshoting = false + return nil +} diff --git a/raftcore/raft_persistent_log.go b/raftcore/raft_persistent_log.go index 378381bd..18224321 100644 --- a/raftcore/raft_persistent_log.go +++ b/raftcore/raft_persistent_log.go @@ -96,18 +96,6 @@ func (rfLog *RaftLog) ReadRaftState() (curTerm int64, votedFor int64) { return rf_state.CurTerm, rf_state.VotedFor } -func (rfLog *RaftLog) PersisSnapshot(snapContext []byte) { - rfLog.dbEng.PutBytesKv(SNAPSHOT_STATE_KEY, snapContext) -} - -func (rfLog *RaftLog) ReadSnapshot() ([]byte, error) { - bytes, err := rfLog.dbEng.GetBytesValue(SNAPSHOT_STATE_KEY) - if err != nil { - return nil, err - } - return bytes, nil -} - // GetFirstLogId // get the first log id from storage engine func (rfLog *RaftLog) GetFirstLogId() uint64 { @@ -141,6 +129,21 @@ func (rfLog *RaftLog) SetEntFirstData(d []byte) error { return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(first_idx), newent_encode) } +func (rfLog *RaftLog) ResetFirstLogEntry(term int64, index int64) error { + rfLog.mu.Lock() + defer rfLog.mu.Unlock() + new_ent := &pb.Entry{} + new_ent.EntryType = pb.EntryType_EntryNormal + new_ent.Term = uint64(term) + new_ent.Index = index + newent_encode := EncodeEntry(new_ent) + if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newent_encode); err != nil { + return err + } + rfLog.firstIdx = uint64(index) + return nil +} + // ReInitLogs // make logs to init state func (rfLog *RaftLog) ReInitLogs() error { @@ -158,32 +161,6 @@ func (rfLog *RaftLog) ReInitLogs() error { return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empent_encode) } -// -// SetEntFirstTermAndIndex -// - -func (rfLog *RaftLog) ResetFirstEntryTermAndIndex(term, index int64) error { - rfLog.mu.Lock() - defer rfLog.mu.Unlock() - first_idx := rfLog.GetFirstLogId() - encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(first_idx))) - if err != nil { - logger.ELogger().Sugar().Panicf("get log entry with id %d error!", first_idx) - panic(err) - } - // del olf first ent - if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(first_idx)); err != nil { - return err - } - ent := DecodeEntry(encode_value) - ent.Term = uint64(term) - ent.Index = index - rfLog.firstIdx = uint64(index) - logger.ELogger().Sugar().Debugf("change first ent to -> ", ent.String()) - new_ent_encode := EncodeEntry(ent) - return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), new_ent_encode) -} - // GetFirst // // get the first entry from storage engine diff --git a/shardkvserver/shard_kvserver.go b/shardkvserver/shard_kvserver.go index 1ae69094..6302b7c8 100644 --- a/shardkvserver/shard_kvserver.go +++ b/shardkvserver/shard_kvserver.go @@ -254,6 +254,12 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) { return case appliedMsg := <-s.applyCh: logger.ELogger().Sugar().Debugf("appling msg %s", appliedMsg.String()) + + if appliedMsg.SnapshotValid { + // TODO: install snapshot data to leveldb + s.rf.CondInstallSnapshot(int(appliedMsg.SnapshotTerm), int(appliedMsg.SnapshotIndex)) + } + req := &pb.CommandRequest{} if err := json.Unmarshal(appliedMsg.Command, req); err != nil { logger.ELogger().Sugar().Errorf("Unmarshal CommandRequest err", err.Error()) @@ -368,6 +374,17 @@ func (s *ShardKV) AppendEntries(ctx context.Context, req *pb.AppendEntriesReques return resp, nil } +// snapshot rpc interface +func (s *ShardKV) Snapshot(ctx context.Context, req *pb.InstallSnapshotRequest) (*pb.InstallSnapshotResponse, error) { + resp := &pb.InstallSnapshotResponse{} + logger.ELogger().Sugar().Debugf("handle snapshot req %s ", req.String()) + + s.rf.HandleInstallSnapshot(req, resp) + logger.ELogger().Sugar().Debugf("handle snapshot resp %s ", resp.String()) + + return resp, nil +} + // rpc interface // DoBucketsOperation hanlde bucket data get, delete and insert func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperationRequest) (*pb.BucketOperationResponse, error) {