We are the champions my friend
And we'll keep on fighting till the end
We are the champions
——We Are The Champions
完整代码见: GitHub - SnowLegend-star/6.824: As we advance, the trials grow ever more arduous, and now we stand before an even mightier summit, one that demands our highest prowess and unwavering resolve to conquer.
还是那句话,尽量只是参考思路而不是照抄
这次的Lab和Lab2确实有点类似,不同之处是Put和Append都不应向客户端返回值。这里不返回值指的是忽略Value,但是可以设置Err字段来确保操作的成功处理。
令人辛酸的是貌似我踩完了这个Lab所有的坑,从最粗糙的方法一步步改进直到最后构架出一份几乎无懈可击的代码,可谓是筚路蓝缕
下面来看具体的实验要求。
Part A: Key/value service without snapshots (moderate/hard)
实验说明的开头就有点令人费解——Clerks将Put()、Append()和Get()的RPC发送到与Raft节点关联的kvserver的leader。我们先弄清楚Client和Clerk和区别,然后再分析Clerk请求处理流程。
- 某个Client A通过它的抽象Clerk A 向kvserver发送操作请求operation
- Kvserver中的某个Server B接收到了这个operation。由于kvraft中和Server和raft中的Server一一对应,所以这个Server B在raft中的peer我们记为Server B1。接下来就有两种情况
- Server B1是raft中的Follower,Server B也是kvraft中的Follower。则Server B会拒绝这个operation,Clerk A1收到错误信息Err会继续向其他kvserver发送operation
- Server B2是Leader,Server B也是Leader。则Server B会接收这个operation,处理完毕后向Clerk A1返回相应的Value和Err
具体的代码实现主要Clerk和Server。我们先分析Clerk端的实现。
Clerk端和Lab02的Clerk端实现大同小异——如果向kvserver发送operation返回失败,就继续向下一个kvserver继续发送。值得注意的一点是:我们该用nrand()来给谁生成唯一的id。给每个operation都生成一个唯一id?还是给每个Client对于的Clerk都生成唯一的id呢?这正是我在上一个lab中埋下的伏笔。
Raft的作者在论文的6.3节中其实给出了可行的方案。我们可以利用(CliendId, CommandIndex)来确定一条唯一的operation。当然,为每条operation都生成唯一的id也是可以通过part A的,但是有个隐患,在此先按下不表。我一开始没看到这张图,用的方法就是后者。
Clerk端的实现还有一个小tip——记录上一次成功通信的Leader。这样就不必每次都从kvserver 0进行尝试了,虽然测试点一共就设置5个kvserver。
然后就是重点了——如何实现Server端?先梳理kvraft处理operation的流程
Get()、Put()、Append()的大致框架就不再赘述了,记得加上去重判定。
- kvserver收到来自Clerk的operation后,会立即把operation提交给下一层的raft进行一致性同步。由于Clerk可能会提交重复的operation,所以需要记录operation完成情况的映射opCompleteState来去重。
- Raft对这个operation的同步结束后,会把它提交给kvserver。
- Kvserver接收这个operation后,会在自己的kvstorage上应用这个operation。注意,raft提交的operation也可能是重复的。
会遇到的第一个难点就是#3。在kvserver等待raft传回的msg时,该选择阻塞式等待还是利用goroutine进行并行等待?很显然应该实现后者。“Test: ops complete fast enough (4A)”这个测试点就是测试并行等待的实现。我最初也是抱着试一试的心态实现阻塞式等待,果不其然连“TestBasic4A”这个最初的测试点也没能通过。
如果并行执行,假设kvserver发送了operation X给raft进行同步,那如何确保kvserver正确收到raft同步完成之后的operation X?毕竟在kvserver(身份为Leader)等待raft返回X的期间,Clerk又持续发送了许多不同的operation。首先,我们可以利用channel实现并行等待。又利用map进行适当映射,确保每个channel和operation一一对应。
由于我们需要用到kv.rf.Start(operation)来获取raft中log的最新index,将这个index作为key,operation绑定的channel作为value,以此来维护一张map。
func (kv *KVServer) waitForResult(indexOld int) ResultFromRaft {kv.mu.Lock()//如果没有这个Ch就创建一个if _, ok := kv.reslutCh[indexOld]; !ok {kv.reslutCh[indexOld] = make(chan ResultFromRaft)}waitCh := kv.reslutCh[indexOld]kv.mu.Unlock()select {case result := <-waitCh:return resultcase <-time.After(time.Millisecond * 1000):return ResultFromRaft{Err: ErrTimeout}}
}
这段并行等待的代码我愿称之为画龙点睛之笔。实现并行等待的另一个重要部分是——kvserver如何接收来自raft提交的msg。我们可以模仿Lab03,创建一个goroutine来监听raft的提价情况。
func (kv *KVServer) applier() {for !kv.killed() {// var msg raft.ApplyMsgselect {case msg := <-kv.applyCh:if msg.CommandValid {op, ok := msg.Command.(Op) //将command重新转化为Op格式的内容if !ok {DPrintf("command转化为Op失败")}}}
}
结合applier()和waitForResult()两个函数就可以大致构建出并行等待的框架了。我还画了个流程图,可以更加直观地理解这两个函数的相互作用。
还有一点就是raft提交的operation是否过期的问题,我采用是“注意Raft的任期已改变”这一方案。这个问题一开始可能会让人摸不着脑袋,举个例子:
- 当 Clerk 发出请求并且 Leader 调用了 Start(),此时 Leader 将请求放入日志并等待日志复制和提交。然而,Leader 可能在完成这些步骤之前失去领导地位(例如,可能因为选举超时、网络分区等原因)。
这种情况下,Clerk 的请求没有完全处理好,新的 Leader 还未接管该请求并将其成功提交。因此,需要处理这种“前 Leader 丢失领导地位”的情况。
实验说明的hint给出了两种可行的方法:
- Raft 任期(Term)已改变:当节点发现自己的任期(Term)变得比其他节点的任期小(例如通过心跳或投票请求),它会意识到自己不再是 Leader,并返回一个错误状态,通知 Clerk 请求失败。
- 日志索引中出现了不同的请求:如果在 Start() 返回的索引处出现了不同的请求,说明这个索引被新 Leader 覆盖了,前 Leader 的日志项可能已经被放弃或覆盖,因此该请求没有被成功提交。
说实话上述两个方法还是显得有点晦涩,但是实现起来只需要几行代码即可解决。我这里选择的是方法1。如果raft中的Leader在对operation进行一致性同步的时候失去了领导地位,那这条operation就不再会被提交了,这种情况其实我们已经处理过。kvserver中 的Leader由于长时间没有收到这个operation的提交,会进行超时处理。
我们真正要考虑的情况是:raft中的Leader把operation提交诶kvraft层面了,但是此时它因为某种缘故变成了Follower,那么kvraft收到的operation就不应该被应用并返回给Clerk。如果kvserver应用了这个operation,但是因为网络波动reply过了很长时间才返回给Clerk。Clerk可能会先进行超时处理,再次发送这个operation给kvserver中新的Leader。但是一段时间后Clerk又会重新收到这个迟到的reply,导致Clerk最终接收两次operation的结果。在我的实现中Clerk不会收到旧的reply。
这里可以直接用continue的原因:waitForResult()中对应index的channel一直在等待applier()向channel传入内容。如果直接进行continue,就会把过期的msg直接丢弃,applier()也不会将这个msg传入channel中,所以waitForResult()会出现超时等待。
Part A最后一个难点就是实现——如何在kvraft层面实现数据的一致性同步。我一开始理解错了实验说明给出的hint,还以为kvserver中的Leader只能接收raft中Leader提交的msg,忽略所有Follower提交的msg。真正的解决方法是让所有kvserver接收来自raft的peer所提交上来的msg,并把它们应用于各自的kvstorage。至于Get()方法也一样应用,只不过非Leader的kvservers可以丢弃Get()获得的value,只有Leader可以才可以返回处理结果value和Err。
分析理解了上述三个难点,我们可以顺利通过4A的第一个测试了,但是一般会倒在第二个测试点“Test: ops complete fast enough (4A)”。究其原因就是在raft的实现层面中,Leader是每隔固定的时间发送Heartbeat。如果Client一瞬间发送大量的command也不能立即发送Heartbeat进行一致性同步。应该修改的地方就是一旦Leader收到新的command就立即发送Heartbeat进行同步。
下面分享下我在完成Part A遇到的各种bug。
首先在applier()的实现上踩了大坑。
①前文提到,从raft提交的operation是可能重复的。存在这样一种情况:在某一时刻,大量Clients同时通过Clerk向kvserver发送大量op。其中包括操op X,但是raft没能在规定时间内处理完op X,于是一段时间后Clerk由于超时重发op X。注意此时kv.requestComplete[args.Identifier]=false,所以kvserver又会再次把op X提交给raft。
所以事实上由于一瞬间向raft提交的op数量过多,第一个op X已经被添加到Leader的Log中等待进行一致性同步。现在kvserver又向Leader 的Log中添加了op X。一段时间后,这两个op X都会由raft重新提交给kvserver。
②判断raft提交的msg合法性出现逻辑问题。我直接把逻辑判写在了最前面,这就导致了kvserver中只有Leader可以接收到来自raft的msg,而非Leader的kvserver会直接忽略它raft中的peer提交的msg。这样一旦出现了Leader的切换,新的Leader就无法得知旧Leader的kvstorage已经应用了哪些内容。
就像下面这样。最初所有的operation都由kvserver 3进行应用,而其他四个kvserver什么都不做。一旦kvserver 0成为了新Leader,它的kvstorage会从nil开始。
可恶的是当时我还没意识到这个逻辑判断存在的bug,想了个昏招——直接建立个全局的kvstorage应付了事。这样你还别说,几乎可以通过Part A了,就是不符合linearizability。而且引出了最棘手的bug。
单独测试TestPersistPartitionUnreliableLinearizable4A的时候没有任何问题,但是一旦结合其他测试点进行测试,TestPersistPartitionUnreliableLinearizable4A就无法满足linearizability。开始我还以为是偶然性问题,但是多次测试每次都是如此。这就不得不着手找出问题所在了。我猜测可能是多线程同步进行多种测试的时候,测试函数之间的全局变量globalKVStorage和globalRequestCompleteState产生了相互污染。
现在又回到了之前的交叉口——到底是选择全局变量还是想办法进行kvserver之间的数据同步。前者被否定了,我们再次把目光转向后者。
其实在TestPersistOneClient4A这个测试的时候,我就考虑数据同步的事情,但是没有深入思考就搁置了。
关于msg合法性判断可行性的思考——对于身份为Follower的kvservers,屏蔽Get(),保留Put()/Append()的辨析。
对于身份为Leader的kvserver,我们暂且忽略。对于普通的kvserver,由于Clerk不会与之通信,所以它们不具有reslutCh。Leader 提交给raft的op进行一致性同步完成后,raft中所有的Server都会向上提交。也就是说对于kvserver中的Follower,也会同步接收到peer提交过来的msg。此时,这些Follower也应该把msg转化为op应用到自己的StateMachine中。但是,op应用到StateMachine的结果应该直接丢弃?Get()的结果可以直接丢弃。Put()/Append()的操作一旦应用返回值就无关紧要了。
③关于程序卡住的小心得。一旦发现程序突然卡住,就往死锁的方向上靠,仔细检查自己在某处逻辑判断处是否及时释放了锁。
像这种遇到特殊条件需要返回的情况,一定要记得先释放锁在返回。这就是一个很容易忽略的点。
Part B: Key/value service with snapshots (hard)
完成Part A后,如果Lab03实现的raft不存在bug,那Part B简直就是手到擒来。很遗憾,我又成了倒霉蛋,raft存在的小bug搞得我汗流浃背。这里就不再赘述我再次和raft斗争的过程,希望你们看到这里的时候raft是浑然一体、无懈可击的masterpiece。
涉及到snapshot,我们首先即应该考虑应该保存哪些状态。参考在raft中实现snapshot的过程,显然本地存储kvstorage和operation完成情况的映射opCompleteState是都应该被保存的。别的状态是否需要保存存疑,就先尝试只保存这两个状态。
有趣的是我终于在这里理解了CommandValid和SnapshotValid真正的作用,之前一直不知道这两个变量到底怎么使用。
值得注意的一点是raft和kvraft在加载snapshot传入的参数是不一样的。之所以raft读入snapshot时需要第二个参数而kvsraft不需要,是因为所谓的第二个参数就是snapshotData。这个snapshotData是kvraft主动发送给raft进行保存的,对于kvraft来说就相当于一个临时变量,当然不用加载这个参数了。
SequenceNum提出的思考过程
还记得我在本文刚开始提到的两种方法来表示operation的唯一性吗?我用的就是为operation生成唯一的id。这种方法最终折戟于“Test: snapshot size is reasonable (4B) "snapshot too large 9497, should not be used when maxraftstate = 500"”
如果利用operation的唯一id作为opCompletetState的key,那opCompleteState的大小会随着operation数量的增多而线性递增。一旦operation数量过多,那就要花费大量空间来存储opCompleteState。
我们仔细观察Clerk端发送operation的过程:前文提到每个Client都利用自己包装后的Clerk与kvraft进行通信。虽然不同Clerk可能并发访问kvraft,但是在每个Clerk内部发送operation却是线性的。
举个例子,有A B C三个Clerk与kvraft进行通信。A B C可能同时向kvraft发送operation,这是并发现象。考察单个Clerk A,它开始向kvraft 发送了operation A0,在operation A0被成功处理前,它不可以再发送相同类型(记住这一会儿要考的)的operation A1。
通过观察以上现象,我们为每个Clerk设置一个递增序列commandIndex就变得很自然了。对于某个commandIndex,任何比它小的commandIndex都是已经被处理过的。在Server端,或许我们只保存一个最后被应用的commandIndex就可以了。任何小于此commandIndex都视作重复op。
自此,opCompleteState的大小就可以被压缩到一个理想的值了。将不同Clerk的id作为key,将Clerk发送的operation附带的commandIndex作为value。对于任意operation,先获取它的ClientId,再比较它的commandIndex和opCompleteState[ClientId]的大小即可达成去重的判断。
美中不足的是采用这种方案还有一个漏洞。前面说道对于每个Clerk,上一个operation处理完之前不能继续处理下一个同类型的operation。但是却存在这样一种情况,Clerk A发送第一个操作Get(),用(ClientId=A, commandIndex=0)来标识这个operation。由于此时kvserver的kvstorage为空,将会返回Err=“NoErrkey”。Clerk A如果又接着发送第二个操作Put(),那它的标识符也是(ClientId=A, commandIndex=0)。Clerk中是单线程操作的,如果卡在Get()就不可以同时调用Put()/Append()。
最后补充下Part B遇到的小bug。
①args.PrevLogIndex-rf.lastIncludedIndex<0。说明Follower的snapshot已经应用了,但是Leader的PrevLogIndex还没有更新。这种情况直接返回Success就行。
②类似的问题。Follower的snapshot要比Leader发送的这个snapshot更新。
说实话由于踩到了所有的坑,这个Lab我的完成时间还是相当长的。不停地重构代码简直要了我的老命。