Skip to content

Commit d4e1ab1

Browse files
authored
Election scheduler should be cancelled after cluster state publication (opensearch-project#11699)
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent 7a6f8b0 commit d4e1ab1

File tree

4 files changed

+26
-6
lines changed

4 files changed

+26
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
146146
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
147147
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))
148148
- Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631))
149+
- Keep the election scheduler open until cluster state has been applied ([#11699](https://github.com/opensearch-project/OpenSearch/pull/11699))
149150

150151
### Deprecated
151152

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ public void onFailure(String source, Exception e) {
386386

387387
@Override
388388
public void onSuccess(String source) {
389+
closePrevotingAndElectionScheduler();
389390
applyListener.onResponse(null);
390391
}
391392
});
@@ -472,17 +473,29 @@ private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, Disco
472473
}
473474

474475
private void closePrevotingAndElectionScheduler() {
476+
closePrevoting();
477+
closeElectionScheduler();
478+
}
479+
480+
private void closePrevoting() {
475481
if (prevotingRound != null) {
476482
prevotingRound.close();
477483
prevotingRound = null;
478484
}
485+
}
479486

487+
private void closeElectionScheduler() {
480488
if (electionScheduler != null) {
481489
electionScheduler.close();
482490
electionScheduler = null;
483491
}
484492
}
485493

494+
// package-visible for testing
495+
boolean isElectionSchedulerRunning() {
496+
return electionScheduler != null;
497+
}
498+
486499
private void updateMaxTermSeen(final long term) {
487500
synchronized (mutex) {
488501
maxTermSeen = Math.max(maxTermSeen, term);
@@ -724,7 +737,7 @@ void becomeLeader(String method) {
724737
lastKnownLeader = Optional.of(getLocalNode());
725738
peerFinder.deactivate(getLocalNode());
726739
clusterFormationFailureHelper.stop();
727-
closePrevotingAndElectionScheduler();
740+
closePrevoting();
728741
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
729742

730743
assert leaderChecker.leader() == null : leaderChecker.leader();
@@ -761,7 +774,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
761774
lastKnownLeader = Optional.of(leaderNode);
762775
peerFinder.deactivate(leaderNode);
763776
clusterFormationFailureHelper.stop();
764-
closePrevotingAndElectionScheduler();
777+
closePrevoting();
765778
cancelActivePublication("become follower: " + method);
766779
preVoteCollector.update(getPreVoteResponse(), leaderNode);
767780

@@ -927,7 +940,6 @@ public void invariant() {
927940
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
928941
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
929942
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
930-
assert electionScheduler == null : electionScheduler;
931943
assert prevotingRound == null : prevotingRound;
932944
assert becomingClusterManager || getStateForClusterManagerService().nodes().getClusterManagerNodeId() != null
933945
: getStateForClusterManagerService();
@@ -972,7 +984,6 @@ assert getLocalNode().equals(applierState.nodes().getClusterManagerNode())
972984
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
973985
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
974986
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
975-
assert electionScheduler == null : electionScheduler;
976987
assert prevotingRound == null : prevotingRound;
977988
assert getStateForClusterManagerService().nodes().getClusterManagerNodeId() == null : getStateForClusterManagerService();
978989
assert leaderChecker.currentNodeIsClusterManager() == false;
@@ -1693,6 +1704,7 @@ public void onSuccess(String source) {
16931704
updateMaxTermSeen(getCurrentTerm());
16941705

16951706
if (mode == Mode.LEADER) {
1707+
closePrevotingAndElectionScheduler();
16961708
// if necessary, abdicate to another node or improve the voting configuration
16971709
boolean attemptReconfiguration = true;
16981710
final ClusterState state = getLastAcceptedState(); // committed state

server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ public void testNodesJoinAfterStableCluster() {
270270
public void testExpandsConfigurationWhenGrowingFromOneNodeToThreeButDoesNotShrink() {
271271
try (Cluster cluster = new Cluster(1)) {
272272
cluster.runRandomly();
273-
cluster.stabilise();
273+
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);
274274

275275
final ClusterNode leader = cluster.getAnyLeader();
276276

@@ -1750,7 +1750,7 @@ public void testDoesNotPerformElectionWhenRestartingFollower() {
17501750
public void testImproveConfigurationPerformsVotingConfigExclusionStateCheck() {
17511751
try (Cluster cluster = new Cluster(1)) {
17521752
cluster.runRandomly();
1753-
cluster.stabilise();
1753+
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);
17541754

17551755
final Coordinator coordinator = cluster.getAnyLeader().coordinator;
17561756
final ClusterState currentState = coordinator.getLastAcceptedState();

test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java

+7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.cluster.OpenSearchAllocationTestCase;
4848
import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.ClusterNode;
4949
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
50+
import org.opensearch.cluster.coordination.Coordinator.Mode;
5051
import org.opensearch.cluster.coordination.LinearizabilityChecker.History;
5152
import org.opensearch.cluster.coordination.LinearizabilityChecker.SequentialSpec;
5253
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
@@ -653,6 +654,12 @@ void stabilise(long stabilisationDurationMillis) {
653654
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)
654655
);
655656
}
657+
if (clusterNode.coordinator.getMode() == Mode.LEADER || clusterNode.coordinator.getMode() == Mode.FOLLOWER) {
658+
assertFalse(
659+
"Election scheduler should stop after cluster has stabilised",
660+
clusterNode.coordinator.isElectionSchedulerRunning()
661+
);
662+
}
656663
}
657664

658665
final Set<String> connectedNodeIds = clusterNodes.stream()

0 commit comments

Comments
 (0)