程序笔记   发布时间:2022-07-18  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Elasticsearch-04-master选举大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

3.2 master选举机制

3.2.1 选举算法

1)bully算法

核心思想

  • 假定所有的节点都具有一个可以比较的ID,通过比较这个ID来选举master

流程说明

  1. 节点向所有比自己ID大的节点发送选举信息(election),告诉他们我选你
  2. 如果收到了回复消息(alive),这说明有人比自己“资历”更老,要让他去做老大,他只能乖乖等着老大选举
    1. 等待老大成功选举的消息(victory)
    2. 如果超时之后还没有成功选举消息,那么重新发送选举信息
  3. 如果没有收到任何回复消息(alive),那么就自己当老大,同时向其他节点发送当选信息(victory)

示例

  1. 首先,我们有6个节点的集群,所有节点都互联,P6是master

    Elasticsearch-04-master选举

  2. P6挂了

    Elasticsearch-04-master选举

  3. P3发现P6挂了,于是向所有比自己ID大的节点发送选举消息(election)

    • 要给P6发的原因是P6有可能恢复了,所以P6也要发

    Elasticsearch-04-master选举

  4. P4和P5都收到了消息,并表示他们会接手,你就不用管了(bully P3)

    Elasticsearch-04-master选举

  5. P4开始接管选主流程,它开始向P5和P6发送选举信息

    Elasticsearch-04-master选举

  6. 只有P5相应了,P5从这里开始接管选举(bully p4)

    Elasticsearch-04-master选举

  7. P5发送选举信息

    Elasticsearch-04-master选举

  8. 没有人能响应P5的选举信息,于是P5当选master,同时告诉别人他是master

    Elasticsearch-04-master选举

优缺点

  • 优点
    • 简单粗暴,只要我比你大,我就来组织选举
  • 缺点
      @H_314_11@master假死会使得集群状态不稳定。假定P6在P5发布当选信息后重新上线,P5检测到P6的话,又会重新开启选举,因为P6的id比P5大
    • 脑裂问题,当出现网络分区的时候,一个集群可能会选举出两个master(因为网络通信受限)
2)raft算法

raft算法首先将系统中角色定义为三种:leader、follower、candidate。同时将系统一致性拆分为Leader选举(Leader election)、日志同步(Log Replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等多个子问题。这里,我们只讨论Leader election

核心思想

  • 每个leader都有一个任期(term),在它的任期内,他是老大;只要发现有人的任期比自己大,他就会无条件的加入

选主流程

  1. follower在一段时间内没有收到leader发送来的确认信息之后会转变为candidate
  2. candidate等待投票请求
    • 收到投票请求,投票,然后等待选举结果
    • 超时,给自己投票,发送投票请求
  3. 收到足够投票请求后,成功当选leader,开始维护集群

Elasticsearch-04-master选举

资料

  • 一文搞懂Raft算法
  • raft算法动画演示
  • raft论文

3.2.2 选举实现

1)es6.8
  • 逻辑流程图

    Elasticsearch-04-master选举

  • 源代码

    • 集群初始化

      /**
       * the main function of a join thread. This function is guaranteed to join the cluster
       * or spawn a new join thread upon failure to do so.
       */
      private void innerJoinCluster() {
          DiscoveryNode masterNode = null;
          final Thread currentThread = Thread.currentThread();
          nodeJoinController.startElectionContext();
          while (masterNode == null && joinThReadControl.joinThreadActive(currentThread)) {
              masterNode = findMaster();
          }
      
          if (joinThReadControl.joinThreadActive(currentThread) == falsE) {
              logger.trace("thread is no longer in currentJoinThread. Stopping.");
              return;
          }
      
          // 如果当前节点是被选出来的master,那么他就成功当选master,开始接受其他节点的连接请求
          // 如果没有成功当选master,那么就去加入master
          // 这里也解释了为什么在判断存活master的时候不能把自己算进去。因为把自己算进去的话,所有节点都会认为自己是master,
          if (transportservice.getLocalNode().equals(masterNodE)) {
              final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
              logger.debug("elected as master, waiTing for incoming joins ([{}] needed)", requiredJoins);
              nodeJoinController.waitToBeElectedAsmaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                      new NodeJoinController.ElectionCallBACk() {
                          @Override
                          public void onElectedAsmaster(clusterstate statE) {
                              synchronized (stateMuteX) {
                                  joinThReadControl.markThreadAsDone(currentThread);
                              }
                          }
      
                          @Override
                          public void onFailure(Throwable t) {
                              logger.trace("failed while waiTing for nodes to join, rejoining", t);
                              synchronized (stateMuteX) {
                                  joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
                              }
                          }
                      }
      
              );
          } else {
              // process any incoming joins (they will fail because we are not the master)
              nodeJoinController.stopElectionContext(masterNode + " elected");
      
              // send join request
              final Boolean success = joinElectedMaster(masterNodE);
      
              synchronized (stateMuteX) {
                  if (success) {
                      DiscoveryNode currentMasterNode = this.clusterstate().getNodes().getMasterNode();
                      if (currentMasterNode == null) {
                          // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                          // a valid master.
                          logger.debug("no master node is set, despite of join request compleTing. retrying pings.");
                          joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
                      } else if (currentMasterNode.equals(masterNodE) == falsE) {
                          // update cluster state
                          joinThReadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                      }
      
                      joinThReadControl.markThreadAsDone(currentThread);
                  } else {
                      // failed to join. Try again...
                      joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
                  }
              }
          }
      }
      
    • 选主逻辑

      private DiscoveryNode findMaster() {
          logger.trace("starTing to ping");
          List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
          if (fullPingResponses == null) {
              logger.trace("No full ping responses");
              return null;
          }
          if (logger.isTraceEnabled()) {
              StringBuilder sb = new StringBuilder();
              if (fullPingResponses.size() == 0) {
                  sb.append(" {nonE}");
              } else {
                  for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                      sb.append("nt--> ").append(pingResponsE);
                  }
              }
              logger.trace("full ping responses:{}", sb);
          }
      
          final DiscoveryNode localNode = transportservice.getLocalNode();
      
          // add our selves
          assert fullPingResponses.stream().map(ZenPing.PingResponse::nodE)
              .filter(n -> n.equals(localNodE)).findAny().isPresent() == false;
      
          fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterstate()));
      
          // filter responses
          final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
      
          List<DiscoveryNode> activeMasters = new ArrayList<>();
          for (ZenPing.PingResponse pingResponse : pingResponses) {
              // We can't include the local node in pingMasters list, otherwise we may up elecTing ourselves without
              // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
              if (pingResponse.master() != null && localNode.equals(pingResponse.master()) == falsE) {
                  activeMasters.add(pingResponse.master());
              }
          }
      
          // nodes discovered during pinging
          List<ElectMasterservice.MasterCandidate> masterCandidates = new ArrayList<>();
          for (ZenPing.PingResponse pingResponse : pingResponses) {
              if (pingResponse.node().ismasterNode()) {
                  masterCandidates.add(new ElectMasterservice.MasterCandidate(pingResponse.node(), pingResponse.getclusterstateVersion()));
              }
          }
      
          // activeMasters为空的时候有两种情况:1.当前节点能看到的所有节点都选出了一个共同的master,且那个节点就是本地节点;2.没有master
          // 1 --> 需要发布选主信息,告诉别人,master是谁
          // 2 --> 既然大家都没有master,那么就来尝试选举master
          // activeMasters不为空时,表示其他节点已经选出了一个master,当前节点要做的事情就是加入这个master
          if (activeMasters.isEmpty()) {
              if (electMaster.hasEnoughCandidates(masterCandidates)) {
                  final ElectMasterservice.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                  logger.trace("candidate {} won election", winner);
                  return winner.getNode();
              } else {
                  // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                  logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                              masterCandidates, electMaster.minimumMasterNodes());
                  return null;
              }
          } else {
              assert activeMasters.contains(localNodE) == false :
                  "local node should never be elected as master when other nodes inDicate an active master";
              // lets tie break between discovered nodes
              return electMaster.tieBreakActiveMasters(activeMasters);
          }
      }
      
    • 法定人数判断逻辑

      // 变量 minimumMasterNodes 就是配置项 discovery.zen.minimum_master_nodes
      public Boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
          if (candidates.isEmpty()) {
              return false;
          }
          if (minimumMasterNodes < 1) {
              return true;
          }
          assert candidates.stream().map(MasterCandidate::getNodE).collect(Collectors.toSet()).size() == candidates.size() :
              "duplicates ahead: " + candidates;
          return candidates.size() >= minimumMasterNodes;
      }
      
    • 节点比较逻辑

      /**
       * compares two candidates to inDicate which the a better master.
       * A higher cluster state version is better
       *
       * @return -1 if c1 is a batter candidate, 1 if c2.
       */
      public static int compare(MasterCandidate c1, MasterCandidate c2) {
          // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
          // list, so if c2 has a higher cluster state version, it needs to come first.
          int ret = Long wangt.cc pare(c2.clusterstateVersion, c1.clusterstateVersion);
          if (ret == 0) {
              ret = compareNodes(c1.getNode(), c2.getNode());
          }
          return ret;
      }
      
  • 分析

    • 什么时候开始选主投票?

      • 集群刚启动时
      • @H_314_11@master检测到其他节点离开时
      • 其他节点检测到master离开时
    • 在选主进行的时候,有新的节点加怎么办?

      • ES会暂时搁置这些加入请求,直到选主结束之后再来处理。如果本地节点成功当选,就接收这些连接请求;如果没有成功当选,则丢弃这些请求
      • 这些新发现的节点不会被计算到候选者中
    • 每个节点选举出来的master可能不一样,是怎么做到不脑裂的?

      • ping过程中发现的候选者数量要大于等于设置项 discovery.zen.minimum_master_nodes
    • 为什么会出现脑裂,不是已经有 discovery.zen.minimum_master_nodes 配置了吗?

      • 假设一开始集群规模为3,那么配置为2是没有任何问题的。但是,一旦集群规模扩大到7,那么合理的配置因为为4。于是,新节点的配置为4,而老节点的配置为2。如果没有及时更新老节点的配置,就会存在脑裂的风险(试想一下,在主节点挂掉时,2个旧节点又恰好和4个新节点产生了网络分区,而由于节点配置项不统一,就会导致脑裂)
2)es7.13

流程图

Elasticsearch-04-master选举

  • 和标准raft算法的不同之处
    • 在集群启动时,节点默认为candidate
    • candidate不做任何投票限制,这有可能导致产生多个leader,ES选择的是最新的leader(term最大的)
    • candidate在投票的时候,最后才会给自己投票,防止出现同票现象

为什么说新版本杜绝了脑裂问题?

  • 因为新版本中的法定投票人数不再由设置决定,而是变成了一个动态更新的值。由ES在依据存活节点数量来判断是否有足够的参与人数

    public Boolean hasQuorum(Collection<String> votes) {
        final HashSet<String> intersection = new HashSet<>(nodEIDs);
        intersection.retainAll(votes);
        return intersection.size() * 2 > nodEIDs.size();
    }
    

大佬总结

以上是大佬教程为你收集整理的Elasticsearch-04-master选举全部内容,希望文章能够帮你解决Elasticsearch-04-master选举所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。