一、总结框架图对于Leader选举,其总体框架图如下图所示
说明: 选举的父接口为Election,其定义了lookForLeader和shutdown两个方法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。 AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,其在3.4.0之后的版本中已经不建议使用。 FastLeaderElection,其是标准的fast paxos算法的实现,基于TCP协议进行选举。 LeaderElection,也表示一种选举算法,其在3.4.0之后的版本中已经不建议使用。二、Election源码分析 说明:可以看到Election接口定义的方法相当简单。三、FastLeaderElection源码分析2.1 类的继承关系 说明:FastLeaderElection实现了Election接口,其需要实现接口中定义的lookForLeader方法和shutdown方法,其是标准的Fast Paxos算法的实现,各服务器之间基于TCP协议进行选举。2.2 类的内部类FastLeaderElection有三个较为重要的内部类,分别为Notification、ToSend、Messenger。1. Notification类 说明:Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息,其buildMsg方法将选举信息封装至ByteBuffer中再进行发送。2. ToSend类 说明:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息。3. Messenger类3.1 类的内部类Messenger包含了WorkerReceiver和WorkerSender两个内部类3.1.1 WorkerReceiver 说明:WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。其是QuorumCnxManager的Message转化为FastLeaderElection的Notification。 其中,WorkerReceiver的主要逻辑在run方法中,其首先会从QuorumCnxManager中的recvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,其流程如下 若包含该服务器,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification,然后判断当前服务器是否为LOOKING,若为LOOKING,则直接将Notification放入FastLeaderElection的recvqueue(区别于recvQueue)中。然后判断投票服务器是否为LOOKING状态,并且其选举周期小于当前服务器的逻辑时钟,则将本(当前)服务器的内部投票发送给该服务器,否则,直接忽略掉该投票。其流程如下 若本服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,起流程如下 3.1.2 WorkerSender说明:WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。3.2 类的属性说明:Messenger中维护了一个WorkerSender和WorkerReceiver,分别表示选票发送器和选票接收器3.3 类的构造函数 说明:会启动WorkerSender和WorkerReceiver,并设置为守护线程。2.3 类的属性 说明:其维护了服务器之间的连接(用于发送消息)、发送消息队列、接收消息队列、推选者的一些信息(zxid、id)、是否停止选举流程标识等。2.4 类的构造函数 说明:构造函数中初始化了stop字段和manager字段,并且调用了starter函数,其源码如下 说明:其完成在构造函数中未完成的部分,如会初始化FastLeaderElection的sendqueue和recvqueue,并且启动接收器和发送器线程。2.5 核心函数分析1. sendNotifications函数 说明:其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。2. totalOrderPredicate函数 说明:该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。3. termPredicate函数说明:该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。4. checkLeader函数说明:该函数检查是否已经完成了Leader的选举,此时Leader的状态应该是LEADING状态。5. lookForLeader函数说明:该函数用于开始新一轮的Leader选举,其首先会将逻辑时钟自增,然后更新本服务器的选票信息(初始化选票),之后将选票信息放入sendqueue等待发送给其他服务器,其流程如下 之后每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现无法获取到任何外部投票,就立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票,其流程如下 在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理。 外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理。外部投票的选举轮次等于内部投票。此时可以开始进行选票PK,如果消息中的选票更优,则需要更新本服务器内部选票,再发送给其他服务器。之后再对选票进行归档操作,无论是否变更了投票,都会将刚刚收到的那份外部投票放入选票集合recvset中进行归档,其中recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票,然后开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,然后再进行最后一次确认,判断是否又有更优的选票产生,若无,则终止投票,然后最终的选票,其流程如下若选票中的服务器状态为FOLLOWING或者LEADING时,其大致步骤会判断选举周期是否等于逻辑时钟,归档选票,是否已经完成了Leader选举,设置服务器状态,修改逻辑时钟等于选举周期,返回最终选票,其流程如下
Java
运行代码复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if(n.electionEpoch == logicalclock){ // 与逻辑时钟相等
// 将该服务器和选票信息放入recvset中
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) { // 已经完成了leader选举
// 设置本服务器的状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
// 最终的选票
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) { // 已经完成了leader选举
synchronized(this){
// 设置逻辑时钟
logicalclock = n.electionEpoch;
// 设置状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
// 最终选票
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
// 返回选票
return endVote;
}
四、总结 本篇博文详细分析了FastLeaderElection的算法,其是ZooKeeper的核心部分,结合前面的理论学习部分,可以比较轻松的理解其具体过程。