前言
Raft作为一个简单的一致性算法,实现一下还是挺好玩的。代码基于6.824 lab-raft,6.824是麻省理工的分布式课程的一个编号,里面有4个lab,第二个就是raft协议的实现,第三个是基于raft协议的kv存储设计,有待实现(oh我居然在做麻省理工的课程设计)。该lab要求使用go实现算法,并提供了一个具有故障模拟功能的RPC,即通过模拟网络,在单台机器我们就可以运行raft算法。
做实验前,你应该熟读raft论文,这里是中文版
实现
参照raft论文和lab提示,整体利用channel作为事件驱动、mutex保证线程安全,写出一个raft算法骨架还是比较容易的。不过在跑test的时候,小小的细节不对就会导致test failed
。
raft-lab提供了17个test,检验了各种情况下的一致性,模拟了各种奇葩网络变化(网络变成这样还是跑路吧),要求4分钟内pass。
数据结构
参照论文,定义几个数据结构
const (
Follower = iota
Candidate
Leader
HeartbeatInterval = 100 * time.Millisecond
)
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
type LogEntry struct {
Term int
Command interface{}
}
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
NextIndex int
}
type Raft struct {
currentTerm int
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
state int // 0:Follower 1:Candidate 2:Leader
votedFor int // 这个实验用index来代替节点
voteCount int
commitIndex int
lastApplied int
currentLeaderId int
log []LogEntry
nextIndex []int
matchIndex []int
applyCh chan ApplyMsg
heartbeatCh chan bool
leaderCh chan bool
commitCh chan bool
}
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateId int // 这个实验用index来代替节点
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
// Your data here (2A).
VoteGranted bool // 是否支持
Term int
}
小声bb,AppendEntriesReply
论文是没有返回nextIndex
的,而是由leader自己去减一重试,这其实是比较慢的,在设置了网络故障unreliable的test中,单纯的减一重试会导致raft集群在一定时间内不能达到一致。让follower过滤掉同一个term的index,并返回应该尝试的nextIndex
,虽然会导致一次复制的日志变多,不过提高了集群达到一致的速度。
一些封装
// 获取锁/释放锁的封装,可以在利用`runtime.Caller`打印获取锁的调用点,虽然性能损失比较大。
func (rf *Raft) Lock() {
rf.mu.Lock()
}
func (rf *Raft) Unlock() {
rf.mu.Unlock()
}
// 字如其名
func (rf *Raft) getLastLogTerm() int {
return rf.log[len(rf.log)-1].Term
}
func (rf *Raft) getLastLogIndex() int {
return len(rf.log) - 1
}
raft实例初始化
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = -1
rf.currentLeaderId = -1
// 初始化空白日志
rf.log = append(rf.log, LogEntry{Term: 0})
rf.applyCh = applyCh
rf.heartbeatCh = make(chan bool, 100)
rf.leaderCh = make(chan bool, 100)
rf.commitCh = make(chan bool, 100)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// 初始化随机数资源库
rand.Seed(time.Now().UnixNano())
go func() {
for {
switch rf.state {
case Follower:
select {
case <-rf.heartbeatCh:
// 这是lab要求心跳
case <-time.After(time.Duration(rand.Int63()%333+550) * time.Millisecond):
rf.state = Candidate
}
case Leader:
rf.broadcastAppendEntries()
time.Sleep(HeartbeatInterval)
case Candidate:
go rf.broadcastVote()
select {
case <-time.After(time.Duration(rand.Int63()%300+500) * time.Millisecond): //随机投票超时是必须的,为了防止票被瓜分完。
case <-rf.heartbeatCh:
rf.state = Follower
case <-rf.leaderCh:
}
}
}
}()
go func() {
for {
<-rf.commitCh
rf.applyMsg(applyCh)
}
}()
return rf
}
这个方法返回一个raft实例,读取持久化数据,起了两个goroutine。
goroutine1是raft三种状态的转化,这里的超时时间不宜设的太短(太短指论文里的时间),在lab文档里有指出为了配合test,选举超时时间应该larger than the paper’s 150 to 300 milliseconds
goroutine2应用已提交日志。
在初始化channel的时候应该设置缓冲大于1。多余的事件并不会导致系统不一致,但是若由于channel缓冲不够而导致阻塞,就会使raft节点死锁。
votedFor清空时机
一次rpc,无论是发起端还是接收端,只要收到更大的term,就要调整自己的状态,发生下面变化:
rf.votedFor = -1
rf.state = Follower
rf.currentTerm = remoteTerm
可以看到state
会变成Follower
。
假设一种情景,ABC三个节点下,A为leader,此时C发生分区,那么C一定会不断循环进行超时选举,C的term会一直增大,当C网络恢复重新加入集群后会继续发投票请求rpc。由于C的投票请求rpc中的term
较大,集群就会调整currentTerm
以及state
,已有leader会废掉。而问题是,C的请求投票是无意义的,却使集群进行了一次选举。针对这个问题有个preVote方案,就是在投票前调研一下自己是否有投票必要,如果没必要,就不发起投票。这篇文章暂无涉及preVote。
投票发起与接收
broadcastVote() 发起投票
func (rf *Raft) broadcastVote() {
rf.Lock()
rf.currentTerm++
rf.voteCount = 1
rf.votedFor = rf.me
vote := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.getLastLogIndex(),
LastLogTerm: rf.getLastLogTerm(),
}
rf.persist()
rf.Unlock()
for i := 0; i < len(rf.peers); i++ {
if rf.state != Candidate { // 发送
break
}
if vote.CandidateId == i { // 自己的票已经给自己了
continue
}
go func(server int) {
var reply RequestVoteReply
ok := rf.sendRequestVote(server, vote, &reply)
if !ok {
return
}
rf.Lock()
defer rf.Unlock()
// 一般来说,reply.Term > rf.currentTerm 的情况下 reply.VoteGranted 不会为true
if reply.Term > rf.currentTerm {
rf.votedFor = -1
rf.state = Follower
rf.currentTerm = reply.Term
rf.persist()
}
if reply.VoteGranted {
rf.voteCount++
if rf.state == Candidate && rf.voteCount > len(rf.peers)/2 {
rf.becomeLeader()
}
}
}(i)
}
}
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
在接收到投票reply后,查看票根是否过半,如果过半转化为leader。
// 没有加锁,外部调用已经加锁了
func (rf *Raft) becomeLeader() {
rf.state = Leader
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
// 初始化为0
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = rf.getLastLogIndex() + 1
}
rf.leaderCh <- true // 结束选举阻塞
}
RequestVote 接收投票
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.Lock()
defer rf.Unlock()
defer rf.persist()
// Your code here (2A, 2B).
reply.VoteGranted = false
reply.Term = rf.currentTerm
// 过期的投票请求
if rf.currentTerm > args.Term {
return
}
// 如果发起方的term比接收方大
// adjust current term
if rf.currentTerm < args.Term {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1
}
upToDate := false
if args.LastLogTerm > rf.getLastLogTerm() {
upToDate = true
}
if args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex >= rf.getLastLogIndex() {
upToDate = true
}
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && // 保证有票
upToDate {
reply.VoteGranted = true
rf.state = Follower
rf.votedFor = args.CandidateId
rf.heartbeatCh <- true
return
}
}
1,进行投票后要发送心跳rf.heartbeatCh <- true
,不然节点会由Follower
超时,从而使集群选举循环下去。
2,判断日志是否较新要满足其中一个条件:一,term较大,二,term一样,但日志index比较大
日志复制与接收
broadcastAppendEntries 广播日志/心跳
日志复制lab文档要求一秒不能超过10次。
func (rf *Raft) broadcastAppendEntries() {
rf.Lock()
defer rf.Unlock()
N := rf.commitIndex
for i := rf.commitIndex + 1; i <= rf.getLastLogIndex(); i++ {
// 1 是leader本身
num := 1
for j := range rf.peers {
// 只能提交本term的,一旦提交了本term的,旧term也算提交了
if rf.me != j && rf.matchIndex[j] >= i && rf.log[i].Term == rf.currentTerm {
num++
}
}
if num > len(rf.peers)/2 {
N = i
}
}
if N != rf.commitIndex {
rf.commitIndex = N
rf.commitCh <- true
}
for i := 0; i < len(rf.peers); i++ {
if rf.state != Leader {
break
}
if i == rf.me { // 不用给自己心跳
continue
}
var args AppendEntriesArgs
args.Term = rf.currentTerm
args.LeaderCommit = rf.commitIndex
args.LeaderId = rf.me
args.PrevLogIndex = rf.nextIndex[i] - 1
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
args.Entries = make([]LogEntry, len(rf.log[args.PrevLogIndex+1:]))
// 复制
copy(args.Entries, rf.log[args.PrevLogIndex+1:])
go func(i int, args AppendEntriesArgs) {
var reply AppendEntriesReply
ok := rf.sendAppendEntries(i, &args, &reply)
if !ok {
return
}
rf.handleAppendEntriesReply(&args, &reply, i)
}(i, args)
}
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
每次发送日志前,leader从matchIndex[]
里统计出应该commit的index,如果index前进,发送commit事件。统计时要判断rf.log[i].Term == rf.currentTerm
,也就是说只能提交自己term的log,一旦提交了自己term的log,之前term未被提交的log也算提交了。这个在论文有提到。
下面是复制日志的响应代码,也很直白。
// reply 为 false, 如果不是任期问题,就是日志不匹配
func (rf *Raft) handleAppendEntriesReply(args *AppendEntriesArgs, reply *AppendEntriesReply, i int) {
rf.Lock()
defer rf.Unlock()
if rf.state != Leader { // 获取锁后校验自己的状态
return
}
if args.Term != rf.currentTerm {
return
}
if reply.Term > rf.currentTerm {
rf.votedFor = -1
rf.currentTerm = reply.Term
rf.state = Follower
rf.persist()
return
}
if reply.Success {
// len(args.Entries) == 0 就是心跳了,不用处理
if len(args.Entries) > 0 {
rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[i] = rf.matchIndex[i] + 1
}
} else {
rf.nextIndex[i] = reply.NextIndex // 直接采用follower的建议
}
}
AppendEntries 接收日志
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.Lock()
defer rf.Unlock()
defer rf.persist()
reply.Success = false
// 告诉老term的节点该更新啦
if rf.currentTerm > args.Term {
reply.Term = rf.currentTerm
return
}
// 心跳
rf.heartbeatCh <- true
// adjust current term
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1
}
reply.Term = rf.currentTerm
// 这坨是在日志不匹配的情况下,对leader的NextIndex建议
if rf.getLastLogIndex() < args.PrevLogIndex {
reply.NextIndex = rf.getLastLogIndex() + 1
return
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
term := rf.log[args.PrevLogIndex].Term
if args.PrevLogTerm != term {
for i := args.PrevLogIndex - 1; i >= 0; i-- {
if rf.log[i].Term != term {
reply.NextIndex = i + 1
break
}
}
return
}
}
reply.Success = true
reply.NextIndex = rf.getLastLogIndex() + 1
// 删除已存在日志
rf.log = rf.log[:args.PrevLogIndex+1]
// 附加新日志
rf.log = append(rf.log, args.Entries...)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = Min(args.LeaderCommit, rf.getLastLogIndex())
rf.commitCh <- true
}
return
}
这理主要是NextIndex
建议值的计算。
将提交的日志应用至状态机
func (rf *Raft) applyMsg(applyCh chan ApplyMsg) {
rf.Lock()
defer rf.Unlock()
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
msg := ApplyMsg{
true,
rf.log[i].Command,
i,
}
applyCh <- msg
rf.lastApplied = i
}
}
应用过程其实是由test去管理的,我们只要负责把需要应用的日志放入chan ApplyMsg
。
持久化
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
w := new(bytes.Buffer)
e := gob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
d.Decode(&rf.currentTerm)
d.Decode(&rf.votedFor)
d.Decode(&rf.log)
}
两个持久化函数,持久化了currentTerm
当前term,votedFor
得票者,log
日志数组,当这三个属性变化时,都执行一次rf.persist()
就没错啦。
后记
表面是在贴代码,实际就是在贴代码。
由于实验是并发过程,一旦test failed
是不容易按线性的过程来分析的。我的方法是多打日志,以及利用net/http/pprof
包对程序的goroutine、mutex状态进行分析。
实现完以后我感觉又变强了(并没有)。