Skip to content

Commit 5687d12

Browse files
Pranshu-Smingshl
authored andcommitted
Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state (opensearch-project#16763)
* Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com>
1 parent 5a8a822 commit 5687d12

File tree

7 files changed

+300
-1
lines changed

7 files changed

+300
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6666
- [Tiered Caching] Fix bug in cache stats API ([#16560](https://github.com/opensearch-project/OpenSearch/pull/16560))
6767
- Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702))
6868
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
69+
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
6970

7071
### Security
7172

server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java

+152
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,37 @@
3333
package org.opensearch.discovery;
3434

3535
import org.opensearch.cluster.ClusterState;
36+
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
3637
import org.opensearch.cluster.coordination.JoinHelper;
38+
import org.opensearch.cluster.coordination.PersistedStateRegistry;
3739
import org.opensearch.cluster.coordination.PublicationTransportHandler;
40+
import org.opensearch.cluster.metadata.RepositoriesMetadata;
41+
import org.opensearch.cluster.metadata.RepositoryMetadata;
3842
import org.opensearch.cluster.node.DiscoveryNode;
3943
import org.opensearch.cluster.node.DiscoveryNodes;
4044
import org.opensearch.cluster.service.ClusterService;
45+
import org.opensearch.common.Randomness;
4146
import org.opensearch.common.settings.Settings;
47+
import org.opensearch.repositories.RepositoriesService;
48+
import org.opensearch.repositories.Repository;
49+
import org.opensearch.repositories.RepositoryMissingException;
50+
import org.opensearch.repositories.fs.ReloadableFsRepository;
4251
import org.opensearch.test.OpenSearchIntegTestCase;
4352
import org.opensearch.test.disruption.NetworkDisruption;
4453
import org.opensearch.test.disruption.ServiceDisruptionScheme;
4554
import org.opensearch.test.disruption.SlowClusterStateProcessing;
4655
import org.opensearch.test.transport.MockTransportService;
4756
import org.opensearch.transport.Transport;
4857
import org.opensearch.transport.TransportService;
58+
import org.junit.Assert;
4959

60+
import java.util.Arrays;
5061
import java.util.HashSet;
62+
import java.util.List;
63+
import java.util.Objects;
5164
import java.util.Set;
5265
import java.util.concurrent.CountDownLatch;
66+
import java.util.stream.Collectors;
5367

5468
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
5569
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
@@ -250,4 +264,142 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
250264
ensureStableCluster(3);
251265
}
252266

267+
/**
268+
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
269+
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
270+
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
271+
*/
272+
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
273+
final String remoteStateRepoName = "remote-state-repo";
274+
final String remoteRoutingTableRepoName = "routing-table-repo";
275+
276+
Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
277+
remoteStateRepoName,
278+
ReloadableFsRepository.TYPE,
279+
remoteRoutingTableRepoName,
280+
ReloadableFsRepository.TYPE
281+
);
282+
internalCluster().startClusterManagerOnlyNodes(3);
283+
internalCluster().startDataOnlyNodes(3);
284+
285+
String clusterManagerNode = internalCluster().getClusterManagerName();
286+
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames())
287+
.filter(node -> !node.equals(clusterManagerNode))
288+
.collect(Collectors.toList());
289+
290+
ensureStableCluster(6);
291+
292+
MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
293+
TransportService.class,
294+
clusterManagerNode
295+
);
296+
logger.info("Blocking Cluster Manager Commit Request on all nodes");
297+
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
298+
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
299+
nonClusterManagerNodes.forEach(node -> {
300+
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
301+
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
302+
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
303+
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
304+
throw new FailedToCommitClusterStateException("Blocking Commit");
305+
}
306+
connection.sendRequest(requestId, action, request, options);
307+
});
308+
});
309+
310+
logger.info("Starting Node with remote publication settings");
311+
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
312+
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
313+
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);
314+
315+
// Checking if publish succeeded in the nodes before shutting down the blocked cluster-manager
316+
assertBusy(() -> {
317+
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
318+
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, randomNode);
319+
320+
ClusterState state = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL).getLastAcceptedState();
321+
RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
322+
Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
323+
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;
324+
325+
assertNotNull(repositoriesMetadata);
326+
assertNotNull(repositoriesMetadata.repositories());
327+
328+
for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
329+
if (repo.name().equals(remoteStateRepoName)) {
330+
isRemoteStateRepoConfigured = Boolean.TRUE;
331+
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
332+
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
333+
}
334+
}
335+
// Asserting that the metadata is present in the persisted cluster-state
336+
assertTrue(isRemoteStateRepoConfigured);
337+
assertTrue(isRemoteRoutingTableRepoConfigured);
338+
339+
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);
340+
341+
isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
342+
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);
343+
344+
// Asserting that the metadata is not present in the repository service.
345+
Assert.assertFalse(isRemoteStateRepoConfigured);
346+
Assert.assertFalse(isRemoteRoutingTableRepoConfigured);
347+
});
348+
349+
logger.info("Stopping current Cluster Manager");
350+
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
351+
// we had the new cluster-state published but not commited.
352+
internalCluster().stopCurrentClusterManagerNode();
353+
354+
// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
355+
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
356+
// the repository service will not be updated as the commit attempt failed.
357+
ensureStableCluster(6);
358+
359+
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
360+
361+
// Checking if the final cluster-state is updated.
362+
RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode)
363+
.state()
364+
.metadata()
365+
.custom(RepositoriesMetadata.TYPE);
366+
367+
Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
368+
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;
369+
370+
for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
371+
if (repo.name().equals(remoteStateRepoName)) {
372+
isRemoteStateRepoConfigured = Boolean.TRUE;
373+
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
374+
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
375+
}
376+
}
377+
378+
Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
379+
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);
380+
381+
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);
382+
383+
isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
384+
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);
385+
386+
Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
387+
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);
388+
389+
logger.info("Stopping current Cluster Manager");
390+
}
391+
392+
private Boolean isRepoPresentInRepositoryService(RepositoriesService repositoriesService, String repoName) {
393+
try {
394+
Repository remoteStateRepo = repositoriesService.repository(repoName);
395+
if (Objects.nonNull(remoteStateRepo)) {
396+
return Boolean.TRUE;
397+
}
398+
} catch (RepositoryMissingException e) {
399+
return Boolean.FALSE;
400+
}
401+
402+
return Boolean.FALSE;
403+
}
404+
253405
}

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java

+15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.repositories.RepositoriesService;
2222
import org.opensearch.repositories.Repository;
2323
import org.opensearch.repositories.RepositoryException;
24+
import org.opensearch.repositories.RepositoryMissingException;
2425
import org.opensearch.threadpool.ThreadPool;
2526

2627
import java.util.ArrayList;
@@ -183,6 +184,20 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
183184
boolean repositoryAlreadyPresent = false;
184185
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
185186
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
187+
try {
188+
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
189+
// but the commit operation failed, the cluster-state may have the repository metadata which is not applied
190+
// into the repository service. This may lead to assertion failures down the line.
191+
repositoriesService.get().repository(newRepositoryMetadata.name());
192+
} catch (RepositoryMissingException e) {
193+
logger.warn(
194+
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
195+
+ "in the repository service.",
196+
newRepositoryMetadata.name()
197+
);
198+
break;
199+
}
200+
186201
try {
187202
// This will help in handling two scenarios -
188203
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository

server/src/main/java/org/opensearch/repositories/RepositoriesService.java

+7
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.Collections;
8181
import java.util.HashMap;
8282
import java.util.List;
83+
import java.util.Locale;
8384
import java.util.Map;
8485
import java.util.Objects;
8586
import java.util.Set;
@@ -904,6 +905,12 @@ public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMe
904905
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
905906
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();
906907

908+
assert Objects.nonNull(repository) : String.format(
909+
Locale.ROOT,
910+
"repository [%s] not present in RepositoryService",
911+
currentRepositoryMetadata.name()
912+
);
913+
907914
List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
908915
.stream()
909916
.map(setting -> setting.getKey())

server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java

+67
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.common.util.FeatureFlags;
5656
import org.opensearch.node.remotestore.RemoteStoreNodeService;
5757
import org.opensearch.repositories.RepositoriesService;
58+
import org.opensearch.repositories.RepositoryMissingException;
5859
import org.opensearch.repositories.blobstore.BlobStoreRepository;
5960
import org.opensearch.test.OpenSearchTestCase;
6061
import org.opensearch.test.VersionUtils;
@@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
13781379
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
13791380
}
13801381

1382+
public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
1383+
Map<String, String> newNodeAttributes = new HashMap<>();
1384+
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
1385+
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
1386+
1387+
final AllocationService allocationService = mock(AllocationService.class);
1388+
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
1389+
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
1390+
RepositoriesService repositoriesService = mock(RepositoriesService.class);
1391+
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
1392+
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);
1393+
1394+
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
1395+
Settings.EMPTY,
1396+
allocationService,
1397+
logger,
1398+
rerouteService,
1399+
remoteStoreNodeService
1400+
);
1401+
1402+
final DiscoveryNode clusterManagerNode = new DiscoveryNode(
1403+
UUIDs.base64UUID(),
1404+
buildNewFakeTransportAddress(),
1405+
newNodeAttributes,
1406+
DiscoveryNodeRole.BUILT_IN_ROLES,
1407+
Version.CURRENT
1408+
);
1409+
1410+
final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
1411+
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
1412+
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
1413+
{
1414+
add(clusterStateRepo);
1415+
add(routingTableRepo);
1416+
}
1417+
};
1418+
1419+
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
1420+
.nodes(
1421+
DiscoveryNodes.builder()
1422+
.add(clusterManagerNode)
1423+
.localNodeId(clusterManagerNode.getId())
1424+
.clusterManagerNodeId(clusterManagerNode.getId())
1425+
)
1426+
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
1427+
.build();
1428+
1429+
final DiscoveryNode joiningNode = new DiscoveryNode(
1430+
UUIDs.base64UUID(),
1431+
buildNewFakeTransportAddress(),
1432+
newNodeAttributes,
1433+
DiscoveryNodeRole.BUILT_IN_ROLES,
1434+
Version.CURRENT
1435+
);
1436+
1437+
final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
1438+
clusterState,
1439+
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
1440+
);
1441+
assertThat(result.executionResults.entrySet(), hasSize(1));
1442+
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
1443+
assertTrue(taskResult.isSuccess());
1444+
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);
1445+
1446+
}
1447+
13811448
private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
13821449
throws Exception {
13831450

test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -2322,10 +2322,24 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
23222322
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
23232323
}
23242324

2325+
/**
2326+
* Starts multiple nodes with the given settings and returns their names
2327+
*/
2328+
public List<String> startNodes(int numOfNodes, Settings settings, Boolean waitForNodeJoin) {
2329+
return startNodes(waitForNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
2330+
}
2331+
23252332
/**
23262333
* Starts multiple nodes with the given settings and returns their names
23272334
*/
23282335
public synchronized List<String> startNodes(Settings... extraSettings) {
2336+
return startNodes(false, extraSettings);
2337+
}
2338+
2339+
/**
2340+
* Starts multiple nodes with the given settings and returns their names
2341+
*/
2342+
public synchronized List<String> startNodes(Boolean waitForNodeJoin, Settings... extraSettings) {
23292343
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
23302344
final int defaultMinClusterManagerNodes;
23312345
if (autoManageClusterManagerNodes) {
@@ -2377,7 +2391,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
23772391
nodes.add(nodeAndClient);
23782392
}
23792393
startAndPublishNodesAndClients(nodes);
2380-
if (autoManageClusterManagerNodes) {
2394+
if (autoManageClusterManagerNodes && !waitForNodeJoin) {
23812395
validateClusterFormed();
23822396
}
23832397
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
@@ -2422,6 +2436,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
24222436
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
24232437
}
24242438

2439+
public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
2440+
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
2441+
}
2442+
24252443
public List<String> startSearchOnlyNodes(int numNodes) {
24262444
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
24272445
}

0 commit comments

Comments
 (0)