Skip to content

Lab‐3A

M1ng edited this page Jan 20, 2024 · 7 revisions

Lab3a不要求实现日志快照,代码基于Lab2c,主要实现在src/kvraft目录下

kvraft

说明

上图是基于kvraft的泳道流程图,注意图中并不包含全部实现细节,且仅展示请求成功的情况。实际测试程序中包含各种网络失败或节点错误的情景此处不在图中体现。

总结

关于实验3的kv服务有以下注意点:

  • 程序包含2部分:客户端Clerk和服务端KV service
  • 客户端并不局限于一个,当然服务端更得是多个
  • 每个服务端都有kv数据库(程序中用Map来充当),各服务端数据库内容保持同步,由raft层保证
  • raft被包含在服务端service结构体中,由service程序调用启动(每个service各自维护自己的raft)
  • Raft 协议规定所有读写操作都只对 Leader 进行(这是因为follower同步可能有延迟,所以不能从follower读)
  • 实验需要系统满足Linearizability,即线性一致性。以及操作幂等性
  • 系统最终目标是构建可靠的分布式kv存储系统,组内一个节点掉线了,可以快速通过选举新的leader保证高可用

客户端

代码

package kvraft

import (
	"../labrpc"
	"sync"
	"sync/atomic"
	"time"
)
import "crypto/rand"
import "math/big"

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	clientId int64      // 客户端唯一标识
	seqId    int64      // 该客户端单调递增的请求id
	leaderId int        // 缓存最近通讯的leader节点
	mu       sync.Mutex // Clerk的互斥锁
}

func nrand() int64 {
	max := big.NewInt(int64(1) << 62)
	bigx, _ := rand.Int(rand.Reader, max)
	x := bigx.Int64()
	return x
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// You'll have to add code here.
	ck.clientId = nrand()
	return ck
}

// Get
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
	// You will have to modify this function.
	args := GetArgs{
		Key:      key,
		ClientId: ck.clientId,
		SeqId:    atomic.AddInt64(&ck.seqId, 1),
	}

	DPrintf("Client[%d] Get starts, Key=%s ", ck.clientId, key)

	ck.mu.Lock()
	leaderId := ck.leaderId
	ck.mu.Unlock()

	for { // 轮询重试,注意每次轮询不会改变Key、ClientId、SeqId
		reply := GetReply{}
		if ck.servers[leaderId].Call("KVServer.Get", &args, &reply) {
			if reply.Err == OK { // 命中
				return reply.Value
			} else if reply.Err == ErrNoKey { // 不存在
				return ""
			}
		}
		// 没找到leader,重新选择leader
		ck.mu.Lock()
		leaderId = (leaderId + 1) % len(ck.servers)
                ck.leaderId = leaderId
		ck.mu.Unlock()

		time.Sleep(1 * time.Millisecond)
	}
}

// PutAppend
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) {
	// You will have to modify this function.
	args := PutAppendArgs{
		Key:      key,
		Value:    value,
		Op:       op,
		ClientId: ck.clientId,
		SeqId:    atomic.AddInt64(&ck.seqId, 1),
	}

	DPrintf("Client[%d] PutAppend, Key=%s Value=%s", ck.clientId, key, value)

	ck.mu.Lock()
	leaderId := ck.leaderId
	ck.mu.Unlock()

	for { // 轮询重试,注意每次轮询不会改变Key、Value、Op、ClientId、SeqId
		reply := PutAppendReply{}
		if ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply) {
			if reply.Err == OK { // 成功
				break
			}
		}
		// 没找到leader,重新选择leader
		ck.mu.Lock()
		leaderId = (leaderId + 1) % len(ck.servers)
                ck.leaderId = leaderId
		ck.mu.Unlock()

		time.Sleep(1 * time.Millisecond)
	}
}

func (ck *Clerk) Put(key string, value string) {
	ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
	ck.PutAppend(key, value, "Append")
}

重点

GetPutAppend内部主要是构造请求参数然后通过RPC调用服务端的处理程序。重点关注那个for循环,本质上是自动重试,直到返回OK/ErrNoKey则跳出循环完成一次操作。

每次重试都会对leaderId进行更新:leaderId = (leaderId + 1) % len(ck.servers),同时更新ck.leaderId = leaderId以便下次请求可以直接找到目标节点。

导致重试的原因有很多:

  • 访问到了follower身份的service节点会导致raft层的Start函数返回return -1, -1, false从而判断不是Leader节点,service会返回如下并导致client端重试:
if !isLeader {
		reply.Err = ErrWrongLeader
		return
	     }
  • Service中的Handle有一个2秒的计时器,超过两秒还没有反馈也会导致重试:
        timer := time.NewTimer(2000 * time.Millisecond)
        ...
	case <-timer.C: // 如果2秒都没提交成功,让client重试
		reply.Err = ErrWrongLeader
	}
  • 注意每次for循环并不会改变其他参数如seqId,所以每次的请求id都是一样的,这个是后面幂等性的重要判定手段!

服务端

代码

package kvraft

import (
	"../labgob"
	"../labrpc"
	"../raft"
	"sync"
	"sync/atomic"
	"time"
)

const (
	OP_TYPE_PUT    = "Put"
	OP_TYPE_APPEND = "Append"
	OP_TYPE_GET    = "Get"
)

type Op struct {
	// Your definitions here.
	// Field names must start with capital letters,
	// otherwise RPC will break.
	Index    int    // 写入raft log时的index
	Term     int    // 写入raft log时的term
	Type     string // PutAppend, Get
	Key      string
	Value    string
	SeqId    int64
	ClientId int64
}

// OpContext 等待Raft提交期间的Op上下文, 用于唤醒阻塞的RPC
type OpContext struct {
	op        *Op
	committed chan byte

	wrongLeader bool // 因为index位置log的term不一致, 说明leader换过了
	ignored     bool // 因为req id过期, 导致该日志被跳过

	// Get操作的结果
	keyExist bool
	value    string
}

type KVServer struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	kvStore map[string]string  // kvStore存储应用层状态
	reqMap  map[int]*OpContext // reqMap存储正在进行中的RPC调用,(log index, 请求上下文)
	seqMap  map[int64]int64    // seqMap记录每个clientId已提交的最大请求ID以便做写入幂等性判定,(客户端id, 客户端seq)
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	// Your code here.
	reply.Err = OK

	op := &Op{
		Type:     OP_TYPE_GET,
		Key:      args.Key,
		ClientId: args.ClientId,
		SeqId:    args.SeqId,
	}

	// 写入raft层
	var isLeader bool
	op.Index, op.Term, isLeader = kv.rf.Start(op)
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	opCtx := &OpContext{
		op:        op,
		committed: make(chan byte),
	}

	func() {
		kv.mu.Lock()
		defer kv.mu.Unlock()

		// 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试
		kv.reqMap[op.Index] = opCtx
	}()

	// RPC结束前清理上下文
	defer func() {
		kv.mu.Lock()
		defer kv.mu.Unlock()
		if one, ok := kv.reqMap[op.Index]; ok {
			if one == opCtx {
				delete(kv.reqMap, op.Index)
			}
		}
	}()

	timer := time.NewTimer(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case <-opCtx.committed: // 阻塞等待kv完成apply该操作
		if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新读取
			reply.Err = ErrWrongLeader
		} else if !opCtx.keyExist { // key不存在
			reply.Err = ErrNoKey
		} else {
			reply.Value = opCtx.value // 返回值
		}
	case <-timer.C: // 如果2秒都没提交成功,让client重试
		reply.Err = ErrWrongLeader
	}
}

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	reply.Err = OK

	op := &Op{
		Type:     args.Op,
		Key:      args.Key,
		Value:    args.Value,
		ClientId: args.ClientId,
		SeqId:    args.SeqId,
	}

	// 写入raft层
	var isLeader bool
	op.Index, op.Term, isLeader = kv.rf.Start(op)
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	opCtx := &OpContext{
		op:        op,
		committed: make(chan byte),
	}

	func() {
		kv.mu.Lock()
		defer kv.mu.Unlock()

		// 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试
		kv.reqMap[op.Index] = opCtx
	}()

	// RPC结束前清理上下文
	defer func() {
		kv.mu.Lock()
		defer kv.mu.Unlock()
		if one, ok := kv.reqMap[op.Index]; ok {
			if one == opCtx {
				delete(kv.reqMap, op.Index)
			}
		}
	}()
	timer := time.NewTimer(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case <-opCtx.committed: // 阻塞等待kv完成apply该操作
		if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新写入
			reply.Err = ErrWrongLeader
		} else if opCtx.ignored {
			// 说明req id过期了,该请求被忽略,对MIT这个lab来说只需要告知客户端OK跳过即可
		}
	case <-timer.C: // 如果2秒都没提交成功,让client重试
		reply.Err = ErrWrongLeader
	}
}

// Kill
// the tester calls Kill() when a KVServer instance won't
// be needed again. for your convenience, we supply
// code to set rf.dead (without needing a lock),
// and a killed() method to test rf.dead in
// long-running loops. you can also add your own
// code to Kill(). you're not required to do anything
// about this, but it may be convenient (for example)
// to suppress debug output from a Kill()ed instance.
func (kv *KVServer) Kill() {
	atomic.StoreInt32(&kv.dead, 1)
	kv.rf.Kill()
	// Your code here, if desired.
}

func (kv *KVServer) killed() bool {
	z := atomic.LoadInt32(&kv.dead)
	return z == 1
}

// StartKVServer
// servers[] contains the ports of the set of
// servers that will cooperate via Raft to
// form the fault-tolerant key/value service.
// me is the index of the current server in servers[].
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
// the k/v server should snapshot when Raft's saved state exceeds maxraftstate bytes,
// in order to allow Raft to garbage-collect its log. if maxraftstate is -1,
// you don't need to snapshot.
// StartKVServer() must return quickly, so it should start goroutines
// for any long-running work.
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	labgob.Register(&Op{})

	kv := new(KVServer)
	kv.me = me
	kv.maxraftstate = maxraftstate

	// You may need initialization code here.

	// kv.applyCh 就是raft层用到的applyCh,两层数据通讯就通过这个通道实现
	kv.applyCh = make(chan raft.ApplyMsg, 1)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)

	kv.kvStore = make(map[string]string)
	kv.reqMap = make(map[int]*OpContext)
	kv.seqMap = make(map[int64]int64)

	go kv.applyLoop()

	return kv
}

// applyLoop 不断监听raft层的已经apply的日志,并处理
// 如果是写操作则判定Op的sequence id是否过期,过期则跳过,否则生效到kvStore。
// 读操作则将kvStore此时的k=v保存到opCtx,并唤起阻塞的RPC。
func (kv *KVServer) applyLoop() {
	for !kv.killed() {
		select {
		case msg := <-kv.applyCh:
			cmd := msg.Command
			index := msg.CommandIndex

			func() {
				kv.mu.Lock()
				defer kv.mu.Unlock()

				// 操作日志
				op := cmd.(*Op)

				opCtx, existOp := kv.reqMap[index]
				prevSeq, existSeq := kv.seqMap[op.ClientId]
				kv.seqMap[op.ClientId] = op.SeqId

				if existOp { // 存在等待结果的RPC, 那么判断状态是否与写入时一致
					if opCtx.op.Term != op.Term {
						opCtx.wrongLeader = true
					}
				}

				// 只处理ID单调递增的客户端写请求
				if op.Type == OP_TYPE_PUT || op.Type == OP_TYPE_APPEND {
					if !existSeq || op.SeqId > prevSeq { // 如果是递增的请求ID,那么接受它的变更,对于重试的操作就不需要处理了,幂等
						if op.Type == OP_TYPE_PUT { // put操作
							kv.kvStore[op.Key] = op.Value
						} else if op.Type == OP_TYPE_APPEND { // put-append操作
							if val, exist := kv.kvStore[op.Key]; exist {
								kv.kvStore[op.Key] = val + op.Value
							} else {
								kv.kvStore[op.Key] = op.Value
							}
						}
					} else if existOp {
						opCtx.ignored = true
					}
				} else { // OP_TYPE_GET
					if existOp {
						opCtx.value, opCtx.keyExist = kv.kvStore[op.Key]
					}
				}
				DPrintf("RaftNode[%d] applyLoop, kvStore[%v]", kv.me, len(kv.kvStore))

				// 唤醒Get/PutAppend的 case <-opCtx.committed: 这行之后的代码,从而响应客户端的RPC
				if existOp {
					opCtx.committed <- 1
				}
			}()
		}
	}
}

重点

RPC handler

这里的handler也就是Get和PutAppend两个函数,这里统一讲一下,具体区别不大,自己看代码。主要关注这几个点:

  • handler内并不直接交互存取kvstore这个map
  • 写入raft日志通过这行代码体现op.Index, op.Term, isLeader = kv.rf.Start(op),follower节点直接会拒掉
  • 期间会保存一个kv.reqMap[op.Index] = opCtx这个opCtx会在后面applyLoop中使用到
  • 结束时候注意清理掉这个opCtx:delete(kv.reqMap, op.Index)
  • case <-opCtx.committed: // 阻塞等待kv完成apply该操作到这个地方handler就等applyLoop中操作最终结果了
  • 当opCtx.committed有值后就会停止阻塞,继续执行返回结果给Clerk
  • 2秒到几计时器到期也会导致程序返回结果

StartKVServer

这个函数主要是初始化服务器并启动applyLoop协程,测试程序会调用并创建服务器节点。

kv.rf = raft.Make(servers, me, persister, kv.applyCh)这行代码可以看出kv和rf的强绑定,rf通过applyCh向应用层传输

applyLoop

这个函数主要作用就是监听applyCh通道内有没有新数据,raft协议层的applyLoop会往applyCh内传入最新的apply日志。重点如下:

  • 这里会根据拿到的已经apply的日志的index去reqMap找有没有对应的OpCtx,后面要用得到
  • prevSeq, existSeq := kv.seqMap[op.ClientId]获取该客户端上一个seqId后面通过op.SeqId > prevSeq来进行幂等判别,重试的请求会不通过
  • opCtx.op.Term != op.Term上下文的term会和刚拿到的term比对,不等说明期间已经重新选举,需要让客户端重试
  • 为了线性一致性,把读操作也作为一条log写入raft等待提交后再响应RPC
  • opCtx.value, opCtx.keyExist = kv.kvStore[op.Key]Get操作的返回Value依托opCtx来传递
  • opCtx.committed <- 1唤醒handler继续执行并返回客户端
Clone this wiki locally