Skip to content

Commit

Permalink
add snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Feb 3, 2024
1 parent cb5a057 commit 503ff19
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 148 deletions.
36 changes: 4 additions & 32 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,16 @@ jobs:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- name: Install gcc-10 and g++-10
- name: Install golang
run: |
sudo apt install -y gcc-10 g++-10
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-10 20
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-10 20
- name: Install build dependencies
run: sudo apt-get update && sudo apt-get install -y clang-format build-essential autoconf automake libtool lcov libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev libzstd-dev
- name: Install RocksDB
run: sudo apt-get update && sudo apt-get install librocksdb-dev
- name: Install latest version cmake
sudo apt install golang
- name: Run test
run: |
sudo wget https://github.com/Kitware/CMake/releases/download/v3.26.3/cmake-3.26.3-linux-x86_64.sh -q -O /tmp/cmake-install.sh && sudo chmod u+x /tmp/cmake-install.sh
sudo mkdir /opt/cmake-3.26.3 && sudo /tmp/cmake-install.sh --skip-license --prefix=/opt/cmake-3.26.3
sudo rm /tmp/cmake-install.sh && sudo ln -sf /opt/cmake-3.26.3/bin/* /usr/local/bin
- name: Install gtest manually
run: sudo apt-get install libgtest-dev && cd /usr/src/gtest && sudo cmake CMakeLists.txt && sudo make && sudo cp lib/*.a /usr/lib && sudo ln -s /usr/lib/libgtest.a /usr/local/lib/libgtest.a && sudo ln -s /usr/lib/libgtest_main.a /usr/local/lib/libgtest_main.a
- name: Install
run: sudo git clone https://github.com/gabime/spdlog.git && cd spdlog && sudo git checkout v1.9.2 && sudo mkdir build && cd build && sudo cmake .. && sudo make -j4 && sudo make install
- name: Install grpc
run: sudo git clone https://github.com/grpc/grpc.git && cd grpc && sudo git checkout v1.28.0 && sudo git submodule update --init && sudo mkdir .build && cd .build && sudo cmake .. -DgRPC_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release && sudo make install -j4 && cd .. && sudo rm -rf .build/CMakeCache.txt && cd .build && sudo cmake .. -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release && sudo make install -j4
- name: Install Google benchmark
run: sudo git clone https://github.com/google/benchmark.git && sudo git clone https://github.com/google/googletest.git benchmark/googletest && cd benchmark && sudo cmake -E make_directory "build" && sudo cmake -E chdir "build" cmake -DCMAKE_BUILD_TYPE=Release ../ && sudo cmake --build "build" --config Release --target install
- name: Build
run: |
mkdir build && cd build
cmake ..
make -j4
- name: Run Main
run: |
./build/rocksdb_storage_impl_tests
./build/log_entry_cache_tests
./build/log_entry_cache_tests
./build/log_entry_cache_benchmark
go test -run TestClusterRwBench tests/integration_test.go -v
test-sanitizer:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install build dependencies
run: sudo apt-get update && sudo apt-get install -y build-essential autoconf automake libtool cmake lcov

131 changes: 53 additions & 78 deletions raftcore/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package raftcore

import (
"context"
"errors"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -59,24 +60,23 @@ func NodeToString(role NodeRole) string {

// raft stack definition
type Raft struct {
mu sync.RWMutex
peers []*RaftPeerNode // rpc client end
id int64
dead int32
applyCh chan *pb.ApplyMsg
applyCond *sync.Cond
replicatorCond []*sync.Cond
role NodeRole
curTerm int64
votedFor int64
grantedVotes int
logs *RaftLog
commitIdx int64
lastApplied int64
nextIdx []int
matchIdx []int
isSnapshoting bool

mu sync.RWMutex
peers []*RaftPeerNode // rpc client end
id int64
dead int32
applyCh chan *pb.ApplyMsg
applyCond *sync.Cond
replicatorCond []*sync.Cond
role NodeRole
curTerm int64
votedFor int64
grantedVotes int
logs *RaftLog
commitIdx int64
lastApplied int64
nextIdx []int
matchIdx []int
isSnapshoting bool
leaderId int64
electionTimer *time.Timer
heartbeatTimer *time.Timer
Expand Down Expand Up @@ -172,10 +172,6 @@ func (rf *Raft) IncrGrantedVotes() {
rf.grantedVotes += 1
}

func (rf *Raft) ReInitLog() {
rf.logs.ReInitLogs()
}

// HandleRequestVote handle request vote from other node
func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVoteResponse) {
rf.mu.Lock()
Expand Down Expand Up @@ -277,7 +273,7 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
resp.Success = true
}

func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int, snapshot []byte) bool {
func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int) bool {
rf.mu.Lock()
defer rf.mu.Unlock()

Expand All @@ -289,43 +285,15 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int,
rf.logs.ReInitLogs()
} else {
rf.logs.EraseBefore(int64(lastIncludedIndex), true)
rf.logs.SetEntFirstData([]byte{})
}
// update dummy entry with lastIncludedTerm and lastIncludedIndex
rf.logs.ResetFirstEntryTermAndIndex(int64(lastIncluedTerm), int64(lastIncludedIndex))
rf.logs.ResetFirstLogEntry(int64(lastIncluedTerm), int64(lastIncludedIndex))

rf.lastApplied = int64(lastIncludedIndex)
rf.commitIdx = int64(lastIncludedIndex)

return true
}

// take a snapshot
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.isSnapshoting = true
snapshot_index := rf.logs.GetFirstLogId()
if index <= int(snapshot_index) {
rf.isSnapshoting = false
logger.ELogger().Sugar().Warnf("reject snapshot, current snapshotIndex is larger in cur term")
return
}
rf.logs.EraseBefore(int64(index), true)
rf.logs.SetEntFirstData([]byte{})
logger.ELogger().Sugar().Debugf("del log entry before idx %d", index)
rf.isSnapshoting = false
rf.logs.PersisSnapshot(snapshot)
}

func (rf *Raft) ReadSnapshot() []byte {
b, err := rf.logs.ReadSnapshot()
if err != nil {
logger.ELogger().Sugar().Error(err.Error())
}
return b
}

// install snapshot from leader
func (rf *Raft) HandleInstallSnapshot(request *pb.InstallSnapshotRequest, response *pb.InstallSnapshotResponse) {
rf.mu.Lock()
Expand Down Expand Up @@ -361,12 +329,6 @@ func (rf *Raft) HandleInstallSnapshot(request *pb.InstallSnapshotRequest, respon

}

func (rf *Raft) GetLogCount() int {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.logs.LogItemCount()
}

func (rf *Raft) advanceCommitIndexForLeader() {
sort.Ints(rf.matchIdx)
n := len(rf.matchIdx)
Expand Down Expand Up @@ -416,31 +378,28 @@ func (rf *Raft) Election() {
}
go func(peer *RaftPeerNode) {
logger.ELogger().Sugar().Debugf("send request vote to %s %s", peer.addr, vote_req.String())

request_vote_resp, err := (*peer.raftServiceCli).RequestVote(context.Background(), vote_req)
if err != nil {
logger.ELogger().Sugar().Errorf("send request vote to %s failed %v", peer.addr, err.Error())
}
if request_vote_resp != nil {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.mu.Lock()
defer rf.mu.Unlock()
if request_vote_resp != nil && rf.curTerm == vote_req.Term && rf.role == NodeRoleCandidate {
logger.ELogger().Sugar().Errorf("send request vote to %s recive -> %s, curterm %d, req term %d", peer.addr, request_vote_resp.String(), rf.curTerm, vote_req.Term)
if rf.curTerm == vote_req.Term && rf.role == NodeRoleCandidate {
if request_vote_resp.VoteGranted {
// success granted the votes
rf.IncrGrantedVotes()
if rf.grantedVotes > len(rf.peers)/2 {
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm)
rf.SwitchRaftNodeRole(NodeRoleLeader)
rf.BroadcastHeartbeat()
rf.grantedVotes = 0
}
} else if request_vote_resp.Term > rf.curTerm {
// request vote reject
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm, rf.votedFor = request_vote_resp.Term, -1
rf.PersistRaftState()
if request_vote_resp.VoteGranted {
// success granted the votes
rf.IncrGrantedVotes()
if rf.grantedVotes > len(rf.peers)/2 {
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm)
rf.SwitchRaftNodeRole(NodeRoleLeader)
rf.BroadcastHeartbeat()
rf.grantedVotes = 0
}
} else if request_vote_resp.Term > rf.curTerm {
// request vote reject
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm, rf.votedFor = request_vote_resp.Term, -1
rf.PersistRaftState()
}
}
}(peer)
Expand Down Expand Up @@ -562,12 +521,14 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
logger.ELogger().Sugar().Debugf("leader prev log index %d", prev_log_index)
if prev_log_index < uint64(rf.logs.GetFirst().Index) {
first_log := rf.logs.GetFirst()

// TODO: send kv leveldb snapshot

snap_shot_req := &pb.InstallSnapshotRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
LastIncludedIndex: first_log.Index,
LastIncludedTerm: int64(first_log.Term),
Data: rf.ReadSnapshot(),
}

rf.mu.RUnlock()
Expand Down Expand Up @@ -681,3 +642,17 @@ func (rf *Raft) Applier() {
rf.mu.Unlock()
}
}

func (rf *Raft) StartSnapshot(snap_idx uint64) error {
rf.isSnapshoting = true
if snap_idx <= rf.logs.GetFirstLogId() {
rf.isSnapshoting = false
return errors.New("ety index is larger than the first log index")
}
rf.logs.EraseBefore(int64(snap_idx), true)
rf.logs.ResetFirstLogEntry(rf.curTerm, int64(snap_idx))

// create checkpoint for db
rf.isSnapshoting = false
return nil
}
53 changes: 15 additions & 38 deletions raftcore/raft_persistent_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,6 @@ func (rfLog *RaftLog) ReadRaftState() (curTerm int64, votedFor int64) {
return rf_state.CurTerm, rf_state.VotedFor
}

func (rfLog *RaftLog) PersisSnapshot(snapContext []byte) {
rfLog.dbEng.PutBytesKv(SNAPSHOT_STATE_KEY, snapContext)
}

func (rfLog *RaftLog) ReadSnapshot() ([]byte, error) {
bytes, err := rfLog.dbEng.GetBytesValue(SNAPSHOT_STATE_KEY)
if err != nil {
return nil, err
}
return bytes, nil
}

// GetFirstLogId
// get the first log id from storage engine
func (rfLog *RaftLog) GetFirstLogId() uint64 {
Expand Down Expand Up @@ -141,6 +129,21 @@ func (rfLog *RaftLog) SetEntFirstData(d []byte) error {
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(first_idx), newent_encode)
}

func (rfLog *RaftLog) ResetFirstLogEntry(term int64, index int64) error {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
new_ent := &pb.Entry{}
new_ent.EntryType = pb.EntryType_EntryNormal
new_ent.Term = uint64(term)
new_ent.Index = index
newent_encode := EncodeEntry(new_ent)
if err := rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), newent_encode); err != nil {
return err
}
rfLog.firstIdx = uint64(index)
return nil
}

// ReInitLogs
// make logs to init state
func (rfLog *RaftLog) ReInitLogs() error {
Expand All @@ -158,32 +161,6 @@ func (rfLog *RaftLog) ReInitLogs() error {
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(INIT_LOG_INDEX), empent_encode)
}

//
// SetEntFirstTermAndIndex
//

func (rfLog *RaftLog) ResetFirstEntryTermAndIndex(term, index int64) error {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
first_idx := rfLog.GetFirstLogId()
encode_value, err := rfLog.dbEng.GetBytesValue(EncodeRaftLogKey(uint64(first_idx)))
if err != nil {
logger.ELogger().Sugar().Panicf("get log entry with id %d error!", first_idx)
panic(err)
}
// del olf first ent
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(first_idx)); err != nil {
return err
}
ent := DecodeEntry(encode_value)
ent.Term = uint64(term)
ent.Index = index
rfLog.firstIdx = uint64(index)
logger.ELogger().Sugar().Debugf("change first ent to -> ", ent.String())
new_ent_encode := EncodeEntry(ent)
return rfLog.dbEng.PutBytesKv(EncodeRaftLogKey(uint64(index)), new_ent_encode)
}

// GetFirst
//
// get the first entry from storage engine
Expand Down
17 changes: 17 additions & 0 deletions shardkvserver/shard_kvserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ func (s *ShardKV) ApplingToStm(done <-chan interface{}) {
return
case appliedMsg := <-s.applyCh:
logger.ELogger().Sugar().Debugf("appling msg %s", appliedMsg.String())

if appliedMsg.SnapshotValid {
// TODO: install snapshot data to leveldb
s.rf.CondInstallSnapshot(int(appliedMsg.SnapshotTerm), int(appliedMsg.SnapshotIndex))
}

req := &pb.CommandRequest{}
if err := json.Unmarshal(appliedMsg.Command, req); err != nil {
logger.ELogger().Sugar().Errorf("Unmarshal CommandRequest err", err.Error())
Expand Down Expand Up @@ -368,6 +374,17 @@ func (s *ShardKV) AppendEntries(ctx context.Context, req *pb.AppendEntriesReques
return resp, nil
}

// snapshot rpc interface
func (s *ShardKV) Snapshot(ctx context.Context, req *pb.InstallSnapshotRequest) (*pb.InstallSnapshotResponse, error) {
resp := &pb.InstallSnapshotResponse{}
logger.ELogger().Sugar().Debugf("handle snapshot req %s ", req.String())

s.rf.HandleInstallSnapshot(req, resp)
logger.ELogger().Sugar().Debugf("handle snapshot resp %s ", resp.String())

return resp, nil
}

// rpc interface
// DoBucketsOperation hanlde bucket data get, delete and insert
func (s *ShardKV) DoBucketsOperation(ctx context.Context, req *pb.BucketOperationRequest) (*pb.BucketOperationResponse, error) {
Expand Down

0 comments on commit 503ff19

Please sign in to comment.