四时宝库

程序员的知识宝库

使用Zookeeper实现Leader选举的设计原理以及示例

大家好,我是程序员xiao熊,本篇内容将分享如何使用 Zookeeper 来实现Leader的选举,涉及的内容主要包含Zookeeper数据模型、监听相关的知识点,更多内容可阅读以下文章。

此外,我们需要准备一个可以运行的ZooKeeper 服务,单机版的即可;接下来进入正题,本篇文章主要分为以下几个部分

  1. Leader选举介绍
  2. 使用Zookeeper实现Leader选举的实现原理
  3. Zookeeper实现Leader选举的执行逻辑以及具体实现步骤
  4. 测试验证

Leader选举介绍

相信小伙伴们都听过主从架构的概念,简单说就是在提供相同服务的众多节点中,有一个节点作为主节点对外提供服务,其他节点作为从节点进行数据同步,提供一些读操作缓解主节点压力,以及进行故障切换保障服务的正常运行;

本篇分享的Leader选举是主从架构中的其中一个环节——Leader选举;除了我们熟悉的主从架构中会使用Leader选举之外,还有一个场景就是定时任务;小伙伴们在实际开发中会发现始终只有一台服务器执行定时任务,面试中也会问如何实现只需要在一台服务器上执行定时任务的问题;Zookeeper实现的Leader选举的流程如下图所示:

使用Zookeeper实现Leader选举的实现原理

Leader选举的实现原理是通过在Zookeeper中创建有序号的数据节点表示参与Leader选举、删除有序列号的数据节点表示退出leader选举的功能或者表示故障,以及通过Zookeeper的监听功能来实现Leader的切换相关的关键问题如下:

  1. 如何使用Zookeeper存储参与Leader选举的服务信息
  2. 如何使用Zookeeper表示服务参与Leader选举
  3. 如何使用Zookeeper判断某个服务已经成为Leader节点,什么时候进行判断
  4. 如何使用Zookeeper表示退出Leader节点选举,或者不需要成为Leader节点
  5. 服务出现故障之后,Leader选举如何处理

针对这些问题,在zookeeper中,我们可以使用如下的解决方案(主要涉及zookeeper的数据模型、监听的知识点), 解决方案的序号对应问题的序号:

  1. 在Zookeeper中创建一个”/leader”节点,并在该节点下创建子节点来表示参与Leader选举的服务信息
  2. 通过在节点”/leader”下创建一个子节点来表示该服务需要参与Leader的选举,创建的子节点必须是临时、有序列号的节点
  3. 通过获取节点”/ leader”下的所有子节点,判断序列号最小的节点是否等于当前进程对应的节点,如果相等,则表示创建该节点的服务成为了Leader,该服务可以继续执行后续的业务逻辑;通常在服务启动,以及Leader出现故障,并收到通知之后进行判断;
  4. 删除Zookeeper中,当前服务的进程创建的节点即可表示不参与Leader选举,或者不再担任Leader角色
  5. 服务出现故障之后,利用创建的子节点是临时的特性,Zookeeper可以自动删除该服务对应的子节点,表示该服务不参与Leader选举,或者不再担任Leader角色

Zookeeper实现Leader选举的执行逻辑以及具体实现步骤

在了解了Leader选举的流程以及原理之后,接下来看看Leader选举的具体执行逻辑和实现步骤,具体的执行逻辑如下图:

接下来是具体的实现步骤:

在执行流程和实现步骤的基础上来看代码实现:

一、初始化Leader选举对象

(1)创建zookeeper客户端对象:

ZooKeeper zk = new ZooKeeper(address, timeouts, watcher);

参数说明:

  • address:表示Zookeeper的连接地址
  • timeouts:表示超时时间
  • watcher:表示监听器,指定为this,同时队列类需要实现Watcher接口

(2)创建Leader选举节点,存储选举信息:

zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)

参数说明:

  • root:表示Leader选举节点,例如“/leader”
  • data:表示节点的数据,可以不传值,例如:new byte[0]
  • ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
  • CreateMode.PERSISTENT:表示是永久性的节点

(3)创建同步对象:

Integer mutex = new Integer(-1)

(3)监听队列节点:

参考(1)步骤的watcher参数,在本例中,使用this对象,表示使用当前对象作为监听器,因此需要初始化zookeeper客户端的类实现Watcher接口,并实现如下方法:

public void process(WatchedEvent event){}

完整代码如下:

public class ZookeeperLeader implements Watcher{
static ZooKeeper zk = null;
    static Integer mutex;
    String root;
   public ZookeeperLeader(String address, String root, int size) {
        try {
            // 1. 创建zookeeper客户端对象
            if(zk == null){

                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                System.out.println("Finished starting ZK: " + zk);
            }

            // 2. 创建Leader选举节点
            this.root = root;
            if (zk != null) {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }

            // 3. 设置同步对象
            mutex = new Integer(-1);

        } catch (IOException e) {
            System.out.println(e.toString());
            zk = null;
        } catch (KeeperException e) {
            System.out.println("Keeper exception when instantiating queue: " + e.toString());
        } catch (InterruptedException e) {
            System.out.println("Interrupted exception");
        }
    }

   // 4. 监听Leader选举节点
synchronized public void process(WatchedEvent event) {
        // todo
    }
}

二、创建start(ILeaderElectionListener listener)方法,参与Leader选举

(1) 在Leader选举节点”/leader”下创建子节点

zk.create(root +"/guid-leader-", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

参数说明:

  • root + "/guid-leader-" :参与Leader选举的服务对应的节点路径,其中root表示Leader选举节点,”/ guid-leader-”是参与Leader选举的服务对应的子节点的前缀,再利用CreateMode.EPHEMERAL_SEQUENTIAL参数的序列号即可生成完整的节点;
  • guid:表示UUID,
  • ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
  • CreateMode.EPHEMERAL_SEQUENTIAL:表示是临时的并且是有序列号的节点类型,因为在参与Leader选举的过程中,参与Leader选举的服务出现故障,则应该不再参与Leader选举,避免出现选举故障的服务作为Leader的情况,利用Zookeeper临时节点的特性,在该服务出现故障之后,Zookeeper能够自动删除对应的子节点,表示不再参与Leader选举;

(2)判断服务是否已经成为了Leader,并通过ILeaderElectionListener通知应用程序

  1. 获取分布式锁节点下的所有子节点
  2. 获取最小的序列号
  3. 如果当前服务创建的子节点的序列号等于最小序列号,则表示当前服务成为了Leader,通过Listener通知业务端,并完成选举;
  4. 如果当前服务创建的子节点的序列号不等于最小序列号,则在序列号比当前服务创建的子节点的序列号小的所有子节点中,再获取这些子节点中序列号最大的子节点(最大不超过当前节点的序列号),如果这类节点不存在,则返回步骤1,进行下一轮的判断
  5. 如果序列号比当前服务创建的子节点的序列号小,但是序列号最大的子节点存在,则则调用exists()方法再次判断是否存在,如果存在,则监听该节点,然后等待创建该节点的服务放弃成为Leader或者不再参与Leader选举
  6. 如果exists()方法判断结果不存在,则回到步骤1,进行下一轮的判断当前服务是否成为Leader

完整代码如下:

public void start(ILeaderElectionListener listener) throws InterruptedException, KeeperException {
        //1. 创建子节点
        String guid = UUID.randomUUID().toString();
        String curLeaderNodePathPrefix = guid + "-leader-";
        curLeaderNodePath = zk.create(leaderNodePath + "/" + curLeaderNodePathPrefix, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(Thread.currentThread().getName() + "创建的节点:" + curLeaderNodePath);
        Integer curLeaderPathSequence = getNodeSequence(curLeaderNodePath);

        while(true) {

            // 2. 获取子节点
            List<String> leaders = zk.getChildren(leaderNodePath, false);
            Integer minSequence = null;
            if (CollectionUtils.isNotEmpty(leaders)) {
                minSequence = leaders.stream().map(this::getNodeSequence).min(Comparator.comparing(Function.identity())).get();
            }

            // 3. 判断是否拥有锁,如果当前线程创建的节点的序列号是最小序列号,则表示当前线程拥有锁
            if (curLeaderPathSequence == minSequence) {
                listener.notice(true);
                return;
            }

            if (CollectionUtils.isEmpty(leaders)) {
                continue;
            }
            // 4. 判断是否需要等待其他线程释放锁
            String preLeaderPath = leaders.stream().filter(leader-> getNodeSequence(leaderNodePath) < curLeaderPathSequence).max(Comparator.comparingInt(this::getNodeSequence)).get();
            Stat leaderStat = zk.exists(leaderNodePath + "/" + preLeaderPath, true);
            if (null != leaderStat) {
                listener.notice(false);
                synchronized (mutex) {
                    mutex.wait();
                }
            }
        }
    }

ILeaderElectionListener接口代码如下:

public interface ILeaderElectionListener {
    void notice(Boolean isCurNodeLeader);
}

三、 创建close()方法,放弃参与Leader选举/放弃Leader角色

(1)删除Leader选举节点下服务对应的子节点:

zk.delete(lockPath, 0)

完整代码如下:

public void close() throws InterruptedException, KeeperException {
    System.out.println("放弃参与Leader选举:" +  curLeaderNodePath);
    zk.delete(curLeaderNodePath, 0);
}

四、处理Leader选举节点的监听事件

在监听事件处理方法中,直接唤醒下一个需要成为Leader的服务,然后让处于等待状态的服务可以继续判断是否已经成为Leader,代码如下

// 监听Leader选举节点
synchronized public void process(WatchedEvent event) {
    // 接收到监听事件,则本次监听失效,需要通过后续的exists方法设置下一轮的监听
synchronized (mutex) {
        mutex.notify();
    }
}

测试验证

public class App {
public static void main( String[] args ) {
        try {
            ZookeeperLeader leader = new ZookeeperLeader("localhost:2181", "/leader");
            leader.start(isCurNodeLeader -> System.out.println(Thread.currentThread().getName()+"是否是leader:" + isCurNodeLeader));
            Thread.sleep(100000000);
        } catch (Exception e) {
            System.out.println("启动异常");
        }
    }
}

总结

本示例中的Leader选举是一个简单的示例,还有很多优化的空间,开发小伙伴们可以尝试着去优化;

以上就是为大家分享的内容,欢迎大家在评论区里面留言,后续也会分享Zookeeper实践相关的文章以及其他技术的文章;

私信我可获取全部完整的代码以及文章的高清图片!!!

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接