|
8 | 8 |
|
9 | 9 | package org.opensearch.gateway.remote;
|
10 | 10 |
|
11 |
| -import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; |
| 11 | +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; |
12 | 12 | import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
13 | 13 | import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
14 | 14 | import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
|
15 | 15 | import org.opensearch.client.Client;
|
| 16 | +import org.opensearch.cluster.ClusterState; |
| 17 | +import org.opensearch.cluster.coordination.CoordinationState; |
| 18 | +import org.opensearch.cluster.coordination.PersistedStateRegistry; |
| 19 | +import org.opensearch.cluster.coordination.PublishClusterStateStats; |
16 | 20 | import org.opensearch.common.blobstore.BlobPath;
|
17 | 21 | import org.opensearch.common.settings.Settings;
|
18 |
| -import org.opensearch.common.util.FeatureFlags; |
19 | 22 | import org.opensearch.discovery.DiscoveryStats;
|
20 | 23 | import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
|
21 | 24 | import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
|
|
27 | 30 | import org.opensearch.repositories.RepositoriesService;
|
28 | 31 | import org.opensearch.repositories.blobstore.BlobStoreRepository;
|
29 | 32 | import org.opensearch.repositories.fs.ReloadableFsRepository;
|
| 33 | +import org.opensearch.test.InternalTestCluster; |
30 | 34 | import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
|
31 | 35 | import org.opensearch.test.OpenSearchIntegTestCase.Scope;
|
32 | 36 | import org.junit.Before;
|
33 | 37 |
|
34 | 38 | import java.io.IOException;
|
35 | 39 | import java.nio.charset.StandardCharsets;
|
36 | 40 | import java.util.Base64;
|
| 41 | +import java.util.HashSet; |
37 | 42 | import java.util.Locale;
|
38 | 43 | import java.util.Map;
|
| 44 | +import java.util.Objects; |
| 45 | +import java.util.Set; |
39 | 46 | import java.util.function.Function;
|
40 | 47 | import java.util.stream.Collectors;
|
41 | 48 |
|
| 49 | +import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; |
| 50 | +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; |
| 51 | +import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; |
| 52 | +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; |
| 53 | +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING; |
42 | 54 | import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
|
43 | 55 | import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
|
44 | 56 | import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
|
@@ -77,10 +89,7 @@ public void setup() {
|
77 | 89 |
|
78 | 90 | @Override
|
79 | 91 | protected Settings featureFlagSettings() {
|
80 |
| - return Settings.builder() |
81 |
| - .put(super.featureFlagSettings()) |
82 |
| - .put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled) |
83 |
| - .build(); |
| 92 | + return Settings.builder().put(super.featureFlagSettings()).put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled).build(); |
84 | 93 | }
|
85 | 94 |
|
86 | 95 | @Override
|
@@ -220,11 +229,121 @@ public void testRemotePublicationDownloadStats() {
|
220 | 229 | NodesStatsResponse nodesStatsResponseDataNode = client().admin()
|
221 | 230 | .cluster()
|
222 | 231 | .prepareNodesStats(dataNode)
|
223 |
| - .addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName()) |
| 232 | + .addMetric(DISCOVERY.metricName()) |
224 | 233 | .get();
|
225 | 234 |
|
226 | 235 | assertDataNodeDownloadStats(nodesStatsResponseDataNode);
|
| 236 | + } |
| 237 | + |
| 238 | + public void testRemotePublicationDisabledByRollingRestart() throws Exception { |
| 239 | + prepareCluster(3, 2, INDEX_NAME, 1, 2); |
| 240 | + ensureStableCluster(5); |
| 241 | + ensureGreen(INDEX_NAME); |
| 242 | + |
| 243 | + Set<String> clusterManagers = internalCluster().getClusterManagerNames(); |
| 244 | + Set<String> restartedMasters = new HashSet<>(); |
| 245 | + |
| 246 | + for (String clusterManager : clusterManagers) { |
| 247 | + internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() { |
| 248 | + @Override |
| 249 | + public Settings onNodeStopped(String nodeName) { |
| 250 | + restartedMasters.add(nodeName); |
| 251 | + return Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, false).build(); |
| 252 | + } |
| 253 | + |
| 254 | + @Override |
| 255 | + public void doAfterNodes(int n, Client client) { |
| 256 | + String activeCM = internalCluster().getClusterManagerName(); |
| 257 | + Set<String> followingCMs = clusterManagers.stream() |
| 258 | + .filter(node -> !Objects.equals(node, activeCM)) |
| 259 | + .collect(Collectors.toSet()); |
| 260 | + boolean activeCMRestarted = restartedMasters.contains(activeCM); |
| 261 | + NodesStatsResponse response = client().admin() |
| 262 | + .cluster() |
| 263 | + .prepareNodesStats(followingCMs.toArray(new String[0])) |
| 264 | + .clear() |
| 265 | + .addMetric(DISCOVERY.metricName()) |
| 266 | + .get(); |
| 267 | + // after master is flipped to restarted master, publication should happen on Transport |
| 268 | + response.getNodes().forEach(nodeStats -> { |
| 269 | + if (activeCMRestarted) { |
| 270 | + PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); |
| 271 | + assertTrue( |
| 272 | + stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0 |
| 273 | + ); |
| 274 | + assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount()); |
| 275 | + } else { |
| 276 | + DiscoveryStats stats = nodeStats.getDiscoveryStats(); |
| 277 | + assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount()); |
| 278 | + assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount()); |
| 279 | + assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount()); |
| 280 | + } |
| 281 | + }); |
| 282 | + |
| 283 | + NodesInfoResponse nodesInfoResponse = client().admin() |
| 284 | + .cluster() |
| 285 | + .prepareNodesInfo(activeCM) |
| 286 | + .clear() |
| 287 | + .addMetric(SETTINGS.metricName()) |
| 288 | + .get(); |
| 289 | + // if masterRestarted is true Publication Setting should be false, and vice versa |
| 290 | + assertTrue( |
| 291 | + REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted |
| 292 | + ); |
| 293 | + |
| 294 | + followingCMs.forEach(node -> { |
| 295 | + PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); |
| 296 | + CoordinationState.PersistedState remoteState = registry.getPersistedState( |
| 297 | + PersistedStateRegistry.PersistedStateType.REMOTE |
| 298 | + ); |
| 299 | + if (activeCMRestarted) { |
| 300 | + assertNull(remoteState.getLastAcceptedState()); |
| 301 | + // assertNull(remoteState.getLastAcceptedManifest()); |
| 302 | + } else { |
| 303 | + ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL) |
| 304 | + .getLastAcceptedState(); |
| 305 | + ClusterState remotePersistedState = remoteState.getLastAcceptedState(); |
| 306 | + assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata())); |
| 307 | + assertEquals(localState.nodes(), remotePersistedState.nodes()); |
| 308 | + assertEquals(localState.routingTable(), remotePersistedState.routingTable()); |
| 309 | + assertEquals(localState.customs(), remotePersistedState.customs()); |
| 310 | + } |
| 311 | + }); |
| 312 | + } |
| 313 | + }); |
| 314 | + |
| 315 | + } |
| 316 | + ensureGreen(INDEX_NAME); |
| 317 | + ensureStableCluster(5); |
| 318 | + |
| 319 | + String activeCM = internalCluster().getClusterManagerName(); |
| 320 | + Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet()); |
| 321 | + NodesStatsResponse response = client().admin() |
| 322 | + .cluster() |
| 323 | + .prepareNodesStats(followingCMs.toArray(new String[0])) |
| 324 | + .clear() |
| 325 | + .addMetric(DISCOVERY.metricName()) |
| 326 | + .get(); |
| 327 | + response.getNodes().forEach(nodeStats -> { |
| 328 | + PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); |
| 329 | + assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0); |
| 330 | + assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount()); |
| 331 | + }); |
| 332 | + NodesInfoResponse nodesInfoResponse = client().admin() |
| 333 | + .cluster() |
| 334 | + .prepareNodesInfo(activeCM) |
| 335 | + .clear() |
| 336 | + .addMetric(SETTINGS.metricName()) |
| 337 | + .get(); |
| 338 | + // if masterRestarted is true Publication Setting should be false, and vice versa |
| 339 | + assertFalse(REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings())); |
227 | 340 |
|
| 341 | + followingCMs.forEach(node -> { |
| 342 | + PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); |
| 343 | + CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); |
| 344 | + assertNull(remoteState.getLastAcceptedState()); |
| 345 | + // assertNull(remoteState.getLastAcceptedManifest()); |
| 346 | + }); |
228 | 347 | }
|
229 | 348 |
|
230 | 349 | private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
|
|
0 commit comments