Skip to content

Commit 1563e1a

Browse files
Fix for race condition in node-join/node-left loop (#15521)
* Add custom connect to node for handleJoinRequest Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent b50117b commit 1563e1a

21 files changed

+844
-22
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4444
- Fix search_as_you_type not supporting multi-fields ([#15988](https://github.com/opensearch-project/OpenSearch/pull/15988))
4545
- Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985))
4646
- Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931))
47+
- Fix race condition in node-join and node-left ([#15521](https://github.com/opensearch-project/OpenSearch/pull/15521))
4748

4849
### Security
4950

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java

+355
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
103103

104104
// contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of
105105
// disconnecting
106-
private final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<>();
106+
protected final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<>();
107107

108108
private final TimeValue reconnectInterval;
109-
private volatile ConnectionChecker connectionChecker;
109+
protected volatile ConnectionChecker connectionChecker;
110110

111111
@Inject
112112
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
@@ -115,6 +115,11 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor
115115
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
116116
}
117117

118+
// exposed for testing
119+
protected ConnectionTarget createConnectionTarget(DiscoveryNode discoveryNode) {
120+
return new ConnectionTarget(discoveryNode);
121+
}
122+
118123
/**
119124
* Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all
120125
* connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known.
@@ -159,6 +164,14 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
159164
runnables.forEach(Runnable::run);
160165
}
161166

167+
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
168+
nodes.forEach(transportService::setPendingDisconnection);
169+
}
170+
171+
public void clearPendingDisconnections() {
172+
transportService.clearPendingDisconnections();
173+
}
174+
162175
/**
163176
* Disconnect from any nodes to which we are currently connected which do not appear in the given nodes. Does not wait for the
164177
* disconnections to complete, because they might have to wait for ongoing connection attempts first.
@@ -211,7 +224,7 @@ private void awaitPendingActivity(Runnable onCompletion) {
211224
* nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
212225
* attempts have completed.
213226
*/
214-
private void connectDisconnectedTargets(Runnable onCompletion) {
227+
protected void connectDisconnectedTargets(Runnable onCompletion) {
215228
final List<Runnable> runnables = new ArrayList<>();
216229
synchronized (mutex) {
217230
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
@@ -321,7 +334,7 @@ private enum ActivityType {
321334
*
322335
* @opensearch.internal
323336
*/
324-
private class ConnectionTarget {
337+
protected class ConnectionTarget {
325338
private final DiscoveryNode discoveryNode;
326339

327340
private PlainListenableActionFuture<Void> future = PlainListenableActionFuture.newListenableFuture();

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.cluster.ClusterStateTaskConfig;
4343
import org.opensearch.cluster.ClusterStateUpdateTask;
4444
import org.opensearch.cluster.LocalClusterUpdateTask;
45+
import org.opensearch.cluster.NodeConnectionsService;
4546
import org.opensearch.cluster.block.ClusterBlocks;
4647
import org.opensearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
4748
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
@@ -187,6 +188,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
187188
private final NodeHealthService nodeHealthService;
188189
private final PersistedStateRegistry persistedStateRegistry;
189190
private final RemoteStoreNodeService remoteStoreNodeService;
191+
private NodeConnectionsService nodeConnectionsService;
190192

191193
/**
192194
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -418,7 +420,11 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
418420

419421
synchronized (mutex) {
420422
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getClusterManagerNode();
421-
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
423+
logger.debug(
424+
"handlePublishRequest: handling version [{}] from [{}]",
425+
publishRequest.getAcceptedState().getVersion(),
426+
sourceNode
427+
);
422428

423429
if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) {
424430
// Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication
@@ -630,7 +636,6 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
630636

631637
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
632638
final ClusterState stateForJoinValidation = getStateForClusterManagerService();
633-
634639
if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
635640
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
636641
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
@@ -814,6 +819,10 @@ public void onFailure(String source, Exception e) {
814819
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
815820
if (currentState.nodes().isLocalNodeElectedClusterManager() == false) {
816821
allocationService.cleanCaches();
822+
// This set only needs to be maintained on active cluster-manager
823+
// This is cleaned up to avoid stale entries which would block future reconnections
824+
logger.trace("Removing all pending disconnections as part of cluster-manager cleanup");
825+
nodeConnectionsService.clearPendingDisconnections();
817826
}
818827
return unchanged();
819828
}
@@ -914,11 +923,18 @@ public DiscoveryStats stats() {
914923
@Override
915924
public void startInitialJoin() {
916925
synchronized (mutex) {
926+
logger.trace("Starting initial join, becoming candidate");
917927
becomeCandidate("startInitialJoin");
918928
}
919929
clusterBootstrapService.scheduleUnconfiguredBootstrap();
920930
}
921931

932+
@Override
933+
public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
934+
assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
935+
this.nodeConnectionsService = nodeConnectionsService;
936+
}
937+
922938
@Override
923939
protected void doStop() {
924940
configuredHostsResolver.stop();
@@ -1356,6 +1372,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13561372
currentPublication = Optional.of(publication);
13571373

13581374
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
1375+
// marking pending disconnects before publish
1376+
// if a nodes tries to send a joinRequest while it is pending disconnect, it should fail
1377+
nodeConnectionsService.setPendingDisconnections(new HashSet<>(clusterChangedEvent.nodesDelta().removedNodes()));
13591378
leaderChecker.setCurrentNodes(publishNodes);
13601379
followersChecker.setCurrentNodes(publishNodes);
13611380
lagDetector.setTrackedNodes(publishNodes);

server/src/main/java/org/opensearch/cluster/coordination/Publication.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS
8585
}
8686

8787
public void start(Set<DiscoveryNode> faultyNodes) {
88-
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
88+
logger.debug("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
8989

9090
for (final DiscoveryNode faultyNode : faultyNodes) {
9191
onFaultyNode(faultyNode);

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ public String executor() {
542542
}
543543

544544
public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
545-
logger.debug("sending cluster state over transport to node: {}", destination.getName());
545+
logger.trace("sending cluster state over transport to node: {}", destination.getName());
546546
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
547547
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
548548
sendFullClusterState(destination, listener);

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

+4
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ private void runTask(UpdateTask task) {
502502
try {
503503
applyChanges(task, previousClusterState, newClusterState, stopWatch);
504504
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
505+
// At this point, cluster state appliers and listeners are completed
505506
logger.debug(
506507
"processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})",
507508
task.source,
@@ -510,6 +511,7 @@ private void runTask(UpdateTask task) {
510511
newClusterState.stateUUID()
511512
);
512513
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
514+
// Then we call the ClusterApplyListener of the task
513515
task.listener.onSuccess(task.source);
514516
} catch (Exception e) {
515517
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
@@ -578,6 +580,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
578580

579581
logger.debug("apply cluster state with version {}", newClusterState.version());
580582
callClusterStateAppliers(clusterChangedEvent, stopWatch);
583+
logger.debug("completed calling appliers of cluster state for version {}", newClusterState.version());
581584

582585
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
583586

@@ -594,6 +597,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
594597
state.set(newClusterState);
595598

596599
callClusterStateListeners(clusterChangedEvent, stopWatch);
600+
logger.debug("completed calling listeners of cluster state for version {}", newClusterState.version());
597601
}
598602

599603
protected void connectToNodesAndWait(ClusterState newClusterState) {

server/src/main/java/org/opensearch/discovery/Discovery.java

+5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.discovery;
3434

35+
import org.opensearch.cluster.NodeConnectionsService;
3536
import org.opensearch.cluster.coordination.ClusterStatePublisher;
3637
import org.opensearch.common.lifecycle.LifecycleComponent;
3738

@@ -54,4 +55,8 @@ public interface Discovery extends LifecycleComponent, ClusterStatePublisher {
5455
*/
5556
void startInitialJoin();
5657

58+
/**
59+
* Sets the NodeConnectionsService which is an abstraction used for connection management
60+
*/
61+
void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService);
5762
}

server/src/main/java/org/opensearch/node/Node.java

+1
Original file line numberDiff line numberDiff line change
@@ -1602,6 +1602,7 @@ public Node start() throws NodeValidationException {
16021602

16031603
injector.getInstance(GatewayService.class).start();
16041604
Discovery discovery = injector.getInstance(Discovery.class);
1605+
discovery.setNodeConnectionsService(nodeConnectionsService);
16051606
clusterService.getClusterManagerService().setClusterStatePublisher(discovery::publish);
16061607

16071608
// Start the transport service now so the publish address will be added to the local disco node in ClusterService

server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java

+30
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ public class ClusterConnectionManager implements ConnectionManager {
6464

6565
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
6666
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
67+
/**
68+
This set is used only by cluster-manager nodes.
69+
Nodes are marked as pending disconnect right before cluster state publish phase.
70+
They are cleared up as part of cluster state apply commit phase
71+
This is to avoid connections from being made to nodes that are in the process of leaving the cluster
72+
Note: If a disconnect is initiated while a connect is in progress, this Set will not handle this case.
73+
Callers need to ensure that connects and disconnects are sequenced.
74+
*/
75+
private final Set<DiscoveryNode> pendingDisconnections = ConcurrentCollections.newConcurrentSet();
6776
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
6877
@Override
6978
protected void closeInternal() {
@@ -122,12 +131,19 @@ public void connectToNode(
122131
ConnectionValidator connectionValidator,
123132
ActionListener<Void> listener
124133
) throws ConnectTransportException {
134+
logger.trace("connecting to node [{}]", node);
125135
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
126136
if (node == null) {
127137
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
128138
return;
129139
}
130140

141+
// if node-left is still in progress, we fail the connect request early
142+
if (pendingDisconnections.contains(node)) {
143+
listener.onFailure(new IllegalStateException("cannot make a new connection as disconnect to node [" + node + "] is pending"));
144+
return;
145+
}
146+
131147
if (connectingRefCounter.tryIncRef() == false) {
132148
listener.onFailure(new IllegalStateException("connection manager is closed"));
133149
return;
@@ -170,6 +186,7 @@ public void connectToNode(
170186
conn.addCloseListener(ActionListener.wrap(() -> {
171187
logger.trace("unregistering {} after connection close and marking as disconnected", node);
172188
connectedNodes.remove(node, finalConnection);
189+
pendingDisconnections.remove(node);
173190
connectionListener.onNodeDisconnected(node, conn);
174191
}));
175192
}
@@ -226,6 +243,19 @@ public void disconnectFromNode(DiscoveryNode node) {
226243
// if we found it and removed it we close
227244
nodeChannels.close();
228245
}
246+
pendingDisconnections.remove(node);
247+
logger.trace("Removed node [{}] from pending disconnections list", node);
248+
}
249+
250+
@Override
251+
public void setPendingDisconnection(DiscoveryNode node) {
252+
logger.trace("marking disconnection as pending for node: [{}]", node);
253+
pendingDisconnections.add(node);
254+
}
255+
256+
@Override
257+
public void clearPendingDisconnections() {
258+
pendingDisconnections.clear();
229259
}
230260

231261
/**

server/src/main/java/org/opensearch/transport/ConnectionManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ void connectToNode(
6565

6666
void disconnectFromNode(DiscoveryNode node);
6767

68+
void setPendingDisconnection(DiscoveryNode node);
69+
70+
void clearPendingDisconnections();
71+
6872
Set<DiscoveryNode> getAllConnectedNodes();
6973

7074
int size();

server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java

+10
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@ public void disconnectFromNode(DiscoveryNode node) {
114114
delegate.disconnectFromNode(node);
115115
}
116116

117+
@Override
118+
public void setPendingDisconnection(DiscoveryNode node) {
119+
delegate.setPendingDisconnection(node);
120+
}
121+
122+
@Override
123+
public void clearPendingDisconnections() {
124+
delegate.clearPendingDisconnections();
125+
}
126+
117127
@Override
118128
public ConnectionProfile getConnectionProfile() {
119129
return delegate.getConnectionProfile();

server/src/main/java/org/opensearch/transport/TransportService.java

+12
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,18 @@ public void disconnectFromNode(DiscoveryNode node) {
773773
connectionManager.disconnectFromNode(node);
774774
}
775775

776+
public void setPendingDisconnection(DiscoveryNode node) {
777+
connectionManager.setPendingDisconnection(node);
778+
}
779+
780+
/**
781+
* Wipes out all pending disconnections.
782+
* This is called on cluster-manager failover to remove stale entries
783+
*/
784+
public void clearPendingDisconnections() {
785+
connectionManager.clearPendingDisconnections();
786+
}
787+
776788
public void addMessageListener(TransportMessageListener listener) {
777789
messageListener.listeners.add(listener);
778790
}

0 commit comments

Comments
 (0)