在 Elasticsearch 7.0 版本中,Elasticsearch 采用了一个新的集群协调模块(Coordinator)代替了旧版 Zen Discovery,对选主算法 Bully 也进行了更换。根据 ES 的说明,新协调模块提供了如下优势:1、免去了 Zen Discovery 的 discovery.zen.minimum_master_nodes 配置,es 会自己选择可以形成仲裁的节点,用户只需配置一个初始 master 节点列表即可。也即集群扩容或缩容的过程中,不要再担心遗漏或配错 discovery.zen.minimum_master_nodes 配置了;2、新版 Leader 选举速度极大快于旧版。在旧版 Zen Discovery 中,每个节点都需要先通过 3 轮的 ZenPing 才能完成节点发现和 Leader 选举,而新版的算法通常只需要在 100ms 以内;3、修复了 Zen Discovery 下的疑难问题,如“重复的网络分区可能导致群集状态更新丢失”问题。目前业界的分布式一致性算法的理论和工程实现都已经很成熟,出于 es 新旧版本之间平稳升级以及 es 实际应用中一些情况考虑,es 官方并没有直接采用现成开源的第三方一致性算法库,而是结合 es 实际情况自己进行了开发,但其实现的思想还是参考了 Raft 一致性算法,和其基本类似。比如角色一样分为 Leader、Candidate、Follower;将时间划分为一个个任意长度的 term(以连续整数编号)每个 term 起始于leader 选举,可以用于过期信息检测;选举 Leader 时为保证可进展性,各节点采用了随机超时启动选举等。es 社区对新协调模块实现的记录可以见 issue-32006(https://github.com/elastic/elasticsearch/issues/32006),下面对新版的 es 选主流程源码做一下分析。

Leader 选举

和旧版 Zen Discovery 模块一样,新版节点选主流程的入口函数也在 startInitialJoin()。一个节点在启动时,完成本地初始化工作后,就会调用 Discovery 的 startInitialJoin 进行 Leader 选举操作:

@Override
public void startInitialJoin() {
    synchronized (mutex) {
        becomeCandidate("startInitialJoin");
    }
    clusterBootstrapService.scheduleUnconfiguredBootstrap();
}

节点要参与选举,首先要使自己变为候选人也即 Candidate 角色,选举完成后才最终确认自己是成为 Leader 还是 Follower。所以在 startInitialJoin() 中,首先执行的就是 becomeCandidate()。becomeCandidate 主要是做环境初始化工作,停掉或去除任何不符合 Candidate 角色的操作和状态,创建符合新的候选人角色环境。而具体选举流程代码则在后面 clusterBootstrapService.scheduleUnconfiguredBootstrap 中。scheduleUnconfiguredBootstrap() 首先会对节点进行判断,如果节点类型不是 master 节点,就直接返回,不再继续进行。另外,如果集群中节点类型都是旧模式(Zen Discovery)节点,新选举也不会继续进行:

void scheduleUnconfiguredBootstrap() {
    ...
    if (transportService.getLocalNode().isMasterNode() == false) {
        return;
    }
    ...
    transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
        @Override
        public void run() {
            final Set<DiscoveryNode> discoveredNodes = getDiscoveredNodes();
            final List<DiscoveryNode> zen1Nodes = discoveredNodes.stream().filter(Coordinator::isZen1Node).collect(Collectors.toList());
            if (zen1Nodes.isEmpty()) {
                logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
                startBootstrap(discoveredNodes, emptyList());
            } else {
                logger.info("avoiding best-effort cluster bootstrapping due to discovery of pre-7.0 nodes {}", zen1Nodes);
            }
        }
        ...
    });
}

在 scheduleUnconfiguredBootstrap() 中调用 startBootstrap() 后,再调用 doBootstrap()。

private void doBootstrap(VotingConfiguration votingConfiguration) {
    ...
    try {
        votingConfigurationConsumer.accept(votingConfiguration);
    } catch (Exception e) {
        ...
        transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC,
            new Runnable() {
                @Override
                public void run() {
                    doBootstrap(votingConfiguration);
                }
                ...
            }
        );
    }
}

在 doBootstrap() 中,使用 Lambda 表达式 votingConfigurationConsumer.accept() 继续执行。如果期间出现异常,会 catch 住异常之后重复执行。votingConfigurationConsumer Lambda 表达式在节点初始化时传入,具体对应函数为 Coordinator 的 setInitialConfiguration():

public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
    synchronized (mutex) {
        ...
        coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
        ...
        preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
        startElectionScheduler();
        return true;
    }
}

setInitialConfiguration 在做了诸多初始条件判断(是否 master 节点,投票节点是否包含本地节点,投票节点能否形成多数决议等)后,初始化 coordinationState 及 preVoteCollector 内的 response 状态,调用 startElectionScheduler() 执行选举。

startElectionScheduler() 最终会调用至 PreVoteCollector 的 start(),在 start() 里,当前节点会给所有当前非旧版的节点发送 pre_vote 请求,此请求类似于竞选通告,告知所有其他符合条件的节点,我已经开始竞选 leader 了:

void start(final Iterable<DiscoveryNode> broadcastNodes) {
    ...
    broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
        new TransportResponseHandler<PreVoteResponse>() {
            ...

            @Override
            public void handleResponse(PreVoteResponse response) {
                handlePreVoteResponse(response, n);
            }

            @Override
            public void handleException(TransportException exp) {
                logger.debug(new ParameterizedMessage("{} failed", this), exp);
            }

            ...
        }));
}

pre_vote 请求发出后,要看 2 部分逻辑,一部分是收到请求的节点如何处理,另一部分是本节点如何处理请求响应。根据逻辑先后顺序,我们先看接收节点如何处理请求。

PreVoteCollector 中注册了 pre_vote 请求(internal:cluster/request_pre_vote)的处理函数,只是一个 Lambda 表达式:

transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
    PreVoteRequest::new,
    (request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));

在 Lambda 中,接收节点先调用 handlePreVoteRequest(request) 处理请求,然后直接返回处理结果给发送端。handlePreVoteRequest() 相关代码如下:

private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
    updateMaxTermSeen.accept(request.getCurrentTerm());

    Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
    assert state != null : "received pre-vote request before fully initialised";

    final DiscoveryNode leader = state.v1();
    final PreVoteResponse response = state.v2();

    if (leader == null) {
        return response;
    }

    if (leader.equals(request.getSourceNode())) {
        // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
        // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
        // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
        // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
        // to also detect its failure.
        return response;
    }

    throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}

首先通过 updateMaxTermSeen 进行接收到请求的 term 进行检测,updateMaxTermSeen 也是一个 Lambda 表达式,具体实现对应 updateMaxTermSeen():

private void updateMaxTermSeen(final long term) {
    synchronized (mutex) {
        maxTermSeen = Math.max(maxTermSeen, term);
        final long currentTerm = getCurrentTerm();
        if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
            // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
            // since we check whether a term bump is needed at the end of the publication too.
            if (publicationInProgress()) {
                logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
            } else {
                try {
                    logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
                    ensureTermAtLeast(getLocalNode(), maxTermSeen);
                    startElection();
                } catch (Exception e) {
                    logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
                    becomeCandidate("updateMaxTermSeen");
                }
            }
        }
    }
}

其主要逻辑是将请求的 term 与当前节点接收过的最大的 term 进行比较,如果当前节点角色是 Leader,并且新接收 request 的 term 大于当前节点最大的 term,则意味着当前自身节点信息可能已经过期,其他节点已经开始竞选 Leader 了。所以先通过 ensureTermAtLeast() 立即停止自己的 Leader 身份,转变为 Candidate,并重新参与选举。

如果确认 term 没有问题,则继续 handlePreVoteRequest 的后续逻辑:如果当前节点还未收到有节点当选为 Leader 的信息(if (leader == null)),则直接返回 response,表示支持请求节点选举;已经有 leader 信息,但和请求节点相同,也返回 response 表示同意。其他情况下,都拒绝请求节点的竞选(throw new CoordinationStateRejectedException),因为已经有 Leader 节点。

以上是 pre_vote 请求处理,下面再看下本地竞选节点如何处理 pre_vote 请求响应:

private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
    ...
    updateMaxTermSeen.accept(response.getCurrentTerm());

    if (response.getLastAcceptedTerm() > clusterState.term()
        || (response.getLastAcceptedTerm() == clusterState.term()
        && response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
        logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
        return;
    }

    preVotesReceived.add(sender);
    final VoteCollection voteCollection = new VoteCollection();
    preVotesReceived.forEach(voteCollection::addVote);

    if (isElectionQuorum(voteCollection, clusterState) == false) {
        logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
        return;
    }

    if (electionStarted.compareAndSet(false, true) == false) {
        logger.debug("{} added {} from {} but election has already started", this, response, sender);
        return;
    }

    logger.debug("{} added {} from {}, starting election", this, response, sender);
    startElection.run();
}

在 handlePreVoteResponse() 中,首先也是对响应信息的 term 合法性进行判断,逻辑同上。如果响应的 term 大于当前节点 term,或者 term 相同但集群状态信息版本更新,则表明响应节点状态更新,当前节点不做任何处理。否则,响应节点作为投赞成自己选举一票加入到 voteCollection 中。如果投票尚未超过多数也不做任何处理,如果已超过多数,则进入成为 Leader 流程,执行startElection.run()。

startElection Lambda 表达式执行 startElection() 方法,发送 StartJoinRequest 请求:

private void startElection() {
    synchronized (mutex) {
        // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
        // to check our mode again here.
        if (mode == Mode.CANDIDATE) {
            if (electionQuorumContainsLocalNode(getLastAcceptedState()) == false) {
                logger.trace("skip election as local node is not part of election quorum: {}",
                    getLastAcceptedState().coordinationMetaData());
                return;
            }

            final StartJoinRequest startJoinRequest
                = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
            logger.debug("starting election with {}", startJoinRequest);
            getDiscoveredNodes().forEach(node -> {
                if (isZen1Node(node) == false) {
                    joinHelper.sendStartJoinRequest(startJoinRequest, node);
                }
            });
        }
    }
}

StartJoin 请求不同于 Join 请求,Join 请求时某个节点请求加入 Leader 所在集群,而 StartJoin 则是 Leader 通知其他节点加入自己集群。发送 StartJoinRequest 请求代码如下:

public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
    ...
    transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
        startJoinRequest, new TransportResponseHandler<Empty>() {
            ...
            @Override
            public void handleResponse(Empty response) {
                logger.debug("successful response to {} from {}", startJoinRequest, destination);
            }

            @Override
            public void handleException(TransportException exp) {
                logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
            }
            ...
        });
}

可见其 handleResponse 不做任何处理。由其对应处理函数可知,StartJoin 请求只起一个通知作用,通告其他节点可以发送 Join 请求了:

transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
    StartJoinRequest::new,
    (request, channel, task) -> {
        final DiscoveryNode destination = request.getSourceNode();
        sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
        channel.sendResponse(Empty.INSTANCE);
    });

其他节点发送 Join 请求时,先通过 joinLeaderInTerm.apply() 调用 joinLeaderInTerm(),构造 Join 请求,获取 term,以及转为 Candidate 角色等,见下:

private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
    synchronized (mutex) {
        logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
        final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
        lastJoin = Optional.of(join);
        peerFinder.setCurrentTerm(getCurrentTerm());
        if (mode != Mode.CANDIDATE) {
            becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector
        } else {
            followersChecker.updateFastResponseState(getCurrentTerm(), mode);
            preVoteCollector.update(getPreVoteResponse(), null);
        }
        return join;
    }
}

最后通过 sendJoinRequest() 发送 Join 请求:

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
    ...
    final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
    final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
    if (pendingOutgoingJoins.add(dedupKey)) {
        ...
        } else {
            actionName = JOIN_ACTION_NAME;
            transportRequest = joinRequest;
        }
        transportService.sendRequest(destination, actionName, transportRequest,
            TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
            new TransportResponseHandler<Empty>() {
                ...
                @Override
                public void handleResponse(Empty response) {
                    pendingOutgoingJoins.remove(dedupKey);
                    logger.debug("successfully joined {} with {}", destination, joinRequest);
                    onCompletion.run();
                    lastFailedJoinAttempt.set(null);
                }

                @Override
                public void handleException(TransportException exp) {
                    pendingOutgoingJoins.remove(dedupKey);
                    logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
                    onCompletion.run();
                    FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
                    attempt.logNow();
                    lastFailedJoinAttempt.set(attempt);
                }
                ...
            });
    } else {
        ...
    }
}

在 Join 请求的接收方面,则是通过 Lambda 表达式调用 handleJoinRequest() 处理:

transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
    (request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));

handleJoinRequest() 中判断如果是当前节点在竞选 Leader,则在对版本等做一系列检测后,发送JoinValidate 请求验证 Join 合法性:

private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
    ...
    if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
        onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
        if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
            // we do this in a couple of places including the cluster update thread. This one here is really just best effort
            // to ensure we fail as fast as possible.
            JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
                stateForJoinValidation.getNodes().getMinNodeVersion());
        }
        sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);

    } else {
        processJoinRequest(joinRequest, joinCallback);
    }
}

在 sendValidateJoinRequest() 的请求响应函数中,调用 processJoinRequest 对 Join 进行处理。在 processJoinRequest 中 ,调用 handleJoin():

private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
    final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
    synchronized (mutex) {
        final CoordinationState coordState = coordinationState.get();
        final boolean prevElectionWon = coordState.electionWon();

        optionalJoin.ifPresent(this::handleJoin);
        joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);

        if (prevElectionWon == false && coordState.electionWon()) {
            becomeLeader("handleJoinRequest");
        }
    }
}

handleJoin() 中,还是对 term 再确认判断(ensureTermAtLeast()),如无问题则调用 coordinationState.get().handleJoin(join):

private void handleJoin(Join join) {
    synchronized (mutex) {
        ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);

        if (coordinationState.get().electionWon()) {
            // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
            final boolean isNewJoin = handleJoinIgnoringExceptions(join);

            // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
            // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
            // race against the election-winning publication and log a big error message, which we can prevent by checking this here:
            final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
            if (isNewJoin && establishedAsMaster && publicationInProgress() == false) {
                scheduleReconfigurationIfNeeded();
            }
        } else {
            coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
        }
    }
}

coordinationState 的 handleJoin 做各种条件判断及最终的 Leader 选举是否成功判断,如果 isElectionQuorum 则竞选成功:

public boolean handleJoin(Join join) {
    ...
    boolean added = joinVotes.addVote(join.getSourceNode());
    boolean prevElectionWon = electionWon;
    electionWon = isElectionQuorum(joinVotes);
    assert !prevElectionWon || electionWon; // we cannot go from won to not won
    logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join,
        join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());

    if (electionWon && prevElectionWon == false) {
        logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
        lastPublishedVersion = getLastAcceptedVersion();
    }
    return added;
}

判断是否赢得选举后(electionWon),回到 processJoinRequest(),通过 joinAccumulator.handleJoinRequest(),如果节点是 Leader,则发布状态更新:

@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
    final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
    masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT),
        joinTaskExecutor, new JoinTaskListener(task, joinCallback));
}

最后,判断如果节点之前的状态未赢得选举,而这次赢得选举,则执行 becomeLeader() 成为 Leader。