-
Notifications
You must be signed in to change notification settings - Fork 0
Lab‐3A
M1ng edited this page Jan 20, 2024
·
7 revisions
Lab3a不要求实现日志快照,代码基于Lab2c,主要实现在src/kvraft目录下
说明
上图是基于kvraft的泳道流程图,注意图中并不包含全部实现细节,且仅展示请求成功的情况。实际测试程序中包含各种网络失败或节点错误的情景此处不在图中体现。
总结
关于实验3的kv服务有以下注意点:
- 程序包含2部分:客户端Clerk和服务端KV service
- 客户端并不局限于一个,当然服务端更得是多个
- 每个服务端都有kv数据库(程序中用Map来充当),各服务端数据库内容保持同步,由raft层保证
- raft被包含在服务端service结构体中,由service程序调用启动(每个service各自维护自己的raft)
- Raft 协议规定所有读写操作都只对 Leader 进行(这是因为follower同步可能有延迟,所以不能从follower读)
- 实验需要系统满足Linearizability,即线性一致性。以及操作幂等性
- 系统最终目标是构建可靠的分布式kv存储系统,组内一个节点掉线了,可以快速通过选举新的leader保证高可用
package kvraft
import (
"../labrpc"
"sync"
"sync/atomic"
"time"
)
import "crypto/rand"
import "math/big"
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
clientId int64 // 客户端唯一标识
seqId int64 // 该客户端单调递增的请求id
leaderId int // 缓存最近通讯的leader节点
mu sync.Mutex // Clerk的互斥锁
}
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.clientId = nrand()
return ck
}
// Get
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
args := GetArgs{
Key: key,
ClientId: ck.clientId,
SeqId: atomic.AddInt64(&ck.seqId, 1),
}
DPrintf("Client[%d] Get starts, Key=%s ", ck.clientId, key)
ck.mu.Lock()
leaderId := ck.leaderId
ck.mu.Unlock()
for { // 轮询重试,注意每次轮询不会改变Key、ClientId、SeqId
reply := GetReply{}
if ck.servers[leaderId].Call("KVServer.Get", &args, &reply) {
if reply.Err == OK { // 命中
return reply.Value
} else if reply.Err == ErrNoKey { // 不存在
return ""
}
}
// 没找到leader,重新选择leader
ck.mu.Lock()
leaderId = (leaderId + 1) % len(ck.servers)
ck.leaderId = leaderId
ck.mu.Unlock()
time.Sleep(1 * time.Millisecond)
}
}
// PutAppend
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
ClientId: ck.clientId,
SeqId: atomic.AddInt64(&ck.seqId, 1),
}
DPrintf("Client[%d] PutAppend, Key=%s Value=%s", ck.clientId, key, value)
ck.mu.Lock()
leaderId := ck.leaderId
ck.mu.Unlock()
for { // 轮询重试,注意每次轮询不会改变Key、Value、Op、ClientId、SeqId
reply := PutAppendReply{}
if ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply) {
if reply.Err == OK { // 成功
break
}
}
// 没找到leader,重新选择leader
ck.mu.Lock()
leaderId = (leaderId + 1) % len(ck.servers)
ck.leaderId = leaderId
ck.mu.Unlock()
time.Sleep(1 * time.Millisecond)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
在Get
和PutAppend
内部主要是构造请求参数然后通过RPC调用服务端的处理程序。重点关注那个for循环,本质上是自动重试,直到返回OK/ErrNoKey则跳出循环完成一次操作。
每次重试都会对leaderId进行更新:leaderId = (leaderId + 1) % len(ck.servers)
,同时更新ck.leaderId = leaderId
以便下次请求可以直接找到目标节点。
导致重试的原因有很多:
- 访问到了follower身份的service节点会导致raft层的Start函数返回
return -1, -1, false
从而判断不是Leader节点,service会返回如下并导致client端重试:
if !isLeader {
reply.Err = ErrWrongLeader
return
}
- Service中的Handle有一个2秒的计时器,超过两秒还没有反馈也会导致重试:
timer := time.NewTimer(2000 * time.Millisecond)
...
case <-timer.C: // 如果2秒都没提交成功,让client重试
reply.Err = ErrWrongLeader
}
- 注意每次for循环并不会改变其他参数如seqId,所以每次的请求id都是一样的,这个是后面幂等性的重要判定手段!
package kvraft
import (
"../labgob"
"../labrpc"
"../raft"
"sync"
"sync/atomic"
"time"
)
const (
OP_TYPE_PUT = "Put"
OP_TYPE_APPEND = "Append"
OP_TYPE_GET = "Get"
)
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
Index int // 写入raft log时的index
Term int // 写入raft log时的term
Type string // PutAppend, Get
Key string
Value string
SeqId int64
ClientId int64
}
// OpContext 等待Raft提交期间的Op上下文, 用于唤醒阻塞的RPC
type OpContext struct {
op *Op
committed chan byte
wrongLeader bool // 因为index位置log的term不一致, 说明leader换过了
ignored bool // 因为req id过期, 导致该日志被跳过
// Get操作的结果
keyExist bool
value string
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
kvStore map[string]string // kvStore存储应用层状态
reqMap map[int]*OpContext // reqMap存储正在进行中的RPC调用,(log index, 请求上下文)
seqMap map[int64]int64 // seqMap记录每个clientId已提交的最大请求ID以便做写入幂等性判定,(客户端id, 客户端seq)
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
reply.Err = OK
op := &Op{
Type: OP_TYPE_GET,
Key: args.Key,
ClientId: args.ClientId,
SeqId: args.SeqId,
}
// 写入raft层
var isLeader bool
op.Index, op.Term, isLeader = kv.rf.Start(op)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
opCtx := &OpContext{
op: op,
committed: make(chan byte),
}
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
// 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试
kv.reqMap[op.Index] = opCtx
}()
// RPC结束前清理上下文
defer func() {
kv.mu.Lock()
defer kv.mu.Unlock()
if one, ok := kv.reqMap[op.Index]; ok {
if one == opCtx {
delete(kv.reqMap, op.Index)
}
}
}()
timer := time.NewTimer(2000 * time.Millisecond)
defer timer.Stop()
select {
case <-opCtx.committed: // 阻塞等待kv完成apply该操作
if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新读取
reply.Err = ErrWrongLeader
} else if !opCtx.keyExist { // key不存在
reply.Err = ErrNoKey
} else {
reply.Value = opCtx.value // 返回值
}
case <-timer.C: // 如果2秒都没提交成功,让client重试
reply.Err = ErrWrongLeader
}
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
reply.Err = OK
op := &Op{
Type: args.Op,
Key: args.Key,
Value: args.Value,
ClientId: args.ClientId,
SeqId: args.SeqId,
}
// 写入raft层
var isLeader bool
op.Index, op.Term, isLeader = kv.rf.Start(op)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
opCtx := &OpContext{
op: op,
committed: make(chan byte),
}
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
// 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试
kv.reqMap[op.Index] = opCtx
}()
// RPC结束前清理上下文
defer func() {
kv.mu.Lock()
defer kv.mu.Unlock()
if one, ok := kv.reqMap[op.Index]; ok {
if one == opCtx {
delete(kv.reqMap, op.Index)
}
}
}()
timer := time.NewTimer(2000 * time.Millisecond)
defer timer.Stop()
select {
case <-opCtx.committed: // 阻塞等待kv完成apply该操作
if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新写入
reply.Err = ErrWrongLeader
} else if opCtx.ignored {
// 说明req id过期了,该请求被忽略,对MIT这个lab来说只需要告知客户端OK跳过即可
}
case <-timer.C: // 如果2秒都没提交成功,让client重试
reply.Err = ErrWrongLeader
}
}
// Kill
// the tester calls Kill() when a KVServer instance won't
// be needed again. for your convenience, we supply
// code to set rf.dead (without needing a lock),
// and a killed() method to test rf.dead in
// long-running loops. you can also add your own
// code to Kill(). you're not required to do anything
// about this, but it may be convenient (for example)
// to suppress debug output from a Kill()ed instance.
func (kv *KVServer) Kill() {
atomic.StoreInt32(&kv.dead, 1)
kv.rf.Kill()
// Your code here, if desired.
}
func (kv *KVServer) killed() bool {
z := atomic.LoadInt32(&kv.dead)
return z == 1
}
// StartKVServer
// servers[] contains the ports of the set of
// servers that will cooperate via Raft to
// form the fault-tolerant key/value service.
// me is the index of the current server in servers[].
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
// the k/v server should snapshot when Raft's saved state exceeds maxraftstate bytes,
// in order to allow Raft to garbage-collect its log. if maxraftstate is -1,
// you don't need to snapshot.
// StartKVServer() must return quickly, so it should start goroutines
// for any long-running work.
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(&Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
// kv.applyCh 就是raft层用到的applyCh,两层数据通讯就通过这个通道实现
kv.applyCh = make(chan raft.ApplyMsg, 1)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.kvStore = make(map[string]string)
kv.reqMap = make(map[int]*OpContext)
kv.seqMap = make(map[int64]int64)
go kv.applyLoop()
return kv
}
// applyLoop 不断监听raft层的已经apply的日志,并处理
// 如果是写操作则判定Op的sequence id是否过期,过期则跳过,否则生效到kvStore。
// 读操作则将kvStore此时的k=v保存到opCtx,并唤起阻塞的RPC。
func (kv *KVServer) applyLoop() {
for !kv.killed() {
select {
case msg := <-kv.applyCh:
cmd := msg.Command
index := msg.CommandIndex
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
// 操作日志
op := cmd.(*Op)
opCtx, existOp := kv.reqMap[index]
prevSeq, existSeq := kv.seqMap[op.ClientId]
kv.seqMap[op.ClientId] = op.SeqId
if existOp { // 存在等待结果的RPC, 那么判断状态是否与写入时一致
if opCtx.op.Term != op.Term {
opCtx.wrongLeader = true
}
}
// 只处理ID单调递增的客户端写请求
if op.Type == OP_TYPE_PUT || op.Type == OP_TYPE_APPEND {
if !existSeq || op.SeqId > prevSeq { // 如果是递增的请求ID,那么接受它的变更,对于重试的操作就不需要处理了,幂等
if op.Type == OP_TYPE_PUT { // put操作
kv.kvStore[op.Key] = op.Value
} else if op.Type == OP_TYPE_APPEND { // put-append操作
if val, exist := kv.kvStore[op.Key]; exist {
kv.kvStore[op.Key] = val + op.Value
} else {
kv.kvStore[op.Key] = op.Value
}
}
} else if existOp {
opCtx.ignored = true
}
} else { // OP_TYPE_GET
if existOp {
opCtx.value, opCtx.keyExist = kv.kvStore[op.Key]
}
}
DPrintf("RaftNode[%d] applyLoop, kvStore[%v]", kv.me, len(kv.kvStore))
// 唤醒Get/PutAppend的 case <-opCtx.committed: 这行之后的代码,从而响应客户端的RPC
if existOp {
opCtx.committed <- 1
}
}()
}
}
}
这里的handler也就是Get和PutAppend两个函数,这里统一讲一下,具体区别不大,自己看代码。主要关注这几个点:
- handler内并不直接交互存取kvstore这个map
- 写入raft日志通过这行代码体现
op.Index, op.Term, isLeader = kv.rf.Start(op)
,follower节点直接会拒掉 - 期间会保存一个
kv.reqMap[op.Index] = opCtx
这个opCtx会在后面applyLoop中使用到 - 结束时候注意清理掉这个opCtx:
delete(kv.reqMap, op.Index)
-
case <-opCtx.committed: // 阻塞等待kv完成apply该操作
到这个地方handler就等applyLoop中操作最终结果了 - 当opCtx.committed有值后就会停止阻塞,继续执行返回结果给Clerk
- 2秒到几计时器到期也会导致程序返回结果
这个函数主要是初始化服务器并启动applyLoop协程,测试程序会调用并创建服务器节点。
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
这行代码可以看出kv和rf的强绑定,rf通过applyCh向应用层传输
这个函数主要作用就是监听applyCh通道内有没有新数据,raft协议层的applyLoop会往applyCh内传入最新的apply日志。重点如下:
- 这里会根据拿到的已经apply的日志的index去reqMap找有没有对应的OpCtx,后面要用得到
-
prevSeq, existSeq := kv.seqMap[op.ClientId]
获取该客户端上一个seqId后面通过op.SeqId > prevSeq来进行幂等判别,重试的请求会不通过 -
opCtx.op.Term != op.Term
上下文的term会和刚拿到的term比对,不等说明期间已经重新选举,需要让客户端重试 - 为了线性一致性,把读操作也作为一条log写入raft等待提交后再响应RPC
-
opCtx.value, opCtx.keyExist = kv.kvStore[op.Key]
Get操作的返回Value依托opCtx来传递 -
opCtx.committed <- 1
唤醒handler继续执行并返回客户端