大家好,我是程序员xiao熊,本篇内容将分享如何使用 Zookeeper 来实现Leader的选举,涉及的内容主要包含Zookeeper数据模型、监听相关的知识点,更多内容可阅读以下文章。
此外,我们需要准备一个可以运行的ZooKeeper 服务,单机版的即可;接下来进入正题,本篇文章主要分为以下几个部分
- Leader选举介绍
- 使用Zookeeper实现Leader选举的实现原理
- Zookeeper实现Leader选举的执行逻辑以及具体实现步骤
- 测试验证
Leader选举介绍
相信小伙伴们都听过主从架构的概念,简单说就是在提供相同服务的众多节点中,有一个节点作为主节点对外提供服务,其他节点作为从节点进行数据同步,提供一些读操作缓解主节点压力,以及进行故障切换保障服务的正常运行;
本篇分享的Leader选举是主从架构中的其中一个环节——Leader选举;除了我们熟悉的主从架构中会使用Leader选举之外,还有一个场景就是定时任务;小伙伴们在实际开发中会发现始终只有一台服务器执行定时任务,面试中也会问如何实现只需要在一台服务器上执行定时任务的问题;Zookeeper实现的Leader选举的流程如下图所示:
使用Zookeeper实现Leader选举的实现原理
Leader选举的实现原理是通过在Zookeeper中创建有序号的数据节点表示参与Leader选举、删除有序列号的数据节点表示退出leader选举的功能或者表示故障,以及通过Zookeeper的监听功能来实现Leader的切换。相关的关键问题如下:
- 如何使用Zookeeper存储参与Leader选举的服务信息
- 如何使用Zookeeper表示服务参与Leader选举
- 如何使用Zookeeper判断某个服务已经成为Leader节点,什么时候进行判断
- 如何使用Zookeeper表示退出Leader节点选举,或者不需要成为Leader节点
- 服务出现故障之后,Leader选举如何处理
针对这些问题,在zookeeper中,我们可以使用如下的解决方案(主要涉及zookeeper的数据模型、监听的知识点), 解决方案的序号对应问题的序号:
- 在Zookeeper中创建一个”/leader”节点,并在该节点下创建子节点来表示参与Leader选举的服务信息
- 通过在节点”/leader”下创建一个子节点来表示该服务需要参与Leader的选举,创建的子节点必须是临时、有序列号的节点
- 通过获取节点”/ leader”下的所有子节点,判断序列号最小的节点是否等于当前进程对应的节点,如果相等,则表示创建该节点的服务成为了Leader,该服务可以继续执行后续的业务逻辑;通常在服务启动,以及Leader出现故障,并收到通知之后进行判断;
- 删除Zookeeper中,当前服务的进程创建的节点即可表示不参与Leader选举,或者不再担任Leader角色
- 服务出现故障之后,利用创建的子节点是临时的特性,Zookeeper可以自动删除该服务对应的子节点,表示该服务不参与Leader选举,或者不再担任Leader角色
Zookeeper实现Leader选举的执行逻辑以及具体实现步骤
在了解了Leader选举的流程以及原理之后,接下来看看Leader选举的具体执行逻辑和实现步骤,具体的执行逻辑如下图:
接下来是具体的实现步骤:
在执行流程和实现步骤的基础上来看代码实现:
一、初始化Leader选举对象
(1)创建zookeeper客户端对象:
ZooKeeper zk = new ZooKeeper(address, timeouts, watcher);
参数说明:
- address:表示Zookeeper的连接地址
- timeouts:表示超时时间
- watcher:表示监听器,指定为this,同时队列类需要实现Watcher接口
(2)创建Leader选举节点,存储选举信息:
zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
参数说明:
- root:表示Leader选举节点,例如“/leader”
- data:表示节点的数据,可以不传值,例如:new byte[0]
- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
- CreateMode.PERSISTENT:表示是永久性的节点
(3)创建同步对象:
Integer mutex = new Integer(-1)
(3)监听队列节点:
参考(1)步骤的watcher参数,在本例中,使用this对象,表示使用当前对象作为监听器,因此需要初始化zookeeper客户端的类实现Watcher接口,并实现如下方法:
public void process(WatchedEvent event){}
完整代码如下:
public class ZookeeperLeader implements Watcher{
static ZooKeeper zk = null;
static Integer mutex;
String root;
public ZookeeperLeader(String address, String root, int size) {
try {
// 1. 创建zookeeper客户端对象
if(zk == null){
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
System.out.println("Finished starting ZK: " + zk);
}
// 2. 创建Leader选举节点
this.root = root;
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 3. 设置同步对象
mutex = new Integer(-1);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: " + e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
// 4. 监听Leader选举节点
synchronized public void process(WatchedEvent event) {
// todo
}
}
二、创建start(ILeaderElectionListener listener)方法,参与Leader选举
(1) 在Leader选举节点”/leader”下创建子节点
zk.create(root +"/guid-leader-", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
参数说明:
- root + "/guid-leader-" :参与Leader选举的服务对应的节点路径,其中root表示Leader选举节点,”/ guid-leader-”是参与Leader选举的服务对应的子节点的前缀,再利用CreateMode.EPHEMERAL_SEQUENTIAL参数的序列号即可生成完整的节点;
- guid:表示UUID,
- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
- CreateMode.EPHEMERAL_SEQUENTIAL:表示是临时的并且是有序列号的节点类型,因为在参与Leader选举的过程中,参与Leader选举的服务出现故障,则应该不再参与Leader选举,避免出现选举故障的服务作为Leader的情况,利用Zookeeper临时节点的特性,在该服务出现故障之后,Zookeeper能够自动删除对应的子节点,表示不再参与Leader选举;
(2)判断服务是否已经成为了Leader,并通过ILeaderElectionListener通知应用程序
- 获取分布式锁节点下的所有子节点
- 获取最小的序列号
- 如果当前服务创建的子节点的序列号等于最小序列号,则表示当前服务成为了Leader,通过Listener通知业务端,并完成选举;
- 如果当前服务创建的子节点的序列号不等于最小序列号,则在序列号比当前服务创建的子节点的序列号小的所有子节点中,再获取这些子节点中序列号最大的子节点(最大不超过当前节点的序列号),如果这类节点不存在,则返回步骤1,进行下一轮的判断
- 如果序列号比当前服务创建的子节点的序列号小,但是序列号最大的子节点存在,则则调用exists()方法再次判断是否存在,如果存在,则监听该节点,然后等待创建该节点的服务放弃成为Leader或者不再参与Leader选举
- 如果exists()方法判断结果不存在,则回到步骤1,进行下一轮的判断当前服务是否成为Leader
完整代码如下:
public void start(ILeaderElectionListener listener) throws InterruptedException, KeeperException {
//1. 创建子节点
String guid = UUID.randomUUID().toString();
String curLeaderNodePathPrefix = guid + "-leader-";
curLeaderNodePath = zk.create(leaderNodePath + "/" + curLeaderNodePathPrefix, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "创建的节点:" + curLeaderNodePath);
Integer curLeaderPathSequence = getNodeSequence(curLeaderNodePath);
while(true) {
// 2. 获取子节点
List<String> leaders = zk.getChildren(leaderNodePath, false);
Integer minSequence = null;
if (CollectionUtils.isNotEmpty(leaders)) {
minSequence = leaders.stream().map(this::getNodeSequence).min(Comparator.comparing(Function.identity())).get();
}
// 3. 判断是否拥有锁,如果当前线程创建的节点的序列号是最小序列号,则表示当前线程拥有锁
if (curLeaderPathSequence == minSequence) {
listener.notice(true);
return;
}
if (CollectionUtils.isEmpty(leaders)) {
continue;
}
// 4. 判断是否需要等待其他线程释放锁
String preLeaderPath = leaders.stream().filter(leader-> getNodeSequence(leaderNodePath) < curLeaderPathSequence).max(Comparator.comparingInt(this::getNodeSequence)).get();
Stat leaderStat = zk.exists(leaderNodePath + "/" + preLeaderPath, true);
if (null != leaderStat) {
listener.notice(false);
synchronized (mutex) {
mutex.wait();
}
}
}
}
ILeaderElectionListener接口代码如下:
public interface ILeaderElectionListener {
void notice(Boolean isCurNodeLeader);
}
三、 创建close()方法,放弃参与Leader选举/放弃Leader角色
(1)删除Leader选举节点下服务对应的子节点:
zk.delete(lockPath, 0)
完整代码如下:
public void close() throws InterruptedException, KeeperException {
System.out.println("放弃参与Leader选举:" + curLeaderNodePath);
zk.delete(curLeaderNodePath, 0);
}
四、处理Leader选举节点的监听事件
在监听事件处理方法中,直接唤醒下一个需要成为Leader的服务,然后让处于等待状态的服务可以继续判断是否已经成为Leader,代码如下
// 监听Leader选举节点
synchronized public void process(WatchedEvent event) {
// 接收到监听事件,则本次监听失效,需要通过后续的exists方法设置下一轮的监听
synchronized (mutex) {
mutex.notify();
}
}
测试验证
public class App {
public static void main( String[] args ) {
try {
ZookeeperLeader leader = new ZookeeperLeader("localhost:2181", "/leader");
leader.start(isCurNodeLeader -> System.out.println(Thread.currentThread().getName()+"是否是leader:" + isCurNodeLeader));
Thread.sleep(100000000);
} catch (Exception e) {
System.out.println("启动异常");
}
}
}
总结
本示例中的Leader选举是一个简单的示例,还有很多优化的空间,开发小伙伴们可以尝试着去优化;
以上就是为大家分享的内容,欢迎大家在评论区里面留言,后续也会分享Zookeeper实践相关的文章以及其他技术的文章;
私信我可获取全部完整的代码以及文章的高清图片!!!