大佬教程收集整理的这篇文章主要介绍了Elasticsearch-04-master选举,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
核心思想
流程说明
示例
首先,我们有6个节点的集群,所有节点都互联,P6是master
P6挂了
P3发现P6挂了,于是向所有比自己ID大的节点发送选举消息(election)
P4和P5都收到了消息,并表示他们会接手,你就不用管了(bully P3)
P4开始接管选主流程,它开始向P5和P6发送选举信息
只有P5相应了,P5从这里开始接管选举(bully p4)
P5发送选举信息
没有人能响应P5的选举信息,于是P5当选master,同时告诉别人他是master
优缺点
raft算法首先将系统中角色定义为三种:leader、follower、candidate。同时将系统一致性拆分为Leader选举(Leader election)、日志同步(Log Replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等多个子问题。这里,我们只讨论Leader election
核心思想
选主流程
参考资料
逻辑流程图
源代码
集群初始化
/**
* the main function of a join thread. This function is guaranteed to join the cluster
* or spawn a new join thread upon failure to do so.
*/
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
while (masterNode == null && joinThReadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
if (joinThReadControl.joinThreadActive(currentThread) == falsE) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
// 如果当前节点是被选出来的master,那么他就成功当选master,开始接受其他节点的连接请求
// 如果没有成功当选master,那么就去加入master
// 这里也解释了为什么在判断存活master的时候不能把自己算进去。因为把自己算进去的话,所有节点都会认为自己是master,
if (transportservice.getLocalNode().equals(masterNodE)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiTing for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsmaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallBACk() {
@Override
public void onElectedAsmaster(clusterstate statE) {
synchronized (stateMuteX) {
joinThReadControl.markThreadAsDone(currentThread);
}
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiTing for nodes to join, rejoining", t);
synchronized (stateMuteX) {
joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
final Boolean success = joinElectedMaster(masterNodE);
synchronized (stateMuteX) {
if (success) {
DiscoveryNode currentMasterNode = this.clusterstate().getNodes().getMasterNode();
if (currentMasterNode == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request compleTing. retrying pings.");
joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNodE) == falsE) {
// update cluster state
joinThReadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
joinThReadControl.markThreadAsDone(currentThread);
} else {
// failed to join. Try again...
joinThReadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
选主逻辑
private DiscoveryNode findMaster() {
logger.trace("starTing to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {nonE}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("nt--> ").append(pingResponsE);
}
}
logger.trace("full ping responses:{}", sb);
}
final DiscoveryNode localNode = transportservice.getLocalNode();
// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::nodE)
.filter(n -> n.equals(localNodE)).findAny().isPresent() == false;
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterstate()));
// filter responses
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up elecTing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && localNode.equals(pingResponse.master()) == falsE) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
List<ElectMasterservice.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().ismasterNode()) {
masterCandidates.add(new ElectMasterservice.MasterCandidate(pingResponse.node(), pingResponse.getclusterstateVersion()));
}
}
// activeMasters为空的时候有两种情况:1.当前节点能看到的所有节点都选出了一个共同的master,且那个节点就是本地节点;2.没有master
// 1 --> 需要发布选主信息,告诉别人,master是谁
// 2 --> 既然大家都没有master,那么就来尝试选举master
// activeMasters不为空时,表示其他节点已经选出了一个master,当前节点要做的事情就是加入这个master
if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterservice.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
assert activeMasters.contains(localNodE) == false :
"local node should never be elected as master when other nodes inDicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
法定人数判断逻辑
// 变量 minimumMasterNodes 就是配置项 discovery.zen.minimum_master_nodes
public Boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNodE).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}
节点比较逻辑
/**
* compares two candidates to inDicate which the a better master.
* A higher cluster state version is better
*
* @return -1 if c1 is a batter candidate, 1 if c2.
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long wangt.cc pare(c2.clusterstateVersion, c1.clusterstateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
分析
在选主进行的时候,有新的节点加怎么办?
每个节点选举出来的master可能不一样,是怎么做到不脑裂的?
discovery.zen.minimum_master_nodes
为什么会出现脑裂,不是已经有 discovery.zen.minimum_master_nodes
配置了吗?
流程图
为什么说新版本杜绝了脑裂问题?
因为新版本中的法定投票人数不再由设置决定,而是变成了一个动态更新的值。由ES在依据存活节点数量来判断是否有足够的参与人数
public Boolean hasQuorum(Collection<String> votes) {
final HashSet<String> intersection = new HashSet<>(nodEIDs);
intersection.retainAll(votes);
return intersection.size() * 2 > nodEIDs.size();
}
以上是大佬教程为你收集整理的Elasticsearch-04-master选举全部内容,希望文章能够帮你解决Elasticsearch-04-master选举所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。